Commit 7a6d6a74 authored by dingjy's avatar dingjy

CREATE

parents
FROM harbor.ibreader.com/base/openjdk:8u342-jdk
WORKDIR /app
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
RUN echo 'Asia/Shanghai' >/etc/timezone
ADD target/sensors.jar app.jar
ENV JAVA_OPTS="-server -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m -XX:MaxDirectMemorySize=128m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:MaxTenuringThreshold=5 -XX:+ExplicitGCInvokesConcurrent -XX:SurvivorRatio=7 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/ets/dump"
ENTRYPOINT [ "/bin/sh", "-c", "java $JAVA_OPTS -Dspring.profiles.active=$APP_ENV -Djava.security.egd=file:/dev/./urandom -jar app.jar" ]
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bayread</groupId>
<artifactId>sensors</artifactId>
<version>0.12</version>
<packaging>jar</packaging>
<name>sensors</name>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.32</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.22</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.19</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.6.1</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<finalName>sensors</finalName>
</build>
<distributionManagement>
<repository>
<id>lwby-releases</id>
<name>releases repository</name>
<url>http://maven.bayread.com/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>lwby-snapshots</id>
<name>snapshots repository</name>
<url>http://maven.bayread.com/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
</project>
package com.bayread.sensors;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.bind.annotation.RestController;
import sun.misc.BASE64Decoder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.sql.SQLOutput;
import java.util.*;
import java.util.zip.GZIPInputStream;
@SpringBootApplication
@RestController
@EnableScheduling
public class WebApplication {
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
SpringApplication.run(WebApplication.class, args);
}
}
\ No newline at end of file
package com.bayread.sensors.batch;
import com.bayread.sensors.batch.impl.SingleWorkerEvent;
import com.bayread.sensors.batch.impl.WorkerEvent;
import com.bayread.sensors.batch.impl.MultipleWorkerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 批量异步处理队列
*
*/
public class BatchWorkerCollector<T> implements WorkerCollector<T>{
/**
* 服务名称
*/
private String name;
/**
* 数据队列
*/
private BlockingQueue<T> bq;
/**
* 批量数据队列
*/
private BlockingQueue<Runnable> batchQueue;
/**
* 处理逻辑实现
*/
private WorkerEvent event;
/**
* 消费等待时间(毫秒)
*/
private int pollWaitTime;
/**
* 处理一批队列数据的大小
*/
private int batchSize;
/**
* 处理线程数量
*/
private int threadSize;
/**
* 指任务队列大小
*/
private int workQueueSize;
/**
* 是否开启消费服务
*/
private volatile boolean isStart = false;
private boolean isBatch = true;
private Logger logger = LoggerFactory.getLogger(this.getClass());
public BatchWorkerCollector(String name, int queueSize, int workQueueSize, int threadSize, int batchSize, int pollWaitTime, WorkerEvent event){
this.name = name;
this.bq = new LinkedBlockingQueue<T>(queueSize);
this.batchQueue = new LinkedBlockingQueue<Runnable>(workQueueSize<=0?Integer.MAX_VALUE:workQueueSize);
this.event = event;
this.workQueueSize = workQueueSize;
this.batchSize = batchSize;
this.pollWaitTime = pollWaitTime;
this.threadSize = threadSize;
this.isBatch = !(event instanceof SingleWorkerEvent);
logger.info("BATCH QUEUE CONFIG[name:{} queuesize:{} threadPoolQueueSize:{} threadSize:{} batchSize:{}]",name,queueSize,workQueueSize,threadSize,pollWaitTime);
}
/**
* 开启消费服务
*/
public void start(){
isStart = true;
new Thread(new HandleThread(new InnerThreadPool(name, this.threadSize, workQueueSize))).start();
logger.info("[{}]服务已开启.",name);
}
/**
* 关闭消费服务
*/
public void stop(){
isStart = false;
if ( this.event instanceof Closeable) {
try {
((Closeable)this.event).close();
} catch (IOException e) {}
}
}
/**
* 如果超过设置的队列阀值,将会阻塞等待
*/
public void put(T obj) {
try {
bq.put(obj);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 如果超过设置的队列阀值,将会直接丢弃并反回FALSE
*/
public boolean offer(T obj) {
if(null == obj)return false;
return bq.offer(obj);
}
/**
* 队列是否为空
*/
public boolean isEmpty(){
return bq.isEmpty();
}
/**
* 队列大小
*/
public int size(){
return bq.size();
}
/**
* 设置消费等待时间(毫秒)
*/
public void setPollWaitTime(int pollWaitTime) {
this.pollWaitTime = pollWaitTime;
}
/**
* 设置处理一批队列数据的大小
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
/**
* 设置处理线程的数量
*/
public void setThreadSize(int threadSize) {
this.threadSize = threadSize;
}
/**
* 设置线程池任务队列的大小
*/
public void setWorkQueueSize(int workQueueSize) {
this.workQueueSize = workQueueSize;
}
/**
* 批量队列数量
*/
@Override
public int batchQueueSize() {
return batchQueue.size();
}
/**
* 队列消费线程
*/
final private class HandleThread implements Runnable{
private ThreadPoolExecutor executor;
private HandleThread(ThreadPoolExecutor executor){
this.executor = executor;
}
@Override
public void run() {
while(isStart){
try {
if(isBatch){
List<T> li = new ArrayList<T>(batchSize);
for(int i=0;i<batchSize;i++){
T t = bq.poll(pollWaitTime, TimeUnit.MILLISECONDS);
if(null == t){break;}
li.add(t);
}
if(li.isEmpty()){
continue;
}
executor.execute(new Thread(){
public void run(){
try {
((MultipleWorkerEvent)event).handle(li);
} catch (Exception e) {
e.printStackTrace();
event.handleException(li,e);
logger.error("[{}]批量队列任务执行异常.",name,e);
}
}
});
}else{
T l = bq.poll(pollWaitTime, TimeUnit.MILLISECONDS);
try {
if(l != null) {
((SingleWorkerEvent) event).handle(l);
}
} catch (Exception e) {
event.handleException(l,e);
logger.error("[{}]批量队列任务执行异常.",name,e);
}
}
} catch (Exception e) {
logger.error("[{}]线程任务分配异常.",name,e);
}
}
logger.info("[{}]服务已关闭.",name);
executor.shutdown();
}
}
/**
* 线程池
*
*/
final private class InnerThreadPool extends ThreadPoolExecutor{
public InnerThreadPool(String name,int threadCount,int taskQueueSize) {
super(threadCount, (int) (threadCount * 1.618), 60, TimeUnit.MILLISECONDS,batchQueue,new InnerThreadFactory(name),new CallerRunsPolicy());
}
}
/**
* 提供线程的工厂
*
*/
final private class InnerThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
InnerThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = name + "-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
public int getBatchSize() {
return batchSize;
}
public int getPollWaitTime() {
return pollWaitTime;
}
public int getThreadSize() {
return threadSize;
}
public int getWorkQueueSize() {
return workQueueSize;
}
}
\ No newline at end of file
package com.bayread.sensors.batch;
import java.util.List;
public interface ExceptionEvent<T>
{
void handleEventException(Throwable ex, List<T> li);
}
package com.bayread.sensors.batch;
public interface WorkerCollector<T> {
public void put(T obj);
public boolean offer(T obj);
public boolean isEmpty();
public int size();
public int batchQueueSize();
public void start();
public void stop();
}
package com.bayread.sensors.batch.impl;
import java.util.List;
/**
* @author martin.ad
* @Description:
* @date 创建时间: 2022/9/19 14:11
*/
public interface MultipleWorkerEvent<T> extends WorkerEvent {
public void handle(List<T> list);
}
\ No newline at end of file
package com.bayread.sensors.batch.impl;
/**
* @author martin.ad
* @Description:
* @date 创建时间: 2022/9/19 13:54
*/
public interface SingleWorkerEvent<T> extends WorkerEvent {
public void handle(T line);
}
\ No newline at end of file
package com.bayread.sensors.batch.impl;
public interface WorkerEvent {
void handleException(Object obj,Exception ex);
}
\ No newline at end of file
package com.bayread.sensors.conf;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* @author martin.ad
* @Description:
* @date 创建时间: 2022/11/3 14:58
*/
@Configuration
@Data
public class ServiceConfig {
@Value("${task.queueSize}")
public int queueSize;
@Value("${task.workQueueSize}")
public int workQueueSize;
@Value("${task.threadSize}")
public int threadSize;
@Value("${task.batchSize}")
public int batchSize;
@Value("${task.pollWaitTime}")
public int pollWaitTime;
}
package com.bayread.sensors.service;
import com.bayread.sensors.batch.BatchWorkerCollector;
import com.bayread.sensors.conf.ServiceConfig;
import com.bayread.sensors.task.LogsHandleWorker;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
@Slf4j
@Controller
@RequestMapping(value = "/")
public class APIService {
private final static String OK_RESULT = "";
private static BatchWorkerCollector<String> workerCollector;
private static final Logger TRACKING_EVENT = LoggerFactory.getLogger("tracking.event");
@Resource
private ServiceConfig conf;
@Resource
private KafkaTemplate<String, String> kafka;
@PostConstruct
private void init() {
workerCollector = new BatchWorkerCollector<>(
"LogsHandleWorker", conf.queueSize, conf.workQueueSize, conf.threadSize, conf.batchSize, conf.pollWaitTime, new LogsHandleWorker(kafka));
workerCollector.start();
}
@ResponseBody
@RequestMapping(value = "sa")
public String reportLog(HttpServletRequest request) {
String dataList = request.getParameter("data_list");
if(StringUtils.isNotEmpty(dataList)){
TRACKING_EVENT.info(dataList);
workerCollector.put(dataList);
}
return OK_RESULT;
}
@ResponseBody
@RequestMapping(value = "ping")
public String test() {
return OK_RESULT;
}
}
\ No newline at end of file
package com.bayread.sensors.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
/**
* @author martin.ad
* @Description:
* @date 创建时间: 2022/11/4 17:13
*/
@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {
@ExceptionHandler(Exception.class)
public String exceptionHandler(Exception e) {
if(e instanceof HttpMessageNotReadableException){
log.error("timeout",e);
}else{
e.printStackTrace();
log.error("exceptionHandler",e);
}
return "error";
}
}
package com.bayread.sensors.task;
import cn.hutool.core.util.ZipUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.bayread.sensors.batch.impl.MultipleWorkerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import sun.misc.BASE64Decoder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class LogsHandleWorker implements MultipleWorkerEvent<String> {
private static final Logger ADAPTER_ERROR_LOG = LoggerFactory.getLogger("adapter.error");
private static final Logger SYS_ERROR_LOG = LoggerFactory.getLogger("sys.error");
private static final String TOPIC = "ets_app_topic";
private KafkaTemplate<String, String> kafka;
public LogsHandleWorker(KafkaTemplate<String, String> kafka){
this.kafka = kafka;
}
@Override
public void handle(List<String> list) {
if(list.size() == 0){return;}
list.forEach(log ->{
try {
handle(log);
}catch (Exception ex){
ADAPTER_ERROR_LOG.info("ERROR LOG:{}",log);
ex.printStackTrace();
}
});
}
void handle(String logs) throws IOException {
BASE64Decoder decoder = new BASE64Decoder();
byte[] dbyte = decoder.decodeBuffer(logs);
JSONArray ja = JSON.parseArray(new String(ZipUtil.unGzip(dbyte), StandardCharsets.UTF_8));
for(int i=0;i<ja.size();i++) {
send(ja.getJSONObject(i).toString());
}
}
public void send(String log){
kafka.send(TOPIC,log);
}
@Override
public void handleException(Object obj, Exception ex) {
SYS_ERROR_LOG.error("LogsHandleWorker Exception:",ex);
}
}
\ No newline at end of file
task:
queueSize: 10000
workQueueSize: 1000
threadSize: 10
batchSize: 200
pollWaitTime: 1000
logging:
level:
root: INFO
file:
path: /data/sensors/logs
\ No newline at end of file
task:
queueSize: 10000
workQueueSize: 1000
threadSize: 10
batchSize: 200
pollWaitTime: 1000
logging:
level:
root: INFO
file:
path: /data/sensors/logs
\ No newline at end of file
server:
port: 8080
tomcat:
accept-count: 10000
max-threads: 1000
min-spare-threads: 100
max-connections: 20000
connection-timeout: 10000
max-http-form-post-size: 10MB
spring:
application:
name: sensors
profiles:
active: test
group:
test: delivery-test
prod: delivery-prod
kafka:
bootstrap-servers: 172.17.243.58:9092,172.17.243.59:9092,172.17.243.60:9092,172.17.243.61:9092,172.17.243.62:9092
producer:
# # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
retries: 0
#一个批次可以使用的内存大小
batch-size: 327680
# 设置生产者内存缓冲区的大小。
buffer-memory: 335544320
# 间隔时间
linger-ms: 100
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
acks: 1
#prometheus监控平台配置
management:
endpoints:
web:
exposure:
include: "*"
exclude: configprops
endpoint:
health:
show-details: ALWAYS
metrics:
tags:
application: ${spring.application.name}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_DIR" value="${LOG_PATH:-.}" />
<appender name="tracking_event" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/tracking_event.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/tracking_event/%d{yyyy-MM-dd,aux}/tracking_event.%d{yyyy-MM-dd.HH}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="tracking.event" level="INFO" additivity="false">
<appender-ref ref="tracking_event" />
</logger>
<appender name="adapter_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/adapter_error.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/adapter_error/%d{yyyy-MM-dd,aux}/adapter_error.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="adapter.error" level="INFO" additivity="false">
<appender-ref ref="adapter_error" />
</logger>
<appender name="sys_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/sys_error.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/sys_error/%d{yyyy-MM-dd,aux}/sys_error.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="sys.error" level="INFO" additivity="true">
<appender-ref ref="sys_error" />
</logger>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
</encoder>
</appender>
<logger name="com.lwby.statistics.log" level="warn" />
<springProfile name="prod">
<root level="${logging.level.root}">
<appender-ref ref="console" />
</root>
</springProfile>
<springProfile name="test">
<root level="${logging.level.root}">
<appender-ref ref="console" />
</root>
</springProfile>
</configuration>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment