Commit 918a7a1d authored by 宋新宇's avatar 宋新宇

短剧APP归因

parent b1240e45
......@@ -27,6 +27,9 @@ public class UniversalProcess {
@Resource
public JdbcTemplate videoJdbcTemplate;
@Resource
public JdbcTemplate videoInlandJdbcTemplate;
@Resource(name = "novelKafka")
public KafkaTemplate<String, String> novelKafkaTemplate;
......@@ -51,25 +54,12 @@ public class UniversalProcess {
redisTemplate.opsForValue().set(key,value,expires, TimeUnit.SECONDS);
}
public void del(String key) {
redisTemplate.delete(key);
}
public void hset(String key, String field, int expire, String value) {
redisTemplate.opsForHash().put(key,field,value);
redisTemplate.expire(key,expire,TimeUnit.SECONDS);
}
public <T> Map<String,T> hgetAll(Class<T> clazz,String key) {
Map<Object, Object> json = redisTemplate.opsForHash().entries(key);
Map<String, T> map = new HashMap<String, T>();
for (Map.Entry<Object, Object> item : json.entrySet()) {
T t = JSON.parseObject(item.getValue().toString(), clazz);
map.put(item.getKey().toString(), t);
}
return map;
}
public <T> T get(Class<T> clazz, String key) {
String value = redisTemplate.opsForValue().get(key);
......
......@@ -46,7 +46,7 @@ public class ParameterSetupStoryNovelFlow extends NodeFlow<StoryNovelAction> {
action.setUserId(action.getUserId());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
if (CallBackType.active.getType().equals(0)) {
if (CallBackType.active.getType().equals(action.getType())) {
action.setChannelId((long)action.getClientInfo().getChannel());
}
......
......@@ -10,6 +10,7 @@ import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.UniversalProcess;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.po.ThirdAccountDy;
import com.lwby.marketing.vo.AppChannelVO;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
import com.lwby.marketing.vo.StoryNovelAction;
......@@ -17,15 +18,18 @@ import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
......@@ -35,6 +39,9 @@ public class DyVideoUniversalProcess extends UniversalProcess {
private static final Logger DYVIDEO_ERROR_LOG = LoggerFactory.getLogger("dyvideo.error");
@Resource
private RedisTemplate<String,String> oldMarketRedisTemplate;
/**
* 通知处理结果
*/
......@@ -67,6 +74,25 @@ public class DyVideoUniversalProcess extends UniversalProcess {
);
}
@Override
public <T> T get(Class<T> clazz, String key) {
String value = oldMarketRedisTemplate.opsForValue().get(key);
if(!Objects.isNull(value)){
return JSON.parseObject(value,clazz);
}
return null;
}
public void setToken(String key, int expires, String value) {
oldMarketRedisTemplate.opsForValue().set(key,value,expires, TimeUnit.SECONDS);
}
public void delToken(String key) {
oldMarketRedisTemplate.delete(key);
}
public String getTotalCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId ,int sprDedu, String dateStr) {
return String.format("%s_total_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
......@@ -81,23 +107,29 @@ public class DyVideoUniversalProcess extends UniversalProcess {
: String.format("fc_%d_%d_%s", action.getUserId(), action.getPlatformId(),action.getMediaName());
}
public String getDyAccessToken(String client_id, String client_secret,String dyTokenUrl) {
public ThirdAccountDy getDyAccessToken(String client_id, String client_secret,String dyTokenUrl) {
Map<String,String> upMap = new HashMap<>();
upMap.put("client_key",client_id);
upMap.put("client_secret",client_secret);
upMap.put("grant_type","client_credential");
String mapAction = JSONObject.toJSONString(upMap);
String accessToken = "";
ThirdAccountDy thirdAccountDy = null;
try {
String result = HttpUtil.post(dyTokenUrl, mapAction);
Map data = (Map) JSON.parseObject(result).get("data");
Integer resultCode = (Integer) data.get("error_code");
String accessToken = "";
if (resultCode == 0) {
//成功
accessToken = (String) data.get("access_token");
Integer expiresIn = (Integer) data.get("expires_in");
thirdAccountDy = new ThirdAccountDy();
thirdAccountDy.setAccessToken(accessToken);
thirdAccountDy.setExpireIn(expiresIn);
thirdAccountDy.setClientId(client_id);
String tokenDy = "token_dy_" + client_id;
set(tokenDy, Integer.parseInt(String.valueOf(expiresIn)),accessToken);
//往老的market缓存写,防止token一直失效
setToken(tokenDy, Integer.parseInt(String.valueOf(expiresIn)),JSON.toJSONString(thirdAccountDy));
} else {
log.warn("dy_access_token_error,code={},resultdy={}", resultCode, JSONObject.toJSONString(result));
}
......@@ -105,7 +137,7 @@ public class DyVideoUniversalProcess extends UniversalProcess {
e.printStackTrace();
}
return accessToken;
return thirdAccountDy;
}
......@@ -117,7 +149,7 @@ public class DyVideoUniversalProcess extends UniversalProcess {
try {
RowMapper<AppChannelVO> rowMapper = BeanPropertyRowMapper.newInstance(AppChannelVO.class);
return lwbyJdbcTemplate.queryForObject(String.format("select id,ecpm_avg_count,motivation_count,ecpm_per_count,spr_dedu "
return videoJdbcTemplate.queryForObject(String.format("select id,ecpm_avg_count,motivation_count,ecpm_per_count,spr_dedu "
+ "from app_channel where channel_id=%d and platform_id=%d",channelId,platformId),rowMapper);
} catch (EmptyResultDataAccessException e) {
return null;
......
......@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeSwitchFlow;
import com.lwby.marketing.po.ThirdAccountDy;
import com.lwby.marketing.po.VideoUpload;
import com.lwby.marketing.util.CacheKeyUtils;
import com.lwby.marketing.util.HttpUtil;
......@@ -107,14 +108,14 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
dyMarketPlatformAppIdList = Arrays.asList(dyMarketPlatformAppId.split("_"));
String tokenDy = "token_dy_" + dyMarketPlatformAppIdList.get(0);
String accessToken = null;
ThirdAccountDy thirdAccountDy = null;
if (up.exists(tokenDy)) {
accessToken = up.get(String.class, tokenDy);
thirdAccountDy = up.get(ThirdAccountDy.class, tokenDy);
} else {
accessToken = up.getDyAccessToken(dyMarketPlatformAppIdList.get(0), dyMarketPlatformAppIdList.get(1),
Objects.equals(env,"prod") ? tokenDyProdUrl : tokenDyDevUrl);
thirdAccountDy = up.getDyAccessToken(dyMarketPlatformAppIdList.get(0), dyMarketPlatformAppIdList.get(1),
Objects.equals(env.getActiveProfiles()[0],"prod") ? tokenDyProdUrl : tokenDyDevUrl);
}
if (accessToken != null) {
if (thirdAccountDy != null) {
//平均ecpm次数
Integer ecpmAvgCount = null;
//每次ecpm次数
......@@ -126,6 +127,7 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
LocalDateTime now = LocalDateTime.now(); // 获取当前日期时间
LocalTime six = LocalTime.of(6, 0, 0); // 获取当天6点的时间
String accessToken = thirdAccountDy.getAccessToken();
Date date = new Date();
Map<String, Integer> resultCountMap = null;
......@@ -293,8 +295,8 @@ public class DyvideoBehaviorFlow extends NodeSwitchFlow<StoryNovelAction> {
DYVIDEO_SYS_LOG.info("DouyinBehaviorKafkaConsumer.douyin code error,userId={},result={}",userId,JSON.toJSONString(result));
if (resultCode == 28001008) {
//删除授权过期token
up.del(tokenDy);
up.getDyAccessToken(dyMarketPlatformAppIdList.get(0), dyMarketPlatformAppIdList.get(1), Objects.equals(env,"prod") ? tokenDyProdUrl : tokenDyDevUrl);
up.delToken(tokenDy);
up.getDyAccessToken(dyMarketPlatformAppIdList.get(0), dyMarketPlatformAppIdList.get(1), Objects.equals(env.getActiveProfiles()[0],"prod") ? tokenDyProdUrl : tokenDyDevUrl);
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow.douyin code auth token expired,userId={},result={}",userId,JSON.toJSONString(result));
}
return null;
......
package com.lwby.marketing.att.dyvideo.handle;
import com.alibaba.fastjson.JSONObject;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.dyvideo.DyVideoUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
......@@ -34,12 +35,11 @@ public class ParameterSetupDyVideoFlow extends NodeFlow<StoryNovelAction> {
//VO对像
DeliveryDeviceInfo deliveryDeviceInfo = null;
//String s = "{\"creativeId\":\"112331\",\"creativeType\":\"3\",\"adid\":\"12321\","
// + "\"clickId\":\"12312143232\",\"channel\":\"216011231\",\"bookId\":\"4322111\","
// + "\"media\":\"jrtt\",\"clickTime\":123123123,\"code\":\"12ede3e231\"}";;
String s = "{\"activeTime\":1711439536879,\"adid\":\"7347520002541764646\",\"channel\":\"214122006\",\"clickId\":\"B.1SLXpxqLP8EXNpAqlfUjsQvppp7bkeHT9LpYmfnxMwxH61cZtBGopyXg3glnl8880WYw3XUYfeh21oLLnT9ovSPFtvFp5faNJQbetTFwBfZiesPw0IpoWF2GtLeJW66Bf8vMnUilTWN7sUlexUfVKQiAefYso0MEqhkcUJhpqzz21B\",\"clickTime\":1711394346686,\"code\":\"FN1za8huKsDQ3TyZYhtyzFgjFTDt-ddTEAzYFwrEpYGMyF6NBImpXtzv9ZTZVnoaI6nSyHU4hQ0kjnwNFsXwLV02x0cpIiod-3L9G35DyTOh_K5LQd6Nvs2FQ20\",\"creativeId\":\"1793848259919929\",\"creativeType\":\"15\",\"media\":\"jrtt\",\"videoResourceId\":\"100013\"}";
//StoryLogin storyLogin = JSONObject.parseObject(s,StoryLogin.class);
//匹配OpenId
VideoUpload videoUpload = up.get(VideoUpload.class,assembleKey(openId));
VideoUpload videoUpload = JSONObject.parseObject(s,VideoUpload.class);
// VideoUpload videoUpload = up.get(VideoUpload.class,assembleKey(openId));
action.setVideoUpload(videoUpload);
//StoryNovelAction对像参数填充
......@@ -47,7 +47,7 @@ public class ParameterSetupDyVideoFlow extends NodeFlow<StoryNovelAction> {
action.setUserId(action.getUserId());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
if (CallBackType.active.getType().equals(0)) {
if (CallBackType.active.getType().equals(action.getType())) {
action.setChannelId((long)action.getClientInfo().getChannel());
}
......@@ -71,6 +71,6 @@ public class ParameterSetupDyVideoFlow extends NodeFlow<StoryNovelAction> {
}
public String assembleKey(String openId) {
return String.format("story:upload:%s", openId);
return String.format("video:upload:%s", openId);
}
}
......@@ -5,6 +5,7 @@ import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
......@@ -26,19 +27,12 @@ import java.util.List;
public class VideoAppActiveConsumer {
@Resource
ApplicationContext ctx;
FlowExecutor<NovelAction> executorNovel;
VideoAppFlowExecutor videoAppFlowExecutor;
private static final Logger VIDEO_SYS_LOG = LoggerFactory.getLogger("videoapp.sys");
private static final Logger VIDEO_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@PostConstruct
public void init(){
executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("novel_setup").THEN("novel_prize").THEN("novel_store").THEN("novel_checkerfirst").SWITCH("novel_cross","novel_plan","novel_channel"));
}
@KafkaListener(topics = {"${system.consumer.videoapp.active.topic}"} ,containerFactory = "kafkaListenerContainerFactoryVideoappActive")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
......@@ -57,12 +51,12 @@ public class VideoAppActiveConsumer {
VIDEO_SYS_LOG.info("media.active.onMessage start, data={}", data == null ? null : data.value());
BookStoreEvent event = JSON.parseObject(data.value(), BookStoreEvent.class);
StoreUserUploadEventBO event = JSON.parseObject(data.value(), StoreUserUploadEventBO.class);
VIDEO_SYS_LOG.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
NovelAction action = new NovelAction(event.getClientInfo(), data.value());
NovelAction action = new NovelAction(event.getClientInfo(), data.value(),event.getUserUploadEvent());
executorNovel.execute(action);
videoAppFlowExecutor.getExecutorByStory().execute(action);
} catch (Throwable e) {
VIDEO_ERROR_LOG.info("media.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
......
......@@ -5,6 +5,7 @@ import com.lwby.marketing.flow.FlowExecutor;
import com.lwby.marketing.flow.Rule;
import com.lwby.marketing.vo.BookStoreEvent;
import com.lwby.marketing.vo.NovelAction;
import com.lwby.marketing.vo.StoreUserUploadEventBO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
......@@ -26,19 +27,12 @@ import java.util.List;
public class VideoAppBehavoirConsumer {
@Resource
ApplicationContext ctx;
FlowExecutor<NovelAction> executorNovel;
VideoAppFlowExecutor videoAppFlowExecutor;
private static final Logger VIDEO_SYS_LOG = LoggerFactory.getLogger("videoapp.sys");
private static final Logger VIDEO_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@PostConstruct
public void init(){
executorNovel = new FlowExecutor<>(ctx, Rule.create().THEN("novel_setup").THEN("novel_prize").THEN("novel_store").THEN("novel_checkerfirst").SWITCH("novel_cross","novel_plan","novel_channel"));
}
@KafkaListener(topics = {"${system.consumer.videoapp.behavoir.topic}"} ,containerFactory = "kafkaListenerContainerFactoryVideoappBehavoir")
public void onMessage(List<ConsumerRecord<String, String>> datas) {
......@@ -57,12 +51,12 @@ public class VideoAppBehavoirConsumer {
VIDEO_SYS_LOG.info("media.active.onMessage start, data={}", data == null ? null : data.value());
BookStoreEvent event = JSON.parseObject(data.value(), BookStoreEvent.class);
StoreUserUploadEventBO event = JSON.parseObject(data.value(), StoreUserUploadEventBO.class);
VIDEO_SYS_LOG.info("media.active.topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
NovelAction action = new NovelAction(event.getClientInfo(), data.value());
NovelAction action = new NovelAction(event.getClientInfo(), data.value(),event.getUserUploadEvent());
executorNovel.execute(action);
videoAppFlowExecutor.getExecutorByStory().execute(action);
} catch (Throwable e) {
VIDEO_ERROR_LOG.info("media.active.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
......
......@@ -15,12 +15,14 @@ import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
......@@ -33,6 +35,9 @@ public class VideoAppUniversalProcess extends UniversalProcess {
private static final Logger VIDEOAPP_ERROR_LOG = LoggerFactory.getLogger("videoapp.error");
@Resource
private RedisTemplate<String,String> oldMarketRedisTemplate;
/**
* 通知处理结果
*/
......@@ -64,6 +69,25 @@ public class VideoAppUniversalProcess extends UniversalProcess {
);
}
public <T> T getOldMarket(Class<T> clazz, String key) {
String value = oldMarketRedisTemplate.opsForValue().get(key);
if(!Objects.isNull(value)){
return JSON.parseObject(value,clazz);
}
return null;
}
public <T> Map<String,T> hgetAllOldMarket(Class<T> clazz,String key) {
Map<Object, Object> json = oldMarketRedisTemplate.opsForHash().entries(key);
Map<String, T> map = new HashMap<String, T>();
for (Map.Entry<Object, Object> item : json.entrySet()) {
T t = JSON.parseObject(item.getValue().toString(), clazz);
map.put(item.getKey().toString(), t);
}
return map;
}
public String getTotalCountKey(AttributionType attributionType, int platformId, Long channelOrPlanId ,int sprDedu, String dateStr) {
return String.format("%s_total_%d_%d_%d_%s", attributionType, platformId, channelOrPlanId, sprDedu, dateStr);
}
......@@ -91,7 +115,7 @@ public class VideoAppUniversalProcess extends UniversalProcess {
try {
RowMapper<AppChannelVO> rowMapper = BeanPropertyRowMapper.newInstance(AppChannelVO.class);
return lwbyJdbcTemplate.queryForObject(String.format("select id,video_count,mv_count,cpm_count,ecpm_avg_count,motivation_count,arpu_count,ecpm_per_count"
return videoInlandJdbcTemplate.queryForObject(String.format("select id,video_count,mv_count,cpm_count,ecpm_avg_count,motivation_count,arpu_count,ecpm_per_count"
+ " from app_channel where channel_id=%d and platform_id=%d",channelId,platformId),rowMapper);
} catch (EmptyResultDataAccessException e) {
return null;
......
......@@ -34,7 +34,7 @@ public class ParameterSetupVideoAppFlow extends NodeFlow<NovelAction> {
ClientInfo clientInfo = action.getClientInfo();
//获取用户注册时间
UserProfile userProfile = clientInfo.getUser();
if (userProfile == null || userProfile.getRegistrationDate() == null) {
if (userProfile == null || userProfile.getCreateDate() == null) {
VIDEOAPP_ERROR_LOG.error("ClientInfo.userProfile 对像为[{}],ClientInfo对像[{}]",userProfile, clientInfo);
action.stop(true);
return;
......@@ -50,7 +50,7 @@ public class ParameterSetupVideoAppFlow extends NodeFlow<NovelAction> {
//匹配设备ID
for(DeviceVideoType type: DeviceVideoType.values()){
deviceIdKey = type.getDeviceId(clientInfo);
if(null != deviceIdKey && null != (deliveryDeviceInfo = up.get(DeliveryDeviceInfo.class,assembleKey(deviceIdKey,platformId)))){
if(null != deviceIdKey && null != (deliveryDeviceInfo = up.getOldMarket(DeliveryDeviceInfo.class,assembleKey(deviceIdKey,platformId)))){
clientInfo.setOaid(deliveryDeviceInfo.getOaid()); //回补OAID
action.setDeliveryDeviceInfo(deliveryDeviceInfo);
action.setDeviceVideoType(type);
......@@ -63,7 +63,7 @@ public class ParameterSetupVideoAppFlow extends NodeFlow<NovelAction> {
action.setDeviceId(clientInfo.getDID());
action.setCurrentDateStr(DateTimUtils.getCurrentDateString());
if (CallBackType.active.getType().equals(0)) {
if (CallBackType.active.getType().equals(action.getType())) {
action.setChannelId((long)action.getClientInfo().getChannel());
}
......
package com.lwby.marketing.att.videoapp.handle;
import com.lwby.marketing.att.AttributionStatus;
import com.lwby.marketing.att.bystory.CallBackType;
import com.lwby.marketing.att.videoapp.VideoAppUniversalProcess;
import com.lwby.marketing.flow.NodeFlow;
import com.lwby.marketing.vo.NovelAction;
......@@ -18,10 +19,12 @@ public class StoreVideoAppAttributionFlow extends NodeFlow<NovelAction> {
public void process(NovelAction action) {
//商店归因通知
if(Objects.isNull(action.getDeliveryDeviceInfo())){
//商店判断30天活跃,不在活跃天数内,发BI
String aliveDeviceKey = up.buildKey(action.getPlatformId(), action.getDeviceId());
if (!up.existsIsAlive(aliveDeviceKey)) {
up.notifyResult(action,"ocpc_result_test", AttributionStatus.STORE_CALLBACK);
if (action.getType().equals(CallBackType.active.getType())) {
//商店判断30天活跃,不在活跃天数内,发BI
String aliveDeviceKey = up.buildKey(action.getPlatformId(), action.getDeviceId());
if (!up.existsIsAlive(aliveDeviceKey)) {
up.notifyResult(action,"ocpc_result_test", AttributionStatus.STORE_CALLBACK);
}
}
action.stop(true); //结束后面所有执行流程
}
......
......@@ -85,7 +85,7 @@ public class VideoAppBehaviorFlow extends NodeSwitchFlow<NovelAction> {
Double arpuCount = null ;
String upcBehaviorKey = CacheKeyUtils.getBehavoirKey(userId);
if (up.exists(upcBehaviorKey)) {
Map<String, Integer> behaviorMap = up.hgetAll(Integer.class, upcBehaviorKey);
Map<String, Integer> behaviorMap = up.hgetAllOldMarket(Integer.class, upcBehaviorKey);
for (Map.Entry<String,Integer> entry : behaviorMap.entrySet()) {
switch (entry.getKey()) {
case "tc": tcpmCount = entry.getValue()/100;break;
......
package com.lwby.marketing.conf;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class OldMarketRedisConfig {
@Value("${old.market.redis.host}")
private String hostName;
@Value("${old.market.redis.port}")
private int hostPort;
@Bean("oldMarketJedisFactory")
public JedisConnectionFactory jedisConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName(hostName);
config.setPort(hostPort);
return new JedisConnectionFactory(config);
}
@Bean("oldMarketRedisTemplate")
public RedisTemplate<Object, Object> redisTemplate(@Qualifier("oldMarketJedisFactory") RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(stringRedisSerializer);
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
\ No newline at end of file
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.dyvideo.DyVideoActiveConsumer;
import com.lwby.marketing.att.novel.NovelActiveConsumer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.dyvideo.DyVideoBehavoirConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.bystory.DyStoryActiveConsumer;
import com.lwby.marketing.att.novel.NovelActiveConsumer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.novel.NovelActiveConsumer;
import com.lwby.marketing.att.novel.NovelActiveStoreConsumer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.bystory.DyStoryActiveConsumer;
import com.lwby.marketing.att.bystory.DyStoryPayConsumer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.bystory.DyStoryPayConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.videoapp.VideoAppActiveConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
......
package com.lwby.marketing.conf;
package com.lwby.marketing.conf.kafka;
import com.lwby.marketing.att.videoapp.VideoAppActiveConsumer;
import com.lwby.marketing.att.videoapp.VideoAppBehavoirConsumer;
......
......@@ -46,6 +46,9 @@ public class AttrController {
@Resource(name = "storyKafka")
private KafkaTemplate<String, String> storyKafkaTemplate;
@Resource(name = "novelKafka")
public KafkaTemplate<String, String> novelKafkaTemplate;
@Resource
DyStoryFlowExecutor dyStoryFlowExecutor;
......@@ -218,4 +221,59 @@ public class AttrController {
return ls.stream()
.collect(Collectors.toMap(CrossCallback::getMediaName, CrossPlatformAccount::new));
}
@RequestMapping("/sendDyVideo")
public void testSendDyVideo() {
String msg = "{\"clientInfo\":{\"channel\":214122006,\"clientIp\":\"125.45.110.52\",\"pkv\":1,\"platformGroupId\":412,"
+ "\"platformId\":412,\"sessionid\":\"undefined\",\"ua\":\"Mozilla/5.0 (Linux; Android 12; PEHM00 Build/SKQ1.210216.001; "
+ "wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/117.0.0.0 Mobile Safari/537.36 aweme.lite/29.2.0 "
+ "ToutiaoMicroApp/3.19.0 PluginVersion/29209076\",\"version\":\"2.0.0\",\"xClient\":\"version=2.0.0;platformId=412;"
+ "equipment=android\"},\"createTime\":1711394346711,\"extraData\":{},\"id\":\"ec792ce2-f448-4daa-b5b9-582e1f1e1bf7\","
+ "\"userId\":913286,\"userUploadEvent\":3,\"wechatAppId\":\"ttd3dda5604ce230b401\","
+ "\"wechatOpenId\":\"_000GI00qFjk0oFW9iYwBmDTC-K257VHXUVL\"}";
StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class);
novelKafkaTemplate.send("testDouyinRegisterTopic",JSON.toJSONString(event)).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
log.info("dyvideo注册发送消息成功: topic={},partition={},offset={}", topic, partition, offset);
}, failure -> {
log.error("dyvideo注册发送消息失败:fail={}", JSON.toJSONString(failure));
});
}
@RequestMapping("/sendVideoApp")
public void testSendVideoapp() {
String msg = "{\"clientInfo\":{\"channel\":682000,\"clientIp\":\"183.226.74.112\","
+ "\"dID\":\"2705A240-1875-4C38-9529-0343B56B6C49\",\"fixVersion\":4,\"idfa\":\"00000000-0000-0000-0000-000000000000\","
+ "\"language\":\"zh\",\"mainVersion\":1,\"oaid\":\"2705A240-1875-4C38-9529-0343B56B6C49\",\"os\":\"1\","
+ "\"phoneModel\":\"iPhone14,2\",\"pkv\":0,\"platformGroupId\":68,\"platformId\":68,\"pm\":\"iPhone14,2\","
+ "\"screenSize\":\"390*844\",\"subVersion\":0,\"systemVersion\":\"17.3.1\",\"ua\":\"Mozilla/5.0 (iPhone; CPU iPhone OS "
+ "17_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148\","
+ "\"user\":{\"avatarUrl\":\"https://lwby-video-res.oss-accelerate.aliyuncs.com/avatarUrl/1673322782333.png\","
+ "\"createDate\":1711440818292,\"deviceId\":\"2705A240-1875-4C38-9529-0343B56B6C49\",\"goldCoinNum\":0,\"id\":553262,"
+ "\"mainversion\":1,\"nickname\":\"游客545250\",\"platform\":68,\"subversion\":0,\"userStatus\":1},\"version\":\"68.1.0.4"
+ ".682000\",\"xClient\":\"sv=17.3.1;pm=iPhone14,2;ss=390*844;version=68.1.0.4.682000;pkv=0;platformId=68;language=zh;"
+ "dID=2705A240-1875-4C38-9529-0343B56B6C49;idfa=00000000-0000-0000-0000-000000000000;os=1\"},"
+ "\"createTime\":1711440818298,\"extraData\":{},\"id\":\"61e3c546-dee4-4cfb-9430-a50cdc63b732\",\"userUploadEvent\":0}";
StoreUserUploadEventBO event = JSONObject.parseObject(msg, StoreUserUploadEventBO.class);
novelKafkaTemplate.send("testFreeVideoEvent",JSON.toJSONString(event)).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
log.info("dyvideo注册发送消息成功: topic={},partition={},offset={}", topic, partition, offset);
}, failure -> {
log.error("dyvideo注册发送消息失败:fail={}", JSON.toJSONString(failure));
});
}
}
......@@ -46,10 +46,12 @@ public class DyVideoJRTTNotify extends DYNotify {
String userJson = JSONObject.toJSONString(request);
try {
String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
Integer resultCode = (Integer) JSON.parseObject(result).get("code");
DYVIDEO_SYS_LOG.info("DyVideoJRTTNotify.video.{}.upload,result={},userId={},channel={},platform={}",eventType,JSON.toJSONString(result),na.getUserId(),na.getChannelId(),na.getPlatformId());
return resultCode == 0;
//TODO;测试不执行
//String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
//Integer resultCode = (Integer) JSON.parseObject(result).get("code");
//return resultCode == 0;
DYVIDEO_SYS_LOG.info("DyVideoJRTTNotify.video.{}.upload,userId={},channel={},platform={}",eventType,na.getUserId(),na.getChannelId(),na.getPlatformId());
return true;
} catch (Exception e) {
return false;
}
......
......@@ -47,7 +47,7 @@ public class VideoAppJRTTNotify extends MobileNotify {
// String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
// Integer resultCode = (Integer) JSON.parseObject(result).get("code");
// return resultCode == 0;
log.info("Callback info:{}",userJson);
log.info("VideoAppJRTTNotify android Callback info:{}",userJson);
return true;
} catch (Exception e) {
return false;
......@@ -78,7 +78,7 @@ public class VideoAppJRTTNotify extends MobileNotify {
// String result = HttpUtil.post(ATTRIBUTE_URL, userJson);
// Integer resultCode = (Integer) JSON.parseObject(result).get("code");
// return resultCode == 0;
log.info("Callback info:{}",userJson);
log.info("VideoAppJRTTNotify ios Callback info:{}",userJson);
return true;
} catch (Exception e) {
return false;
......
package com.lwby.marketing.po;
import lombok.Data;
import java.io.Serializable;
/**
* @author songxinyu
* @version ThirdAccountDy.java, v 0.1 2024年01月30日 15:30 songxinyu Exp $
*/
@Data
public class ThirdAccountDy implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键ID
*/
private Integer id;
private String clientId;
private Integer platformId;
private String accessToken;
private Integer expireIn;
}
......@@ -32,7 +32,11 @@ public class NovelAction extends Action {
public NovelAction(ClientInfo clientInfo,String msg){
this(clientInfo,null,msg);
this(clientInfo,null,msg);
}
public NovelAction(ClientInfo clientInfo,String msg,Integer type){
this(clientInfo,null,msg,type);
}
public NovelAction(ClientInfo clientInfo, DeliveryDeviceInfo deliveryDeviceInfo) {
......@@ -44,4 +48,11 @@ public class NovelAction extends Action {
this.deliveryDeviceInfo = deliveryDeviceInfo;
this.body = msg;
}
public NovelAction(ClientInfo clientInfo, DeliveryDeviceInfo deliveryDeviceInfo,String msg,Integer type){
this.clientInfo = clientInfo;
this.deliveryDeviceInfo = deliveryDeviceInfo;
this.body = msg;
this.type = type;
}
}
\ No newline at end of file
......@@ -44,4 +44,5 @@ public class UserProfile {
private Integer subversion;
private Integer channel;
private Integer userStatus;
private Date createDate;
}
......@@ -68,6 +68,12 @@ bi:
host: 172.17.243.150
port: 6379
old:
market:
redis:
host: 172.17.243.150
port: 6380
system:
consumer:
dystory:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment