Commit 52a2ad0e authored by 宋新宇's avatar 宋新宇

Merge branch '3-抖音短剧及短剧app归因接入' into 'dev'

短剧APP归因

See merge request !7
parents 7c53db15 95e76b7b
...@@ -85,7 +85,7 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -85,7 +85,7 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> {
up.incrby(channelTotal, 1); up.incrby(channelTotal, 1);
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) { if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
//回传,回传个数 + 1 //回传,回传个数 + 1
up.incrby(channelCallback, 1); up.incrby(channelCallback, 1);
DYSTORY_SYS_LOG.info( DYSTORY_SYS_LOG.info(
...@@ -97,6 +97,7 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -97,6 +97,7 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> {
DYSTORY_SYS_LOG.info( DYSTORY_SYS_LOG.info(
"PaySpduFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, sprDedu = {}, goodId = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}", "PaySpduFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, sprDedu = {}, goodId = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
platformId, channelId, sprDedu, goodId, channelTotalCount, channelCallbackCount, 0); platformId, channelId, sprDedu, goodId, channelTotalCount, channelCallbackCount, 0);
action.stop(true);
} }
} }
......
...@@ -2,15 +2,19 @@ package com.lwby.marketing.att.dyvideo; ...@@ -2,15 +2,19 @@ package com.lwby.marketing.att.dyvideo;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.StoreUserUploadEventBO; import com.lwby.marketing.vo.StoreUserUploadEventBO;
import com.lwby.marketing.vo.StoryNovelAction; import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List; import java.util.List;
...@@ -29,6 +33,15 @@ public class DyVideoBehavoirConsumer { ...@@ -29,6 +33,15 @@ public class DyVideoBehavoirConsumer {
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error"); private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
@Resource
ApplicationContext ctx;
FlowExecutor<StoryNovelAction> executorByDyVideoBehavoir;
@PostConstruct
public void init(){
executorByDyVideoBehavoir = new FlowExecutor<>(ctx, Rule.create().THEN("dyvideo_setup").THEN("dyvideo_store").THEN("dyvideo_checkerfirst").SWITCH("dyvideo_behavior"));
}
@KafkaListener(topics = {"${system.consumer.dyvideo.behavoir.topic}"} ,containerFactory = "kafkaListenerContainerFactoryDyVideoBehavoir") @KafkaListener(topics = {"${system.consumer.dyvideo.behavoir.topic}"} ,containerFactory = "kafkaListenerContainerFactoryDyVideoBehavoir")
public void onMessage(List<ConsumerRecord<String, String>> datas) { public void onMessage(List<ConsumerRecord<String, String>> datas) {
...@@ -52,7 +65,7 @@ public class DyVideoBehavoirConsumer { ...@@ -52,7 +65,7 @@ public class DyVideoBehavoirConsumer {
StoryNovelAction action = new StoryNovelAction(event.getClientInfo(), event.getUserId(), event.getWechatOpenId(), StoryNovelAction action = new StoryNovelAction(event.getClientInfo(), event.getUserId(), event.getWechatOpenId(),
event.getUserUploadEvent()); event.getUserUploadEvent());
dyVideoFlowExecutor.getExecutorByStory().execute(action); executorByDyVideoBehavoir.execute(action);
} catch (Throwable e) { } catch (Throwable e) {
DYVIDEO_ERROR_LOG.error("dy.video.behavoir.onMessage failed, data={}, costTime={} ms", data.value(), DYVIDEO_ERROR_LOG.error("dy.video.behavoir.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e); System.currentTimeMillis() - begin, e);
......
...@@ -18,7 +18,7 @@ public class DyVideoFlowExecutor { ...@@ -18,7 +18,7 @@ public class DyVideoFlowExecutor {
@PostConstruct @PostConstruct
public void init(){ public void init(){
executorByStory = new FlowExecutor<>(ctx, Rule.create().THEN("dyvideo_setup").THEN("dyvideo_store").THEN("dyvideo_checkerfirst").SWITCH("dyvideo_behavior","dyvideo_dedu").THEN("dyvideo_uploadcall")); executorByStory = new FlowExecutor<>(ctx, Rule.create().THEN("dyvideo_setup").THEN("dyvideo_store").THEN("dyvideo_checkerfirst").THEN("dyvideo_uploadcall"));
} }
public FlowExecutor<StoryNovelAction> getExecutorByStory(){ public FlowExecutor<StoryNovelAction> getExecutorByStory(){
......
...@@ -2,7 +2,10 @@ package com.lwby.marketing.att.dyvideo.handle; ...@@ -2,7 +2,10 @@ package com.lwby.marketing.att.dyvideo.handle;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess; import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.flow.NodeSwitchFlow; import com.lwby.marketing.flow.NodeSwitchFlow;
import com.lwby.marketing.po.ThirdAccountDy; import com.lwby.marketing.po.ThirdAccountDy;
import com.lwby.marketing.po.VideoUpload; import com.lwby.marketing.po.VideoUpload;
...@@ -15,13 +18,17 @@ import org.slf4j.Logger; ...@@ -15,13 +18,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.util.*; import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
...@@ -55,6 +62,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -55,6 +62,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
} }
@Override @Override
@Async("commonTaskExecutor")
public void process(StoryNovelAction action) { public void process(StoryNovelAction action) {
VideoUpload videoUpload = action.getVideoUpload(); VideoUpload videoUpload = action.getVideoUpload();
long userId = action.getUserId(); long userId = action.getUserId();
...@@ -64,7 +72,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -64,7 +72,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
long hour = TimeUnit.MILLISECONDS.toHours(currentTime - activeTime); long hour = TimeUnit.MILLISECONDS.toHours(currentTime - activeTime);
if (hour > 6) { if (hour > 6) {
log.info("DyvideoBehaviorFlow expeed six hour ,videoUpload={},userId={}",JSON.toJSONString(videoUpload),userId); log.info("DyvideoBehaviorFlow expeed six hour ,videoUpload={},userId={}",JSON.toJSONString(videoUpload),userId);
action.stop(true); //action.stop(true);
return; return;
} }
int platformId = action.getPlatformId(); int platformId = action.getPlatformId();
...@@ -91,7 +99,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -91,7 +99,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow.cacheModel in table is set value,djChanel={},userId={},ecpmAvgModelCount={},pecpmModelCount={},motivateModelCount={}",channelId,userId,ecpmAvgModelCount,pecpmModelCount,motivateModelCount); DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow.cacheModel in table is set value,djChanel={},userId={},ecpmAvgModelCount={},pecpmModelCount={},motivateModelCount={}",channelId,userId,ecpmAvgModelCount,pecpmModelCount,motivateModelCount);
} else { } else {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow.cacheModel in table is not set value,djChanel={},userId={}",channelId,userId); DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow.cacheModel in table is not set value,djChanel={},userId={}",channelId,userId);
action.stop(true); //action.stop(true);
return; return;
} }
...@@ -176,7 +184,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -176,7 +184,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
} else { } else {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow tvcandmo not up to the standard,tvcCount={},motivateCount={},djChanel={},userId={}", tvcCount, DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow tvcandmo not up to the standard,tvcCount={},motivateCount={},djChanel={},userId={}", tvcCount,
motivateCount, channelId, userId); motivateCount, channelId, userId);
action.stop(true); //action.stop(true);
return; return;
} }
} }
...@@ -185,7 +193,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -185,7 +193,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
if (motivateCount < motivateModelCount) { if (motivateCount < motivateModelCount) {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow motivate not up to the standard,motivateCount={},motivateModelCount={},djChanel={},userId={}", DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow motivate not up to the standard,motivateCount={},motivateModelCount={},djChanel={},userId={}",
motivateCount, motivateModelCount, channelId, userId); motivateCount, motivateModelCount, channelId, userId);
action.stop(true); //action.stop(true);
return; return;
} }
videoUpload.setMotivateCount(String.valueOf(motivateCount)); videoUpload.setMotivateCount(String.valueOf(motivateCount));
...@@ -195,7 +203,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -195,7 +203,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
if ((ecpmAvgCount < ecpmAvgModelCount)) { if ((ecpmAvgCount < ecpmAvgModelCount)) {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow ecpm not up to the standard,ecpmAvgCount={},ecpmAvgModelCount={},djChanel={},userId={}", DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow ecpm not up to the standard,ecpmAvgCount={},ecpmAvgModelCount={},djChanel={},userId={}",
ecpmAvgCount, ecpmAvgModelCount, channelId, userId); ecpmAvgCount, ecpmAvgModelCount, channelId, userId);
action.stop(true); //action.stop(true);
return; return;
} }
videoUpload.setEcpmAvgCount(String.valueOf(ecpmAvgCount)); videoUpload.setEcpmAvgCount(String.valueOf(ecpmAvgCount));
...@@ -207,7 +215,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -207,7 +215,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
DYVIDEO_SYS_LOG.info( DYVIDEO_SYS_LOG.info(
"DyvideoBehaviorFlow pecpmCount not up to the standard,pecpmModelCount={},pecpmCount={},motivateModelCount={},motivateCount={},djChanel={},userId={}", "DyvideoBehaviorFlow pecpmCount not up to the standard,pecpmModelCount={},pecpmCount={},motivateModelCount={},motivateCount={},djChanel={},userId={}",
pecpmModelCount, pecpmCount, motivateModelCount, motivateCount, channelId, userId); pecpmModelCount, pecpmCount, motivateModelCount, motivateCount, channelId, userId);
action.stop(true); //action.stop(true);
return; return;
} }
videoUpload.setPecpmCount(String.valueOf(pecpmModelCount)); videoUpload.setPecpmCount(String.valueOf(pecpmModelCount));
...@@ -219,15 +227,55 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -219,15 +227,55 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
DYVIDEO_SYS_LOG.info( DYVIDEO_SYS_LOG.info(
"DyvideoBehaviorFlow pecpmCount is null not to the standard,pecpmModelCount={},pecpmCount={},motivateModelCount={},motivateCount={},djChanel={},userId={}", "DyvideoBehaviorFlow pecpmCount is null not to the standard,pecpmModelCount={},pecpmCount={},motivateModelCount={},motivateCount={},djChanel={},userId={}",
pecpmModelCount, pecpmCount, motivateModelCount, motivateCount, channelId, userId); pecpmModelCount, pecpmCount, motivateModelCount, motivateCount, channelId, userId);
action.stop(true); //action.stop(true);
return; return;
} }
} }
//满足之后扣量
Integer sprDedu = appChannel.getSprDedu();
CallBackType type = CallBackType.getCallBackTypeByType(action.getType());
//等于空 或 100直接回传
if (sprDedu == null || sprDedu == 100) {
action.getMedia().notify(action);
up.notifyResult(action, type.getTopic()+"_test",type.getStatus());
up.set(up.getFirstCheckerKey(action),up.getExpire(action),"1");
return;
}
//总数
String channelTotal = up.getTotalCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
//回传
String channelCallback = up.getCallbackCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
long channelTotalCount = up.incrby(channelTotal, 0, 60 * 60 * 24);
long channelCallbackCount = up.incrby(channelCallback, 0, 60 * 60 * 24);
up.incrby(channelTotal, 1);
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
//回传,回传个数 + 1
up.incrby(channelCallback, 1);
action.getMedia().notify(action);
up.notifyResult(action, type.getTopic()+"_test",type.getStatus());
up.set(up.getFirstCheckerKey(action),up.getExpire(action),"1");
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 1);
} else {
up.notifyResult(action,"ocpc_behavior_test", AttributionStatus.NORMAL_DEDUCTION_CALLBACK);
up.set(up.getFirstCheckerKey(action), up.getExpire(action), "1");
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 0);
}
//action.stop(true);
} else { } else {
action.stop(true); //action.stop(true);
} }
} else { } else {
action.stop(true); //action.stop(true);
} }
} }
...@@ -310,4 +358,16 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -310,4 +358,16 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
} }
return resultCountMap; return resultCountMap;
} }
private boolean isCallback(long channelTotalCount, long channelCallbackCount, Integer sprDedu) {
if (channelTotalCount == 0) {
//首次随机
return ThreadLocalRandom.current().nextInt(1, 3) == 1;
}
//计算回传率
BigDecimal divide = new BigDecimal(channelCallbackCount).divide(new BigDecimal(channelTotalCount), 4, RoundingMode.HALF_UP);
//比较回传率和扣量比例,决定是否回传
double percent = (double) Math.round(sprDedu * 100 / 100.0) / 100;
return divide.compareTo(BigDecimal.valueOf(percent).setScale(4, RoundingMode.HALF_UP)) != 1;
}
} }
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.flow.NodeSwitchFlow;
import com.lwby.marketing.vo.AppChannelVO;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author songxinyu
* @version PaySpduFlow.java, v 0.1 2024年03月04日 18:46 songxinyu Exp $
*/
@Slf4j
@Component("dyvideo_dedu")
public class DyvideoDeduFlow extends NodeSwitchFlow<StoryNovelAction> {
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
@Resource
DyVideoUniversalProcess up;
@Override
public boolean checked(StoryNovelAction action) {
return action.getType() == 3;
}
@Override
public void process(StoryNovelAction action) {
int platformId = action.getPlatformId();
Long channelId = action.getChannelId();
AppChannelVO appChannel = up.getAppChannelByPlatformAndChannel(platformId,channelId);
Integer sprDedu = appChannel.getSprDedu();
//等于空 或 100直接回传
if (sprDedu == null || sprDedu == 100) {
return;
}
//总数
String channelTotal = up.getTotalCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
//回传
String channelCallback = up.getCallbackCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
long channelTotalCount = up.incrby(channelTotal, 0, 60 * 60 * 24);
long channelCallbackCount = up.incrby(channelCallback, 0, 60 * 60 * 24);
up.incrby(channelTotal, 1);
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
//回传,回传个数 + 1
up.incrby(channelCallback, 1);
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 1);
} else {
up.notifyResult(action,"ocpc_behavior_test", AttributionStatus.NORMAL_DEDUCTION_CALLBACK);
up.set(up.getFirstCheckerKey(action), up.getExpire(action), "1");
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 0);
}
}
private boolean isCallback(long channelTotalCount, long channelCallbackCount, Integer sprDedu) {
if (channelTotalCount == 0) {
//首次随机
return ThreadLocalRandom.current().nextInt(1, 3) == 1;
}
//计算回传率
BigDecimal divide = new BigDecimal(channelCallbackCount).divide(new BigDecimal(channelTotalCount), 4, RoundingMode.HALF_UP);
//比较回传率和扣量比例,决定是否回传
double percent = (double) Math.round(sprDedu * 100 / 100.0) / 100;
return divide.compareTo(BigDecimal.valueOf(percent).setScale(4, RoundingMode.HALF_UP)) != 1;
}
}
...@@ -35,7 +35,10 @@ jetcache: ...@@ -35,7 +35,10 @@ jetcache:
type: linkedhashmap type: linkedhashmap
keyConvertor: fastjson keyConvertor: fastjson
limit: 10000 limit: 10000
core:
thread:
pool:
size: 10
logging: logging:
file: file:
path: /data/marketing/logs path: /data/marketing/logs
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment