Commit 735774ed authored by 宋新宇's avatar 宋新宇

store归因上报

parent 9cac9e22
package com.lwby.marketing.att.novel; package com.lwby.marketing.att;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.flow.FlowExecutor; import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule; import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.BookStoreEvent; import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction; import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.StoreNovelAction;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
...@@ -17,15 +18,17 @@ import javax.annotation.Resource; ...@@ -17,15 +18,17 @@ import javax.annotation.Resource;
public class NovelAttributionMain { public class NovelAttributionMain {
@Resource @Resource
ApplicationContext ctx; ApplicationContext ctx;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
FlowExecutor<NovelAction> executorNovel; public FlowExecutor<NovelAction> executorNovel;
public FlowExecutor<StoreNovelAction> executorStoreNovelActive;
public FlowExecutor<StoreNovelAction> executorStoreNovelPay;
@PostConstruct @PostConstruct
public void init() { public void init() {
executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("setup").THEN("prize").THEN("store").THEN("checkerfirst").SWITCH("cross","plan","channel")); executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("setup").THEN("prize").THEN("store").THEN("checkerfirst").SWITCH("cross","plan","channel"));
//test(); executorStoreNovelActive = new FlowExecutor<>(ctx, Rule.create().THEN("setupStoreNovel").THEN("storeNovelStore").THEN("checkerNovelStorefirst").THEN("uploadcall"));
executorStoreNovelPay = new FlowExecutor<>(ctx, Rule.create().THEN("setupStoreNovel").THEN("storeNovelStore").THEN("checkerNovelStorefirst").THEN("payspdu").THEN("uploadcall"));
} }
public void test(){ public void test(){
......
...@@ -5,11 +5,12 @@ public enum AttributionStatus { ...@@ -5,11 +5,12 @@ public enum AttributionStatus {
STORE_CALLBACK(3,"商店"), STORE_CALLBACK(3,"商店"),
NORMAL_DEDUCTION_CALLBACK(4,"正常扣量"), NORMAL_DEDUCTION_CALLBACK(4,"正常扣量"),
OLDUSER_DEDUCTION_CALLBACK(7,"老用户扣量"), OLDUSER_DEDUCTION_CALLBACK(7,"老用户扣量"),
IP_CALLBACK(9,"IP归因"); IP_CALLBACK(9,"IP归因"),
PAY(10,"付费回传");
final int id; public int id;
final String desc; public String desc;
AttributionStatus(int id,String desc){ AttributionStatus(int id,String desc){
this.id = id; this.id = id;
......
package com.lwby.marketing.att.novel;
import com.alibaba.fastjson.JSON;
import com.lwby.marketing.att.NovelAttributionMain;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Component;
/**
* @author songxinyu
* @version NovelActiveConsumer.java, v 0.1 2024年03月05日 10:40 songxinyu Exp $
*/
@Slf4j
@Component
public class NovelActiveConsumer implements MessageListener<String, String> {
@Autowired
NovelAttributionMain novelAttributionMain;
@KafkaListener(topics = {"${novel.active.consumer.topic:testNovelActive}"},groupId = "${novel.active.consumer.group.id:test_novel_dy_active}")
@Override
public void onMessage(ConsumerRecord<String, String> data) {
long begin = System.currentTimeMillis();
if (log.isInfoEnabled()) {
log.info("media.active.onMessage start, data={}", data == null ? null : data.value());
}
if (data != null) {
try {
BookStoreEvent event = JSON.parseObject(data.value(), BookStoreEvent.class);
if (log.isInfoEnabled()) {
log.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
}
NovelAction action = new NovelAction(event.getClientInfo(), data.value());
novelAttributionMain.executorNovel.execute(action);
} catch (Throwable e) {
log.error("novel.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
...@@ -43,7 +43,7 @@ public class UniversalProcess { ...@@ -43,7 +43,7 @@ public class UniversalProcess {
if (Objects.isNull(ddi)) { if (Objects.isNull(ddi)) {
ddi = new DeliveryDeviceInfo(); ddi = new DeliveryDeviceInfo();
String channelStr = String.valueOf(action.getChannelId()); String channelStr = String.valueOf(action.getClientInfo().getChannel());
ddi.setMedia(MediaMapping.getMediaNameByChannelId(channelStr)); ddi.setMedia(MediaMapping.getMediaNameByChannelId(channelStr));
ddi.setAd_plan_id("0"); ddi.setAd_plan_id("0");
ddi.setAd_group_id("0"); ddi.setAd_group_id("0");
......
...@@ -89,7 +89,8 @@ public class ChannelAttributionFlow extends NodeSwitchFlow<NovelAction> { ...@@ -89,7 +89,8 @@ public class ChannelAttributionFlow extends NodeSwitchFlow<NovelAction> {
//计算回传率 //计算回传率
BigDecimal divide = new BigDecimal(channelCallbackCount).divide(new BigDecimal(channelTotalCount), 4, RoundingMode.HALF_UP); BigDecimal divide = new BigDecimal(channelCallbackCount).divide(new BigDecimal(channelTotalCount), 4, RoundingMode.HALF_UP);
//比较回传率和扣量比例,决定是否回传 //比较回传率和扣量比例,决定是否回传
return divide.compareTo(BigDecimal.valueOf(sprDedu.doubleValue()).setScale(4, RoundingMode.HALF_UP)) != 1; double percent = (double) Math.round(sprDedu * 100 / 100.0) / 100;
return divide.compareTo(BigDecimal.valueOf(percent).setScale(4, RoundingMode.HALF_UP)) != 1;
} }
/** /**
......
...@@ -102,7 +102,8 @@ public class CrossPlatformAttributionFlow extends NodeSwitchFlow<NovelAction> { ...@@ -102,7 +102,8 @@ public class CrossPlatformAttributionFlow extends NodeSwitchFlow<NovelAction> {
//计算 //计算
BigDecimal divide = BigDecimal.valueOf(channelCallbackCount).divide(BigDecimal.valueOf(channelTotalCount), 4, RoundingMode.HALF_UP); BigDecimal divide = BigDecimal.valueOf(channelCallbackCount).divide(BigDecimal.valueOf(channelTotalCount), 4, RoundingMode.HALF_UP);
//扣量 //扣量
return divide.compareTo(BigDecimal.valueOf(sprDedu.doubleValue()).setScale(4, RoundingMode.HALF_UP)) != 1; double percent = (double) Math.round(sprDedu * 100 / 100.0) / 100;
return divide.compareTo(BigDecimal.valueOf(percent).setScale(4, RoundingMode.HALF_UP)) != 1;
} }
} }
} }
......
package com.lwby.marketing.att.storenovel;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.att.NovelAttributionMain;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.StoreNovelAction;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Component;
/**
* @author songxinyu
* @version StoreNovelPayConsumer.java, v 0.1 2024年03月05日 10:40 songxinyu Exp $
*/
@Slf4j
@Component
public class StoreNovelActiveConsumer implements MessageListener<String, String> {
@Autowired
NovelAttributionMain novelAttributionMain;
@KafkaListener(topics = {"${store.novel.active.consumer.topic:testStoreNovelActive}"},groupId = "${store.novel.active.consumer.group.id:test_store_dy_active}")
@Override
public void onMessage(ConsumerRecord<String, String> data) {
long begin = System.currentTimeMillis();
if (log.isInfoEnabled()) {
log.info("media.active.onMessage start, data={}", data == null ? null : data.value());
}
if (data != null) {
try {
StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class);
if (log.isInfoEnabled()) {
log.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
}
StoreNovelAction action = new StoreNovelAction(event.getClientInfo(),event.getUserId(),event.getOpenId(),event.getUserUploadEvent());
novelAttributionMain.executorStoreNovelActive.execute(action);
} catch (Throwable e) {
log.error("novel.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
package com.lwby.marketing.att.storenovel;
import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.StoreNovelAction;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Component
public class StoreNovelAttributionMain {
@Resource
ApplicationContext ctx;
public FlowExecutor<StoreNovelAction> executorStoreNovelActive;
@PostConstruct
public void init() {
executorStoreNovelActive = new FlowExecutor<>(ctx, Rule.create().THEN("setupStoreNovel").THEN("storeNovelStore").THEN("checkerNovelStorefirst").THEN("uploadcall"));
}
public void test(){
String msg = "{\"clientInfo\":{\"channel\":214120503,\"clientIp\":\"183.219.7.180\",\"pkv\":1,\"platformGroupId\":412,"
+ "\"platformId\":412,\"sessionid\":\"undefined\",\"ua\":\"Mozilla/5.0 (Linux; Android 12; OXF-AN00 Build/HUAWEIOXF-AN00;"
+ " wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/117.0.0.0 Mobile Safari/537.36 aweme.lite/28.9.0 "
+ "ToutiaoMicroApp/3.14.0 PluginVersion/28909073\",\"version\":\"2.0.0\",\"xClient\":\"version=2.0.0;platformId=412;"
+ "equipment=android\"},\"createTime\":1709533439693,\"extraData\":{},\"id\":\"93857260-8130-442f-bc92-b549dbf28ef0\","
+ "\"userId\":875325,\"userUploadEvent\":0,\"appId\":\"ttd3dda5604ce230b401\","
+ "\"openId\":\"_000HgDjWl-P5-WS9HTQIssNgTnMBRUqDHDu\"}";
StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class);
StoreNovelAction action = new StoreNovelAction(event.getClientInfo(),event.getUserId(),event.getOpenId(),event.getUserUploadEvent());
try {
executorStoreNovelActive.execute(action);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// @KafkaListener(topics = "registerMediaEvent1", groupId = "test")
// public void listen(String message) {
// System.out.println("Received Message: " + message);
//
// BookStoreEvent event = JSONObject.parseObject(message, BookStoreEvent.class);
// NovelAction action = new NovelAction(event.getClientInfo(),message);
//
// try {
// executorNovel.execute(action);
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
}
package com.lwby.marketing.att.storenovel;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.att.NovelAttributionMain;
import com.lwby.marketing.vo.StoreNovelAction;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Component;
/**
* @author songxinyu
* @version StoreNovelPayConsumer.java, v 0.1 2024年03月05日 10:40 songxinyu Exp $
*/
@Slf4j
@Component
public class StoreNovelPayConsumer implements MessageListener<String, String> {
@Autowired
NovelAttributionMain novelAttributionMain;
@KafkaListener(topics = {"${store.novel.pay.consumer.topic:testStoreNovelpay}"},groupId = "${store.novel.pay.consumer.group.id:test_store_dy_pay}")
@Override
public void onMessage(ConsumerRecord<String, String> data) {
long begin = System.currentTimeMillis();
if (log.isInfoEnabled()) {
log.info("media.active.onMessage start, data={}", data == null ? null : data.value());
}
if (data != null) {
try {
StoreUserUploadEventBO event = JSONObject.parseObject(data.value(), StoreUserUploadEventBO.class);
if (log.isInfoEnabled()) {
log.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
}
StoreNovelAction action = new StoreNovelAction(event.getClientInfo(),null,event.getUserId(),event.getOpenId(),event.getUserUploadEvent(),event.getProductId());
novelAttributionMain.executorStoreNovelPay.execute(action);
} catch (Throwable e) {
log.error("novel.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
}
}
}
}
package com.lwby.marketing.att.storenovel;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alicp.jetcache.anno.CacheRefresh;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached;
import com.google.common.collect.ImmutableMap;
import com.lwby.marketing.att.novel.AttributionStatus;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.po.AppChannel;
import com.lwby.marketing.vo.AttributeRule;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoreNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class UniversalStoreProcess {
@Resource
JdbcTemplate lwbyJdbcTemplate;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Resource
private RedisTemplate<String, String> redisTemplate;
/******************************************** GENERIC METHOD ***********************************************/
/**
* 通知处理结果
*/
public void notifyResult(StoreNovelAction action, AttributionStatus status) {
DeliveryDeviceInfo ddi = action.getDeliveryDeviceInfo();
if (Objects.isNull(ddi)) {
ddi = new DeliveryDeviceInfo();
String channelStr = String.valueOf(action.getChannelId());
ddi.setMedia(MediaMapping.getMediaNameByChannelId(channelStr));
ddi.setAd_plan_id("0");
ddi.setAd_group_id("0");
ddi.setAd_creative_id("0");
ddi.setPlatform_id(String.valueOf(action.getPlatformId()));
ddi.setDj_channel(channelStr);
}
ddi.setOs(action.getClientInfo().getOs());
ddi.setIs_call(status.id);
ddi.setUserId(action.getUserId());
ddi.setActive_time(System.currentTimeMillis());
ddi.setChannel(String.valueOf(action.getChannelId()));
ddi.setUserId(action.getUserId());
String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = kafkaTemplate.send("ocpc_result", jsonString);
active_result.addCallback(
result -> log.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> log.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
);
}
public String getTotalCountKey(AttributionType attributionType, int platformId, int channelOrPlanId,String goodsId ,int sprDedu, String dateStr) {
return String.format("%s_total_%d_%d_%s_%d_%s", attributionType, platformId, channelOrPlanId,goodsId, sprDedu, dateStr);
}
public String getCallbackCountKey(AttributionType attributionType, int platformId, int channelOrPlanId,String goodsId, int sprDedu, String dateStr) {
return String.format("%s_callback_%d_%d_%s_%d_%s", attributionType, platformId, channelOrPlanId,goodsId, sprDedu, dateStr);
}
public String getFirstCheckerKey(StoreNovelAction action) {
return Objects.equals(action.getType(),0)
? String.format("fc_%d_%d_%s_%s", action.getUserId(), action.getPlatformId(),action.getMediaName(),action.getCurrentDateStr())
: String.format("fc_%d_%d_%s", action.getUserId(), action.getPlatformId(),action.getMediaName());
}
/******************************************** REDIS **********************************************************/
public boolean exists(String key) {
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
public void set(String key, int expires, String value) {
redisTemplate.opsForValue().set(key,value,expires, TimeUnit.SECONDS);
}
public <T> T get(Class<T> clazz, String key) {
String value = redisTemplate.opsForValue().get(key);
if(!Objects.isNull(value)){
return JSON.parseObject(value,clazz);
}
return null;
}
public long incrby(String key, int increment, int expireSecond) {
Long v = redisTemplate.opsForValue().increment(key,increment);
redisTemplate.expire(key,expireSecond, TimeUnit.SECONDS);
return Objects.isNull(v)?0: v;
}
public long incrby(String key, int increment) {
return redisTemplate.opsForValue().increment(key,increment);
}
/******************************************** JDBC *************************************************************/
@Cached(name="attribute_rule_store_novel", cacheType = CacheType.LOCAL)
@CacheRefresh(refresh = 300)
public AttributeRule getAttributeRuleByPlatformIdAndChannelId(int platformId,int channelId) {
return lwbyJdbcTemplate.queryForObject(String.format("select * from attribute_rule where channel_id=%d and platform_id=%d",channelId,platformId),AttributeRule.class);
}
/******************************************** INNER CLASS ******************************************************/
/**
* 媒体ID和名称关系映射
*/
static class MediaMapping{
static Map<String, String> mediaMap = ImmutableMap.<String, String>builder()
.put("11", "xiaomi")
.put("12", "huawei")
.put("13", "oppo")
.put("14", "vivo")
.put("15", "yingyongbao")
.put("16", "meizu")
.put("17", "360zhushou")
.put("18", "baiduzhushou")
.put("19", "other")
.put("21", "jinritoutiao")
.put("22", "guangdiantong")
.put("23", "kuaishou")
.put("24", "uc")
.put("25", "qutoutiao")
.put("28", "youdao")
.put("33", "dystore")
.build();
static String getMediaNameByChannelId(String channelId){
return mediaMap.get(channelId.substring(0, 2));
}
}
}
package com.lwby.marketing.att.storenovel.handle;
import com.lwby.marketing.att.novel.UniversalProcess;
import com.lwby.marketing.att.storenovel.UniversalStoreProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.StoreNovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component("checkerNovelStorefirst")
public class CheckerNovelStoreFirstFlow extends NodeFlow<StoreNovelAction> {
@Resource
UniversalStoreProcess up;
@Override
public void process(StoreNovelAction action) {
if(up.exists(up.getFirstCheckerKey(action))){
action.stop(true);
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.storenovel.handle;
import cn.hutool.crypto.SecureUtil;
import com.lwby.marketing.att.novel.DeviceType;
import com.lwby.marketing.att.novel.UniversalProcess;
import com.lwby.marketing.att.novel.media.Media;
import com.lwby.marketing.att.storenovel.media.StoreMedia;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.po.StoryLogin;
import com.lwby.marketing.util.DateTimUtils;
import com.lwby.marketing.util.NumberUtils;
import com.lwby.marketing.vo.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.UUID;
@Slf4j
@Component("setupStoreNovel")
public class ParameterSetupStoreNovelFlow extends NodeFlow<StoreNovelAction> {
@Resource
UniversalProcess up;
@Override
public void process(StoreNovelAction action) {
ClientInfo clientInfo = action.getClientInfo();
String openId = action.getOpenId();
//平台ID
Integer platformId = clientInfo.getPlatformId();
//VO对像
DeliveryDeviceInfo deliveryDeviceInfo = null;
//匹配OpenId
StoryLogin storyLogin = up.get(StoryLogin.class,assembleKey(openId));
action.setStoryLogin(storyLogin);
//NovelAction对像参数填充
action.setPlatformId(clientInfo.getPlatformId());
action.setUserId(action.getUserId());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
action.setChannelId(action.getClientInfo().getChannel());
//非商店吊起参数设置
if (Objects.nonNull(storyLogin)) {
deliveryDeviceInfo = new DeliveryDeviceInfo();
deliveryDeviceInfo.setAd_creative_id(storyLogin.getCreativeId());
deliveryDeviceInfo.setPlatform_id(String.valueOf(clientInfo.getPlatformId()));
deliveryDeviceInfo.setCallback_param(storyLogin.getClickId());
deliveryDeviceInfo.setUuid(UUID.randomUUID().toString());
deliveryDeviceInfo.setDj_channel(storyLogin.getChannel());
deliveryDeviceInfo.setPromotion_id(storyLogin.getAdid());
action.setChannelId(NumberUtils.parseInteger(storyLogin.getChannel()));
action.setPlanId(NumberUtils.parseInteger((storyLogin.getAdid())));
action.setMediaName(storyLogin.getMedia());
action.setMedia(StoreMedia.getMedia(action.getMediaName()));
}
}
public String assembleKey(String openId) {
return String.format("story:upload:%s", openId);
}
}
package com.lwby.marketing.att.storenovel.handle;
import com.alibaba.fastjson.JSON;
import com.lwby.marketing.att.novel.AttributionStatus;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.att.storenovel.UniversalStoreProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.AttributeRule;
import com.lwby.marketing.vo.StoreNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author songxinyu
* @version PaySpduFlow.java, v 0.1 2024年03月04日 18:46 songxinyu Exp $
*/
@Slf4j
@Component("payspdu")
public class PaySpduFlow extends NodeFlow<StoreNovelAction> {
@Resource
UniversalStoreProcess up;
@Override
public void process(StoreNovelAction action) {
process0(action, AttributionType.CHANNEL);
}
public void process0(StoreNovelAction action,AttributionType type) {
int platformId = action.getPlatformId();
int channelId = action.getChannelId();
AttributeRule attributeRule = up.getAttributeRuleByPlatformIdAndChannelId(platformId,channelId);
if (attributeRule == null) {
//直接回传,走回传流
log.info("PaySpduFlow process0 attribute is null,platformId={},channelId={}",action.getPlatformId(),action.getChannelId());
return;
}
String shaveMakeCall = attributeRule.getShaveMakeCall();
/**
* 金额扣量规则
* 商品id : 回传
* {
* "1" : 85,
* "2" : 90,
* "3" : 40,
* "4" : 100
* }
*/
Map<String, Integer> map = JSON.parseObject(shaveMakeCall, Map.class);
Integer sprDedu = 0;
String goodId = action.getGoodId();
for (Map.Entry<String, Integer> entry : map.entrySet()) {
String key = entry.getKey();
if (key.equals(goodId)) {
sprDedu = entry.getValue();
break;
}
}
if (sprDedu == 100) {
log.info("PaySpduFlow process0 attribute.sprDeduChannel 100,platformId={},channelId={}",action.getPlatformId(),action.getChannelId());
return;
}
//总数
String channelTotal = up.getTotalCountKey(type, action.getPlatformId(), action.getChannelId(),goodId, sprDedu, action.getCurrentDateStr());
//回传
String channelCallback = up.getCallbackCountKey(type, action.getPlatformId(), action.getChannelId(),goodId, sprDedu, action.getCurrentDateStr());
long channelTotalCount = up.incrby(channelTotal, 0, 60 * 60 * 24);
long channelCallbackCount = up.incrby(channelCallback, 0, 60 * 60 * 24);
up.incrby(channelTotal, 1);
if (isCallback(channelTotalCount, channelCallbackCount, sprDedu)) {
//回传,回传个数 + 1
up.incrby(channelCallback, 1);
log.info(
"PaySpduFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, sprDedu = {}, goodId = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
platformId, channelId, sprDedu, goodId, channelTotalCount, channelCallbackCount, 1);
} else {
up.notifyResult(action, AttributionStatus.NORMAL_DEDUCTION_CALLBACK);
up.set(up.getFirstCheckerKey(action),60 * 60 * 24,"1");
log.info(
"PaySpduFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, sprDedu = {}, goodId = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
platformId, channelId, sprDedu, goodId, channelTotalCount, channelCallbackCount, 0);
}
}
private boolean isCallback(long channelTotalCount, long channelCallbackCount, Integer sprDedu) {
if (channelTotalCount == 0) {
//首次随机
return ThreadLocalRandom.current().nextInt(1, 3) == 1;
}
//计算回传率
BigDecimal divide = new BigDecimal(channelCallbackCount).divide(new BigDecimal(channelTotalCount), 4, RoundingMode.HALF_UP);
//比较回传率和扣量比例,决定是否回传
double percent = (double) Math.round(sprDedu * 100 / 100.0) / 100;
return divide.compareTo(BigDecimal.valueOf(percent).setScale(4, RoundingMode.HALF_UP)) != 1;
}
}
package com.lwby.marketing.att.storenovel.handle;
import com.lwby.marketing.att.novel.AttributionStatus;
import com.lwby.marketing.att.novel.DeviceType;
import com.lwby.marketing.att.novel.UniversalProcess;
import com.lwby.marketing.att.storenovel.UniversalStoreProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.StoreNovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
@Component("storeNovelStore")
public class StoreNovelStoreAttributionFlow extends NodeFlow<StoreNovelAction> {
@Resource
UniversalStoreProcess up;
@Override
public void process(StoreNovelAction action) {
//商店归因通知
if(Objects.isNull(action.getDeliveryDeviceInfo())){
up.notifyResult(action, AttributionStatus.STORE_CALLBACK);
action.stop(true); //结束后面所有执行流程
}
}
}
\ No newline at end of file
package com.lwby.marketing.att.storenovel.handle;
import com.lwby.marketing.att.novel.AttributionStatus;
import com.lwby.marketing.att.storenovel.UniversalStoreProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.StoreNovelAction;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author songxinyu
* @version ActiveCallFlow.java, v 0.1 2024年03月04日 18:28 songxinyu Exp $
*/
@Component("uploadcall")
public class UploadCallFlow extends NodeFlow<StoreNovelAction> {
@Resource
UniversalStoreProcess up;
@Override
public void process(StoreNovelAction action) {
action.getMedia().callback(action);
up.notifyResult(action, AttributionStatus.ACTIVE_CALLBACK);
up.set(up.getFirstCheckerKey(action),60 * 60 * 24,"1"); //每天扣量过一次,不在重复走
return;
}
}
package com.lwby.marketing.att.storenovel.media;
import com.lwby.marketing.vo.StoreNovelAction;
/**
* @author songxinyu
* @version BaseStoreCallback.java, v 0.1 2024年03月04日 17:19 songxinyu Exp $
*/
public abstract class BaseStoreCallback implements IStoreCallback {
@Override
public boolean delivery(StoreNovelAction action) {
return call(action);
}
public abstract boolean call(StoreNovelAction na);
}
package com.lwby.marketing.att.storenovel.media;
import com.lwby.marketing.vo.StoreNovelAction;
/**
* @author songxinyu
* @version IStoreCallback.java, v 0.1 2024年03月04日 17:16 songxinyu Exp $
*/
public interface IStoreCallback {
boolean delivery(StoreNovelAction action);
}
package com.lwby.marketing.att.storenovel.media;
import com.lwby.marketing.att.storenovel.media.jrtt.JRTTActiveCallback;
import com.lwby.marketing.att.storenovel.media.jrtt.JRTTPayCallback;
import com.lwby.marketing.vo.StoreNovelAction;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public enum StoreMedia {
JRTT("jrtt","今日头条",(c)-> c == 0 ? new JRTTActiveCallback() : new JRTTPayCallback());
final String name;
final String desc;
final Function<Integer, IStoreCallback> fun;
final static Map<String, StoreMedia> mediaMap = Arrays.stream(StoreMedia.values())
.collect(Collectors.toMap(StoreMedia::name, Function.identity()));
StoreMedia(String name, String desc, Function<Integer,IStoreCallback> fun){
this.name = name;
this.desc = desc;
this.fun = fun;
}
public boolean callback(StoreNovelAction action){
return fun.apply(action.getType()).delivery(action);
}
public static StoreMedia getMedia(String mediaName){
return mediaMap.get(mediaName);
}
}
\ No newline at end of file
package com.lwby.marketing.att.storenovel.media.jrtt;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.att.novel.media.jrtt.dto.JrttAttributeRequest;
import com.lwby.marketing.att.storenovel.media.BaseStoreCallback;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoreNovelAction;
/**
* @author songxinyu
* @version JRTTActiveCallback.java, v 0.1 2024年03月04日 17:24 songxinyu Exp $
*/
public class JRTTActiveCallback extends BaseStoreCallback {
public static final String ATTRIBUTE_URL = "https://analytics.oceanengine.com/api/v2/conversion";
@Override
public boolean call(StoreNovelAction na) {
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
JrttAttributeRequest.Ad ad = new JrttAttributeRequest.Ad();
ad.setCallback(ddi.getCallback_param());
JrttAttributeRequest.Context context = new JrttAttributeRequest.Context();
context.setAd(ad);
JrttAttributeRequest request = JrttAttributeRequest.builder().context(context).event_type("active").timestamp(
System.currentTimeMillis()).build();
String userJson = JSONObject.toJSONString(request);
try {
String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
Integer resultCode = (Integer) JSON.parseObject(result).get("code");
return resultCode == 0;
} catch (Exception e) {
return false;
}
}
}
package com.lwby.marketing.att.storenovel.media.jrtt;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.att.novel.media.jrtt.dto.JrttAttributeRequest;
import com.lwby.marketing.att.storenovel.media.BaseStoreCallback;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoreNovelAction;
/**
* @author songxinyu
* @version JRTTPayCallback.java, v 0.1 2024年03月04日 17:25 songxinyu Exp $
*/
public class JRTTPayCallback extends BaseStoreCallback {
public static final String ATTRIBUTE_URL = "https://analytics.oceanengine.com/api/v2/conversion";
@Override
public boolean call(StoreNovelAction na) {
DeliveryDeviceInfo ddi = na.getDeliveryDeviceInfo();
JrttAttributeRequest.Ad ad = new JrttAttributeRequest.Ad();
ad.setCallback(ddi.getCallback_param());
JrttAttributeRequest.Context context = new JrttAttributeRequest.Context();
context.setAd(ad);
JrttAttributeRequest request = JrttAttributeRequest.builder().context(context).event_type("active_pay").timestamp(
System.currentTimeMillis()).build();
String userJson = JSONObject.toJSONString(request);
try {
String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
Integer resultCode = (Integer) JSON.parseObject(result).get("code");
return resultCode == 0;
} catch (Exception e) {
return false;
}
}
}
package com.lwby.marketing.att.storenovel.media.jrtt.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author songxinyu
* @version JrttAttributeRequest.java, v 0.1 2024年02月23日 17:05 songxinyu Exp $
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class JrttAttributeRequest {
/**
* {
* "event_type": "active",
* "context": {
* "ad": {
* "callback": "EPHk9cX3pv4CGJax4ZENKI7w4MDev_4C",
* }
* },
* "timestamp": 1604888786102
* }
*/
/**
* 回传的事件,例如”激活“、”付费“
*/
private String event_type;
/**
* 包含一些关键的上下文信息
*/
private Context context;
/**
* 事件发生的毫秒级时间戳
*/
private long timestamp;
@Data
public static class Context {
/**
* 包含一些关键的广告相关信息
*/
private Ad ad;
private Device device;
}
@Data
public static class Ad {
/**
* callback 字段有两个获取途径,对于监测链接归因的方式,需要从监测链接的__CALLBACK_PARAM__这个宏获取这个字段值;
*/
private String callback;
}
@Data
public static class Device {
/**
* 设备平台
*/
private String platform;
private String imei;
private String oaid;
private String idfa;
}
}
package com.lwby.marketing.controller;
import com.alibaba.fastjson2.JSONObject;
import com.lwby.marketing.att.NovelAttributionMain;
import com.lwby.marketing.vo.StoreNovelAction;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author songxinyu
* @version AttrController.java, v 0.1 2024年03月05日 17:30 songxinyu Exp $
*/
@RestController
@RequestMapping(value = "/attr")
public class AttrController {
@Autowired
NovelAttributionMain nm;
@RequestMapping("/testUpload")
public void testUpload() {
String msg = "{\"clientInfo\":{\"channel\":214120503,\"clientIp\":\"183.219.7.180\",\"pkv\":1,\"platformGroupId\":412,"
+ "\"platformId\":412,\"sessionid\":\"undefined\",\"ua\":\"Mozilla/5.0 (Linux; Android 12; OXF-AN00 Build/HUAWEIOXF-AN00;"
+ " wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/117.0.0.0 Mobile Safari/537.36 aweme.lite/28.9.0 "
+ "ToutiaoMicroApp/3.14.0 PluginVersion/28909073\",\"version\":\"2.0.0\",\"xClient\":\"version=2.0.0;platformId=412;"
+ "equipment=android\"},\"createTime\":1709533439693,\"extraData\":{},\"id\":\"93857260-8130-442f-bc92-b549dbf28ef0\","
+ "\"userId\":875325,\"userUploadEvent\":0,\"appId\":\"ttd3dda5604ce230b401\","
+ "\"openId\":\"_000HgDjWl-P5-WS9HTQIssNgTnMBRUqDHDu\"}";
StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class);
StoreNovelAction action = new StoreNovelAction(event.getClientInfo(),event.getUserId(),event.getOpenId(),event.getUserUploadEvent());
try {
nm.executorStoreNovelActive.execute(action);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
package com.lwby.marketing.po;
import lombok.Data;
@Data
public class StoryLogin {
private String adid;
private String creativeId;
private String creativeType;
private String clickId;
private String channel;
private String bookId;
private String media;
private String code;
private long clickTime;
}
package com.lwby.marketing.vo;
import lombok.Data;
import java.io.Serializable;
import java.util.Map;
/**
* Package com.lwby.delivery.web.domain
*
* @author songxinyu
* @version AttributeRule.java, v 0.1 2024年03月05日 14:09 songxinyu Exp $
*/
@Data
public class AttributeRule implements Serializable {
private static final long serialVersionUID = 1L;
/** $column.columnComment */
private Long id;
/** 合作方 */
private String cpname;
/** 合作方式 */
private String type;
/** $column.columnComment */
private String media;
private Integer platformId;
private String platformName;
/** 渠道名 */
private String channel;
/** 渠道号 */
private String channelId;
/** 短篇id */
private Long bookId;
/** 短篇名称 */
private String bookName;
private Long createUid;
private String createUname;
private Map<String, Integer> shaveMakeAll;
private Map<String, Integer> clickMakeAll;
private String shaveMake;
private String shaveMakeCall;
private String link;
}
package com.lwby.marketing.vo;
import com.lwby.marketing.att.novel.media.Media;
import com.lwby.marketing.att.storenovel.media.StoreMedia;
import com.lwby.marketing.flow.Action;
import com.lwby.marketing.po.StoryLogin;
import lombok.Data;
/**
* @author songxinyu
* @version StoreNovelAction.java, v 0.1 2024年03月04日 15:44 songxinyu Exp $
*/
@Data
public class StoreNovelAction extends Action {
ClientInfo clientInfo;
StoryLogin storyLogin;
DeliveryDeviceInfo deliveryDeviceInfo;
String openId;
int platformId;
long userId;
int channelId;
int planId;
String mediaName;
Integer type;
StoreMedia media;
String currentDateStr;
String goodId;
public StoreNovelAction(ClientInfo clientInfo,Long userId,String openId,Integer type){
this(clientInfo,null,userId,openId,type);
}
public StoreNovelAction(ClientInfo clientInfo, DeliveryDeviceInfo deliveryDeviceInfo,Long userId,String openId,Integer type){
this(clientInfo,null,userId,openId,type,null);
}
public StoreNovelAction(ClientInfo clientInfo, DeliveryDeviceInfo deliveryDeviceInfo,Long userId,String openId,Integer type,String goodId){
this.clientInfo = clientInfo;
this.deliveryDeviceInfo = deliveryDeviceInfo;
this.userId = userId;
this.openId = openId;
this.type = type;
this.goodId = goodId;
}
}
package com.lwby.marketing.vo;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class StoreUserUploadEventBO {
/**
* 唯一id
*/
String id;
/**
* 事件id
*/
Integer userUploadEvent;
String appId;
String openId;
String productId;
String productName;
Double value;
/**
* 客户端信息
*/
ClientInfo clientInfo;
Long userId;
/**
* 下单之后,不限制用户关注时间的首次充值
*/
String orderObject;
/**
* 扩展数据
*/
Map<String, Object> extraData = new HashMap<>();
Date createTime = new Date();
Double totalValue;
public StoreUserUploadEventBO() {
setId(UUID.randomUUID().toString());
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Integer getUserUploadEvent() {
return userUploadEvent;
}
public void setUserUploadEvent(Integer userUploadEvent) {
this.userUploadEvent = userUploadEvent;
}
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getOpenId() {
return openId;
}
public void setOpenId(String openId) {
this.openId = openId;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public Double getValue() {
return value;
}
public void setValue(Double value) {
this.value = value;
}
public ClientInfo getClientInfo() {
return clientInfo;
}
public void setClientInfo(ClientInfo clientInfo) {
this.clientInfo = clientInfo;
}
public Map<String, Object> getExtraData() {
return extraData;
}
public void setExtraData(Map<String, Object> extraData) {
this.extraData = extraData;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getOrderObject() {
return orderObject;
}
public void setOrderObject(String orderObject) {
this.orderObject = orderObject;
}
public Double getTotalValue() {
return totalValue;
}
public void setTotalValue(Double totalValue) {
this.totalValue = totalValue;
}
}
...@@ -31,4 +31,30 @@ spring: ...@@ -31,4 +31,30 @@ spring:
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 100 max-poll-records: 100
listener: listener:
ack-mode: RECORD ack-mode: RECORD
\ No newline at end of file
store:
novel:
active:
consumer:
topic: storeNovelActive
group:
id: store_dy_active
pay:
consumer:
topic: storeNovelpay
group:
id: store_dy_pay
novel:
active:
consumer:
topic: novelActive
group:
id: novel_dy_active
management:
health:
db:
enabled: false
\ No newline at end of file
spring: spring:
datasource: datasource:
lwby: lwby:
jdbc-url: jdbc:mysql://rm-2zem654n5267sl284.mysql.rds.aliyuncs.com/lwby_novel?zeroDateTimeBehavior=convertToNull&characterEncoding=utf8&autoReconnect=true jdbc-url: jdbc:mysql://rm-2zeo09186ukqa8zh1.mysql.rds.aliyuncs.com/lwby?zeroDateTimeBehavior=convertToNull&characterEncoding=utf8&autoReconnect=true
username: lwby_read username: lwby
password: TXlEjAy0zO2S password: VjxYfmY8J77ISChp
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 1 initialSize: 2
minIdle: 1 minIdle: 2
marketing: marketing:
jdbc-url: jdbc:mysql://rm-2ze1e2ykb87kj2592.mysql.rds.aliyuncs.com:3306/lwby_marketing_growth?zeroDateTimeBehavior=convertToNull&characterEncoding=utf8&autoReconnect=true jdbc-url: jdbc:mysql://rm-2zeo09186ukqa8zh1.mysql.rds.aliyuncs.com/lwby?zeroDateTimeBehavior=convertToNull&characterEncoding=utf8&autoReconnect=true
username: read01 username: lwby
password: read01!@# password: VjxYfmY8J77ISChp
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 1 initialSize: 2
minIdle: 1 minIdle: 2
redis: redis:
host: 127.0.0.1 host: 172.17.243.150
port: 6379 port: 6379
kafka: kafka:
bootstrap-servers: 172.17.255.143:9092 bootstrap-servers: 172.17.255.113:9092
producer: producer:
retries: 3 retries: 3
acks: 1 acks: 1
...@@ -31,4 +31,31 @@ spring: ...@@ -31,4 +31,31 @@ spring:
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500 max-poll-records: 500
listener: listener:
ack-mode: RECORD ack-mode: RECORD
\ No newline at end of file
store:
novel:
active:
consumer:
topic: testStoreNovelActive
group:
id: test_store_dy_active
pay:
consumer:
topic: testStoreNovelpay
group:
id: test_store_dy_pay
novel:
active:
consumer:
topic: testNovelActive
group:
id: test_novel_dy_active
management:
health:
db:
enabled: false
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment