Commit 95e76b7b authored by 宋新宇's avatar 宋新宇

短剧APP归因

parent 7e0af7f6
......@@ -85,7 +85,7 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> {
up.incrby(channelTotal, 1);
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
//回传,回传个数 + 1
up.incrby(channelCallback, 1);
DYSTORY_SYS_LOG.info(
......@@ -97,6 +97,7 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> {
DYSTORY_SYS_LOG.info(
"PaySpduFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, sprDedu = {}, goodId = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
platformId, channelId, sprDedu, goodId, channelTotalCount, channelCallbackCount, 0);
action.stop(true);
}
}
......
......@@ -2,15 +2,19 @@ package com.lwby.marketing.att.dyvideo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
......@@ -29,6 +33,15 @@ public class DyVideoBehavoirConsumer {
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
@Resource
ApplicationContext ctx;
FlowExecutor<StoryNovelAction> executorByDyVideoBehavoir;
@PostConstruct
public void init(){
executorByDyVideoBehavoir = new FlowExecutor<>(ctx, Rule.create().THEN("dyvideo_setup").THEN("dyvideo_store").THEN("dyvideo_checkerfirst").SWITCH("dyvideo_behavior"));
}
@KafkaListener(topics = {"${system.consumer.dyvideo.behavoir.topic}"} ,containerFactory = "kafkaListenerContainerFactoryDyVideoBehavoir")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
......@@ -52,7 +65,7 @@ public class DyVideoBehavoirConsumer {
StoryNovelAction action = new StoryNovelAction(event.getClientInfo(), event.getUserId(), event.getWechatOpenId(),
event.getUserUploadEvent());
dyVideoFlowExecutor.getExecutorByStory().execute(action);
executorByDyVideoBehavoir.execute(action);
} catch (Throwable e) {
DYVIDEO_ERROR_LOG.error("dy.video.behavoir.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
......
......@@ -18,7 +18,7 @@ public class DyVideoFlowExecutor {
@PostConstruct
public void init(){
executorByStory = new FlowExecutor<>(ctx, Rule.create().THEN("dyvideo_setup").THEN("dyvideo_store").THEN("dyvideo_checkerfirst").SWITCH("dyvideo_behavior","dyvideo_dedu").THEN("dyvideo_uploadcall"));
executorByStory = new FlowExecutor<>(ctx, Rule.create().THEN("dyvideo_setup").THEN("dyvideo_store").THEN("dyvideo_checkerfirst").THEN("dyvideo_uploadcall"));
}
public FlowExecutor<StoryNovelAction> getExecutorByStory(){
......
......@@ -2,7 +2,10 @@ package com.lwby.marketing.att.dyvideo.handle;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.flow.NodeSwitchFlow;
import com.lwby.marketing.po.ThirdAccountDy;
import com.lwby.marketing.po.VideoUpload;
......@@ -15,13 +18,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
......@@ -55,6 +62,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
}
@Override
@Async("commonTaskExecutor")
public void process(StoryNovelAction action) {
VideoUpload videoUpload = action.getVideoUpload();
long userId = action.getUserId();
......@@ -64,7 +72,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
long hour = TimeUnit.MILLISECONDS.toHours(currentTime - activeTime);
if (hour > 6) {
log.info("DyvideoBehaviorFlow expeed six hour ,videoUpload={},userId={}",JSON.toJSONString(videoUpload),userId);
action.stop(true);
//action.stop(true);
return;
}
int platformId = action.getPlatformId();
......@@ -91,7 +99,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow.cacheModel in table is set value,djChanel={},userId={},ecpmAvgModelCount={},pecpmModelCount={},motivateModelCount={}",channelId,userId,ecpmAvgModelCount,pecpmModelCount,motivateModelCount);
} else {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow.cacheModel in table is not set value,djChanel={},userId={}",channelId,userId);
action.stop(true);
//action.stop(true);
return;
}
......@@ -176,7 +184,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
} else {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow tvcandmo not up to the standard,tvcCount={},motivateCount={},djChanel={},userId={}", tvcCount,
motivateCount, channelId, userId);
action.stop(true);
//action.stop(true);
return;
}
}
......@@ -185,7 +193,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
if (motivateCount < motivateModelCount) {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow motivate not up to the standard,motivateCount={},motivateModelCount={},djChanel={},userId={}",
motivateCount, motivateModelCount, channelId, userId);
action.stop(true);
//action.stop(true);
return;
}
videoUpload.setMotivateCount(String.valueOf(motivateCount));
......@@ -195,7 +203,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
if ((ecpmAvgCount < ecpmAvgModelCount)) {
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow ecpm not up to the standard,ecpmAvgCount={},ecpmAvgModelCount={},djChanel={},userId={}",
ecpmAvgCount, ecpmAvgModelCount, channelId, userId);
action.stop(true);
//action.stop(true);
return;
}
videoUpload.setEcpmAvgCount(String.valueOf(ecpmAvgCount));
......@@ -207,7 +215,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
DYVIDEO_SYS_LOG.info(
"DyvideoBehaviorFlow pecpmCount not up to the standard,pecpmModelCount={},pecpmCount={},motivateModelCount={},motivateCount={},djChanel={},userId={}",
pecpmModelCount, pecpmCount, motivateModelCount, motivateCount, channelId, userId);
action.stop(true);
//action.stop(true);
return;
}
videoUpload.setPecpmCount(String.valueOf(pecpmModelCount));
......@@ -219,15 +227,55 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
DYVIDEO_SYS_LOG.info(
"DyvideoBehaviorFlow pecpmCount is null not to the standard,pecpmModelCount={},pecpmCount={},motivateModelCount={},motivateCount={},djChanel={},userId={}",
pecpmModelCount, pecpmCount, motivateModelCount, motivateCount, channelId, userId);
action.stop(true);
//action.stop(true);
return;
}
}
//满足之后扣量
Integer sprDedu = appChannel.getSprDedu();
CallBackType type = CallBackType.getCallBackTypeByType(action.getType());
//等于空 或 100直接回传
if (sprDedu == null || sprDedu == 100) {
action.getMedia().notify(action);
up.notifyResult(action, type.getTopic()+"_test",type.getStatus());
up.set(up.getFirstCheckerKey(action),up.getExpire(action),"1");
return;
}
//总数
String channelTotal = up.getTotalCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
//回传
String channelCallback = up.getCallbackCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
long channelTotalCount = up.incrby(channelTotal, 0, 60 * 60 * 24);
long channelCallbackCount = up.incrby(channelCallback, 0, 60 * 60 * 24);
up.incrby(channelTotal, 1);
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
//回传,回传个数 + 1
up.incrby(channelCallback, 1);
action.getMedia().notify(action);
up.notifyResult(action, type.getTopic()+"_test",type.getStatus());
up.set(up.getFirstCheckerKey(action),up.getExpire(action),"1");
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 1);
} else {
up.notifyResult(action,"ocpc_behavior_test", AttributionStatus.NORMAL_DEDUCTION_CALLBACK);
up.set(up.getFirstCheckerKey(action), up.getExpire(action), "1");
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 0);
}
//action.stop(true);
} else {
action.stop(true);
//action.stop(true);
}
} else {
action.stop(true);
//action.stop(true);
}
}
......@@ -310,4 +358,16 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
}
return resultCountMap;
}
private boolean isCallback(long channelTotalCount, long channelCallbackCount, Integer sprDedu) {
if (channelTotalCount == 0) {
//首次随机
return ThreadLocalRandom.current().nextInt(1, 3) == 1;
}
//计算回传率
BigDecimal divide = new BigDecimal(channelCallbackCount).divide(new BigDecimal(channelTotalCount), 4, RoundingMode.HALF_UP);
//比较回传率和扣量比例,决定是否回传
double percent = (double) Math.round(sprDedu * 100 / 100.0) / 100;
return divide.compareTo(BigDecimal.valueOf(percent).setScale(4, RoundingMode.HALF_UP)) != 1;
}
}
package com.lwby.marketing.att.dyvideo.handle;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.flow.NodeSwitchFlow;
import com.lwby.marketing.vo.AppChannelVO;
import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author songxinyu
* @version PaySpduFlow.java, v 0.1 2024年03月04日 18:46 songxinyu Exp $
*/
@Slf4j
@Component("dyvideo_dedu")
public class DyvideoDeduFlow extends NodeSwitchFlow<StoryNovelAction> {
private static final Logger DYVIDEO_SYS_LOG = LoggerFactory.getLogger("dyvideo.sys");
@Resource
DyVideoUniversalProcess up;
@Override
public boolean checked(StoryNovelAction action) {
return action.getType() == 3;
}
@Override
public void process(StoryNovelAction action) {
int platformId = action.getPlatformId();
Long channelId = action.getChannelId();
AppChannelVO appChannel = up.getAppChannelByPlatformAndChannel(platformId,channelId);
Integer sprDedu = appChannel.getSprDedu();
//等于空 或 100直接回传
if (sprDedu == null || sprDedu == 100) {
return;
}
//总数
String channelTotal = up.getTotalCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
//回传
String channelCallback = up.getCallbackCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(), sprDedu, action.getCurrentDateStr());
long channelTotalCount = up.incrby(channelTotal, 0, 60 * 60 * 24);
long channelCallbackCount = up.incrby(channelCallback, 0, 60 * 60 * 24);
up.incrby(channelTotal, 1);
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
//回传,回传个数 + 1
up.incrby(channelCallback, 1);
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 1);
} else {
up.notifyResult(action,"ocpc_behavior_test", AttributionStatus.NORMAL_DEDUCTION_CALLBACK);
up.set(up.getFirstCheckerKey(action), up.getExpire(action), "1");
DYVIDEO_SYS_LOG.info(
"ChannelAttributionFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, planId={}, sprDedu = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
action.getPlatformId(), action.getChannelId(),action.getPlanId(), sprDedu, channelTotalCount, channelCallbackCount, 0);
}
}
private boolean isCallback(long channelTotalCount, long channelCallbackCount, Integer sprDedu) {
if (channelTotalCount == 0) {
//首次随机
return ThreadLocalRandom.current().nextInt(1, 3) == 1;
}
//计算回传率
BigDecimal divide = new BigDecimal(channelCallbackCount).divide(new BigDecimal(channelTotalCount), 4, RoundingMode.HALF_UP);
//比较回传率和扣量比例,决定是否回传
double percent = (double) Math.round(sprDedu * 100 / 100.0) / 100;
return divide.compareTo(BigDecimal.valueOf(percent).setScale(4, RoundingMode.HALF_UP)) != 1;
}
}
......@@ -35,7 +35,10 @@ jetcache:
type: linkedhashmap
keyConvertor: fastjson
limit: 10000
core:
thread:
pool:
size: 10
logging:
file:
path: /data/marketing/logs
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment