Commit 281150d7 authored by dingjy's avatar dingjy

1

parents
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lwby</groupId>
<artifactId>pika-proxy</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.32</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.3</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.96.Final</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.2-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.6.1</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<finalName>pika-proxy</finalName>
</build>
</project>
\ No newline at end of file
package com.pika.proxy;
import com.pika.proxy.conf.PikaProxyConfig;
import com.pika.proxy.handler.PikaCmdHandler;
import com.pika.proxy.server.Server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@SpringBootApplication
@RestController
public class WebApplication {
@Resource
private PikaProxyConfig config;
@Resource
private PikaCmdHandler handler;
@PostConstruct
public void start() {
Server server = new Server(config.getPort(), config.getMaxIdle(), config.getMaxTotal(), handler);
Runtime.getRuntime().addShutdownHook(new Thread(server::destroy));
}
public static void main(String... args) {
SpringApplication.run(WebApplication.class, args);
}
}
package com.pika.proxy.conf;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Data
@ConfigurationProperties(prefix = "pika-proxy")
public class PikaProxyConfig {
private int port;
private int maxWaitMillis;
private int timeBetweenEvictionRunsMillis;
private int minEvictableIdleTimeMillis;
private int timeout;
public int maxTotal;
public int maxIdle;
private List<Shard> shards;
}
\ No newline at end of file
package com.pika.proxy.conf;
import java.net.URI;
import java.util.BitSet;
import java.util.List;
import java.util.stream.IntStream;
import java.util.zip.CRC32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Component
public class PikaShardLoad {
private static final int SLOT_SIZE = 1024;
private final Logger logger = LoggerFactory.getLogger(PikaShardLoad.class);
private final JedisPool[] dict = new JedisPool[SLOT_SIZE];
@Resource
private PikaProxyConfig config;
public JedisPool getPool(byte[] key){
CRC32 ck = crc32.get();
ck.update(key);
return dict[(int) (ck.getValue()%SLOT_SIZE)];
}
@PostConstruct
private void initShards() {
checkpoint();
List<Shard> shards = config.getShards();
int maxWaitMillis = config.getMaxWaitMillis();
int timeBetweenEvictionRunsMillis = config.getTimeBetweenEvictionRunsMillis();
int minEvictableIdleTimeMillis = config.getMinEvictableIdleTimeMillis();
int timeout = config.getTimeout();
int maxIdle = config.getMaxIdle();
int maxTotal = config.getMaxTotal();
shards.forEach(e->{
String uri = e.getUri();
int slotEnd = e.getSlot_end();
int slotBegin = e.getSlot_begin();
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(maxTotal);
poolConfig.setMaxIdle(maxIdle);
poolConfig.setMaxWaitMillis(maxWaitMillis);
poolConfig.setTestWhileIdle(true);
poolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
poolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
JedisPool jp = new JedisPool(poolConfig, URI.create(uri), timeout);
IntStream.range(slotBegin,slotEnd+1).forEach(i -> {
dict[i] = jp;
});
});
}
private void checkpoint() {
BitSet bitSet = new BitSet(SLOT_SIZE);
config.getShards().forEach(e -> {
IntStream.range(e.getSlot_begin(), e.getSlot_end() + 1).forEach(i -> {
if (bitSet.get(i)) {
throw new RuntimeException("slot " + i + " is conflict used");
}
bitSet.set(i);
});
});
if (bitSet.cardinality() != SLOT_SIZE) {
throw new RuntimeException("slot is not full");
}
}
private static final ThreadLocal<CRC32> crc32 = new ThreadLocal<CRC32>() {
@Override
protected CRC32 initialValue() {
return new CRC32();
}
};
}
package com.pika.proxy.conf;
import lombok.Data;
@Data
public class Shard {
private String uri;
private int slot_begin;
private int slot_end;
}
package com.pika.proxy.exceptions;
public class PikaException extends Exception {
public PikaException() {
super();
}
public PikaException(Throwable cause) {
super(cause);
}
public PikaException(String message) {
super(message);
}
public PikaException(String message, Throwable cause) {
super(message, cause);
}
protected PikaException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
package com.pika.proxy.handler;
import com.pika.proxy.exceptions.PikaException;
import com.pika.proxy.result.ErrorResult;
import com.pika.proxy.result.InlineResult;
import com.pika.proxy.result.Result;
import com.pika.proxy.server.Server;
import com.pika.proxy.conf.PikaShardLoad;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
public abstract class CmdHandler {
public Result exec(String key, PikaCmdHandler.IHandle handle) {
byte[] kb = getBytes(key);
JedisPool jp = getRedisShard().getPool(kb);
if (jp == null) {
return ErrorResult.SHARD_DISABLE_RESP;
}
try (Jedis jedis = jp.getResource()) {
return handle.handle(kb,jedis);
}
}
public abstract PikaShardLoad getRedisShard();
public interface IHandle {
Result handle(byte[] key, Jedis jedis);
}
protected byte[] getBytes(String key) {
return key.getBytes(StandardCharsets.UTF_8);
}
protected String getString(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
public Result ping() {
return new InlineResult("PONG");
}
public abstract Result get(String key) throws PikaException;
public abstract Result set(String key, byte[] value) throws PikaException;
public abstract Result setEx(String key, int second, byte[] value) throws PikaException;
public abstract Result setNx(String key, byte[] value) throws PikaException;
public abstract Result getSet(String key, byte[] value) throws PikaException;
public abstract Result del(String key) throws PikaException;
public abstract Result exists(String key) throws PikaException;
public abstract Result expire(String key, int value) throws PikaException;
public abstract Result expireAt(String key, int value) throws PikaException;
public abstract Result ttl(String key) throws PikaException;
public abstract Result decr(String key) throws PikaException;
public abstract Result decrBy(String key, long value) throws PikaException;
public abstract Result incr(String key) throws PikaException;
public abstract Result incrBy(String key, long value) throws PikaException;
public abstract Result hSet(String key, byte[] field, byte[] value) throws PikaException;
public abstract Result hMSet(String key, byte[][] value) throws PikaException;
public abstract Result hGet(String key, byte[] field) throws PikaException;
public abstract Result hMGet(String key, byte[][] field) throws PikaException;
public abstract Result hGetAll(String key) throws PikaException;
public abstract Result hDel(String key, byte[][] field) throws PikaException;
public abstract Result hExists(String key, byte[] field) throws PikaException;
public abstract Result hIncrBy(String key, byte[] field, long value) throws PikaException;
public abstract Result hKeys(String key) throws PikaException;
public abstract Result hLen(String key) throws PikaException;
public abstract Result hSetNx(String key, byte[] field, byte[] value) throws PikaException;
public abstract Result hVals(String key) throws PikaException;
public abstract Result lPush(String key, byte[][] value) throws PikaException;
public abstract Result rPush(String key, byte[][] value) throws PikaException;
public abstract Result lPop(String key) throws PikaException;
public abstract Result rPop(String key) throws PikaException;
public abstract Result bLPop(String key, int timeout) throws PikaException;
public abstract Result bRPop(String key, int timeout) throws PikaException;
public abstract Result lLen(String key) throws PikaException;
public abstract Result lRange(String key, long start, long end) throws PikaException;
public abstract Result lRem(String key, long count, byte[] value) throws PikaException;
public abstract Result lSet(String key, long index, byte[] value) throws PikaException;
public abstract Result lTrim(String key, long start, long end) throws PikaException;
public abstract Result sAdd(String key, byte[][] value) throws PikaException;
public abstract Result sCard(String key) throws PikaException;
public abstract Result sisMember(String key, byte[] value) throws PikaException;
public abstract Result sMembers(String key) throws PikaException;
public abstract Result sPop(String key) throws PikaException;
public abstract Result sRem(String key, byte[][] value) throws PikaException;
public abstract Result zAdd(String key, byte[][] value) throws PikaException;
public abstract Result zCard(String key) throws PikaException;
public abstract Result zCount(String key, byte[] min, byte[] max) throws PikaException;
public abstract Result zIncrBy(String key, double score, byte[] value) throws PikaException;
public abstract Result zRange(String key, long stat, long stop) throws PikaException;
public abstract Result zRangeByScore(String key, byte[][] value) throws PikaException;
public abstract Result zRank(String key, byte[] value) throws PikaException;
public abstract Result zRem(String key, byte[][] value) throws PikaException;
public abstract Result zRemRangeByRank(String key, long start, long stop) throws PikaException;
public abstract Result zRemRangeByScore(String key, byte[] min, byte[] max) throws PikaException;
public abstract Result zRevRange(String key, long start, long stop) throws PikaException;
public abstract Result zRevRangeByScore(String key, byte[][] value) throws PikaException;
public abstract Result zRevRank(String key, byte[] value) throws PikaException;
public abstract Result zScore(String key, byte[] value) throws PikaException;
}
This diff is collapsed.
package com.pika.proxy.invoke;
import com.pika.proxy.server.codec.PikaCmd;
import com.pika.proxy.result.Result;
public interface CmdInvoke {
Result<?> execute(String name, PikaCmd command) throws Exception;
}
package com.pika.proxy.invoke;
import com.pika.proxy.server.codec.PikaCmd;
import com.pika.proxy.handler.CmdHandler;
import com.pika.proxy.result.Result;
import com.pika.proxy.result.ErrorResult;
import java.lang.reflect.Method;
public class PikaCmdInvokeImpl implements CmdInvoke {
private final Class<?>[] types;
private final Method method;
private final CmdHandler handler;
public PikaCmdInvokeImpl(Class<?>[] types, Method method, CmdHandler handler) {
this.types = types;
this.method = method;
this.handler = handler;
}
@Override
public Result<?> execute(String name, PikaCmd cmd) throws Exception {
if (!cmd.checkArgNum(types.length)) {
return new ErrorResult("wrong number of args for '" + name + "' command");
}
Object[] objects = new Object[types.length];
cmd.toArguments(objects, types);
return (Result<?>) method.invoke(handler, objects);
}
}
package com.pika.proxy.result;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
public class BulkResult extends Result<ByteBuf> {
public static final BulkResult NIL_REPLY = new BulkResult();
public static final char MARKER = '$';
private final ByteBuf bytes;
private final int capacity;
public BulkResult() {
bytes = null;
capacity = -1;
}
public BulkResult(byte[] bytes) {
if (bytes != null) {
this.bytes = Unpooled.wrappedBuffer(bytes);
this.capacity = bytes.length;
} else {
this.bytes = null;
this.capacity = -1;
}
}
public BulkResult(Double value) {
byte[] data = value.toString().getBytes(StandardCharsets.UTF_8);
this.bytes = Unpooled.wrappedBuffer(data);
capacity = data.length;
}
@Override
public ByteBuf data() {
return bytes;
}
public String asUTF8String() {
if (bytes == null) {
return null;
}
return bytes.toString(Charsets.UTF_8);
}
@Override
public void write(ByteBuf os) throws IOException {
os.writeByte(MARKER);
os.writeBytes(numToBytes(capacity, true));
if (capacity > 0) {
os.writeBytes(bytes);
os.writeBytes(CRLF);
}
}
public String toString() {
return asUTF8String();
}
}
package com.pika.proxy.result;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
public class ErrorResult extends Result<String> {
public static final char MARKER = '-';
public static final ErrorResult NYI_RESP = new ErrorResult("not implemented");
public static final ErrorResult SHARD_DISABLE_RESP = new ErrorResult("shard disable");
private final String error;
public ErrorResult(String error) {
this.error = error;
}
public ErrorResult(Exception e) {
this.error = e.getCause().toString();
}
@Override
public String data() {
return error;
}
@Override
public void write(ByteBuf os) {
os.writeByte(MARKER);
os.writeBytes(error.getBytes(Charsets.UTF_8));
os.writeBytes(CRLF);
}
public String toString() {
return error;
}
}
package com.pika.proxy.result;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
public class InlineResult extends Result<Object> {
private final Object o;
public InlineResult(Object o) {
this.o = o;
}
@Override
public Object data() {
return o;
}
@Override
public void write(ByteBuf os) throws IOException {
if (o == null) {
os.writeBytes(CRLF);
} else if (o instanceof String) {
os.writeByte('+');
os.writeBytes(((String) o).getBytes(Charsets.US_ASCII));
os.writeBytes(CRLF);
} else if (o instanceof ByteBuf) {
os.writeByte('+');
os.writeBytes(((ByteBuf) o).array());
os.writeBytes(CRLF);
} else if (o instanceof byte[]) {
os.writeByte('+');
os.writeBytes((byte[]) o);
os.writeBytes(CRLF);
} else if (o instanceof Long) {
os.writeByte(':');
os.writeBytes(numToBytes((Long) o, true));
} else {
os.writeBytes("ERR invalid inline response".getBytes(Charsets.US_ASCII));
os.writeBytes(CRLF);
}
}
}
package com.pika.proxy.result;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
public class IntegerResult extends Result<Long> {
public static final char MARKER = ':';
private final long integer;
private static IntegerResult[] replies = new IntegerResult[512];
static {
for (int i = -255; i < 256; i++) {
replies[i + 255] = new IntegerResult(i);
}
}
public static IntegerResult integer(long integer) {
if (integer > -256 && integer < 256) {
return replies[((int) (integer + 255))];
} else {
return new IntegerResult(integer);
}
}
public IntegerResult(long integer) {
this.integer = integer;
}
public IntegerResult(boolean integer) {
this.integer = integer ? 1 : 0;
}
@Override
public Long data() {
return integer;
}
@Override
public void write(ByteBuf os) throws IOException {
os.writeByte(MARKER);
os.writeBytes(Result.numToBytes(integer, true));
}
public String toString() {
return data().toString();
}
}
package com.pika.proxy.result;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MultiBulkResult extends Result<Result[]> {
public static final char MARKER = '*';
public static final MultiBulkResult EMPTY = new MultiBulkResult(new Result[0]);
private Result[] replies;
public MultiBulkResult(Result[] replies) {
if (replies != null) {
this.replies = replies;
} else {
this.replies = new Result[0];
}
}
public MultiBulkResult(Map<byte[], byte[]> map) {
if (map == null || map.isEmpty()) {
this.replies = new Result[0];
} else {
int i = 0;
replies = new BulkResult[map.size() * 2];
Set<byte[]> keys = map.keySet();
for (byte[] key : keys) {
replies[i++] = new BulkResult(key);
replies[i++] = new BulkResult(map.get(key));
}
}
}
public MultiBulkResult(Set<byte[]> set) {
if (set == null || set.isEmpty()) {
this.replies = new Result[0];
} else {
int i = 0;
replies = new BulkResult[set.size()];
for (byte[] key : set) {
replies[i++] = new BulkResult(key);
}
}
}
public MultiBulkResult(List<String> set) {
if (set == null || set.isEmpty()) {
this.replies = new Result[0];
} else {
int i = 0;
replies = new BulkResult[set.size()];
for (String key : set) {
replies[i++] = new BulkResult(key.getBytes(Charset.forName("UTF-8")));
}
}
}
public MultiBulkResult(Collection<byte[]> coll) {
if (coll == null || coll.isEmpty()) {
this.replies = new Result[0];
} else {
int i = 0;
replies = new BulkResult[coll.size()];
for (byte[] key : coll) {
replies[i++] = new BulkResult(key);
}
}
}
@Override
public Result[] data() {
return replies;
}
@Override
public void write(ByteBuf os) throws IOException {
os.writeByte(MARKER);
if (replies == null) {
os.writeBytes(NEG_ONE_WITH_CRLF);
} else {
os.writeBytes(numToBytes(replies.length, true));
for (Result reply : replies) {
reply.write(os);
}
}
}
}
package com.pika.proxy.result;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
public abstract class Result<T> {
public static final byte[] NEG_ONE = convert(-1, false);
public static final byte[] NEG_ONE_WITH_CRLF = convert(-1, true);
public static final char LF = '\n';
public static final char CR = '\r';
public final byte[] CRLF = "\r\n".getBytes();
private static final int NUM_MAP_LENGTH = 256;
private static final byte[][] numMap = new byte[NUM_MAP_LENGTH][];
static {
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
numMap[i] = convert(i, false);
}
}
private static final byte[][] numMapWithCRLF = new byte[NUM_MAP_LENGTH][];
static {
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
numMapWithCRLF[i] = convert(i, true);
}
}
public static byte[] numToBytes(long value, boolean withCRLF) {
if (value >= 0 && value < NUM_MAP_LENGTH) {
int index = (int) value;
return withCRLF ? numMapWithCRLF[index] : numMap[index];
} else if (value == -1) {
return withCRLF ? NEG_ONE_WITH_CRLF : NEG_ONE;
}
return convert(value, withCRLF);
}
private static byte[] convert(long value, boolean withCRLF) {
boolean negative = value < 0;
long abs = Math.abs(value);
int index = (value == 0 ? 0 : (int) Math.log10(abs)) + (negative ? 2 : 1);
byte[] bytes = new byte[withCRLF ? index + 2 : index];
if (withCRLF) {
bytes[index] = CR;
bytes[index + 1] = LF;
}
if (negative) {
bytes[0] = '-';
}
long next = abs;
while ((next /= 10) > 0) {
bytes[--index] = (byte) ('0' + (abs % 10));
abs = next;
}
bytes[--index] = (byte) ('0' + abs);
return bytes;
}
public abstract T data();
public abstract void write(ByteBuf os) throws IOException;
}
package com.pika.proxy.result;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
public class StatusResult extends Result<String> {
public static final char MARKER = '+';
public static final StatusResult OK = new StatusResult("OK");
public static final StatusResult QUIT = new StatusResult("OK");
private final String status;
private final byte[] statusBytes;
public StatusResult(String status) {
this.status = status;
this.statusBytes = status.getBytes(Charsets.UTF_8);
}
@Override
public String data() {
return status;
}
@Override
public void write(ByteBuf os) throws IOException {
os.writeByte(MARKER);
os.writeBytes(statusBytes);
os.writeBytes(CRLF);
}
}
package com.pika.proxy.server;
import com.pika.proxy.handler.CmdHandler;
import com.pika.proxy.server.codec.PikaCmdDecoder;
import com.pika.proxy.server.codec.PikaCmdEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
private EventLoopGroup boss;
public Server(final int port, final int minThreads, final int maxThreads, final CmdHandler handler) {
boss = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(boss);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch){
ch.pipeline().addLast(new PikaCmdEncoder());
ch.pipeline().addLast(new PikaCmdDecoder());
ch.pipeline().addLast(new ServerIdleHandler());
ch.pipeline().addLast(new ServerHandler(minThreads, maxThreads, handler));
}
});
b.option(ChannelOption.SO_BACKLOG, 128);
b.option(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
b.bind(port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void destroy() {
try {
boss.shutdownGracefully();
} catch (Exception ignored) {
}
}
}
package com.pika.proxy.server;
import com.pika.proxy.server.codec.PikaCmd;
import com.pika.proxy.handler.CmdHandler;
import com.pika.proxy.invoke.CmdInvoke;
import com.pika.proxy.invoke.PikaCmdInvokeImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import com.pika.proxy.result.ErrorResult;
import com.pika.proxy.result.Result;
import com.pika.proxy.result.StatusResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ServerHandler extends SimpleChannelInboundHandler<PikaCmd> {
private final Logger logger = LoggerFactory.getLogger(ServerHandler.class);
private final ThreadPoolExecutor executor;
private final Map<String, CmdInvoke> methods;
public ServerHandler(int minThreads, int maxThreads, CmdHandler handler) {
executor = new ThreadPoolExecutor(minThreads, maxThreads, 30, TimeUnit.MINUTES, new LinkedBlockingQueue(Short.MAX_VALUE));
methods = new HashMap<>();
Class<? extends CmdHandler> clazz = handler.getClass();
for (final Method method : clazz.getMethods()) {
Class<?>[] types = method.getParameterTypes();
CmdInvoke invoke = new PikaCmdInvokeImpl(types, method, handler);
methods.put(method.getName(), invoke);
methods.put(method.getName().toUpperCase(), invoke);
methods.put(method.getName().toLowerCase(), invoke);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, PikaCmd cmd) {
executor.execute(() -> {
try {
handler(ctx, cmd);
} catch (Exception e) {
ctx.writeAndFlush(new ErrorResult(e));
e.printStackTrace();
}
});
}
private void handler(ChannelHandlerContext ctx, final PikaCmd cmd) throws Exception {
String name = new String(cmd.getName(), StandardCharsets.UTF_8);
CmdInvoke invoke = methods.get(name);
Result<?> resp;
if (invoke == null) {
resp = new ErrorResult("unknown command '" + name + "'");
} else {
resp = invoke.execute(name, cmd);
}
if (resp == StatusResult.QUIT) {
ctx.close();
} else {
if (resp == null) {
resp = ErrorResult.NYI_RESP;
}
ctx.writeAndFlush(resp);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
logger.info("远 程服务 " + ctx.channel().remoteAddress() + " 链路建立");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
logger.info("远程服务 " + ctx.channel().remoteAddress() + " 链路断开");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(cause + "");
ctx.close();
}
}
package com.pika.proxy.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class ServerIdleHandler extends IdleStateHandler {
private static final int SET_IDLE_TIME = 12;
private final Logger logger = LoggerFactory.getLogger(ServerIdleHandler.class);
public ServerIdleHandler() {
super(0, 0, SET_IDLE_TIME, TimeUnit.HOURS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
if (IdleState.ALL_IDLE == evt.state()) {
logger.error("idle timeout " + SET_IDLE_TIME + " minutes, close connection " + ctx.toString());
ctx.close();
}
}
}
package com.pika.proxy.server.codec;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
public class PikaCmd {
public static final byte[] EMPTY_BYTES = new byte[0];
private final Object[] objects;
public PikaCmd(Object[] objects) {
this.objects = objects;
}
public boolean checkArgNum(int argsNum) {
return argsNum <= objects.length - 1;
}
public byte[] getName() {
return getBytes(objects[0]);
}
private byte[] getBytes(Object object) {
byte[] argument;
if (object == null) {
argument = EMPTY_BYTES;
} else if (object instanceof byte[]) {
argument = (byte[]) object;
} else if (object instanceof ByteBuf) {
argument = ((ByteBuf) object).array();
} else if (object instanceof String) {
argument = ((String) object).getBytes(Charsets.UTF_8);
} else {
argument = object.toString().getBytes(Charsets.UTF_8);
}
return argument;
}
public void toArguments(Object[] arguments, Class<?>[] types) {
int position = 0;
for (Class<?> type : types) {
if (position <= 0 && type == String.class) {
arguments[position] = new String((byte[]) objects[1 + position], StandardCharsets.UTF_8);
} else if (type == int.class) {
arguments[position] = Integer.parseInt(new String((byte[]) objects[1 + position], StandardCharsets.UTF_8));
} else if (type == long.class) {
arguments[position] = Long.parseLong(new String((byte[]) objects[1 + position], StandardCharsets.UTF_8));
} else if (type == double.class) {
arguments[position] = Double.parseDouble(new String((byte[]) objects[1 + position], StandardCharsets.UTF_8));
} else if (type == byte[].class) {
arguments[position] = objects[1 + position];
} else {
int left = objects.length - position - 1;
byte[][] lastArgument = new byte[left][];
for (int i = 0; i < left; i++) {
lastArgument[i] = (byte[]) objects[i + position + 1];
}
arguments[position] = lastArgument;
}
position++;
}
}
}
package com.pika.proxy.server.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import com.pika.proxy.exceptions.PikaException;
import com.pika.proxy.result.BulkResult;
import com.pika.proxy.result.MultiBulkResult;
import com.pika.proxy.result.Result;
import java.io.IOException;
import java.util.List;
import static com.pika.proxy.result.Result.CR;
public class PikaCmdDecoder extends ReplayingDecoder<Void> {
private byte[][] bytes;
private int arguments = 0;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
if (bytes != null) {
int numArgs = bytes.length;
for (int i = arguments; i < numArgs; i++) {
if (in.readByte() == BulkResult.MARKER) {
int len = readInt(in);
bytes[i] = new byte[len];
in.readBytes(bytes[i]);
if (in.bytesBefore((byte) Result.CR) != 0) {
throw new PikaException("Argument doesn't end in CRLF");
}
in.skipBytes(2);
arguments++;
checkpoint();
} else {
throw new IOException("Unexpected character");
}
}
try {
out.add(new PikaCmd(bytes));
} finally {
bytes = null;
arguments = 0;
}
} else if (in.readByte() == MultiBulkResult.MARKER) {
int numArgs = readInt(in);
if (numArgs < 0) {
throw new PikaException("Invalid size: " + numArgs);
}
bytes = new byte[numArgs][];
checkpoint();
decode(ctx, in, out);
} else {
in.readerIndex(in.readerIndex() - 1);
byte[][] b = new byte[1][];
b[0] = in.readBytes(in.bytesBefore((byte) CR)).array();
in.skipBytes(2);
out.add(new PikaCmd(b));
}
}
public int readInt(ByteBuf is) throws IOException {
int size = 0;
int sign = 1;
int read = is.readByte();
if (read == '-') {
read = is.readByte();
sign = -1;
}
do {
if (read == CR) {
if (is.readByte() == Result.LF) {
break;
}
}
int value = read - '0';
if (value >= 0 && value < 10) {
size *= 10;
size += value;
} else {
throw new IOException("Invalid character in integer");
}
read = is.readByte();
} while (true);
return size * sign;
}
}
package com.pika.proxy.server.codec;
import com.pika.proxy.result.Result;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class PikaCmdEncoder extends MessageToByteEncoder<Result> {
@Override
public void encode(ChannelHandlerContext ctx, Result msg, ByteBuf out) throws Exception {
msg.write(out);
}
}
logging:
level:
root: INFO
file:
path: /data/pika-proxy/logs
pika-proxy:
port: 7788
timeout: 10000
maxTotal: 100
maxIdle: 10
maxWaitMillis: 60000
timeBetweenEvictionRunsMillis: 5000
minEvictableIdleTimeMillis: 300000
shards:
- uri: redis://127.0.0.1:6379
slot_begin: 0
slot_end: 340
- uri: redis://127.0.0.1:6380
slot_begin: 341
slot_end: 683
- uri: redis://127.0.0.1:6381
slot_begin: 684
slot_end: 1023
\ No newline at end of file
logging:
level:
root: INFO
file:
path: /data/pika-proxy/logs
pika-proxy:
port: 7788
timeout: 10000
maxTotal: 100
maxIdle: 10
maxWaitMillis: 60000
timeBetweenEvictionRunsMillis: 5000
minEvictableIdleTimeMillis: 300000
shards:
- uri: redis://127.0.0.1:6379
slot_begin: 0
slot_end: 340
- uri: redis://127.0.0.1:6379
slot_begin: 341
slot_end: 683
- uri: redis://127.0.0.1:6379
slot_begin: 684
slot_end: 1023
\ No newline at end of file
server:
port: 6661
tomcat:
accept-count: 10000
max-threads: 1000
min-spare-threads: 100
max-connections: 20000
connection-timeout: 10000
max-http-form-post-size: 10MB
spring:
application:
name: pika-proxy
profiles:
active: test
#prometheus监控平台配置
management:
endpoints:
web:
exposure:
include: "*"
exclude: configprops
endpoint:
health:
show-details: ALWAYS
metrics:
tags:
application: ${spring.application.name}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_DIR" value="${LOG_PATH:-.}" />
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%level] [%thread] [%logger] [tr=%mdc{TRACE_ID:-0}] %msg %n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<springProfile name="prod">
<root level="${logging.level.root}">
<appender-ref ref="console" />
</root>
</springProfile>
<springProfile name="test">
<root level="${logging.level.root}">
<appender-ref ref="console" />
</root>
</springProfile>
</configuration>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment