Commit 451d76cd authored by 宋新宇's avatar 宋新宇

Merge remote-tracking branch 'origin/main'

# Conflicts:
#	src/main/java/com/lwby/marketing/att/bystory/handle/ParameterSetupStoryNovelFlow.java
parents 4be701c1 691087f6
package com.lwby.marketing.att.novel; package com.lwby.marketing.att;
public enum AttributionStatus { public enum AttributionStatus {
ACTIVE_CALLBACK(2,"激活回传"), ACTIVE_CALLBACK(2,"激活回传"),
......
package com.lwby.marketing.att.bystory; package com.lwby.marketing.att.bystory;
import com.lwby.marketing.att.novel.AttributionStatus; import com.lwby.marketing.att.AttributionStatus;
/** /**
* @author songxinyu * @author songxinyu
......
...@@ -23,7 +23,7 @@ public class DyStoryActiveConsumer implements MessageListener<String, String> { ...@@ -23,7 +23,7 @@ public class DyStoryActiveConsumer implements MessageListener<String, String> {
@Resource @Resource
DyStoryFlowExecutor dyStoryFlowExecutor; DyStoryFlowExecutor dyStoryFlowExecutor;
@KafkaListener(topics = {"${story.novel.active.consumer.topic:testStoryNovelActive}"},groupId = "${story.novel.active.consumer.group.id:test_story_dy_active}") @KafkaListener(topics = {"${system.consumer.dystory.active.topic}"},groupId = "${system.consumer.dystory.active.topic}")
@Override @Override
public void onMessage(ConsumerRecord<String, String> data) { public void onMessage(ConsumerRecord<String, String> data) {
......
...@@ -24,7 +24,7 @@ public class DyStoryPayConsumer extends DyStoryFlowExecutor implements MessageLi ...@@ -24,7 +24,7 @@ public class DyStoryPayConsumer extends DyStoryFlowExecutor implements MessageLi
@Resource @Resource
DyStoryFlowExecutor dyStoryFlowExecutor; DyStoryFlowExecutor dyStoryFlowExecutor;
@KafkaListener(topics = {"${story.novel.pay.consumer.topic:testStoryNovelpay}"},groupId = "${story.novel.pay.consumer.group.id:test_story_dy_pay}") @KafkaListener(topics = {"${system.consumer.dystory.pay.topic}"},groupId = "${system.consumer.dystory.pay.group_id}")
@Override @Override
public void onMessage(ConsumerRecord<String, String> data) { public void onMessage(ConsumerRecord<String, String> data) {
......
...@@ -5,7 +5,7 @@ import com.alicp.jetcache.anno.CacheRefresh; ...@@ -5,7 +5,7 @@ import com.alicp.jetcache.anno.CacheRefresh;
import com.alicp.jetcache.anno.CacheType; import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached; import com.alicp.jetcache.anno.Cached;
import com.lwby.marketing.att.UniversalProcess; import com.lwby.marketing.att.UniversalProcess;
import com.lwby.marketing.att.novel.AttributionStatus; import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.novel.AttributionType; import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.vo.DeliveryDeviceInfo; import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoryNovelAction; import com.lwby.marketing.vo.StoryNovelAction;
......
...@@ -41,7 +41,7 @@ public class ParameterSetupStoryNovelFlow extends NodeFlow<StoryNovelAction> { ...@@ -41,7 +41,7 @@ public class ParameterSetupStoryNovelFlow extends NodeFlow<StoryNovelAction> {
//StoryLogin storyLogin = up.get(StoryLogin.class,assembleKey(openId)); //StoryLogin storyLogin = up.get(StoryLogin.class,assembleKey(openId));
action.setStoryLogin(storyLogin); action.setStoryLogin(storyLogin);
//NovelAction对像参数填充 //StoryNovelAction对像参数填充
action.setPlatformId(platformId); action.setPlatformId(platformId);
action.setUserId(action.getUserId()); action.setUserId(action.getUserId());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString()); action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
......
package com.lwby.marketing.att.bystory.handle; package com.lwby.marketing.att.bystory.handle;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.lwby.marketing.att.novel.AttributionStatus; import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.novel.AttributionType; import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.att.bystory.DyStoryUniversalProcess; import com.lwby.marketing.att.bystory.DyStoryUniversalProcess;
import com.lwby.marketing.flow.NodeSwitchFlow; import com.lwby.marketing.flow.NodeSwitchFlow;
...@@ -27,18 +27,13 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -27,18 +27,13 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> {
@Resource @Resource
DyStoryUniversalProcess up; DyStoryUniversalProcess up;
@Override
public void process(StoryNovelAction action) {
process0(action, AttributionType.CHANNEL);
}
@Override @Override
public boolean checked(StoryNovelAction action) { public boolean checked(StoryNovelAction action) {
return action.getType() == 2; return action.getType() == 2;
} }
public void process0(StoryNovelAction action, AttributionType type) { @Override
public void process(StoryNovelAction action) {
int platformId = action.getPlatformId(); int platformId = action.getPlatformId();
int channelId = action.getChannelId(); int channelId = action.getChannelId();
String shaveMakeCall = up.getAttributeRuleByPlatformIdAndChannelId(platformId,channelId); String shaveMakeCall = up.getAttributeRuleByPlatformIdAndChannelId(platformId,channelId);
...@@ -77,9 +72,9 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -77,9 +72,9 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> {
} }
//总数 //总数
String channelTotal = up.getTotalCountKey(type, action.getPlatformId(), action.getChannelId(),goodId, sprDedu, action.getCurrentDateStr()); String channelTotal = up.getTotalCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(),goodId, sprDedu, action.getCurrentDateStr());
//回传 //回传
String channelCallback = up.getCallbackCountKey(type, action.getPlatformId(), action.getChannelId(),goodId, sprDedu, action.getCurrentDateStr()); String channelCallback = up.getCallbackCountKey(AttributionType.CHANNEL, action.getPlatformId(), action.getChannelId(),goodId, sprDedu, action.getCurrentDateStr());
long channelTotalCount = up.incrby(channelTotal, 0, 60 * 60 * 24); long channelTotalCount = up.incrby(channelTotal, 0, 60 * 60 * 24);
long channelCallbackCount = up.incrby(channelCallback, 0, 60 * 60 * 24); long channelCallbackCount = up.incrby(channelCallback, 0, 60 * 60 * 24);
...@@ -99,7 +94,6 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> { ...@@ -99,7 +94,6 @@ public class PaySpduFlow extends NodeSwitchFlow<StoryNovelAction> {
"PaySpduFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, sprDedu = {}, goodId = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}", "PaySpduFlow.process0.deduction doing dynamic, platformId = {}, channel = {}, sprDedu = {}, goodId = {}, channelTotalCount = {}, channelCallbackCount = {}, v = {}",
platformId, channelId, sprDedu, goodId, channelTotalCount, channelCallbackCount, 0); platformId, channelId, sprDedu, goodId, channelTotalCount, channelCallbackCount, 0);
} }
} }
private boolean isCallback(long channelTotalCount, long channelCallbackCount, Integer sprDedu) { private boolean isCallback(long channelTotalCount, long channelCallbackCount, Integer sprDedu) {
......
package com.lwby.marketing.att.bystory.handle; package com.lwby.marketing.att.bystory.handle;
import com.lwby.marketing.att.novel.AttributionStatus; import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.bystory.CallBackType; import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.bystory.DyStoryUniversalProcess; import com.lwby.marketing.att.bystory.DyStoryUniversalProcess;
import com.lwby.marketing.flow.NodeFlow; import com.lwby.marketing.flow.NodeFlow;
......
...@@ -20,15 +20,11 @@ public class UploadCallFlow extends NodeFlow<StoryNovelAction> { ...@@ -20,15 +20,11 @@ public class UploadCallFlow extends NodeFlow<StoryNovelAction> {
@Override @Override
public void process(StoryNovelAction action) { public void process(StoryNovelAction action) {
process0(action, CallBackType.getCallBackTypeByType(action.getType())); CallBackType type = CallBackType.getCallBackTypeByType(action.getType());
}
private void process0(StoryNovelAction action, CallBackType callBackTypeByType) {
boolean success = action.getMedia().notify(action); boolean success = action.getMedia().notify(action);
if (success) { if (success) {
up.notifyResult(action, callBackTypeByType.getTopic(), up.notifyResult(action, type.getTopic(),type.getStatus());
callBackTypeByType.getStatus());
up.set(up.getFirstCheckerKey(action),60 * 60 * 24,"1"); up.set(up.getFirstCheckerKey(action),60 * 60 * 24,"1");
} }
} }
......
...@@ -33,7 +33,7 @@ public class NovelActiveConsumer implements MessageListener<String, String> { ...@@ -33,7 +33,7 @@ public class NovelActiveConsumer implements MessageListener<String, String> {
executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("novel_setup").THEN("novel_prize").THEN("novel_store").THEN("novel_checkerfirst").SWITCH("novel_cross","novel_plan","novel_channel")); executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("novel_setup").THEN("novel_prize").THEN("novel_store").THEN("novel_checkerfirst").SWITCH("novel_cross","novel_plan","novel_channel"));
} }
@KafkaListener(topics = {"${novel.active.consumer.topic:testNovelActive}"},groupId = "${novel.active.consumer.group.id:test_novel_dy_active}") @KafkaListener(topics = {"${system.consumer.novel.active.topic}"},groupId = "${system.consumer.novel.active.group_id}")
@Override @Override
public void onMessage(ConsumerRecord<String, String> data) { public void onMessage(ConsumerRecord<String, String> data) {
......
package com.lwby.marketing.att.novel; package com.lwby.marketing.att.novel;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alicp.jetcache.anno.CacheRefresh; import com.lwby.marketing.att.AttributionStatus;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached;
import com.lwby.marketing.att.UniversalProcess; import com.lwby.marketing.att.UniversalProcess;
import com.lwby.marketing.vo.NovelAction; import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.DeliveryDeviceInfo; import com.lwby.marketing.vo.DeliveryDeviceInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
...@@ -46,7 +43,7 @@ public class NovelUniversalProcess extends UniversalProcess { ...@@ -46,7 +43,7 @@ public class NovelUniversalProcess extends UniversalProcess {
ddi.setDevice_status(exists(PRIZE_FLOW_PREFIX.concat(String.valueOf(ddi.getUserId()))) ? 2 : 0); ddi.setDevice_status(exists(PRIZE_FLOW_PREFIX.concat(String.valueOf(ddi.getUserId()))) ? 2 : 0);
String jsonString = JSONObject.toJSONString(ddi); String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = kafkaTemplate.send("ocpc_result_test", jsonString); ListenableFuture<SendResult<String, String>> active_result = kafkaTemplate.send("ocpc_result", jsonString);
active_result.addCallback( active_result.addCallback(
result -> log.info("归因成功[{}],归因类型[{}]", jsonString, status.desc), result -> log.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> log.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex) ex -> log.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
......
package com.lwby.marketing.att.novel.handle; package com.lwby.marketing.att.novel.handle;
import com.lwby.marketing.att.novel.AttributionStatus; import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.novel.AttributionType; import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.att.novel.NovelUniversalProcess; import com.lwby.marketing.att.novel.NovelUniversalProcess;
import com.lwby.marketing.flow.NodeSwitchFlow; import com.lwby.marketing.flow.NodeSwitchFlow;
......
...@@ -3,7 +3,7 @@ package com.lwby.marketing.att.novel.handle; ...@@ -3,7 +3,7 @@ package com.lwby.marketing.att.novel.handle;
import com.alicp.jetcache.anno.CacheRefresh; import com.alicp.jetcache.anno.CacheRefresh;
import com.alicp.jetcache.anno.CacheType; import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached; import com.alicp.jetcache.anno.Cached;
import com.lwby.marketing.att.novel.AttributionStatus; import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.novel.AttributionType; import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.att.novel.NovelUniversalProcess; import com.lwby.marketing.att.novel.NovelUniversalProcess;
import com.lwby.marketing.flow.NodeSwitchFlow; import com.lwby.marketing.flow.NodeSwitchFlow;
......
package com.lwby.marketing.att.novel.handle; package com.lwby.marketing.att.novel.handle;
import com.lwby.marketing.att.novel.AttributionStatus; import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.novel.DeviceType; import com.lwby.marketing.att.novel.DeviceType;
import com.lwby.marketing.att.novel.NovelUniversalProcess; import com.lwby.marketing.att.novel.NovelUniversalProcess;
import com.lwby.marketing.flow.NodeFlow; import com.lwby.marketing.flow.NodeFlow;
......
...@@ -38,26 +38,20 @@ bi: ...@@ -38,26 +38,20 @@ bi:
host: r-2zethzp7pjl3rjbelp.redis.rds.aliyuncs.com host: r-2zethzp7pjl3rjbelp.redis.rds.aliyuncs.com
port: 6379 port: 6379
story:
novel:
active:
consumer:
topic: storyNovelActive
group:
id: story_dy_active
pay:
consumer:
topic: storeNovelpay
group:
id: story_dy_pay
system:
novel: consumer:
active: dystory:
consumer: active:
topic: novelActive topic: storyNovelActive
group: group_id: story_dy_active
id: novel_dy_active pay:
topic: storeNovelpay
group_id: story_dy_pay
novel:
active:
topic: novelActive
group_id: novel_dy_active
management: management:
health: health:
......
...@@ -38,26 +38,19 @@ bi: ...@@ -38,26 +38,19 @@ bi:
host: 172.17.243.150 host: 172.17.243.150
port: 6379 port: 6379
story: system:
novel: consumer:
active: dystory:
consumer: active:
topic: testStoryNovelActive topic: testStoryNovelActive
group: group_id: test_story_dy_active
id: test_story_dy_active pay:
pay:
consumer:
topic: testStoryNovelpay topic: testStoryNovelpay
group: group_id: test_story_dy_pay
id: test_story_dy_pay novel:
active:
topic: testNovelActive
novel: group_id: test_novel_dy_active
active:
consumer:
topic: testNovelActive
group:
id: test_novel_dy_active
management: management:
health: health:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment