Commit 6a9bd727 authored by 宋新宇's avatar 宋新宇

小说消费修改

parent 7421bb36
...@@ -16,6 +16,7 @@ import org.springframework.stereotype.Component; ...@@ -16,6 +16,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List;
/** /**
* @author songxinyu * @author songxinyu
...@@ -23,7 +24,7 @@ import javax.annotation.Resource; ...@@ -23,7 +24,7 @@ import javax.annotation.Resource;
*/ */
@Slf4j @Slf4j
@Component @Component
public class NovelActiveConsumer implements MessageListener<String, String> { public class NovelActiveConsumer {
@Resource @Resource
ApplicationContext ctx; ApplicationContext ctx;
...@@ -40,25 +41,34 @@ public class NovelActiveConsumer implements MessageListener<String, String> { ...@@ -40,25 +41,34 @@ public class NovelActiveConsumer implements MessageListener<String, String> {
} }
@KafkaListener(topics = {"${system.consumer.novel.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryNovelActive") @KafkaListener(topics = {"${system.consumer.novel.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryNovelActive")
@Override public void onMessage(List<ConsumerRecord<String, String>> datas) {
public void onMessage(ConsumerRecord<String, String> data) {
long begin = System.currentTimeMillis();
//long begin = System.currentTimeMillis(); if (NOVEL_SYS_LOG.isInfoEnabled()) {
//NOVEL_SYS_LOG.info("media.active.onMessage start, data={}", data == null ? null : data.value()); NOVEL_SYS_LOG.info("media.active.onMessage start ,size = {}", datas.size());
}
//if (data != null) {
// try {
// BookStoreEvent event = JSON.parseObject(data.value(), BookStoreEvent.class); for (ConsumerRecord<String, String> data : datas) {
// NOVEL_SYS_LOG.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event)); try {
// if (data == null) {
// NovelAction action = new NovelAction(event.getClientInfo(), data.value()); log.warn("media.active.story.onMessage listen 消费数据为null");
// return;
// executorNovel.execute(action); }
// } catch (Throwable e) {
// NOVEL_ERROR_LOG.info("novel.active.onMessage failed, data={}, costTime={} ms", data.value(), NOVEL_SYS_LOG.info("media.active.onMessage start, data={}", data == null ? null : data.value());
// System.currentTimeMillis() - begin, e);
// } BookStoreEvent event = JSON.parseObject(data.value(), BookStoreEvent.class);
//} NOVEL_SYS_LOG.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
NovelAction action = new NovelAction(event.getClientInfo(), data.value());
executorNovel.execute(action);
} catch (Throwable e) {
NOVEL_ERROR_LOG.info("media.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
} }
} }
\ No newline at end of file
package com.lwby.marketing.att.novel;
import com.alibaba.fastjson.JSON;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
/**
* @author songxinyu
* @version NovelActiveConsumer.java, v 0.1 2024年03月05日 10:40 songxinyu Exp $
*/
@Slf4j
@Component
public class NovelActiveStoreConsumer {
@Resource
ApplicationContext ctx;
FlowExecutor<NovelAction> executorNovel;
private static final Logger NOVEL_SYS_LOG = LoggerFactory.getLogger("novel.sys");
private static final Logger NOVEL_ERROR_LOG = LoggerFactory.getLogger("novel.error");
@PostConstruct
public void init(){
executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("novel_setup").THEN("novel_prize").THEN("novel_store"));
}
@KafkaListener(topics = {"${system.consumer.novel.storeActive.topic}"} ,containerFactory = "kafkaListenerContainerFactoryNovelStoreActive")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
if (NOVEL_SYS_LOG.isInfoEnabled()) {
NOVEL_SYS_LOG.info("novel.store.active.onMessage start ,size = {}", datas.size());
}
for (ConsumerRecord<String, String> data : datas) {
try {
if (data == null) {
log.warn("novel.store.active.story.onMessage listen 消费数据为null");
return;
}
NOVEL_SYS_LOG.info("novel.store.active.onMessage start, data={}", data == null ? null : data.value());
BookStoreEvent event = JSON.parseObject(data.value(), BookStoreEvent.class);
NOVEL_SYS_LOG.info("novel.store.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
NovelAction action = new NovelAction(event.getClientInfo(), data.value());
executorNovel.execute(action);
} catch (Throwable e) {
NOVEL_ERROR_LOG.info("novel.store.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
\ No newline at end of file
package com.lwby.marketing.conf;
import com.lwby.marketing.att.novel.NovelActiveConsumer;
import com.lwby.marketing.att.novel.NovelActiveStoreConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class KafkaStoreConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.novel.storeActive.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryNovelStoreActive")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "novelstore")
public NovelActiveStoreConsumer listener() {
return new NovelActiveStoreConsumer();
}
}
...@@ -66,9 +66,11 @@ system: ...@@ -66,9 +66,11 @@ system:
group_id: story_dy_pay group_id: story_dy_pay
novel: novel:
active: active:
topic: novelActive topic: registerMediaEvent1
group_id: novel_dy_active_1 group_id: novel_media_active_1
storeActive:
topic: bookStoreEvent1
group_id: novel_store_active_1
management: management:
health: health:
db: db:
......
...@@ -65,8 +65,11 @@ system: ...@@ -65,8 +65,11 @@ system:
group_id: test_story_dy_pay group_id: test_story_dy_pay
novel: novel:
active: active:
topic: testNovelActive topic: testRegisterMediaEvent1
group_id: test_novel_dy_active group_id: test_novel_media_active_1
storeActive:
topic: testBookStoreEvent1
group_id: test_novel_store_active_1
management: management:
health: health:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment