Commit b1240e45 authored by 宋新宇's avatar 宋新宇

短剧APP归因

parent 7f90382e
......@@ -13,6 +13,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
......@@ -59,6 +60,17 @@ public class UniversalProcess {
redisTemplate.expire(key,expire,TimeUnit.SECONDS);
}
public <T> Map<String,T> hgetAll(Class<T> clazz,String key) {
Map<Object, Object> json = redisTemplate.opsForHash().entries(key);
Map<String, T> map = new HashMap<String, T>();
for (Map.Entry<Object, Object> item : json.entrySet()) {
T t = JSON.parseObject(item.getValue().toString(), clazz);
map.put(item.getKey().toString(), t);
}
return map;
}
public <T> T get(Class<T> clazz, String key) {
String value = redisTemplate.opsForValue().get(key);
if(!Objects.isNull(value)){
......
......@@ -29,7 +29,7 @@ public class DyVideoActiveConsumer {
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
@KafkaListener(topics = {"${system.consumer.dyvideo.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryStoryActive")
@KafkaListener(topics = {"${system.consumer.dyvideo.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryDyVideoActive")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
......
package com.lwby.marketing.att.dyvideo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* @author songxinyu
* @version DyVideoActiveConsumer.java, v 0.1 2024年03月21日 17:40 songxinyu Exp $
*/
@Slf4j
@Component
public class DyVideoBehavoirConsumer {
@Resource
DyVideoFlowExecutor dyVideoFlowExecutor;
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
@KafkaListener(topics = {"${system.consumer.dyvideo.behavoir.topic}"} ,containerFactory = "kafkaListenerContainerFactoryDyVideoBehavoir")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
if (DYVIDEO_SYS_LOG.isInfoEnabled()) {
DYVIDEO_SYS_LOG.info("media.behavoir.dyVideo.onMessage start ,size = {}", datas.size());
}
for (ConsumerRecord<String, String> data : datas) {
try {
if (data == null) {
log.warn("media.behavoir.dyVideo.onMessage listen 消费数据为null");
return;
}
DYVIDEO_SYS_LOG.info("media.behavoir.dyVideo.onMessage start, data={}", data == null ? null : data.value());
StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class);
DYVIDEO_SYS_LOG.info("media.behavoir.dyVideo.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
StoryNovelAction action = new StoryNovelAction(event.getClientInfo(), event.getUserId(), event.getWechatOpenId(),
event.getUserUploadEvent());
dyVideoFlowExecutor.getExecutorByStory().execute(action);
} catch (Throwable e) {
DYVIDEO_ERROR_LOG.error("dy.video.behavoir.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
......@@ -20,7 +20,7 @@ public class StoreDyVideoAttributionFlow extends NodeFlow<StoryNovelAction> {
//商店归因通知
if(Objects.isNull(action.getDeliveryDeviceInfo())){
if (action.getType().equals(CallBackType.active.getType())) {
up.notifyResult(action,"ocpc_result", AttributionStatus.STORE_CALLBACK);
up.notifyResult(action,"ocpc_result_test", AttributionStatus.STORE_CALLBACK);
}
action.stop(true); //结束后面所有执行流程
}
......
......@@ -25,7 +25,7 @@ public class UploadDyVideoCallFlow extends NodeFlow<StoryNovelAction> {
boolean success = action.getMedia().notify(action);
if (success) {
action.getVideoUpload().setActiveTime(System.currentTimeMillis());
up.notifyResult(action, type.getTopic(),type.getStatus());
up.notifyResult(action, type.getTopic()+"_test",type.getStatus());
up.set(up.getFirstCheckerKey(action),60 * 60 * 24,"1");
}
}
......
package com.lwby.marketing.att.videoapp;
import com.lwby.marketing.vo.ClientInfo;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.function.Function;
import java.util.stream.Collectors;
public enum DeviceVideoType {
IMEI("imei",(c) -> isNotEmptyAndSNull(c.getImei())?c.getImei():null),
OAID("oaid", (c) -> isNotEmptyAndSNull(c.getOaid())?c.getOaid():null),
IDFA("idfa",(c) -> isNotEmptyAndSNull(c.getIdfa())?c.getIdfa():null),
IP_UA("ipua",(c) ->
(isNotEmptyAndSNull(c.getClientIp()) && isNotEmptyAndSNull(c.getUa())) ?
(("0".equals(c.getOs()) ? c.getClientIp().concat(StringUtils.substringBefore(c.getUa(), " Chrome/")) :
( c.getUa().startsWith("bi kan duan ju") ? Arrays.stream(c.getSystemVersion().split("\\.")).collect(Collectors.joining("_")):
c.getClientIp().concat(c.getUa())))):null),
IP_MODEL("ipmodel",(c) -> isNotEmptyAndSNull(c.getClientIp()) && isNotEmptyAndSNull(c.getPhoneModel())?c.getClientIp().concat(c.getPhoneModel()):null);
private String value;
private Function<ClientInfo,String> fun;
DeviceVideoType(String value, Function<ClientInfo,String> fun) {
this.value = value;
this.fun = fun;
}
public String getValue() {
return this.value;
}
public String getDeviceId(ClientInfo clientInfo){
return fun.apply(clientInfo);
}
private static boolean isNotEmptyAndSNull(String str) {
return StringUtils.isNotEmpty(str) && !"null".equals(str);
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp;
import com.alibaba.fastjson.JSON;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
/**
* @author songxinyu
* @version VideoAppActiveConsumer.java, v 0.1 2024年03月25日 17:40 songxinyu Exp $
*/
@Slf4j
@Component
public class VideoAppActiveConsumer {
@Resource
ApplicationContext ctx;
FlowExecutor<NovelAction> executorNovel;
private static final Logger VIDEO_SYS_LOG = LoggerFactory.getLogger("videoapp.sys");
private static final Logger VIDEO_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@PostConstruct
public void init(){
executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("novel_setup").THEN("novel_prize").THEN("novel_store").THEN("novel_checkerfirst").SWITCH("novel_cross","novel_plan","novel_channel"));
}
@KafkaListener(topics = {"${system.consumer.videoapp.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryVideoappActive")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
if (VIDEO_SYS_LOG.isInfoEnabled()) {
VIDEO_SYS_LOG.info("media.active.onMessage start ,size = {}", datas.size());
}
for (ConsumerRecord<String, String> data : datas) {
try {
if (data == null) {
VIDEO_SYS_LOG.warn("media.active.story.onMessage listen 消费数据为null");
return;
}
VIDEO_SYS_LOG.info("media.active.onMessage start, data={}", data == null ? null : data.value());
BookStoreEvent event = JSON.parseObject(data.value(), BookStoreEvent.class);
VIDEO_SYS_LOG.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
NovelAction action = new NovelAction(event.getClientInfo(), data.value());
executorNovel.execute(action);
} catch (Throwable e) {
VIDEO_ERROR_LOG.info("media.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp;
import com.alibaba.fastjson.JSON;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
/**
* @author songxinyu
* @version VideoAppActiveConsumer.java, v 0.1 2024年03月25日 17:40 songxinyu Exp $
*/
@Slf4j
@Component
public class VideoAppBehavoirConsumer {
@Resource
ApplicationContext ctx;
FlowExecutor<NovelAction> executorNovel;
private static final Logger VIDEO_SYS_LOG = LoggerFactory.getLogger("videoapp.sys");
private static final Logger VIDEO_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@PostConstruct
public void init(){
executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("novel_setup").THEN("novel_prize").THEN("novel_store").THEN("novel_checkerfirst").SWITCH("novel_cross","novel_plan","novel_channel"));
}
@KafkaListener(topics = {"${system.consumer.videoapp.behavoir.topic}"} ,containerFactory = "kafkaListenerContainerFactoryVideoappBehavoir")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
if (VIDEO_SYS_LOG.isInfoEnabled()) {
VIDEO_SYS_LOG.info("media.active.onMessage start ,size = {}", datas.size());
}
for (ConsumerRecord<String, String> data : datas) {
try {
if (data == null) {
VIDEO_SYS_LOG.warn("media.active.story.onMessage listen 消费数据为null");
return;
}
VIDEO_SYS_LOG.info("media.active.onMessage start, data={}", data == null ? null : data.value());
BookStoreEvent event = JSON.parseObject(data.value(), BookStoreEvent.class);
VIDEO_SYS_LOG.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
NovelAction action = new NovelAction(event.getClientInfo(), data.value());
executorNovel.execute(action);
} catch (Throwable e) {
VIDEO_ERROR_LOG.info("media.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.NovelAction;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Component
public class VideoAppFlowExecutor {
@Resource
ApplicationContext ctx;
FlowExecutor<NovelAction> executorByStory;
@PostConstruct
public void init(){
executorByStory = new FlowExecutor<>(ctx, Rule.create().THEN("videoapp_setup").THEN("videoapp_store").THEN("videoapp_checkerfirst").SWITCH("videoapp_behavior").THEN("videoapp_uploadcall"));
}
public FlowExecutor<NovelAction> getExecutorByStory(){
return this.executorByStory;
}
}
package com.lwby.marketing.att.videoapp;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alicp.jetcache.anno.CacheRefresh;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.UniversalProcess;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.vo.*;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Component
public class VideoAppUniversalProcess extends UniversalProcess {
private static final Logger VIDEOAPP_SYS_LOG = LoggerFactory.getLogger("videoapp.sys");
private static final Logger VIDEOAPP_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
/**
* 通知处理结果
*/
public void notifyResult(NovelAction action, String topic, AttributionStatus status) {
DeliveryDeviceInfo ddi = action.getDeliveryDeviceInfo();
if (Objects.isNull(ddi)) {
ddi = new DeliveryDeviceInfo();
String channelStr = String.valueOf(action.getChannelId());
ddi.setMedia(MediaMapping.getMediaNameByChannelId(channelStr));
ddi.setAd_plan_id("0");
ddi.setAd_group_id("0");
ddi.setAd_creative_id("0");
ddi.setPlatform_id(String.valueOf(action.getPlatformId()));
ddi.setDj_channel(channelStr);
}
ddi.setIs_call(status.id);
ddi.setDevice_id(String.valueOf(action.getUserId()));
ddi.setActive_time(System.currentTimeMillis());
ddi.setChannel(String.valueOf(action.getChannelId()));
ddi.setUserId(action.getUserId());
String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send(topic, jsonString);
active_result.addCallback(
result -> VIDEOAPP_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> VIDEOAPP_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
);
}
public String getTotalCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId ,int sprDedu, String dateStr) {
return String.format("%s_total_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
public String getCallbackCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId, int sprDedu, String dateStr) {
return String.format("%s_callback_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
public String getFirstCheckerKey(NovelAction action) {
return Objects.equals(action.getType(), CallBackType.active.getType())
? String.format("fc_%d_%d_%s_%s", action.getDeviceId(), action.getPlatformId(),action.getMediaName(),action.getCurrentDateStr())
: String.format("fc_%d_%d_%s", action.getDeviceId(), action.getPlatformId(),action.getMediaName());
}
public String buildKey(Integer platformId,String deviceId) {
return String.format("market:activeness:30:%d:%s",platformId,deviceId);
}
/******************************************** JDBC *************************************************************/
@Cached(name="appchannel_video_app", cacheType = CacheType.LOCAL)
@CacheRefresh(refresh = 300)
public AppChannelVO getAppChannelByPlatformAndChannel(int platformId,Long channelId) {
try {
RowMapper<AppChannelVO> rowMapper = BeanPropertyRowMapper.newInstance(AppChannelVO.class);
return lwbyJdbcTemplate.queryForObject(String.format("select id,video_count,mv_count,cpm_count,ecpm_avg_count,motivation_count,arpu_count,ecpm_per_count"
+ " from app_channel where channel_id=%d and platform_id=%d",channelId,platformId),rowMapper);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
}
package com.lwby.marketing.att.videoapp.handle;
import com.lwby.marketing.att.videoapp.VideoAppUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.NovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component("videoapp_checkerfirst")
public class CheckerVideoAppFirstFlow extends NodeFlow<NovelAction> {
@Resource
VideoAppUniversalProcess up;
@Override
public void process(NovelAction action) {
if(up.exists(up.getFirstCheckerKey(action))){
action.stop(true);
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp.handle;
import cn.hutool.crypto.SecureUtil;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.videoapp.DeviceVideoType;
import com.lwby.marketing.att.videoapp.VideoAppUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.notify.Media;
import com.lwby.marketing.notify.media.Platform;
import com.lwby.marketing.util.DateTimUtils;
import com.lwby.marketing.util.NumberUtils;
import com.lwby.marketing.vo.ClientInfo;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.UserProfile;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
@Slf4j
@Component("videoapp_setup")
public class ParameterSetupVideoAppFlow extends NodeFlow<NovelAction> {
@Resource
VideoAppUniversalProcess up;
private static final Logger VIDEOAPP_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@Override
public void process(NovelAction action) {
ClientInfo clientInfo = action.getClientInfo();
//获取用户注册时间
UserProfile userProfile = clientInfo.getUser();
if (userProfile == null || userProfile.getRegistrationDate() == null) {
VIDEOAPP_ERROR_LOG.error("ClientInfo.userProfile 对像为[{}],ClientInfo对像[{}]",userProfile, clientInfo);
action.stop(true);
return;
}
//平台ID
Integer platformId = clientInfo.getPlatformId();
//VO对像
DeliveryDeviceInfo deliveryDeviceInfo = null;
//设备ID
String deviceIdKey = null;
//匹配设备ID
for(DeviceVideoType type: DeviceVideoType.values()){
deviceIdKey = type.getDeviceId(clientInfo);
if(null != deviceIdKey && null != (deliveryDeviceInfo = up.get(DeliveryDeviceInfo.class,assembleKey(deviceIdKey,platformId)))){
clientInfo.setOaid(deliveryDeviceInfo.getOaid()); //回补OAID
action.setDeliveryDeviceInfo(deliveryDeviceInfo);
action.setDeviceVideoType(type);
break;
}
}
action.setPlatformId(platformId);
action.setUserId(clientInfo.getUser().getId());
action.setDeviceId(clientInfo.getDID());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
if (CallBackType.active.getType().equals(0)) {
action.setChannelId((long)action.getClientInfo().getChannel());
}
//非商店吊起参数设置
if (Objects.nonNull(deliveryDeviceInfo)) {
action.setChannelId(NumberUtils.parseLong(deliveryDeviceInfo.getDj_channel()));
action.setPlanId(NumberUtils.parseLong((deliveryDeviceInfo.getAd_plan_id())));
action.setAdvertiserId(deliveryDeviceInfo.getAdvertiser_id());
action.setMediaName(deliveryDeviceInfo.getMedia());
action.setMedia(Media.getMedia(action.getMediaName(), Platform.VIDEO));
}
}
public String assembleKey(String deviceId, int platformId) {
return String.format("getClickByIdfaAndPlatformId_%s_%d", SecureUtil.md5(deviceId), platformId);
}
}
package com.lwby.marketing.att.videoapp.handle;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.videoapp.VideoAppUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.NovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
@Component("videoapp_store")
public class StoreVideoAppAttributionFlow extends NodeFlow<NovelAction> {
@Resource
VideoAppUniversalProcess up;
@Override
public void process(NovelAction action) {
//商店归因通知
if(Objects.isNull(action.getDeliveryDeviceInfo())){
//商店判断30天活跃,不在活跃天数内,发BI
String aliveDeviceKey = up.buildKey(action.getPlatformId(), action.getDeviceId());
if (!up.existsIsAlive(aliveDeviceKey)) {
up.notifyResult(action,"ocpc_result_test", AttributionStatus.STORE_CALLBACK);
}
action.stop(true); //结束后面所有执行流程
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp.handle;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.videoapp.VideoAppUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.NovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author songxinyu
* @version ActiveCallFlow.java, v 0.1 2024年03月04日 18:28 songxinyu Exp $
*/
@Component("videoapp_uploadcall")
public class UploadVideoAppCallFlow extends NodeFlow<NovelAction> {
@Resource
VideoAppUniversalProcess up;
@Override
public void process(NovelAction action) {
CallBackType type = CallBackType.getCallBackTypeByType(action.getType());
boolean success = action.getMedia().notify(action);
if (success) {
up.notifyResult(action, type.getTopic()+"_test",type.getStatus());
up.set(up.getFirstCheckerKey(action),60 * 60 * 24,"1");
}
}
}
......@@ -28,4 +28,10 @@ public class DataSourceConfig {
DataSource videoDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "videoInlandDataSource")
@ConfigurationProperties("spring.datasource.video-inland")
DataSource videoInlandDataSource() {
return DataSourceBuilder.create().build();
}
}
package com.lwby.marketing.conf;
import com.lwby.marketing.att.dyvideo.DyVideoActiveConsumer;
import com.lwby.marketing.att.novel.NovelActiveConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class DyVideoActiveKafkaConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.novel.active.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryDyVideoActive")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "dyvideo_active")
public DyVideoActiveConsumer listener() {
return new DyVideoActiveConsumer();
}
}
package com.lwby.marketing.conf;
import com.lwby.marketing.att.dyvideo.DyVideoBehavoirConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class DyVideoBehavoirKafkaConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.novel.active.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryDyVideoBehavoir")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "dyvideo_behavoir")
public DyVideoBehavoirConsumer listener() {
return new DyVideoBehavoirConsumer();
}
}
......@@ -24,4 +24,9 @@ public class JdbcTemplateConfig {
return new JdbcTemplate(dataSource);
}
@Bean(name = "videoInlandJdbcTemplate")
JdbcTemplate videoInlandJdbc(@Qualifier("videoInlandDataSource") DataSource dataSource){
return new JdbcTemplate(dataSource);
}
}
package com.lwby.marketing.conf;
import com.lwby.marketing.att.videoapp.VideoAppActiveConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class VideoAppActiveKafkaConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.novel.active.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryVideoappActive")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "videoapp_active")
public VideoAppActiveConsumer listener() {
return new VideoAppActiveConsumer();
}
}
package com.lwby.marketing.conf;
import com.lwby.marketing.att.videoapp.VideoAppActiveConsumer;
import com.lwby.marketing.att.videoapp.VideoAppBehavoirConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class VideoAppBehavoirKafkaConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.novel.active.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryVideoappBehavoir")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "videoapp_behavoir")
public VideoAppBehavoirConsumer listener() {
return new VideoAppBehavoirConsumer();
}
}
......@@ -7,6 +7,7 @@ import com.lwby.marketing.notify.media.gdt.NovelGDTNotify;
import com.lwby.marketing.notify.media.jrtt.DyVideoJRTTNotify;
import com.lwby.marketing.notify.media.jrtt.NovelJRTTNotify;
import com.lwby.marketing.notify.media.jrtt.StoryJRTTNotify;
import com.lwby.marketing.notify.media.jrtt.VideoAppJRTTNotify;
import java.util.Arrays;
import java.util.Map;
......@@ -17,7 +18,8 @@ public enum Media {
NOVEL_GDT("gdt", Platform.NOVEL, "广点通",new NovelGDTNotify()),
NOVEL_JRTT("jrtt", Platform.NOVEL, "今日头条",new NovelJRTTNotify()),
DY_STORY_JRTT("jrtt", Platform.DY_STORY, "今日头条",new StoryJRTTNotify()),
DY_VIDEO_JRTT("jrtt", Platform.DY_VIDEO, "今日头条",new DyVideoJRTTNotify());
DY_VIDEO_JRTT("jrtt", Platform.DY_VIDEO, "今日头条",new DyVideoJRTTNotify()),
VIDEOAPP_JRTT("jrtt_freevideo", Platform.VIDEO, "今日头条",new VideoAppJRTTNotify());
final String name;
final Platform platform;
......
package com.lwby.marketing.notify.media.jrtt;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.notify.MobileNotify;
import com.lwby.marketing.notify.media.jrtt.dto.JrttAttributeRequest;
import com.lwby.marketing.vo.ClientInfo;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.NovelAction;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class VideoAppJRTTNotify extends MobileNotify {
public static final String ATTRIBUTE_URL = "https://analytics.oceanengine.com/api/v2/conversion";
@Override
public boolean android(NovelAction na) {
return na.getType() == 0 ? android(na,"active") :android(na,"game_addiction");
}
@Override
public boolean ios(NovelAction na) {
return na.getType() == 0 ? android(na,"active") :android(na,"game_addiction");
}
public boolean android(NovelAction na,String eventType){
ClientInfo ci = na.getClientInfo();
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
JrttAttributeRequest.Device device = new JrttAttributeRequest.Device();
device.setPlatform("android");
device.setImei(ci.getImei());
device.setOaid(ci.getOaid());
JrttAttributeRequest.Ad ad = new JrttAttributeRequest.Ad();
ad.setCallback(ddi.getCallback_param());
JrttAttributeRequest.Context context = new JrttAttributeRequest.Context();
context.setAd(ad);
context.setDevice(device);
JrttAttributeRequest request = JrttAttributeRequest.builder().context(context).event_type(eventType).timestamp(
System.currentTimeMillis()).build();
String userJson = JSONObject.toJSONString(request);
try {
//TODO;测试不执行
// String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
// Integer resultCode = (Integer) JSON.parseObject(result).get("code");
// return resultCode == 0;
log.info("Callback info:{}",userJson);
return true;
} catch (Exception e) {
return false;
}
}
public boolean ios(NovelAction na,String eventType) {
ClientInfo ci = na.getClientInfo();
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
JrttAttributeRequest.Device device = new JrttAttributeRequest.Device();
device.setPlatform("ios");
device.setIdfa(ci.getIdfa());
JrttAttributeRequest.Ad ad = new JrttAttributeRequest.Ad();
ad.setCallback(ddi.getCallback_param());
JrttAttributeRequest.Context context = new JrttAttributeRequest.Context();
context.setAd(ad);
context.setDevice(device);
JrttAttributeRequest request = JrttAttributeRequest.builder().context(context).event_type(eventType).timestamp(
System.currentTimeMillis()).build();
String userJson = JSONObject.toJSONString(request);
try {
//TODO;测试不执行
// String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
// Integer resultCode = (Integer) JSON.parseObject(result).get("code");
// return resultCode == 0;
log.info("Callback info:{}",userJson);
return true;
} catch (Exception e) {
return false;
}
}
}
package com.lwby.marketing.util;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author songxinyu
* @version CacheKeyUtils.java, v 0.1 2024年03月22日 17:56 songxinyu Exp $
......@@ -8,6 +11,12 @@ public class CacheKeyUtils {
private static final String CACHE_BEHAVIOR_PREFIX = "up_c_";
static SimpleDateFormat df = new SimpleDateFormat("MMdd");//设置日期格式
public static String getBehavoirKey(Long userId) {
return CACHE_BEHAVIOR_PREFIX + userId + "_" + df.format(new Date()) + "_fv";
}
public static String getVideoBehavoirKey(Long userId) {
return CACHE_BEHAVIOR_PREFIX + userId + "_fv";
}
......
package com.lwby.marketing.vo;
import com.lwby.marketing.att.novel.DeviceType;
import com.lwby.marketing.att.videoapp.DeviceVideoType;
import com.lwby.marketing.flow.Action;
import com.lwby.marketing.notify.Media;
import lombok.Data;
......@@ -24,7 +25,12 @@ public class NovelAction extends Action {
String body;
DeviceType deviceType;
String currentDateStr;
Media media;
Media media;
DeviceVideoType deviceVideoType;
String goodId;
Integer type;
public NovelAction(ClientInfo clientInfo,String msg){
this(clientInfo,null,msg);
}
......
......@@ -21,6 +21,13 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 2
minIdle: 2
video-inland:
jdbc-url: jdbc:mysql://rm-2zeo09186ukqa8zh1.mysql.rds.aliyuncs.com:3306/video-inland?zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=utf8&autoReconnect=true
username: lwby
password: VjxYfmY8J77ISChp
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 2
minIdle: 2
redis:
host: 172.17.243.150
port: 6379
......@@ -77,6 +84,20 @@ system:
storeActive:
topic: testBookStoreEvent1
group_id: test_novel_store_active_1
dyvideo:
active:
topic: testDouyinRegisterTopic
group_id: test_douyin_video_active_1
behavoir:
topic: testDouyinBehaviorTopic
group_id: test_douyin_video_behavior_1
videoapp:
active:
topic: testFreeVideoEvent
group_id: test_videoapp_active_1
behavoir:
topic: testFreeVideoKeyBehaviorEvent
group_id: test_videoapp_behavior_1
management:
health:
......
......@@ -68,6 +68,39 @@
<appender-ref ref="dyvideo_error" />
</logger>
<!--短剧app-->
<appender name="videoapp_sys" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/videoapp_sys.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/videoapp_sys/videoapp_sys.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="videoapp.sys" level="INFO" additivity="false">
<appender-ref ref="videoapp_sys" />
</logger>
<appender name="videoapp_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/videoapp_error.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/videoapp_error/videoapp_error.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="videoapp.error" level="INFO" additivity="true">
<appender-ref ref="videoapp_error" />
</logger>
<!--小说-->
<appender name="novel_sys" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/novel_sys.log</File>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment