Commit 37d2de62 authored by 宋新宇's avatar 宋新宇

kafka修改

parent ba5f7a3b
...@@ -24,7 +24,7 @@ public class UniversalProcess { ...@@ -24,7 +24,7 @@ public class UniversalProcess {
public JdbcTemplate lwbyJdbcTemplate; public JdbcTemplate lwbyJdbcTemplate;
@Resource @Resource
public KafkaTemplate<String, String> kafkaTemplate; public KafkaTemplate<String, String> novelKafkaTemplate;
@Resource @Resource
public RedisTemplate<String, String> redisTemplate; public RedisTemplate<String, String> redisTemplate;
......
...@@ -13,6 +13,7 @@ import org.springframework.kafka.listener.MessageListener; ...@@ -13,6 +13,7 @@ import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List;
/** /**
* @author songxinyu * @author songxinyu
...@@ -20,7 +21,7 @@ import javax.annotation.Resource; ...@@ -20,7 +21,7 @@ import javax.annotation.Resource;
*/ */
@Slf4j @Slf4j
@Component @Component
public class DyStoryActiveConsumer implements MessageListener<String, String> { public class DyStoryActiveConsumer {
@Resource @Resource
DyStoryFlowExecutor dyStoryFlowExecutor; DyStoryFlowExecutor dyStoryFlowExecutor;
...@@ -29,20 +30,28 @@ public class DyStoryActiveConsumer implements MessageListener<String, String> { ...@@ -29,20 +30,28 @@ public class DyStoryActiveConsumer implements MessageListener<String, String> {
private static final Logger DYSTORY_ERROR_LOG = LoggerFactory.getLogger("dystory.error"); private static final Logger DYSTORY_ERROR_LOG = LoggerFactory.getLogger("dystory.error");
@KafkaListener(topics = {"${system.consumer.dystory.active.topic}"},groupId = "${system.consumer.dystory.active.topic}") @KafkaListener(topics = {"${system.consumer.dystory.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryStoryActive")
@Override public void onMessage(List<ConsumerRecord<String, String>> datas) {
public void onMessage(ConsumerRecord<String, String> data) {
long begin = System.currentTimeMillis(); long begin = System.currentTimeMillis();
DYSTORY_SYS_LOG.info("media.active.story.onMessage start, data={}", data == null ? null : data.value()); if (DYSTORY_SYS_LOG.isInfoEnabled()) {
DYSTORY_SYS_LOG.info("media.active.story.onMessage start ,size = {}", datas.size());
}
if (data != null) { for (ConsumerRecord<String, String> data : datas) {
try { try {
if (data == null) {
log.warn("media.active.story.onMessage listen 消费数据为null");
return;
}
DYSTORY_SYS_LOG.info("media.active.story.onMessage start, data={}", data == null ? null : data.value());
StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class); StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class);
DYSTORY_SYS_LOG.info("media.active.story.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event)); DYSTORY_SYS_LOG.info("media.active.story.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
StoryNovelAction action = new StoryNovelAction(event.getClientInfo(),event.getUserId(),event.getOpenId(),event.getUserUploadEvent()); StoryNovelAction action = new StoryNovelAction(event.getClientInfo(), event.getUserId(), event.getOpenId(),
event.getUserUploadEvent());
dyStoryFlowExecutor.getExecutorByStory().execute(action); dyStoryFlowExecutor.getExecutorByStory().execute(action);
} catch (Throwable e) { } catch (Throwable e) {
......
...@@ -14,6 +14,7 @@ import org.springframework.stereotype.Component; ...@@ -14,6 +14,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List;
/** /**
* @author songxinyu * @author songxinyu
...@@ -21,7 +22,7 @@ import javax.annotation.Resource; ...@@ -21,7 +22,7 @@ import javax.annotation.Resource;
*/ */
@Slf4j @Slf4j
@Component @Component
public class DyStoryPayConsumer extends DyStoryFlowExecutor implements MessageListener<String, String> { public class DyStoryPayConsumer extends DyStoryFlowExecutor {
@Resource @Resource
DyStoryFlowExecutor dyStoryFlowExecutor; DyStoryFlowExecutor dyStoryFlowExecutor;
...@@ -30,20 +31,28 @@ public class DyStoryPayConsumer extends DyStoryFlowExecutor implements MessageLi ...@@ -30,20 +31,28 @@ public class DyStoryPayConsumer extends DyStoryFlowExecutor implements MessageLi
private static final Logger DYSTORY_ERROR_LOG = LoggerFactory.getLogger("dystory.error"); private static final Logger DYSTORY_ERROR_LOG = LoggerFactory.getLogger("dystory.error");
@KafkaListener(topics = {"${system.consumer.dystory.pay.topic}"},groupId = "${system.consumer.dystory.pay.group_id}") @KafkaListener(topics = {"${system.consumer.dystory.pay.topic}"} ,containerFactory = "kafkaListenerContainerFactoryStorypay")
@Override public void onMessage(List<ConsumerRecord<String, String>> datas) {
public void onMessage(ConsumerRecord<String, String> data) {
long begin = System.currentTimeMillis(); long begin = System.currentTimeMillis();
DYSTORY_SYS_LOG.info("media.pay.story.onMessage start, data={}", data == null ? null : data.value()); if (DYSTORY_SYS_LOG.isInfoEnabled()) {
DYSTORY_SYS_LOG.info("media.active.story.onMessage start ,size = {}", datas.size());
}
if (data != null) { for (ConsumerRecord<String, String> data : datas) {
try { try {
if (data == null) {
log.warn("media.pay.story.onMessage listen 消费数据为null");
return;
}
DYSTORY_SYS_LOG.info("media.pay.story.onMessage start, data={}", data == null ? null : data.value());
StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class); StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class);
DYSTORY_SYS_LOG.info("media.pay.story.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event)); DYSTORY_SYS_LOG.info("media.pay.story.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
StoryNovelAction action = new StoryNovelAction(event.getClientInfo(),event.getUserId(),event.getOpenId(),event.getUserUploadEvent(),event.getProductId()); StoryNovelAction action = new StoryNovelAction(event.getClientInfo(), event.getUserId(), event.getOpenId(),
event.getUserUploadEvent(), event.getProductId());
dyStoryFlowExecutor.getExecutorByStory().execute(action); dyStoryFlowExecutor.getExecutorByStory().execute(action);
} catch (Throwable e) { } catch (Throwable e) {
......
...@@ -48,7 +48,7 @@ public class DyStoryUniversalProcess extends UniversalProcess { ...@@ -48,7 +48,7 @@ public class DyStoryUniversalProcess extends UniversalProcess {
ddi.setUserId(action.getUserId()); ddi.setUserId(action.getUserId());
String jsonString = JSONObject.toJSONString(ddi); String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = kafkaTemplate.send(topic, jsonString); ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send(topic, jsonString);
active_result.addCallback( active_result.addCallback(
result -> DYSTORY_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc), result -> DYSTORY_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> DYSTORY_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex) ex -> DYSTORY_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
......
...@@ -39,7 +39,7 @@ public class NovelActiveConsumer implements MessageListener<String, String> { ...@@ -39,7 +39,7 @@ public class NovelActiveConsumer implements MessageListener<String, String> {
executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("novel_setup").THEN("novel_prize").THEN("novel_store").THEN("novel_checkerfirst").SWITCH("novel_cross","novel_plan","novel_channel")); executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("novel_setup").THEN("novel_prize").THEN("novel_store").THEN("novel_checkerfirst").SWITCH("novel_cross","novel_plan","novel_channel"));
} }
@KafkaListener(topics = {"${system.consumer.novel.active.topic}"},groupId = "${system.consumer.novel.active.group_id}") @KafkaListener(topics = {"${system.consumer.novel.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryNovelActive")
@Override @Override
public void onMessage(ConsumerRecord<String, String> data) { public void onMessage(ConsumerRecord<String, String> data) {
......
...@@ -47,7 +47,7 @@ public class NovelUniversalProcess extends UniversalProcess { ...@@ -47,7 +47,7 @@ public class NovelUniversalProcess extends UniversalProcess {
ddi.setDevice_status(exists(PRIZE_FLOW_PREFIX.concat(String.valueOf(ddi.getUserId()))) ? 2 : 0); ddi.setDevice_status(exists(PRIZE_FLOW_PREFIX.concat(String.valueOf(ddi.getUserId()))) ? 2 : 0);
String jsonString = JSONObject.toJSONString(ddi); String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = kafkaTemplate.send("ocpc_result", jsonString); ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send("ocpc_result", jsonString);
active_result.addCallback( active_result.addCallback(
result -> NOVEL_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc), result -> NOVEL_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> NOVEL_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex) ex -> NOVEL_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
......
package com.lwby.marketing.conf;
import com.lwby.marketing.att.bystory.DyStoryActiveConsumer;
import com.lwby.marketing.att.novel.NovelActiveConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.novel.active.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryNovelActive")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean
public NovelActiveConsumer listener() {
return new NovelActiveConsumer();
}
}
package com.lwby.marketing.conf;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaProducerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.producer.retries}")
private int retries;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> novelKafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
package com.lwby.marketing.conf;
import com.lwby.marketing.att.bystory.DyStoryActiveConsumer;
import com.lwby.marketing.att.bystory.DyStoryPayConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 16:35 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class StoryActiveKafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.dystory.active.group_id}")
private String groupId;
@Value("${spring.kafka.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryStoryActive")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "storyActive")
public DyStoryActiveConsumer listener() {
return new DyStoryActiveConsumer();
}
}
package com.lwby.marketing.conf;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaProducerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class StoryKafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.producer.retries}")
private int retries;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean(name = "storyKafkaTemplate")
public KafkaTemplate<String, String> storyKafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
package com.lwby.marketing.conf;
import com.lwby.marketing.att.bystory.DyStoryPayConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaProducerConfig.java, v 0.1 2024年03月18日 16:34 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class StoryPayKafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.dystory.pay.group_id}")
private String groupId;
@Value("${spring.kafka.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryStorypay")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "storyPay")
public DyStoryPayConsumer listener() {
return new DyStoryPayConsumer();
}
}
...@@ -8,6 +8,7 @@ import com.alicp.jetcache.anno.Cached; ...@@ -8,6 +8,7 @@ import com.alicp.jetcache.anno.Cached;
import com.lwby.marketing.att.bystory.DyStoryFlowExecutor; import com.lwby.marketing.att.bystory.DyStoryFlowExecutor;
import com.lwby.marketing.att.bystory.DyStoryUniversalProcess; import com.lwby.marketing.att.bystory.DyStoryUniversalProcess;
import com.lwby.marketing.po.CrossCallback; import com.lwby.marketing.po.CrossCallback;
import com.lwby.marketing.util.DateTimUtils;
import com.lwby.marketing.vo.CrossPlatformAccount; import com.lwby.marketing.vo.CrossPlatformAccount;
import com.lwby.marketing.vo.StoryNovelAction; import com.lwby.marketing.vo.StoryNovelAction;
import com.lwby.marketing.vo.StoreUserUploadEventBO; import com.lwby.marketing.vo.StoreUserUploadEventBO;
...@@ -21,6 +22,7 @@ import org.springframework.web.bind.annotation.RequestMapping; ...@@ -21,6 +22,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
...@@ -40,7 +42,10 @@ public class AttrController { ...@@ -40,7 +42,10 @@ public class AttrController {
DyStoryUniversalProcess up; DyStoryUniversalProcess up;
@Resource @Resource
private KafkaTemplate<String, String> kafkaTemplate; private KafkaTemplate<String, String> novelKafkaTemplate;
@Resource
private KafkaTemplate<String, String> storyKafkaTemplate;
@Resource @Resource
DyStoryFlowExecutor dyStoryFlowExecutor; DyStoryFlowExecutor dyStoryFlowExecutor;
...@@ -52,7 +57,7 @@ public class AttrController { ...@@ -52,7 +57,7 @@ public class AttrController {
+ " wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/117.0.0.0 Mobile Safari/537.36 aweme.lite/28.9.0 " + " wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/117.0.0.0 Mobile Safari/537.36 aweme.lite/28.9.0 "
+ "ToutiaoMicroApp/3.14.0 PluginVersion/28909073\",\"version\":\"2.0.0\",\"xClient\":\"version=2.0.0;platformId=412;" + "ToutiaoMicroApp/3.14.0 PluginVersion/28909073\",\"version\":\"2.0.0\",\"xClient\":\"version=2.0.0;platformId=412;"
+ "equipment=android\"},\"createTime\":1709533439693,\"extraData\":{},\"id\":\"93857260-8130-442f-bc92-b549dbf28ef0\"," + "equipment=android\"},\"createTime\":1709533439693,\"extraData\":{},\"id\":\"93857260-8130-442f-bc92-b549dbf28ef0\","
+ "\"userId\":875325,\"userUploadEvent\":2,\"productId\":\"1\",\"appId\":\"ttd3dda5604ce230b401\"," + "\"userId\":875325,\"userUploadEvent\":0,\"appId\":\"ttd3dda5604ce230b401\","
+ "\"openId\":\"_000HgDjWl-P5-WS9HTQIssNgTnMBRUqDHDu\"}"; + "\"openId\":\"_000HgDjWl-P5-WS9HTQIssNgTnMBRUqDHDu\"}";
StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class); StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class);
...@@ -127,7 +132,7 @@ public class AttrController { ...@@ -127,7 +132,7 @@ public class AttrController {
StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class); StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class);
kafkaTemplate.send("testStoryNovelActive",JSON.toJSONString(event)).addCallback(success -> { storyKafkaTemplate.send("testStoryNovelActive",JSON.toJSONString(event)).addCallback(success -> {
// 消息发送到的topic // 消息发送到的topic
String topic = success.getRecordMetadata().topic(); String topic = success.getRecordMetadata().topic();
// 消息发送到的分区 // 消息发送到的分区
...@@ -152,7 +157,7 @@ public class AttrController { ...@@ -152,7 +157,7 @@ public class AttrController {
StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class); StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class);
kafkaTemplate.send("testStoryNovelpay",JSON.toJSONString(event)).addCallback(success -> { storyKafkaTemplate.send("testStoryNovelpay",JSON.toJSONString(event)).addCallback(success -> {
// 消息发送到的topic // 消息发送到的topic
String topic = success.getRecordMetadata().topic(); String topic = success.getRecordMetadata().topic();
// 消息发送到的分区 // 消息发送到的分区
...@@ -166,16 +171,31 @@ public class AttrController { ...@@ -166,16 +171,31 @@ public class AttrController {
} }
@Resource @Resource
JdbcTemplate lwbyJdbcTemplate; JdbcTemplate lwbyJdbcTemplate;
@Resource
JdbcTemplate marketingJdbcTemplate;
public Date getEarliestRegistrationTimeByDeviceId(String deviceId,Integer platformId){
Date date = lwbyJdbcTemplate.queryForObject(String.format("select max(registration_date) from user_profiles where platform_id != %d and device_id = '%s'",platformId,deviceId),Date.class);
return Objects.isNull(date)?new Date():date;
}
@RequestMapping("testcross") @RequestMapping("testcross")
public void testCross() { public void testCross() {
JSONObject platformPrizeExpireJson = com.alibaba.fastjson2.JSON.parseObject("{\n"
CrossPlatformAccount cpa = getCrossPlatformMedia().get("gdt"); + " \"5\": 259200,\n"
//判断是否配置跨平台归因,如果没有则继续执行下一个计划归因处理器 + " \"6\": 259200,\n"
if(Objects.isNull(cpa) || (!cpa.getAccount().contains("0") && !cpa.getAccount().contains("32422617")) + " \"51\": 259200\n"
|| (cpa.getOldAccount().contains("0") || cpa.getOldAccount().contains("32422617"))){ + "}");
System.out.println(111); Integer platformPrizeExpireStr = platformPrizeExpireJson.getInteger(String.valueOf(5));
}
long daysSinceRegistration = DateTimUtils.calculateDaysBetweenDates(getEarliestRegistrationTimeByDeviceId("56d4428af47940fb",5), new Date());
//CrossPlatformAccount cpa = getCrossPlatformMedia().get("gdt");
////判断是否配置跨平台归因,如果没有则继续执行下一个计划归因处理器
//if(Objects.isNull(cpa) || (!cpa.getAccount().contains("0") && !cpa.getAccount().contains("32422617"))
// || (cpa.getOldAccount().contains("0") || cpa.getOldAccount().contains("32422617"))){
// System.out.println(111);
//}
} }
@Cached(name="cross_platform_account", cacheType = CacheType.LOCAL) @Cached(name="cross_platform_account", cacheType = CacheType.LOCAL)
......
...@@ -33,6 +33,22 @@ spring: ...@@ -33,6 +33,22 @@ spring:
listener: listener:
ack-mode: RECORD ack-mode: RECORD
kafka2:
bootstrap-servers: 172.17.243.58:9092,172.17.243.59:9092,172.17.243.60:9092,172.17.243.61:9092,172.17.243.62:9092
producer:
retries: 3
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
enable-auto-commit: true
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 100
listener:
ack-mode: RECORD
bi: bi:
redis: redis:
host: r-2zethzp7pjl3rjbelp.redis.rds.aliyuncs.com host: r-2zethzp7pjl3rjbelp.redis.rds.aliyuncs.com
......
...@@ -33,6 +33,22 @@ spring: ...@@ -33,6 +33,22 @@ spring:
listener: listener:
ack-mode: RECORD ack-mode: RECORD
kafka2:
bootstrap-servers: 172.17.255.113:9092
producer:
retries: 3
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
enable-auto-commit: true
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 100
listener:
ack-mode: RECORD
bi: bi:
redis: redis:
host: 172.17.243.150 host: 172.17.243.150
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment