Commit 7926dd47 authored by 宋新宇's avatar 宋新宇

抖音短剧归因

parent 17e05343
This diff is collapsed.
This source diff could not be displayed because it is too large. You can view the blob instead.
[traceId=] 2024-03-07 16:45:12.921 -ERROR org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1:c.l.m.a.s.StoryNovelPayConsumer:46 - story.novel.pay.onMessage failed, data={"clientInfo":{"pkv":1,"platformId":601},"createTime":1709800560175,"extraData":{},"id":"17e45093-f226-4ee3-a6c8-c3bf363f61a6","openId":"_0001oR1eBGI2jG-UEcOlP4xI1qhuRLIbuo2","productId":"1","userId":3,"userUploadEvent":2}, costTime=6547 ms
java.lang.NullPointerException: null
at com.lwby.marketing.att.storynovel.handle.ParameterSetupStoryNovelFlow.process(ParameterSetupStoryNovelFlow.java:47)
at com.lwby.marketing.att.storynovel.handle.ParameterSetupStoryNovelFlow.process(ParameterSetupStoryNovelFlow.java:18)
at com.lwby.marketing.flow.FlowExecutor.executeNormalNode(FlowExecutor.java:51)
at com.lwby.marketing.flow.FlowExecutor.execute(FlowExecutor.java:32)
at com.lwby.marketing.att.storynovel.StoryNovelPayConsumer.onMessage(StoryNovelPayConsumer.java:44)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2588)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2569)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2483)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.lang.Thread.run(Thread.java:748)
This diff is collapsed.
[traceId=] 2024-03-13 22:07:55.156 -ERROR org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1:o.a.k.c.c.i.ConsumerCoordinator:1231 - [Consumer clientId=consumer-test_novel_dy_active-1, groupId=test_novel_dy_active] Offset commit failed on partition testNovelActive-0 at offset 0: The coordinator is not aware of this member.
[traceId=] 2024-03-13 22:07:55.156 -ERROR org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1:o.a.k.c.c.i.ConsumerCoordinator:1231 - [Consumer clientId=consumer-test_story_dy_pay-3, groupId=test_story_dy_pay] Offset commit failed on partition testStoryNovelpay-0 at offset 12: The coordinator is not aware of this member.
[traceId=] 2024-03-13 22:12:20.027 -ERROR org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1:o.a.k.c.c.i.ConsumerCoordinator:1231 - [Consumer clientId=consumer-test_story_dy_pay-3, groupId=test_story_dy_pay] Offset commit failed on partition testStoryNovelpay-0 at offset 12: The coordinator is not aware of this member.
[traceId=] 2024-03-13 22:12:20.028 -ERROR org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1:o.a.k.c.c.i.ConsumerCoordinator:1231 - [Consumer clientId=consumer-test_novel_dy_active-1, groupId=test_novel_dy_active] Offset commit failed on partition testNovelActive-0 at offset 0: The coordinator is not aware of this member.
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -102,6 +102,11 @@ ...@@ -102,6 +102,11 @@
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
<version>2.8.1</version> <version>2.8.1</version>
</dependency> </dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -6,7 +6,8 @@ public enum AttributionStatus { ...@@ -6,7 +6,8 @@ public enum AttributionStatus {
NORMAL_DEDUCTION_CALLBACK(4,"正常扣量"), NORMAL_DEDUCTION_CALLBACK(4,"正常扣量"),
OLDUSER_DEDUCTION_CALLBACK(7,"老用户扣量"), OLDUSER_DEDUCTION_CALLBACK(7,"老用户扣量"),
IP_CALLBACK(9,"IP归因"), IP_CALLBACK(9,"IP归因"),
PAY(10,"付费回传"); PAY(10,"付费回传"),
BEHAVIOR(8,"关键行为");
public int id; public int id;
......
...@@ -23,6 +23,9 @@ public class UniversalProcess { ...@@ -23,6 +23,9 @@ public class UniversalProcess {
@Resource @Resource
public JdbcTemplate lwbyJdbcTemplate; public JdbcTemplate lwbyJdbcTemplate;
@Resource
public JdbcTemplate videoJdbcTemplate;
@Resource(name = "novelKafka") @Resource(name = "novelKafka")
public KafkaTemplate<String, String> novelKafkaTemplate; public KafkaTemplate<String, String> novelKafkaTemplate;
...@@ -47,6 +50,15 @@ public class UniversalProcess { ...@@ -47,6 +50,15 @@ public class UniversalProcess {
redisTemplate.opsForValue().set(key,value,expires, TimeUnit.SECONDS); redisTemplate.opsForValue().set(key,value,expires, TimeUnit.SECONDS);
} }
public void del(String key) {
redisTemplate.delete(key);
}
public void hset(String key, String field, int expire, String value) {
redisTemplate.opsForHash().put(key,field,value);
redisTemplate.expire(key,expire,TimeUnit.SECONDS);
}
public <T> T get(Class<T> clazz, String key) { public <T> T get(Class<T> clazz, String key) {
String value = redisTemplate.opsForValue().get(key); String value = redisTemplate.opsForValue().get(key);
if(!Objects.isNull(value)){ if(!Objects.isNull(value)){
......
...@@ -8,7 +8,7 @@ import com.lwby.marketing.att.AttributionStatus; ...@@ -8,7 +8,7 @@ import com.lwby.marketing.att.AttributionStatus;
*/ */
public enum CallBackType { public enum CallBackType {
active(0,"ocpc_result", AttributionStatus.ACTIVE_CALLBACK),pay(2,"ocpc_pay",AttributionStatus.PAY); active(0,"ocpc_result", AttributionStatus.ACTIVE_CALLBACK),pay(2,"ocpc_pay",AttributionStatus.PAY),behavior(3,"ocpc_behavior",AttributionStatus.BEHAVIOR);
private Integer type; private Integer type;
......
package com.lwby.marketing.att.dyvideo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* @author songxinyu
* @version DyVideoActiveConsumer.java, v 0.1 2024年03月21日 17:40 songxinyu Exp $
*/
@Slf4j
@Component
public class DyVideoActiveConsumer {
@Resource
DyVideoFlowExecutor dyVideoFlowExecutor;
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
@KafkaListener(topics = {"${system.consumer.dyvideo.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryStoryActive")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
long begin = System.currentTimeMillis();
if (DYVIDEO_SYS_LOG.isInfoEnabled()) {
DYVIDEO_SYS_LOG.info("media.active.dyVideo.onMessage start ,size = {}", datas.size());
}
for (ConsumerRecord<String, String> data : datas) {
try {
if (data == null) {
log.warn("media.active.dyVideo.onMessage listen 消费数据为null");
return;
}
DYVIDEO_SYS_LOG.info("media.active.dyVideo.onMessage start, data={}", data == null ? null : data.value());
StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class);
DYVIDEO_SYS_LOG.info("media.active.dyVideo.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
StoryNovelAction action = new StoryNovelAction(event.getClientInfo(), event.getUserId(), event.getWechatOpenId(),
event.getUserUploadEvent());
dyVideoFlowExecutor.getExecutorByStory().execute(action);
} catch (Throwable e) {
DYVIDEO_ERROR_LOG.error("dy.video.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
package com.lwby.marketing.att.dyvideo;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.StoryNovelAction;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Component
public class DyVideoFlowExecutor {
@Resource
ApplicationContext ctx;
FlowExecutor<StoryNovelAction> executorByStory;
@PostConstruct
public void init(){
executorByStory = new FlowExecutor<>(ctx, Rule.create().THEN("dyvideo_setup").THEN("dyvideo_store").THEN("dyvideo_checkerfirst").SWITCH("dyvideo_behavior","dyvideo_dedu").THEN("dyvideo_uploadcall"));
}
public FlowExecutor<StoryNovelAction> getExecutorByStory(){
return this.executorByStory;
}
}
package com.lwby.marketing.att.dyvideo;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alicp.jetcache.anno.CacheRefresh;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.UniversalProcess;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.vo.AppChannelVO;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Component
public class DyVideoUniversalProcess extends UniversalProcess {
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
/**
* 通知处理结果
*/
public void notifyResult(StoryNovelAction action,String topic, AttributionStatus status) {
DeliveryDeviceInfo ddi = action.getDeliveryDeviceInfo();
if (Objects.isNull(ddi)) {
ddi = new DeliveryDeviceInfo();
String channelStr = String.valueOf(action.getChannelId());
ddi.setMedia(MediaMapping.getMediaNameByChannelId(channelStr));
ddi.setAd_plan_id("0");
ddi.setAd_group_id("0");
ddi.setAd_creative_id("0");
ddi.setPlatform_id(String.valueOf(action.getPlatformId()));
ddi.setDj_channel(channelStr);
}
ddi.setIs_call(status.id);
ddi.setUserId(action.getUserId());
ddi.setDevice_id(String.valueOf(action.getUserId()));
ddi.setActive_time(System.currentTimeMillis());
ddi.setChannel(String.valueOf(action.getChannelId()));
ddi.setUserId(action.getUserId());
String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send(topic, jsonString);
active_result.addCallback(
result -> DYVIDEO_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> DYVIDEO_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
);
}
public String getTotalCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId ,int sprDedu, String dateStr) {
return String.format("%s_total_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
public String getCallbackCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId, int sprDedu, String dateStr) {
return String.format("%s_callback_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
public String getFirstCheckerKey(StoryNovelAction action) {
return Objects.equals(action.getType(), CallBackType.active.getType())
? String.format("fc_%d_%d_%s_%s", action.getUserId(), action.getPlatformId(),action.getMediaName(),action.getCurrentDateStr())
: String.format("fc_%d_%d_%s", action.getUserId(), action.getPlatformId(),action.getMediaName());
}
public String getDyAccessToken(String client_id, String client_secret,String dyTokenUrl) {
Map<String,String> upMap = new HashMap<>();
upMap.put("client_key",client_id);
upMap.put("client_secret",client_secret);
upMap.put("grant_type","client_credential");
String mapAction = JSONObject.toJSONString(upMap);
String accessToken = "";
try {
String result = HttpUtil.post(dyTokenUrl, mapAction);
Map data = (Map) JSON.parseObject(result).get("data");
Integer resultCode = (Integer) data.get("error_code");
if (resultCode == 0) {
//成功
accessToken = (String) data.get("access_token");
Integer expiresIn = (Integer) data.get("expires_in");
String tokenDy = "token_dy_" + client_id;
set(tokenDy, Integer.parseInt(String.valueOf(expiresIn)),accessToken);
} else {
log.warn("dy_access_token_error,code={},resultdy={}", resultCode, JSONObject.toJSONString(result));
}
} catch (Throwable e) {
e.printStackTrace();
}
return accessToken;
}
/******************************************** JDBC *************************************************************/
@Cached(name="appchannel_dy_video", cacheType = CacheType.LOCAL)
@CacheRefresh(refresh = 300)
public AppChannelVO getAppChannelByPlatformAndChannel(int platformId,Long channelId) {
try {
RowMapper<AppChannelVO> rowMapper = BeanPropertyRowMapper.newInstance(AppChannelVO.class);
return lwbyJdbcTemplate.queryForObject(String.format("select id,ecpm_avg_count,motivation_count,ecpm_per_count,spr_dedu "
+ "from app_channel where channel_id=%d and platform_id=%d",channelId,platformId),rowMapper);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
}
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.StoryNovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component("dyvideo_checkerfirst")
public class CheckerDyVideoFirstFlow extends NodeFlow<StoryNovelAction> {
@Resource
DyVideoUniversalProcess up;
@Override
public void process(StoryNovelAction action) {
if(up.exists(up.getFirstCheckerKey(action))){
action.stop(true);
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.flow.NodeSwitchFlow;
import com.lwby.marketing.vo.AppChannelVO;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author songxinyu
* @version PaySpduFlow.java, v 0.1 2024年03月04日 18:46 songxinyu Exp $
*/
@Slf4j
@Component("dyvideo_dedu")
public class DyvideoDeduFlow extends NodeSwitchFlow<StoryNovelAction> {
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
@Resource
DyVideoUniversalProcess up;
@Override
public boolean checked(StoryNovelAction action) {
return action.getType() == 3;
}
@Override
public void process(StoryNovelAction action) {
int platformId = action.getPlatformId();
Long channelId = action.getChannelId();
AppChannelVO appChannel = up.getAppChannelByPlatformAndChannel(platformId,channelId);
Integer sprDedu = appChannel.getSprDedu();
//等于空 或 100直接回传
if (sprDedu == null || sprDedu == 100) {
return;
}
//总数
String channelTotal = up.getTotalCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
//回传
String channelCallback = up.getCallbackCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
long channelTotalCount = up.incrby(channelTotal, 0, 60 * 60 * 24);
long channelCallbackCount = up.incrby(channelCallback, 0, 60 * 60 * 24);
up.incrby(channelTotal, 1);
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
//回传,回传个数 + 1
up.incrby(channelCallback, 1);
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 1);
} else {
up.notifyResult(action,"ocpc_behavior_test", AttributionStatus.NORMAL_DEDUCTION_CALLBACK);
up.set(up.getFirstCheckerKey(action),60 * 60 * 24,"1");
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 0);
}
}
private boolean isCallback(long channelTotalCount, long channelCallbackCount, Integer sprDedu) {
if (channelTotalCount == 0) {
//首次随机
return ThreadLocalRandom.current().nextInt(1, 3) == 1;
}
//计算回传率
BigDecimal divide = new BigDecimal(channelCallbackCount).divide(new BigDecimal(channelTotalCount), 4, RoundingMode.HALF_UP);
//比较回传率和扣量比例,决定是否回传
double percent = (double) Math.round(sprDedu * 100 / 100.0) / 100;
return divide.compareTo(BigDecimal.valueOf(percent).setScale(4, RoundingMode.HALF_UP)) != 1;
}
}
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.notify.Media;
import com.lwby.marketing.notify.media.Platform;
import com.lwby.marketing.po.VideoUpload;
import com.lwby.marketing.util.DateTimUtils;
import com.lwby.marketing.util.NumberUtils;
import com.lwby.marketing.vo.ClientInfo;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.UUID;
@Slf4j
@Component("dyvideo_setup")
public class ParameterSetupDyVideoFlow extends NodeFlow<StoryNovelAction> {
@Resource
DyVideoUniversalProcess up;
@Override
public void process(StoryNovelAction action) {
ClientInfo clientInfo = action.getClientInfo();
String openId = action.getOpenId();
//平台ID
Integer platformId = clientInfo.getPlatformId();
//VO对像
DeliveryDeviceInfo deliveryDeviceInfo = null;
//String s = "{\"creativeId\":\"112331\",\"creativeType\":\"3\",\"adid\":\"12321\","
// + "\"clickId\":\"12312143232\",\"channel\":\"216011231\",\"bookId\":\"4322111\","
// + "\"media\":\"jrtt\",\"clickTime\":123123123,\"code\":\"12ede3e231\"}";;
//StoryLogin storyLogin = JSONObject.parseObject(s,StoryLogin.class);
//匹配OpenId
VideoUpload videoUpload = up.get(VideoUpload.class,assembleKey(openId));
action.setVideoUpload(videoUpload);
//StoryNovelAction对像参数填充
action.setPlatformId(platformId);
action.setUserId(action.getUserId());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
if (CallBackType.active.getType().equals(0)) {
action.setChannelId((long)action.getClientInfo().getChannel());
}
//非商店吊起参数设置
if (Objects.nonNull(videoUpload)) {
deliveryDeviceInfo = new DeliveryDeviceInfo();
deliveryDeviceInfo.setAd_creative_id(videoUpload.getCreativeId());
deliveryDeviceInfo.setPlatform_id(String.valueOf(clientInfo.getPlatformId()));
deliveryDeviceInfo.setCallback_param(videoUpload.getClickId());
deliveryDeviceInfo.setUuid(UUID.randomUUID().toString());
deliveryDeviceInfo.setDj_channel(videoUpload.getChannel());
deliveryDeviceInfo.setPromotion_id(videoUpload.getAdid());
deliveryDeviceInfo.setVideoResourceId(videoUpload.getVideoResourceId());
action.setDeliveryDeviceInfo(deliveryDeviceInfo);
action.setChannelId(NumberUtils.parseLong(videoUpload.getChannel()));
action.setPlanId(NumberUtils.parseLong((videoUpload.getAdid())));
action.setMediaName(videoUpload.getMedia());
action.setMedia(Media.getMedia(action.getMediaName(), Platform.DY_VIDEO));
}
}
public String assembleKey(String openId) {
return String.format("story:upload:%s", openId);
}
}
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.StoryNovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
@Component("dyvideo_store")
public class StoreDyVideoAttributionFlow extends NodeFlow<StoryNovelAction> {
@Resource
DyVideoUniversalProcess up;
@Override
public void process(StoryNovelAction action) {
//商店归因通知
if(Objects.isNull(action.getDeliveryDeviceInfo())){
if (action.getType().equals(CallBackType.active.getType())) {
up.notifyResult(action,"ocpc_result", AttributionStatus.STORE_CALLBACK);
}
action.stop(true); //结束后面所有执行流程
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.StoryNovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author songxinyu
* @version ActiveCallFlow.java, v 0.1 2024年03月04日 18:28 songxinyu Exp $
*/
@Component("dyvideo_uploadcall")
public class UploadDyVideoCallFlow extends NodeFlow<StoryNovelAction> {
@Resource
DyVideoUniversalProcess up;
@Override
public void process(StoryNovelAction action) {
CallBackType type = CallBackType.getCallBackTypeByType(action.getType());
boolean success = action.getMedia().notify(action);
if (success) {
action.getVideoUpload().setActiveTime(System.currentTimeMillis());
up.notifyResult(action, type.getTopic(),type.getStatus());
up.set(up.getFirstCheckerKey(action),60 * 60 * 24,"1");
}
}
}
...@@ -22,4 +22,10 @@ public class DataSourceConfig { ...@@ -22,4 +22,10 @@ public class DataSourceConfig {
DataSource marketingDataSource() { DataSource marketingDataSource() {
return DataSourceBuilder.create().build(); return DataSourceBuilder.create().build();
} }
@Bean(name = "videoDataSource")
@ConfigurationProperties("spring.datasource.video")
DataSource videoDataSource() {
return DataSourceBuilder.create().build();
}
} }
...@@ -19,4 +19,9 @@ public class JdbcTemplateConfig { ...@@ -19,4 +19,9 @@ public class JdbcTemplateConfig {
return new JdbcTemplate(dataSource); return new JdbcTemplate(dataSource);
} }
@Bean(name = "videoJdbcTemplate")
JdbcTemplate videoJdbc(@Qualifier("videoDataSource") DataSource dataSource){
return new JdbcTemplate(dataSource);
}
} }
...@@ -7,7 +7,7 @@ public abstract class DYNotify extends BaseNotiry { ...@@ -7,7 +7,7 @@ public abstract class DYNotify extends BaseNotiry {
public boolean delivery(Action action) { public boolean delivery(Action action) {
//TODO;抖音短剧加入后,需要重构StoryNovelAction //TODO;抖音短剧加入后,需要重构StoryNovelAction
StoryNovelAction storyNovelAction = (StoryNovelAction)action; StoryNovelAction storyNovelAction = (StoryNovelAction)action;
return storyNovelAction.getType() == 0?active(storyNovelAction):pay(storyNovelAction); return storyNovelAction.getType() == 0?active(storyNovelAction): (storyNovelAction.getType() == 2 ? pay(storyNovelAction):behavior(storyNovelAction));
} }
public abstract boolean active(StoryNovelAction na); public abstract boolean active(StoryNovelAction na);
...@@ -15,4 +15,6 @@ public abstract class DYNotify extends BaseNotiry { ...@@ -15,4 +15,6 @@ public abstract class DYNotify extends BaseNotiry {
public abstract boolean pay(StoryNovelAction na); public abstract boolean pay(StoryNovelAction na);
public abstract boolean behavior(StoryNovelAction na);
} }
...@@ -4,6 +4,7 @@ package com.lwby.marketing.notify; ...@@ -4,6 +4,7 @@ package com.lwby.marketing.notify;
import com.lwby.marketing.flow.Action; import com.lwby.marketing.flow.Action;
import com.lwby.marketing.notify.media.Platform; import com.lwby.marketing.notify.media.Platform;
import com.lwby.marketing.notify.media.gdt.NovelGDTNotify; import com.lwby.marketing.notify.media.gdt.NovelGDTNotify;
import com.lwby.marketing.notify.media.jrtt.DyVideoJRTTNotify;
import com.lwby.marketing.notify.media.jrtt.NovelJRTTNotify; import com.lwby.marketing.notify.media.jrtt.NovelJRTTNotify;
import com.lwby.marketing.notify.media.jrtt.StoryJRTTNotify; import com.lwby.marketing.notify.media.jrtt.StoryJRTTNotify;
...@@ -15,7 +16,8 @@ import java.util.stream.Collectors; ...@@ -15,7 +16,8 @@ import java.util.stream.Collectors;
public enum Media { public enum Media {
NOVEL_GDT("gdt", Platform.NOVEL, "广点通",new NovelGDTNotify()), NOVEL_GDT("gdt", Platform.NOVEL, "广点通",new NovelGDTNotify()),
NOVEL_JRTT("jrtt", Platform.NOVEL, "今日头条",new NovelJRTTNotify()), NOVEL_JRTT("jrtt", Platform.NOVEL, "今日头条",new NovelJRTTNotify()),
DY_STORY_JRTT("jrtt", Platform.DY_STORY, "今日头条",new StoryJRTTNotify()); DY_STORY_JRTT("jrtt", Platform.DY_STORY, "今日头条",new StoryJRTTNotify()),
DY_VIDEO_JRTT("jrtt", Platform.DY_VIDEO, "今日头条",new DyVideoJRTTNotify());
final String name; final String name;
final Platform platform; final Platform platform;
......
package com.lwby.marketing.notify.media.jrtt;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.notify.DYNotify;
import com.lwby.marketing.notify.media.jrtt.dto.JrttAttributeRequest;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class DyVideoJRTTNotify extends DYNotify {
public static final String ATTRIBUTE_URL = "https://analytics.oceanengine.com/api/v2/conversion";
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
@Override
public boolean active(StoryNovelAction na) {
return notify(na,"active");
}
@Override
public boolean pay(StoryNovelAction na) {
return false;
}
@Override
public boolean behavior(StoryNovelAction na) {
return notify(na,"game_addiction");
}
public boolean notify(StoryNovelAction na,String eventType) {
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
JrttAttributeRequest.Ad ad = new JrttAttributeRequest.Ad();
ad.setCallback(ddi.getCallback_param());
JrttAttributeRequest.Context context = new JrttAttributeRequest.Context();
context.setAd(ad);
JrttAttributeRequest request = JrttAttributeRequest.builder().context(context).event_type(eventType).timestamp(
System.currentTimeMillis()).build();
String userJson = JSONObject.toJSONString(request);
try {
String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
Integer resultCode = (Integer) JSON.parseObject(result).get("code");
DYVIDEO_SYS_LOG.info("DyVideoJRTTNotify.video.{}.upload,result={},userId={},channel={},platform={}",eventType,JSON.toJSONString(result),na.getUserId(),na.getChannelId(),na.getPlatformId());
return resultCode == 0;
} catch (Exception e) {
return false;
}
}
}
...@@ -27,6 +27,11 @@ public class StoryJRTTNotify extends DYNotify { ...@@ -27,6 +27,11 @@ public class StoryJRTTNotify extends DYNotify {
return notify(na,"active_pay"); return notify(na,"active_pay");
} }
@Override
public boolean behavior(StoryNovelAction na) {
return false;
}
public boolean notify(StoryNovelAction na,String eventType) { public boolean notify(StoryNovelAction na,String eventType) {
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo(); DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
......
package com.lwby.marketing.po;
import lombok.Data;
/**
* @author songxinyu
* @version VideoUpload.java, v 0.1 2024年01月29日 18:38 songxinyu Exp $
*/
@Data
public class VideoUpload {
private String adid;
private String creativeId;
private String creativeType;
private String clickId;
private String channel;
private String videoResourceId;
private String media;
private String code;
private String ecpmAvgCount;
private String motivateCount;
//每次ecpm达标
private String pecpmCount;
//达标次数,与激励视频阈值比较
private String perecpmSize;
private long activeTime;
private long clickTime;
}
package com.lwby.marketing.util;
/**
* @author songxinyu
* @version CacheKeyUtils.java, v 0.1 2024年03月22日 17:56 songxinyu Exp $
*/
public class CacheKeyUtils {
private static final String CACHE_BEHAVIOR_PREFIX = "up_c_";
public static String getVideoBehavoirKey(Long userId) {
return CACHE_BEHAVIOR_PREFIX + userId + "_fv";
}
}
package com.lwby.marketing.util;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.commons.httpclient.methods.StringRequestEntity;
import org.apache.commons.httpclient.params.HttpMethodParams;
import java.io.*;
/**
* @author songxinyu
* @version HttpUtil.java, v 0.1 2024年03月22日 17:58 songxinyu Exp $
*/
public class HttpUtil {
public static String postDy (String url, String toJson,String token) throws IOException {
HttpClient httpClient = new HttpClient();
PostMethod postMethod = new PostMethod(url);
RequestEntity se = new StringRequestEntity(toJson ,"application/json" ,"UTF-8");
postMethod.setRequestEntity(se);
postMethod.setRequestHeader("Content-Type","application/json");
postMethod.setRequestHeader("access-token",token);
//默认的重试策略
postMethod.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler());
postMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, 5000);//设置超时时间
int httpStatus = httpClient.executeMethod(postMethod);
InputStream bodystreams = postMethod.getResponseBodyAsStream();
String body = convertStreamToString(bodystreams);
return body;
}
public static String convertStreamToString(InputStream is) throws UnsupportedEncodingException {
// BufferedReader reader = new BufferedReader(new InputStreamReader(is));//输出的中文乱码
BufferedReader reader = new BufferedReader(new InputStreamReader(is,"utf8")); //GBK
StringBuilder sb = new StringBuilder();
String line = null;
try {
while ((line = reader.readLine()) != null) {
sb.append(line + "\n");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return sb.toString();
}
}
package com.lwby.marketing.vo;
import com.lwby.marketing.po.AppChannel;
import lombok.Data;
/**
* @author songxinyu
* @version AppChannelVO.java, v 0.1 2023年05月24日 14:05 songxinyu Exp $
*/
@Data
public class AppChannelVO extends AppChannel {
private Integer videoCount;
private Long mvCount;
private Long cpmCount;
//平均ecpm达标次数
private Long ecpmAvgCount;
//每次ecpm达标次数
private Long ecpmPerCount;
//激励视频次数
private Long motivationCount;
//arpu
private Double arpuCount;
}
...@@ -38,6 +38,8 @@ public class StoreUserUploadEventBO { ...@@ -38,6 +38,8 @@ public class StoreUserUploadEventBO {
*/ */
String orderObject; String orderObject;
String wechatOpenId;
/** /**
* 扩展数据 * 扩展数据
*/ */
...@@ -154,4 +156,12 @@ public class StoreUserUploadEventBO { ...@@ -154,4 +156,12 @@ public class StoreUserUploadEventBO {
public void setTotalValue(Double totalValue) { public void setTotalValue(Double totalValue) {
this.totalValue = totalValue; this.totalValue = totalValue;
} }
public String getWechatOpenId() {
return wechatOpenId;
}
public void setWechatOpenId(String wechatOpenId) {
this.wechatOpenId = wechatOpenId;
}
} }
...@@ -3,6 +3,7 @@ package com.lwby.marketing.vo; ...@@ -3,6 +3,7 @@ package com.lwby.marketing.vo;
import com.lwby.marketing.flow.Action; import com.lwby.marketing.flow.Action;
import com.lwby.marketing.notify.Media; import com.lwby.marketing.notify.Media;
import com.lwby.marketing.po.StoryLogin; import com.lwby.marketing.po.StoryLogin;
import com.lwby.marketing.po.VideoUpload;
import lombok.Data; import lombok.Data;
/** /**
...@@ -13,7 +14,10 @@ import lombok.Data; ...@@ -13,7 +14,10 @@ import lombok.Data;
public class StoryNovelAction extends Action { public class StoryNovelAction extends Action {
ClientInfo clientInfo; ClientInfo clientInfo;
//抖音故事会
StoryLogin storyLogin; StoryLogin storyLogin;
//抖音短剧
VideoUpload videoUpload;
DeliveryDeviceInfo deliveryDeviceInfo; DeliveryDeviceInfo deliveryDeviceInfo;
String openId; String openId;
int platformId; int platformId;
......
...@@ -14,6 +14,13 @@ spring: ...@@ -14,6 +14,13 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 2 initialSize: 2
minIdle: 2 minIdle: 2
video:
jdbc-url: jdbc:mysql://rm-2zeo09186ukqa8zh1.mysql.rds.aliyuncs.com:3306/video?zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=utf8&autoReconnect=true
username: lwby
password: VjxYfmY8J77ISChp
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 2
minIdle: 2
redis: redis:
host: 172.17.243.150 host: 172.17.243.150
port: 6379 port: 6379
......
...@@ -35,6 +35,39 @@ ...@@ -35,6 +35,39 @@
<appender-ref ref="dystory_error" /> <appender-ref ref="dystory_error" />
</logger> </logger>
<!--抖音短剧小程序-->
<appender name="dyvideo_sys" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/dyvideo_sys.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/dyvideo_sys/dyvideo_sys.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="dyvideo.sys" level="INFO" additivity="false">
<appender-ref ref="dyvideo_sys" />
</logger>
<appender name="dyvideo_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/dyvideo_error.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_DIR}/dyvideo_error/dyvideo_error.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder charset="UTF-8">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="dyvideo.error" level="INFO" additivity="true">
<appender-ref ref="dyvideo_error" />
</logger>
<!--小说--> <!--小说-->
<appender name="novel_sys" class="ch.qos.logback.core.rolling.RollingFileAppender"> <appender name="novel_sys" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_DIR}/novel_sys.log</File> <File>${LOG_DIR}/novel_sys.log</File>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment