Commit ccfedbac authored by 宋新宇's avatar 宋新宇

Merge branch 'release_20240408_01' into 'main'

Release 20240408 01

See merge request !42
parents db10b7aa 03b7f069
package com.lwby.marketing.att;
import com.lwby.marketing.vo.AppChannelVO;
import java.util.Objects;
import java.util.function.Function;
public enum BehavoirType {
MOTIVATEMODELCOUNT("motivateModelCount",(a) -> Objects.nonNull(a.getMotivationCount()) ? a.getMotivationCount().intValue() : null),
ECPMAVGMODELCOUNT("ecpmAvgModelCount", (a) -> Objects.nonNull(a.getEcpmAvgCount()) ? a.getEcpmAvgCount().intValue() : null),
PECPMMODELCOUNT("pecpmModelCount",(a) -> Objects.nonNull(a.getEcpmPerCount()) ? a.getEcpmPerCount().intValue() : null),
ARPUMODELCOUNT("arpuModelCount",(a) -> Objects.nonNull(a.getArpuCount()) ? a.getArpuCount().doubleValue() : null);
private String value;
private Function<AppChannelVO,Number> fun;
BehavoirType(String value, Function<AppChannelVO,Number> fun) {
this.value = value;
this.fun = fun;
}
public String getValue() {
return this.value;
}
public Number getBehavoirType(AppChannelVO appChannel){
return fun.apply(appChannel);
}
}
\ No newline at end of file
......@@ -30,8 +30,8 @@ public class UniversalProcess {
@Resource
public JdbcTemplate videoInlandJdbcTemplate;
@Resource(name = "novelKafka")
public KafkaTemplate<String, String> novelKafkaTemplate;
@Resource(name = "storyKafka")
public KafkaTemplate<String, String> storyKafkaTemplate;
@Resource
public RedisTemplate<String, String> redisTemplate;
......
......@@ -53,7 +53,7 @@ public class DyStoryUniversalProcess extends UniversalProcess {
ddi.setUserId(action.getUserId());
String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send(topic, jsonString);
ListenableFuture<SendResult<String, String>> active_result = storyKafkaTemplate.send(topic, jsonString);
active_result.addCallback(
result -> DYSTORY_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> DYSTORY_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
......
......@@ -66,6 +66,7 @@ public class DyVideoBehavoirConsumer {
event.getUserUploadEvent());
executorByDyVideoBehavoir.execute(action);
DYVIDEO_SYS_LOG.info("media.behavoir.dyVideo.end,topic={}, bookStoreEvent={}", data.topic(), JSON.toJSONString(event));
} catch (Throwable e) {
DYVIDEO_ERROR_LOG.error("dy.video.behavoir.onMessage failed, data={}, costTime={} ms", data.value(),
System.currentTimeMillis() - begin, e);
......
......@@ -10,6 +10,7 @@ import com.lwby.marketing.att.UniversalProcess;
import com.lwby.marketing.att.CallBackType;
import com.lwby.marketing.att.novel.AttributionType;
import com.lwby.marketing.po.ThirdAccountDy;
import com.lwby.marketing.util.CacheKeyUtils;
import com.lwby.marketing.util.HttpUtil;
import com.lwby.marketing.vo.AppChannelVO;
import com.lwby.marketing.vo.DeliveryDeviceInfo;
......@@ -17,6 +18,7 @@ import com.lwby.marketing.vo.StoryNovelAction;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
......@@ -26,9 +28,8 @@ import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Slf4j
......@@ -42,6 +43,14 @@ public class DyVideoUniversalProcess extends UniversalProcess {
@Resource
private RedisTemplate<String,String> oldMarketRedisTemplate;
SimpleDateFormat dfh = new SimpleDateFormat("yyyy-MM-dd");//设置日期格式
private static final String url = "https://open.douyin.com/api/traffic/v1/rt_ecpm/query/";
private static final String tokenDyDevUrl = "https://open-sandbox.douyin.com/oauth/client_token/";
private static final String tokenDyProdUrl = "https://open.douyin.com/oauth/client_token/";
/**
* 通知处理结果
*/
......@@ -67,15 +76,14 @@ public class DyVideoUniversalProcess extends UniversalProcess {
ddi.setOpenId(action.getOpenId());
String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send(topic, jsonString);
ListenableFuture<SendResult<String, String>> active_result = storyKafkaTemplate.send(topic, jsonString);
active_result.addCallback(
result -> DYVIDEO_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> DYVIDEO_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
);
}
@Override
public <T> T get(Class<T> clazz, String key) {
public <T> T getOld(Class<T> clazz, String key) {
String value = oldMarketRedisTemplate.opsForValue().get(key);
if(!Objects.isNull(value)){
return JSON.parseObject(value,clazz);
......@@ -169,4 +177,86 @@ public class DyVideoUniversalProcess extends UniversalProcess {
return null;
}
}
public Map<String,Integer> getResultCountList(Date day, String openid, String accessToken, Integer tvcCount, Integer motivateCount, Integer pecpmCount, Integer pecpmModelCount, Long userId, String tokenDy, List<String> dyMarketPlatformAppIdList,
Environment env) {
int pageNumer = 1;
Map<String,Integer> resultCountMap = new HashMap<>();
String upcBehaviorKey = CacheKeyUtils.getVideoBehavoirKey(userId);
String cursor = null;
for (int i=1 ; i<= pageNumer ; i++) {
Map<String,Object> mp = new HashMap<>();
mp.put("open_id",openid);
mp.put("date_hour",dfh.format(day));
if (cursor != null) {
mp.put("cursor",cursor);
}
String mapAction = JSONObject.toJSONString(mp);
try {
String result = HttpUtil.postDy(url, mapAction,accessToken);
Integer resultCode = (Integer) JSON.parseObject(result).get("err_no");
if (resultCode == 0) {
Map dataMap = (Map) JSON.parseObject(result).get("data");
List<JSONObject> records = (List<JSONObject>)dataMap.get("records");
String next_cursor = (String)dataMap.get("next_cursor");
if (records != null && records.size()>0) {
if (Objects.isNull(tvcCount)) {
tvcCount = 0;
}
if (Objects.isNull(motivateCount)) {
motivateCount = 0;
}
if (Objects.isNull(pecpmCount)) {
pecpmCount = 0;
}
motivateCount += records.size();
for(JSONObject jsonObject : records) {
String cost = jsonObject.getString("cost");
Integer costI = Integer.parseInt(cost);
Double c = (double)(costI / 100);
int ct = c.intValue();
if (pecpmModelCount != null) {
if (ct>=pecpmModelCount) {
pecpmCount ++;
}
}
tvcCount += costI;
}
hset(upcBehaviorKey,"tvc",60 * 60 * 24 * 3,JSON.toJSONString(tvcCount));
hset(upcBehaviorKey,"mvc",60 * 60 * 24 * 3,JSON.toJSONString(motivateCount));
hset(upcBehaviorKey,"vec",60 * 60 * 24 * 3,JSON.toJSONString(pecpmCount));
}
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow.douyin code succ,userId={},result={}",userId,JSON.toJSONString(result));
if (records != null && records.size() == 500) {
pageNumer ++;
cursor = next_cursor;
}
} else {
DYVIDEO_SYS_LOG.info("DouyinBehaviorKafkaConsumer.douyin code error,userId={},result={}",userId,JSON.toJSONString(result));
if (resultCode == 28001008) {
//删除授权过期token
delToken(tokenDy);
getDyAccessToken(dyMarketPlatformAppIdList.get(0), dyMarketPlatformAppIdList.get(1), Objects.equals(env.getActiveProfiles()[0],"prod") ? tokenDyProdUrl : tokenDyDevUrl);
DYVIDEO_SYS_LOG.info("DyvideoBehaviorFlow.douyin code auth token expired,userId={},result={}",userId,JSON.toJSONString(result));
}
return null;
}
resultCountMap.put("tvc",tvcCount);
resultCountMap.put("motivate",motivateCount);
resultCountMap.put("pecpm",pecpmCount);
} catch (Throwable e) {
e.printStackTrace();
}
}
return resultCountMap;
}
}
......@@ -47,7 +47,7 @@ public class NovelUniversalProcess extends UniversalProcess {
ddi.setDevice_status(exists(PRIZE_FLOW_PREFIX.concat(String.valueOf(ddi.getUserId()))) ? 2 : 0);
String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send("ocpc_result_test", jsonString);
ListenableFuture<SendResult<String, String>> active_result = storyKafkaTemplate.send("ocpc_result_test", jsonString);
active_result.addCallback(
result -> NOVEL_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> NOVEL_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
......
......@@ -61,7 +61,7 @@ public class VideoAppUniversalProcess extends UniversalProcess {
ddi.setUserId(action.getUserId());
String jsonString = JSONObject.toJSONString(ddi);
ListenableFuture<SendResult<String, String>> active_result = novelKafkaTemplate.send(topic, jsonString);
ListenableFuture<SendResult<String, String>> active_result = storyKafkaTemplate.send(topic, jsonString);
active_result.addCallback(
result -> VIDEOAPP_SYS_LOG.info("归因成功[{}],归因类型[{}]", jsonString, status.desc),
ex -> VIDEOAPP_ERROR_LOG.error("归因失败[{}],归因类型[{}]", jsonString, status.desc, ex)
......@@ -76,6 +76,10 @@ public class VideoAppUniversalProcess extends UniversalProcess {
return null;
}
public boolean existsOld(String key) {
return Boolean.TRUE.equals(oldMarketRedisTemplate.hasKey(key));
}
public <T> Map<String,T> hgetAllOldMarket(Class<T> clazz,String key) {
Map<Object, Object> json = oldMarketRedisTemplate.opsForHash().entries(key);
......
......@@ -38,6 +38,8 @@ public class BiRedisConfig {
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(stringRedisSerializer);
template.setConnectionFactory(redisConnectionFactory);
return template;
}
......
......@@ -34,6 +34,9 @@ public class OldMarketRedisConfig {
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(stringRedisSerializer);
template.setConnectionFactory(redisConnectionFactory);
return template;
}
......
......@@ -34,6 +34,8 @@ public class RedisConfig {
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(stringRedisSerializer);
template.setConnectionFactory(redisConnectionFactory);
return template;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment