Commit 306ad3ba authored by 宋新宇's avatar 宋新宇

Merge branch '3-抖音短剧及短剧app归因接入' into 'release_20240327_01'

Resolve "抖音短剧及短剧app归因接入"

See merge request !8
parents 17e05343 95e76b7b
......@@ -102,6 +102,11 @@
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
<build>
......
......@@ -6,7 +6,8 @@ public enum AttributionStatus {
NORMAL_DEDUCTION_CALLBACK(4,"正常扣量"),
OLDUSER_DEDUCTION_CALLBACK(7,"老用户扣量"),
IP_CALLBACK(9,"IP归因"),
PAY(10,"付费回传");
PAY(10,"付费回传"),
BEHAVIOR(8,"关键行为");
public int id;
......
......@@ -13,6 +13,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
......@@ -23,6 +24,12 @@ public class UniversalProcess {
@Resource
public JdbcTemplate lwbyJdbcTemplate;
@Resource
public JdbcTemplate videoJdbcTemplate;
@Resource
public JdbcTemplate videoInlandJdbcTemplate;
@Resource(name = "novelKafka")
public KafkaTemplate<String, String> novelKafkaTemplate;
......@@ -47,6 +54,13 @@ public class UniversalProcess {
redisTemplate.opsForValue().set(key,value,expires, TimeUnit.SECONDS);
}
public void hset(String key, String field, int expire, String value) {
redisTemplate.opsForHash().put(key,field,value);
redisTemplate.expire(key,expire,TimeUnit.SECONDS);
}
public <T> T get(Class<T> clazz, String key) {
String value = redisTemplate.opsForValue().get(key);
if(!Objects.isNull(value)){
......
......@@ -8,7 +8,7 @@ import com.lwby.marketing.att.AttributionStatus;
*/
public enum CallBackType {
active(0,"ocpc_result", AttributionStatus.ACTIVE_CALLBACK),pay(2,"ocpc_pay",AttributionStatus.PAY);
active(0,"ocpc_result", AttributionStatus.ACTIVE_CALLBACK),pay(2,"ocpc_pay",AttributionStatus.PAY),behavior(3,"ocpc_behavior",AttributionStatus.BEHAVIOR);
private Integer type;
......
......@@ -46,7 +46,7 @@ public class ParameterSetupStoryNovelFlow extends NodeFlow<StoryNovelAction> {
action.setUserId(action.getUserId());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
if (CallBackType.active.getType().equals(0)) {
if (CallBackType.active.getType().equals(action.getType())) {
action.setChannelId((long)action.getClientInfo().getChannel());
}
......
......@@ -85,7 +85,7 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> {
up.incrby(channelTotal, 1);
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
//回传,回传个数 + 1
up.incrby(channelCallback, 1);
DYSTORY_SYS_LOG.info(
......@@ -97,6 +97,7 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> {
DYSTORY_SYS_LOG.info(
"PaySpduFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, sprDedu = {}, goodId = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
platformId, channelId, sprDedu, goodId, channelTotalCount, channelCallbackCount, 0);
action.stop(true);
}
}
......
package com.lwby.marketing.att.dyvideo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* @author songxinyu
* @version DyVideoActiveConsumer.java, v 0.1 2024年03月21日 17:40 songxinyu Exp $
*/
@Slf4j
@Component
public class DyVideoActiveConsumer {
@Resource
DyVideoFlowExecutor dyVideoFlowExecutor;
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
@KafkaListener(topics = {"${system.consumer.dyvideo.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryDyVideoActive")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
if (DYVIDEO_SYS_LOG.isInfoEnabled()) {
DYVIDEO_SYS_LOG.info("media.active.dyVideo.onMessage start ,size = {}", datas.size());
}
for (ConsumerRecord<String, String> data : datas) {
try {
if (data == null) {
log.warn("media.active.dyVideo.onMessage listen 消费数据为null");
return;
}
DYVIDEO_SYS_LOG.info("media.active.dyVideo.onMessage start, data={}", data == null ? null : data.value());
StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class);
DYVIDEO_SYS_LOG.info("media.active.dyVideo.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
StoryNovelAction action = new StoryNovelAction(event.getClientInfo(), event.getUserId(), event.getWechatOpenId(),
event.getUserUploadEvent());
dyVideoFlowExecutor.getExecutorByStory().execute(action);
} catch (Throwable e) {
DYVIDEO_ERROR_LOG.error("dy.video.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
package com.lwby.marketing.att.dyvideo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
/**
* @author songxinyu
* @version DyVideoActiveConsumer.java, v 0.1 2024年03月21日 17:40 songxinyu Exp $
*/
@Slf4j
@Component
public class DyVideoBehavoirConsumer {
@Resource
DyVideoFlowExecutor dyVideoFlowExecutor;
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
@Resource
ApplicationContext ctx;
FlowExecutor<StoryNovelAction> executorByDyVideoBehavoir;
@PostConstruct
public void init(){
executorByDyVideoBehavoir = new FlowExecutor<>(ctx, Rule.create().THEN("dyvideo_setup").THEN("dyvideo_store").THEN("dyvideo_checkerfirst").SWITCH("dyvideo_behavior"));
}
@KafkaListener(topics = {"${system.consumer.dyvideo.behavoir.topic}"} ,containerFactory = "kafkaListenerContainerFactoryDyVideoBehavoir")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
if (DYVIDEO_SYS_LOG.isInfoEnabled()) {
DYVIDEO_SYS_LOG.info("media.behavoir.dyVideo.onMessage start ,size = {}", datas.size());
}
for (ConsumerRecord<String, String> data : datas) {
try {
if (data == null) {
log.warn("media.behavoir.dyVideo.onMessage listen 消费数据为null");
return;
}
DYVIDEO_SYS_LOG.info("media.behavoir.dyVideo.onMessage start, data={}", data == null ? null : data.value());
StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class);
DYVIDEO_SYS_LOG.info("media.behavoir.dyVideo.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
StoryNovelAction action = new StoryNovelAction(event.getClientInfo(), event.getUserId(), event.getWechatOpenId(),
event.getUserUploadEvent());
executorByDyVideoBehavoir.execute(action);
} catch (Throwable e) {
DYVIDEO_ERROR_LOG.error("dy.video.behavoir.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
package com.lwby.marketing.att.dyvideo;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.StoryNovelAction;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Component
public class DyVideoFlowExecutor {
@Resource
ApplicationContext ctx;
FlowExecutor<StoryNovelAction> executorByStory;
@PostConstruct
public void init(){
executorByStory = new FlowExecutor<>(ctx, Rule.create().THEN("dyvideo_setup").THEN("dyvideo_store").THEN("dyvideo_checkerfirst").THEN("dyvideo_uploadcall"));
}
public FlowExecutor<StoryNovelAction> getExecutorByStory(){
return this.executorByStory;
}
}
package com.lwby.marketing.att.dyvideo;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alicp.jetcache.anno.CacheRefresh;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.UniversalProcess;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.po.ThirdAccountDy;
import com.lwby.marketing.vo.AppChannelVO;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class DyVideoUniversalProcess extends UniversalProcess {
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
@Resource
private RedisTemplate<String,String> oldMarketRedisTemplate;
/**
* 通知处理结果
*/
public void notifyResult(StoryNovelAction action,String topic, AttributionStatus status) {
DeliveryDeviceInfo ddi = action.getDeliveryDeviceInfo();
if (Objects.isNull(ddi)) {
ddi = new DeliveryDeviceInfo();
String channelStr = String.valueOf(action.getChannelId());
ddi.setMedia(MediaMapping.getMediaNameByChannelId(channelStr));
ddi.setAd_plan_id("0");
ddi.setAd_group_id("0");
ddi.setAd_creative_id("0");
ddi.setPlatform_id(String.valueOf(action.getPlatformId()));
ddi.setDj_channel(channelStr);
}
ddi.setIs_call(status.id);
ddi.setUserId(action.getUserId());
ddi.setDevice_id(String.valueOf(action.getUserId()));
ddi.setActive_time(System.currentTimeMillis());
ddi.setChannel(String.valueOf(action.getChannelId()));
ddi.setUserId(action.getUserId());
String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send(topic, jsonString);
active_result.addCallback(
result -> DYVIDEO_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> DYVIDEO_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
);
}
@Override
public <T> T get(Class<T> clazz, String key) {
String value = oldMarketRedisTemplate.opsForValue().get(key);
if(!Objects.isNull(value)){
return JSON.parseObject(value,clazz);
}
return null;
}
public void setToken(String key, int expires, String value) {
oldMarketRedisTemplate.opsForValue().set(key,value,expires, TimeUnit.SECONDS);
}
public void delToken(String key) {
oldMarketRedisTemplate.delete(key);
}
public String getTotalCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId ,int sprDedu, String dateStr) {
return String.format("%s_total_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
public String getCallbackCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId, int sprDedu, String dateStr) {
return String.format("%s_callback_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
public String getFirstCheckerKey(StoryNovelAction action) {
return Objects.equals(action.getType(), CallBackType.active.getType())
? String.format("fc_%d_%d_%s_%s", action.getUserId(), action.getPlatformId(),action.getMediaName(),action.getCurrentDateStr())
: String.format("fc_%d_%d_%s", action.getUserId(), action.getPlatformId(),action.getMediaName());
}
public int getExpire(StoryNovelAction action) {
return Objects.equals(action.getType(), CallBackType.active.getType())
? 60 * 60 * 24
: 60 * 60 * 24 * 7;
}
public ThirdAccountDy getDyAccessToken(String client_id, String client_secret,String dyTokenUrl) {
Map<String,String> upMap = new HashMap<>();
upMap.put("client_key",client_id);
upMap.put("client_secret",client_secret);
upMap.put("grant_type","client_credential");
String mapAction = JSONObject.toJSONString(upMap);
ThirdAccountDy thirdAccountDy = null;
try {
String result = HttpUtil.post(dyTokenUrl, mapAction);
Map data = (Map) JSON.parseObject(result).get("data");
Integer resultCode = (Integer) data.get("error_code");
String accessToken = "";
if (resultCode == 0) {
//成功
accessToken = (String) data.get("access_token");
Integer expiresIn = (Integer) data.get("expires_in");
thirdAccountDy = new ThirdAccountDy();
thirdAccountDy.setAccessToken(accessToken);
thirdAccountDy.setExpireIn(expiresIn);
thirdAccountDy.setClientId(client_id);
String tokenDy = "token_dy_" + client_id;
//往老的market缓存写,防止token一直失效
setToken(tokenDy, Integer.parseInt(String.valueOf(expiresIn)),JSON.toJSONString(thirdAccountDy));
} else {
log.warn("dy_access_token_error,code={},resultdy={}", resultCode, JSONObject.toJSONString(result));
}
} catch (Throwable e) {
e.printStackTrace();
}
return thirdAccountDy;
}
/******************************************** JDBC *************************************************************/
@Cached(name="appchannel_dy_video", cacheType = CacheType.LOCAL)
@CacheRefresh(refresh = 300)
public AppChannelVO getAppChannelByPlatformAndChannel(int platformId,Long channelId) {
try {
RowMapper<AppChannelVO> rowMapper = BeanPropertyRowMapper.newInstance(AppChannelVO.class);
return videoJdbcTemplate.queryForObject(String.format("select id,ecpm_avg_count,motivation_count,ecpm_per_count,spr_dedu "
+ "from app_channel where channel_id=%d and platform_id=%d",channelId,platformId),rowMapper);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
}
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.StoryNovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component("dyvideo_checkerfirst")
public class CheckerDyVideoFirstFlow extends NodeFlow<StoryNovelAction> {
@Resource
DyVideoUniversalProcess up;
@Override
public void process(StoryNovelAction action) {
if(up.exists(up.getFirstCheckerKey(action))){
action.stop(true);
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.dyvideo.handle;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.notify.Media;
import com.lwby.marketing.notify.media.Platform;
import com.lwby.marketing.po.VideoUpload;
import com.lwby.marketing.util.DateTimUtils;
import com.lwby.marketing.util.NumberUtils;
import com.lwby.marketing.vo.ClientInfo;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.UUID;
@Slf4j
@Component("dyvideo_setup")
public class ParameterSetupDyVideoFlow extends NodeFlow<StoryNovelAction> {
@Resource
DyVideoUniversalProcess up;
@Override
public void process(StoryNovelAction action) {
ClientInfo clientInfo = action.getClientInfo();
String openId = action.getOpenId();
//平台ID
Integer platformId = clientInfo.getPlatformId();
//VO对像
DeliveryDeviceInfo deliveryDeviceInfo = null;
String s = "{\"activeTime\":1711439536879,\"adid\":\"7347520002541764646\",\"channel\":\"214122006\",\"clickId\":\"B.1SLXpxqLP8EXNpAqlfUjsQvppp7bkeHT9LpYmfnxMwxH61cZtBGopyXg3glnl8880WYw3XUYfeh21oLLnT9ovSPFtvFp5faNJQbetTFwBfZiesPw0IpoWF2GtLeJW66Bf8vMnUilTWN7sUlexUfVKQiAefYso0MEqhkcUJhpqzz21B\",\"clickTime\":1711394346686,\"code\":\"FN1za8huKsDQ3TyZYhtyzFgjFTDt-ddTEAzYFwrEpYGMyF6NBImpXtzv9ZTZVnoaI6nSyHU4hQ0kjnwNFsXwLV02x0cpIiod-3L9G35DyTOh_K5LQd6Nvs2FQ20\",\"creativeId\":\"1793848259919929\",\"creativeType\":\"15\",\"media\":\"jrtt\",\"videoResourceId\":\"100013\"}";
//StoryLogin storyLogin = JSONObject.parseObject(s,StoryLogin.class);
//匹配OpenId
VideoUpload videoUpload = JSONObject.parseObject(s,VideoUpload.class);
// VideoUpload videoUpload = up.get(VideoUpload.class,assembleKey(openId));
action.setVideoUpload(videoUpload);
//StoryNovelAction对像参数填充
action.setPlatformId(platformId);
action.setUserId(action.getUserId());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
if (CallBackType.active.getType().equals(action.getType())) {
action.setChannelId((long)action.getClientInfo().getChannel());
}
//非商店吊起参数设置
if (Objects.nonNull(videoUpload)) {
deliveryDeviceInfo = new DeliveryDeviceInfo();
deliveryDeviceInfo.setAd_creative_id(videoUpload.getCreativeId());
deliveryDeviceInfo.setPlatform_id(String.valueOf(clientInfo.getPlatformId()));
deliveryDeviceInfo.setCallback_param(videoUpload.getClickId());
deliveryDeviceInfo.setUuid(UUID.randomUUID().toString());
deliveryDeviceInfo.setDj_channel(videoUpload.getChannel());
deliveryDeviceInfo.setPromotion_id(videoUpload.getAdid());
deliveryDeviceInfo.setVideoResourceId(videoUpload.getVideoResourceId());
action.setDeliveryDeviceInfo(deliveryDeviceInfo);
action.setChannelId(NumberUtils.parseLong(videoUpload.getChannel()));
action.setPlanId(NumberUtils.parseLong((videoUpload.getAdid())));
action.setMediaName(videoUpload.getMedia());
action.setMedia(Media.getMedia(action.getMediaName(), Platform.DY_VIDEO));
}
}
public String assembleKey(String openId) {
return String.format("video:upload:%s", openId);
}
}
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.StoryNovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
@Component("dyvideo_store")
public class StoreDyVideoAttributionFlow extends NodeFlow<StoryNovelAction> {
@Resource
DyVideoUniversalProcess up;
@Override
public void process(StoryNovelAction action) {
//商店归因通知
if(Objects.isNull(action.getDeliveryDeviceInfo())){
if (action.getType().equals(CallBackType.active.getType())) {
up.notifyResult(action,"ocpc_result_test", AttributionStatus.STORE_CALLBACK);
}
action.stop(true); //结束后面所有执行流程
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.StoryNovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author songxinyu
* @version ActiveCallFlow.java, v 0.1 2024年03月04日 18:28 songxinyu Exp $
*/
@Component("dyvideo_uploadcall")
public class UploadDyVideoCallFlow extends NodeFlow<StoryNovelAction> {
@Resource
DyVideoUniversalProcess up;
@Override
public void process(StoryNovelAction action) {
CallBackType type = CallBackType.getCallBackTypeByType(action.getType());
boolean success = action.getMedia().notify(action);
if (success) {
action.getVideoUpload().setActiveTime(System.currentTimeMillis());
up.notifyResult(action, type.getTopic()+"_test",type.getStatus());
up.set(up.getFirstCheckerKey(action),up.getExpire(action),"1");
}
}
}
package com.lwby.marketing.att.videoapp;
import com.lwby.marketing.vo.ClientInfo;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.function.Function;
import java.util.stream.Collectors;
public enum DeviceVideoType {
IMEI("imei",(c) -> isNotEmptyAndSNull(c.getImei())?c.getImei():null),
OAID("oaid", (c) -> isNotEmptyAndSNull(c.getOaid())?c.getOaid():null),
IDFA("idfa",(c) -> isNotEmptyAndSNull(c.getIdfa())?c.getIdfa():null),
IP_UA("ipua",(c) ->
(isNotEmptyAndSNull(c.getClientIp()) && isNotEmptyAndSNull(c.getUa())) ?
(("0".equals(c.getOs()) ? c.getClientIp().concat(StringUtils.substringBefore(c.getUa(), " Chrome/")) :
( c.getUa().startsWith("bi kan duan ju") ? Arrays.stream(c.getSystemVersion().split("\\.")).collect(Collectors.joining("_")):
c.getClientIp().concat(c.getUa())))):null),
IP_MODEL("ipmodel",(c) -> isNotEmptyAndSNull(c.getClientIp()) && isNotEmptyAndSNull(c.getPhoneModel())?c.getClientIp().concat(c.getPhoneModel()):null);
private String value;
private Function<ClientInfo,String> fun;
DeviceVideoType(String value, Function<ClientInfo,String> fun) {
this.value = value;
this.fun = fun;
}
public String getValue() {
return this.value;
}
public String getDeviceId(ClientInfo clientInfo){
return fun.apply(clientInfo);
}
private static boolean isNotEmptyAndSNull(String str) {
return StringUtils.isNotEmpty(str) && !"null".equals(str);
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp;
import com.alibaba.fastjson.JSON;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
/**
* @author songxinyu
* @version VideoAppActiveConsumer.java, v 0.1 2024年03月25日 17:40 songxinyu Exp $
*/
@Slf4j
@Component
public class VideoAppActiveConsumer {
@Resource
VideoAppFlowExecutor videoAppFlowExecutor;
private static final Logger VIDEO_SYS_LOG = LoggerFactory.getLogger("videoapp.sys");
private static final Logger VIDEO_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@KafkaListener(topics = {"${system.consumer.videoapp.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryVideoappActive")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
if (VIDEO_SYS_LOG.isInfoEnabled()) {
VIDEO_SYS_LOG.info("media.active.onMessage start ,size = {}", datas.size());
}
for (ConsumerRecord<String, String> data : datas) {
try {
if (data == null) {
VIDEO_SYS_LOG.warn("media.active.story.onMessage listen 消费数据为null");
return;
}
VIDEO_SYS_LOG.info("media.active.onMessage start, data={}", data == null ? null : data.value());
StoreUserUploadEventBO event = JSON.parseObject(data.value(), StoreUserUploadEventBO.class);
VIDEO_SYS_LOG.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
NovelAction action = new NovelAction(event.getClientInfo(), data.value(),event.getUserUploadEvent());
videoAppFlowExecutor.getExecutorByStory().execute(action);
} catch (Throwable e) {
VIDEO_ERROR_LOG.info("media.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp;
import com.alibaba.fastjson.JSON;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
/**
* @author songxinyu
* @version VideoAppActiveConsumer.java, v 0.1 2024年03月25日 17:40 songxinyu Exp $
*/
@Slf4j
@Component
public class VideoAppBehavoirConsumer {
@Resource
VideoAppFlowExecutor videoAppFlowExecutor;
private static final Logger VIDEO_SYS_LOG = LoggerFactory.getLogger("videoapp.sys");
private static final Logger VIDEO_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@KafkaListener(topics = {"${system.consumer.videoapp.behavoir.topic}"} ,containerFactory = "kafkaListenerContainerFactoryVideoappBehavoir")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
if (VIDEO_SYS_LOG.isInfoEnabled()) {
VIDEO_SYS_LOG.info("media.active.onMessage start ,size = {}", datas.size());
}
for (ConsumerRecord<String, String> data : datas) {
try {
if (data == null) {
VIDEO_SYS_LOG.warn("media.active.story.onMessage listen 消费数据为null");
return;
}
VIDEO_SYS_LOG.info("media.active.onMessage start, data={}", data == null ? null : data.value());
StoreUserUploadEventBO event = JSON.parseObject(data.value(), StoreUserUploadEventBO.class);
VIDEO_SYS_LOG.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
NovelAction action = new NovelAction(event.getClientInfo(), data.value(),event.getUserUploadEvent());
videoAppFlowExecutor.getExecutorByStory().execute(action);
} catch (Throwable e) {
VIDEO_ERROR_LOG.info("media.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.NovelAction;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Component
public class VideoAppFlowExecutor {
@Resource
ApplicationContext ctx;
FlowExecutor<NovelAction> executorByStory;
@PostConstruct
public void init(){
executorByStory = new FlowExecutor<>(ctx, Rule.create().THEN("videoapp_setup").THEN("videoapp_store").THEN("videoapp_checkerfirst").SWITCH("videoapp_behavior").THEN("videoapp_uploadcall"));
}
public FlowExecutor<NovelAction> getExecutorByStory(){
return this.executorByStory;
}
}
package com.lwby.marketing.att.videoapp;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alicp.jetcache.anno.CacheRefresh;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.UniversalProcess;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.vo.*;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Component
public class VideoAppUniversalProcess extends UniversalProcess {
private static final Logger VIDEOAPP_SYS_LOG = LoggerFactory.getLogger("videoapp.sys");
private static final Logger VIDEOAPP_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@Resource
private RedisTemplate<String,String> oldMarketRedisTemplate;
/**
* 通知处理结果
*/
public void notifyResult(NovelAction action, String topic, AttributionStatus status) {
DeliveryDeviceInfo ddi = action.getDeliveryDeviceInfo();
if (Objects.isNull(ddi)) {
ddi = new DeliveryDeviceInfo();
String channelStr = String.valueOf(action.getChannelId());
ddi.setMedia(MediaMapping.getMediaNameByChannelId(channelStr));
ddi.setAd_plan_id("0");
ddi.setAd_group_id("0");
ddi.setAd_creative_id("0");
ddi.setPlatform_id(String.valueOf(action.getPlatformId()));
ddi.setDj_channel(channelStr);
}
ddi.setOs(action.getClientInfo().getOs());
ddi.setIs_call(status.id);
ddi.setDevice_id(action.getDeviceId());
ddi.setActive_time(System.currentTimeMillis());
ddi.setChannel(String.valueOf(action.getChannelId()));
ddi.setUserId(action.getUserId());
String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send(topic, jsonString);
active_result.addCallback(
result -> VIDEOAPP_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> VIDEOAPP_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
);
}
public <T> T getOldMarket(Class<T> clazz, String key) {
String value = oldMarketRedisTemplate.opsForValue().get(key);
if(!Objects.isNull(value)){
return JSON.parseObject(value,clazz);
}
return null;
}
public <T> Map<String,T> hgetAllOldMarket(Class<T> clazz,String key) {
Map<Object, Object> json = oldMarketRedisTemplate.opsForHash().entries(key);
Map<String, T> map = new HashMap<String, T>();
for (Map.Entry<Object, Object> item : json.entrySet()) {
T t = JSON.parseObject(item.getValue().toString(), clazz);
map.put(item.getKey().toString(), t);
}
return map;
}
public String getTotalCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId ,int sprDedu, String dateStr) {
return String.format("%s_total_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
public String getCallbackCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId, int sprDedu, String dateStr) {
return String.format("%s_callback_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
public String getFirstCheckerKey(NovelAction action) {
return Objects.equals(action.getType(), CallBackType.active.getType())
? String.format("fc_%d_%d_%s_%s", action.getDeviceId(), action.getPlatformId(),action.getMediaName(),action.getCurrentDateStr())
: String.format("fc_%d_%d_%s", action.getDeviceId(), action.getPlatformId(),action.getMediaName());
}
public int getExpire(NovelAction action) {
return Objects.equals(action.getType(), CallBackType.active.getType())
? 60 * 60 * 24
: 60 * 60 * 24 * 7;
}
public String buildKey(Integer platformId,String deviceId) {
return String.format("market:activeness:30:%d:%s",platformId,deviceId);
}
/******************************************** JDBC *************************************************************/
@Cached(name="appchannel_video_app", cacheType = CacheType.LOCAL)
@CacheRefresh(refresh = 300)
public AppChannelVO getAppChannelByPlatformAndChannel(int platformId,Long channelId) {
try {
RowMapper<AppChannelVO> rowMapper = BeanPropertyRowMapper.newInstance(AppChannelVO.class);
return videoInlandJdbcTemplate.queryForObject(String.format("select id,video_count,mv_count,cpm_count,ecpm_avg_count,motivation_count,arpu_count,ecpm_per_count"
+ " from app_channel where channel_id=%d and platform_id=%d",channelId,platformId),rowMapper);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
}
package com.lwby.marketing.att.videoapp.handle;
import com.lwby.marketing.att.videoapp.VideoAppUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.NovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component("videoapp_checkerfirst")
public class CheckerVideoAppFirstFlow extends NodeFlow<NovelAction> {
@Resource
VideoAppUniversalProcess up;
@Override
public void process(NovelAction action) {
if(up.exists(up.getFirstCheckerKey(action))){
action.stop(true);
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp.handle;
import cn.hutool.crypto.SecureUtil;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.videoapp.DeviceVideoType;
import com.lwby.marketing.att.videoapp.VideoAppUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.notify.Media;
import com.lwby.marketing.notify.media.Platform;
import com.lwby.marketing.util.DateTimUtils;
import com.lwby.marketing.util.NumberUtils;
import com.lwby.marketing.vo.ClientInfo;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.UserProfile;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
@Slf4j
@Component("videoapp_setup")
public class ParameterSetupVideoAppFlow extends NodeFlow<NovelAction> {
@Resource
VideoAppUniversalProcess up;
private static final Logger VIDEOAPP_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@Override
public void process(NovelAction action) {
ClientInfo clientInfo = action.getClientInfo();
//获取用户注册时间
UserProfile userProfile = clientInfo.getUser();
if (userProfile == null || userProfile.getCreateDate() == null) {
VIDEOAPP_ERROR_LOG.error("ClientInfo.userProfile 对像为[{}],ClientInfo对像[{}]",userProfile, clientInfo);
action.stop(true);
return;
}
//平台ID
Integer platformId = clientInfo.getPlatformId();
//VO对像
DeliveryDeviceInfo deliveryDeviceInfo = null;
//设备ID
String deviceIdKey = null;
//匹配设备ID
for(DeviceVideoType type: DeviceVideoType.values()){
deviceIdKey = type.getDeviceId(clientInfo);
if(null != deviceIdKey && null != (deliveryDeviceInfo = up.getOldMarket(DeliveryDeviceInfo.class,assembleKey(deviceIdKey,platformId)))){
clientInfo.setOaid(deliveryDeviceInfo.getOaid()); //回补OAID
action.setDeliveryDeviceInfo(deliveryDeviceInfo);
action.setDeviceVideoType(type);
break;
}
}
action.setPlatformId(platformId);
action.setUserId(clientInfo.getUser().getId());
action.setDeviceId(clientInfo.getDID());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
if (CallBackType.active.getType().equals(action.getType())) {
action.setChannelId((long)action.getClientInfo().getChannel());
}
//非商店吊起参数设置
if (Objects.nonNull(deliveryDeviceInfo)) {
action.setChannelId(NumberUtils.parseLong(deliveryDeviceInfo.getDj_channel()));
action.setPlanId(NumberUtils.parseLong((deliveryDeviceInfo.getAd_plan_id())));
action.setAdvertiserId(deliveryDeviceInfo.getAdvertiser_id());
action.setMediaName(deliveryDeviceInfo.getMedia());
action.setMedia(Media.getMedia(action.getMediaName(), Platform.VIDEO));
}
}
public String assembleKey(String deviceId, int platformId) {
return String.format("getClickByIdfaAndPlatformId_%s_%d", SecureUtil.md5(deviceId), platformId);
}
}
package com.lwby.marketing.att.videoapp.handle;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.videoapp.VideoAppUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.NovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
@Component("videoapp_store")
public class StoreVideoAppAttributionFlow extends NodeFlow<NovelAction> {
@Resource
VideoAppUniversalProcess up;
@Override
public void process(NovelAction action) {
//商店归因通知
if(Objects.isNull(action.getDeliveryDeviceInfo())){
if (action.getType().equals(CallBackType.active.getType())) {
//商店判断30天活跃,不在活跃天数内,发BI
String aliveDeviceKey = up.buildKey(action.getPlatformId(), action.getDeviceId());
if (!up.existsIsAlive(aliveDeviceKey)) {
up.notifyResult(action,"ocpc_result_test", AttributionStatus.STORE_CALLBACK);
}
}
action.stop(true); //结束后面所有执行流程
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.videoapp.handle;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.videoapp.VideoAppUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.NovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author songxinyu
* @version ActiveCallFlow.java, v 0.1 2024年03月04日 18:28 songxinyu Exp $
*/
@Component("videoapp_uploadcall")
public class UploadVideoAppCallFlow extends NodeFlow<NovelAction> {
@Resource
VideoAppUniversalProcess up;
@Override
public void process(NovelAction action) {
CallBackType type = CallBackType.getCallBackTypeByType(action.getType());
boolean success = action.getMedia().notify(action);
if (success) {
up.notifyResult(action, type.getTopic()+"_test",type.getStatus());
up.set(up.getFirstCheckerKey(action), up.getExpire(action), "1");
}
}
}
......@@ -22,4 +22,16 @@ public class DataSourceConfig {
DataSource marketingDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "videoDataSource")
@ConfigurationProperties("spring.datasource.video")
DataSource videoDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "videoInlandDataSource")
@ConfigurationProperties("spring.datasource.video-inland")
DataSource videoInlandDataSource() {
return DataSourceBuilder.create().build();
}
}
......@@ -19,4 +19,14 @@ public class JdbcTemplateConfig {
return new JdbcTemplate(dataSource);
}
@Bean(name = "videoJdbcTemplate")
JdbcTemplate videoJdbc(@Qualifier("videoDataSource") DataSource dataSource){
return new JdbcTemplate(dataSource);
}
@Bean(name = "videoInlandJdbcTemplate")
JdbcTemplate videoInlandJdbc(@Qualifier("videoInlandDataSource") DataSource dataSource){
return new JdbcTemplate(dataSource);
}
}
package com.lwby.marketing.conf;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class OldMarketRedisConfig {
@Value("${old.market.redis.host}")
private String hostName;
@Value("${old.market.redis.port}")
private int hostPort;
@Bean("oldMarketJedisFactory")
public JedisConnectionFactory jedisConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName(hostName);
config.setPort(hostPort);
return new JedisConnectionFactory(config);
}
@Bean("oldMarketRedisTemplate")
public RedisTemplate<Object, Object> redisTemplate(@Qualifier("oldMarketJedisFactory") RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(stringRedisSerializer);
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
\ No newline at end of file
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.dyvideo.DyVideoActiveConsumer;
import com.lwby.marketing.att.novel.NovelActiveConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class DyVideoActiveKafkaConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.dyvideo.active.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryDyVideoActive")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "dyvideo_active")
public DyVideoActiveConsumer listener() {
return new DyVideoActiveConsumer();
}
}
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.dyvideo.DyVideoBehavoirConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class DyVideoBehavoirKafkaConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.dyvideo.behavoir.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryDyVideoBehavoir")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "dyvideo_behavoir")
public DyVideoBehavoirConsumer listener() {
return new DyVideoBehavoirConsumer();
}
}
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.bystory.DyStoryActiveConsumer;
import com.lwby.marketing.att.novel.NovelActiveConsumer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.novel.NovelActiveConsumer;
import com.lwby.marketing.att.novel.NovelActiveStoreConsumer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.bystory.DyStoryActiveConsumer;
import com.lwby.marketing.att.bystory.DyStoryPayConsumer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.bystory.DyStoryPayConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
......
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.videoapp.VideoAppActiveConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class VideoAppActiveKafkaConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.videoapp.active.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryVideoappActive")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "videoapp_active")
public VideoAppActiveConsumer listener() {
return new VideoAppActiveConsumer();
}
}
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.videoapp.VideoAppActiveConsumer;
import com.lwby.marketing.att.videoapp.VideoAppBehavoirConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author songxinyu
* @version KafkaConsumerConfig.java, v 0.1 2024年03月18日 17:01 songxinyu Exp $
*/
@Configuration
@EnableKafka
public class VideoAppBehavoirKafkaConsumerConfig {
@Value("${spring.kafka2.bootstrap-servers}")
private String servers;
@Value("${spring.kafka2.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka2.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${system.consumer.videoapp.behavoir.group_id}")
private String groupId;
@Value("${spring.kafka2.consumer.max-poll-records}")
private int maxPollRecordsConfig;
@Bean(name = "kafkaListenerContainerFactoryVideoappBehavoir")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean(name = "videoapp_behavoir")
public VideoAppBehavoirConsumer listener() {
return new VideoAppBehavoirConsumer();
}
}
......@@ -46,6 +46,9 @@ public class AttrController {
@Resource(name = "storyKafka")
private KafkaTemplate<String, String> storyKafkaTemplate;
@Resource(name = "novelKafka")
public KafkaTemplate<String, String> novelKafkaTemplate;
@Resource
DyStoryFlowExecutor dyStoryFlowExecutor;
......@@ -218,4 +221,59 @@ public class AttrController {
return ls.stream()
.collect(Collectors.toMap(CrossCallback::getMediaName, CrossPlatformAccount::new));
}
@RequestMapping("/sendDyVideo")
public void testSendDyVideo() {
String msg = "{\"clientInfo\":{\"channel\":214122006,\"clientIp\":\"125.45.110.52\",\"pkv\":1,\"platformGroupId\":412,"
+ "\"platformId\":412,\"sessionid\":\"undefined\",\"ua\":\"Mozilla/5.0 (Linux; Android 12; PEHM00 Build/SKQ1.210216.001; "
+ "wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/117.0.0.0 Mobile Safari/537.36 aweme.lite/29.2.0 "
+ "ToutiaoMicroApp/3.19.0 PluginVersion/29209076\",\"version\":\"2.0.0\",\"xClient\":\"version=2.0.0;platformId=412;"
+ "equipment=android\"},\"createTime\":1711394346711,\"extraData\":{},\"id\":\"ec792ce2-f448-4daa-b5b9-582e1f1e1bf7\","
+ "\"userId\":913286,\"userUploadEvent\":3,\"wechatAppId\":\"ttd3dda5604ce230b401\","
+ "\"wechatOpenId\":\"_000GI00qFjk0oFW9iYwBmDTC-K257VHXUVL\"}";
StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class);
novelKafkaTemplate.send("testDouyinRegisterTopic",JSON.toJSONString(event)).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
log.info("dyvideo注册发送消息成功: topic={},partition={},offset={}", topic, partition, offset);
}, failure -> {
log.error("dyvideo注册发送消息失败:fail={}", JSON.toJSONString(failure));
});
}
@RequestMapping("/sendVideoApp")
public void testSendVideoapp() {
String msg = "{\"clientInfo\":{\"channel\":682000,\"clientIp\":\"183.226.74.112\","
+ "\"dID\":\"2705A240-1875-4C38-9529-0343B56B6C49\",\"fixVersion\":4,\"idfa\":\"00000000-0000-0000-0000-000000000000\","
+ "\"language\":\"zh\",\"mainVersion\":1,\"oaid\":\"2705A240-1875-4C38-9529-0343B56B6C49\",\"os\":\"1\","
+ "\"phoneModel\":\"iPhone14,2\",\"pkv\":0,\"platformGroupId\":68,\"platformId\":68,\"pm\":\"iPhone14,2\","
+ "\"screenSize\":\"390*844\",\"subVersion\":0,\"systemVersion\":\"17.3.1\",\"ua\":\"Mozilla/5.0 (iPhone; CPU iPhone OS "
+ "17_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148\","
+ "\"user\":{\"avatarUrl\":\"https://lwby-video-res.oss-accelerate.aliyuncs.com/avatarUrl/1673322782333.png\","
+ "\"createDate\":1711440818292,\"deviceId\":\"2705A240-1875-4C38-9529-0343B56B6C49\",\"goldCoinNum\":0,\"id\":553262,"
+ "\"mainversion\":1,\"nickname\":\"游客545250\",\"platform\":68,\"subversion\":0,\"userStatus\":1},\"version\":\"68.1.0.4"
+ ".682000\",\"xClient\":\"sv=17.3.1;pm=iPhone14,2;ss=390*844;version=68.1.0.4.682000;pkv=0;platformId=68;language=zh;"
+ "dID=2705A240-1875-4C38-9529-0343B56B6C49;idfa=00000000-0000-0000-0000-000000000000;os=1\"},"
+ "\"createTime\":1711440818298,\"extraData\":{},\"id\":\"61e3c546-dee4-4cfb-9430-a50cdc63b732\",\"userUploadEvent\":0}";
StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class);
novelKafkaTemplate.send("testFreeVideoEvent",JSON.toJSONString(event)).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
log.info("dyvideo注册发送消息成功: topic={},partition={},offset={}", topic, partition, offset);
}, failure -> {
log.error("dyvideo注册发送消息失败:fail={}", JSON.toJSONString(failure));
});
}
}
......@@ -7,7 +7,7 @@ public abstract class DYNotify extends BaseNotiry {
public boolean delivery(Action action) {
//TODO;抖音短剧加入后,需要重构StoryNovelAction
StoryNovelAction storyNovelAction = (StoryNovelAction)action;
return storyNovelAction.getType() == 0?active(storyNovelAction):pay(storyNovelAction);
return storyNovelAction.getType() == 0?active(storyNovelAction): (storyNovelAction.getType() == 2 ? pay(storyNovelAction):behavior(storyNovelAction));
}
public abstract boolean active(StoryNovelAction na);
......@@ -15,4 +15,6 @@ public abstract class DYNotify extends BaseNotiry {
public abstract boolean pay(StoryNovelAction na);
public abstract boolean behavior(StoryNovelAction na);
}
......@@ -4,8 +4,10 @@ package com.lwby.marketing.notify;
import com.lwby.marketing.flow.Action;
import com.lwby.marketing.notify.media.Platform;
import com.lwby.marketing.notify.media.gdt.NovelGDTNotify;
import com.lwby.marketing.notify.media.jrtt.DyVideoJRTTNotify;
import com.lwby.marketing.notify.media.jrtt.NovelJRTTNotify;
import com.lwby.marketing.notify.media.jrtt.StoryJRTTNotify;
import com.lwby.marketing.notify.media.jrtt.VideoAppJRTTNotify;
import java.util.Arrays;
import java.util.Map;
......@@ -15,7 +17,9 @@ import java.util.stream.Collectors;
public enum Media {
NOVEL_GDT("gdt", Platform.NOVEL, "广点通",new NovelGDTNotify()),
NOVEL_JRTT("jrtt", Platform.NOVEL, "今日头条",new NovelJRTTNotify()),
DY_STORY_JRTT("jrtt", Platform.DY_STORY, "今日头条",new StoryJRTTNotify());
DY_STORY_JRTT("jrtt", Platform.DY_STORY, "今日头条",new StoryJRTTNotify()),
DY_VIDEO_JRTT("jrtt", Platform.DY_VIDEO, "今日头条",new DyVideoJRTTNotify()),
VIDEOAPP_JRTT("jrtt_freevideo", Platform.VIDEO, "今日头条",new VideoAppJRTTNotify());
final String name;
final Platform platform;
......
package com.lwby.marketing.notify.media.jrtt;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.notify.DYNotify;
import com.lwby.marketing.notify.media.jrtt.dto.JrttAttributeRequest;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class DyVideoJRTTNotify extends DYNotify {
public static final String ATTRIBUTE_URL = "https://analytics.oceanengine.com/api/v2/conversion";
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
@Override
public boolean active(StoryNovelAction na) {
return notify(na,"active");
}
@Override
public boolean pay(StoryNovelAction na) {
return false;
}
@Override
public boolean behavior(StoryNovelAction na) {
return notify(na,"game_addiction");
}
public boolean notify(StoryNovelAction na,String eventType) {
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
JrttAttributeRequest.Ad ad = new JrttAttributeRequest.Ad();
ad.setCallback(ddi.getCallback_param());
JrttAttributeRequest.Context context = new JrttAttributeRequest.Context();
context.setAd(ad);
JrttAttributeRequest request = JrttAttributeRequest.builder().context(context).event_type(eventType).timestamp(
System.currentTimeMillis()).build();
String userJson = JSONObject.toJSONString(request);
try {
//TODO;测试不执行
//String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
//Integer resultCode = (Integer) JSON.parseObject(result).get("code");
//return resultCode == 0;
DYVIDEO_SYS_LOG.info("DyVideoJRTTNotify.video.{}.upload,userId={},channel={},platform={}",eventType,na.getUserId(),na.getChannelId(),na.getPlatformId());
return true;
} catch (Exception e) {
return false;
}
}
}
......@@ -27,6 +27,11 @@ public class StoryJRTTNotify extends DYNotify {
return notify(na,"active_pay");
}
@Override
public boolean behavior(StoryNovelAction na) {
return false;
}
public boolean notify(StoryNovelAction na,String eventType) {
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
......
package com.lwby.marketing.notify.media.jrtt;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.notify.MobileNotify;
import com.lwby.marketing.notify.media.jrtt.dto.JrttAttributeRequest;
import com.lwby.marketing.vo.ClientInfo;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.NovelAction;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class VideoAppJRTTNotify extends MobileNotify {
public static final String ATTRIBUTE_URL = "https://analytics.oceanengine.com/api/v2/conversion";
@Override
public boolean android(NovelAction na) {
return na.getType() == 0 ? android(na,"active") :android(na,"game_addiction");
}
@Override
public boolean ios(NovelAction na) {
return na.getType() == 0 ? android(na,"active") :android(na,"game_addiction");
}
public boolean android(NovelAction na,String eventType){
ClientInfo ci = na.getClientInfo();
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
JrttAttributeRequest.Device device = new JrttAttributeRequest.Device();
device.setPlatform("android");
device.setImei(ci.getImei());
device.setOaid(ci.getOaid());
JrttAttributeRequest.Ad ad = new JrttAttributeRequest.Ad();
ad.setCallback(ddi.getCallback_param());
JrttAttributeRequest.Context context = new JrttAttributeRequest.Context();
context.setAd(ad);
context.setDevice(device);
JrttAttributeRequest request = JrttAttributeRequest.builder().context(context).event_type(eventType).timestamp(
System.currentTimeMillis()).build();
String userJson = JSONObject.toJSONString(request);
try {
//TODO;测试不执行
// String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
// Integer resultCode = (Integer) JSON.parseObject(result).get("code");
// return resultCode == 0;
log.info("VideoAppJRTTNotify android Callback info:{}",userJson);
return true;
} catch (Exception e) {
return false;
}
}
public boolean ios(NovelAction na,String eventType) {
ClientInfo ci = na.getClientInfo();
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
JrttAttributeRequest.Device device = new JrttAttributeRequest.Device();
device.setPlatform("ios");
device.setIdfa(ci.getIdfa());
JrttAttributeRequest.Ad ad = new JrttAttributeRequest.Ad();
ad.setCallback(ddi.getCallback_param());
JrttAttributeRequest.Context context = new JrttAttributeRequest.Context();
context.setAd(ad);
context.setDevice(device);
JrttAttributeRequest request = JrttAttributeRequest.builder().context(context).event_type(eventType).timestamp(
System.currentTimeMillis()).build();
String userJson = JSONObject.toJSONString(request);
try {
//TODO;测试不执行
// String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
// Integer resultCode = (Integer) JSON.parseObject(result).get("code");
// return resultCode == 0;
log.info("VideoAppJRTTNotify ios Callback info:{}",userJson);
return true;
} catch (Exception e) {
return false;
}
}
}
package com.lwby.marketing.po;
import lombok.Data;
import java.io.Serializable;
/**
* @author songxinyu
* @version ThirdAccountDy.java, v 0.1 2024年01月30日 15:30 songxinyu Exp $
*/
@Data
public class ThirdAccountDy implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键ID
*/
private Integer id;
private String clientId;
private Integer platformId;
private String accessToken;
private Integer expireIn;
}
package com.lwby.marketing.po;
import lombok.Data;
/**
* @author songxinyu
* @version VideoUpload.java, v 0.1 2024年01月29日 18:38 songxinyu Exp $
*/
@Data
public class VideoUpload {
private String adid;
private String creativeId;
private String creativeType;
private String clickId;
private String channel;
private String videoResourceId;
private String media;
private String code;
private String ecpmAvgCount;
private String motivateCount;
//每次ecpm达标
private String pecpmCount;
//达标次数,与激励视频阈值比较
private String perecpmSize;
private long activeTime;
private long clickTime;
}
package com.lwby.marketing.util;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author songxinyu
* @version CacheKeyUtils.java, v 0.1 2024年03月22日 17:56 songxinyu Exp $
*/
public class CacheKeyUtils {
private static final String CACHE_BEHAVIOR_PREFIX = "up_c_";
static SimpleDateFormat df = new SimpleDateFormat("MMdd");//设置日期格式
public static String getBehavoirKey(Long userId) {
return CACHE_BEHAVIOR_PREFIX + userId + "_" + df.format(new Date()) + "_fv";
}
public static String getVideoBehavoirKey(Long userId) {
return CACHE_BEHAVIOR_PREFIX + userId + "_fv";
}
}
package com.lwby.marketing.util;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.commons.httpclient.methods.StringRequestEntity;
import org.apache.commons.httpclient.params.HttpMethodParams;
import java.io.*;
/**
* @author songxinyu
* @version HttpUtil.java, v 0.1 2024年03月22日 17:58 songxinyu Exp $
*/
public class HttpUtil {
public static String postDy (String url, String toJson,String token) throws IOException {
HttpClient httpClient = new HttpClient();
PostMethod postMethod = new PostMethod(url);
RequestEntity se = new StringRequestEntity(toJson ,"application/json" ,"UTF-8");
postMethod.setRequestEntity(se);
postMethod.setRequestHeader("Content-Type","application/json");
postMethod.setRequestHeader("access-token",token);
//默认的重试策略
postMethod.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler());
postMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, 5000);//设置超时时间
int httpStatus = httpClient.executeMethod(postMethod);
InputStream bodystreams = postMethod.getResponseBodyAsStream();
String body = convertStreamToString(bodystreams);
return body;
}
public static String convertStreamToString(InputStream is) throws UnsupportedEncodingException {
// BufferedReader reader = new BufferedReader(new InputStreamReader(is));//输出的中文乱码
BufferedReader reader = new BufferedReader(new InputStreamReader(is,"utf8")); //GBK
StringBuilder sb = new StringBuilder();
String line = null;
try {
while ((line = reader.readLine()) != null) {
sb.append(line + "\n");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return sb.toString();
}
}
package com.lwby.marketing.vo;
import com.lwby.marketing.po.AppChannel;
import lombok.Data;
/**
* @author songxinyu
* @version AppChannelVO.java, v 0.1 2023年05月24日 14:05 songxinyu Exp $
*/
@Data
public class AppChannelVO extends AppChannel {
private Integer videoCount;
private Long mvCount;
private Long cpmCount;
//平均ecpm达标次数
private Long ecpmAvgCount;
//每次ecpm达标次数
private Long ecpmPerCount;
//激励视频次数
private Long motivationCount;
//arpu
private Double arpuCount;
}
package com.lwby.marketing.vo;
import com.lwby.marketing.att.novel.DeviceType;
import com.lwby.marketing.att.videoapp.DeviceVideoType;
import com.lwby.marketing.flow.Action;
import com.lwby.marketing.notify.Media;
import lombok.Data;
......@@ -24,9 +25,18 @@ public class NovelAction extends Action {
String body;
DeviceType deviceType;
String currentDateStr;
Media media;
Media media;
DeviceVideoType deviceVideoType;
String goodId;
Integer type;
public NovelAction(ClientInfo clientInfo,String msg){
this(clientInfo,null,msg);
this(clientInfo,null,msg);
}
public NovelAction(ClientInfo clientInfo,String msg,Integer type){
this(clientInfo,null,msg,type);
}
public NovelAction(ClientInfo clientInfo, DeliveryDeviceInfo deliveryDeviceInfo) {
......@@ -38,4 +48,11 @@ public class NovelAction extends Action {
this.deliveryDeviceInfo = deliveryDeviceInfo;
this.body = msg;
}
public NovelAction(ClientInfo clientInfo, DeliveryDeviceInfo deliveryDeviceInfo,String msg,Integer type){
this.clientInfo = clientInfo;
this.deliveryDeviceInfo = deliveryDeviceInfo;
this.body = msg;
this.type = type;
}
}
\ No newline at end of file
......@@ -38,6 +38,8 @@ public class StoreUserUploadEventBO {
*/
String orderObject;
String wechatOpenId;
/**
* 扩展数据
*/
......@@ -154,4 +156,12 @@ public class StoreUserUploadEventBO {
public void setTotalValue(Double totalValue) {
this.totalValue = totalValue;
}
public String getWechatOpenId() {
return wechatOpenId;
}
public void setWechatOpenId(String wechatOpenId) {
this.wechatOpenId = wechatOpenId;
}
}
......@@ -3,6 +3,7 @@ package com.lwby.marketing.vo;
import com.lwby.marketing.flow.Action;
import com.lwby.marketing.notify.Media;
import com.lwby.marketing.po.StoryLogin;
import com.lwby.marketing.po.VideoUpload;
import lombok.Data;
/**
......@@ -13,7 +14,10 @@ import lombok.Data;
public class StoryNovelAction extends Action {
ClientInfo clientInfo;
//抖音故事会
StoryLogin storyLogin;
//抖音短剧
VideoUpload videoUpload;
DeliveryDeviceInfo deliveryDeviceInfo;
String openId;
int platformId;
......
......@@ -44,4 +44,5 @@ public class UserProfile {
private Integer subversion;
private Integer channel;
private Integer userStatus;
private Date createDate;
}
......@@ -14,6 +14,20 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 1
minIdle: 1
video:
jdbc-url: jdbc:mysql://rm-2zeyw42052h06f905.mysql.rds.aliyuncs.com:3306/video?zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=utf8&serverTimezone=Asia/Shanghai&autoReconnect=true
username: video
password: hdTImXLlzOEbP5bk
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 2
minIdle: 2
video-inland:
jdbc-url: jdbc:mysql://rm-2zeo09186ukqa8zh1.mysql.rds.aliyuncs.com:3306/video-inland?zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=utf8&autoReconnect=true
username: video
password: hdTImXLlzOEbP5bk
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 2
minIdle: 2
redis:
host: r-2zeknwihqn2v6q4x5x.tairpdb.rds.aliyuncs.com
port: 6379
......@@ -54,6 +68,11 @@ bi:
host: r-2zethzp7pjl3rjbelp.redis.rds.aliyuncs.com
port: 6379
old:
market:
redis:
host: r-2zemv0esbjz5tsnmxn.redis.rds.aliyuncs.com
port: 6379
system:
consumer:
......@@ -71,6 +90,20 @@ system:
storeActive:
topic: bookStoreEvent1
group_id: novel_store_active_1
dyvideo:
active:
topic: DouyinRegisterTopic
group_id: douyin_video_active_1
behavoir:
topic: DouyinBehaviorTopic
group_id: douyin_video_behavior_1
videoapp:
active:
topic: freeVideoEvent
group_id: videoapp_active_1
behavoir:
topic: freeVideoKeyBehaviorEvent
group_id: videoapp_behavior_1
management:
health:
db:
......
......@@ -14,6 +14,20 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 2
minIdle: 2
video:
jdbc-url: jdbc:mysql://rm-2zeo09186ukqa8zh1.mysql.rds.aliyuncs.com:3306/video?zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=utf8&autoReconnect=true
username: lwby
password: VjxYfmY8J77ISChp
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 2
minIdle: 2
video-inland:
jdbc-url: jdbc:mysql://rm-2zeo09186ukqa8zh1.mysql.rds.aliyuncs.com:3306/video-inland?zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=utf8&autoReconnect=true
username: lwby
password: VjxYfmY8J77ISChp
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 2
minIdle: 2
redis:
host: 172.17.243.150
port: 6379
......@@ -54,6 +68,12 @@ bi:
host: 172.17.243.150
port: 6379
old:
market:
redis:
host: 172.17.243.150
port: 6379
system:
consumer:
dystory:
......@@ -70,6 +90,20 @@ system:
storeActive:
topic: testBookStoreEvent1
group_id: test_novel_store_active_1
dyvideo:
active:
topic: testDouyinRegisterTopic
group_id: test_douyin_video_active_1
behavoir:
topic: testDouyinBehaviorTopic
group_id: test_douyin_video_behavior_1
videoapp:
active:
topic: testFreeVideoEvent
group_id: test_videoapp_active_1
behavoir:
topic: testFreeVideoKeyBehaviorEvent
group_id: test_videoapp_behavior_1
management:
health:
......
......@@ -35,7 +35,10 @@ jetcache:
type: linkedhashmap
keyConvertor: fastjson
limit: 10000
core:
thread:
pool:
size: 10
logging:
file:
path: /data/marketing/logs
\ No newline at end of file
......@@ -35,6 +35,72 @@
<appender-ref ref="dystory_error" />
</logger>
<!--抖音短剧小程序-->
<appender name="dyvideo_sys" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/dyvideo_sys.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/dyvideo_sys/dyvideo_sys.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="dyvideo.sys" level="INFO" additivity="false">
<appender-ref ref="dyvideo_sys" />
</logger>
<appender name="dyvideo_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/dyvideo_error.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/dyvideo_error/dyvideo_error.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="dyvideo.error" level="INFO" additivity="true">
<appender-ref ref="dyvideo_error" />
</logger>
<!--短剧app-->
<appender name="videoapp_sys" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/videoapp_sys.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/videoapp_sys/videoapp_sys.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="videoapp.sys" level="INFO" additivity="false">
<appender-ref ref="videoapp_sys" />
</logger>
<appender name="videoapp_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/videoapp_error.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/videoapp_error/videoapp_error.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="videoapp.error" level="INFO" additivity="true">
<appender-ref ref="videoapp_error" />
</logger>
<!--小说-->
<appender name="novel_sys" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/novel_sys.log</File>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment