redis Lettuce是如何发送Command命令到redis的 代长亚 2024-10-31 2025-09-05 lettuce-core版本 : 5.1.7.RELEASE
在上一篇介绍了Lettuce是如何基于Netty与Redis建立连接的,其中提到了一个很重要的CommandHandler类,这一期会介绍CommandHandler是如何在发送Command到Lettuce中发挥作用的,以及Lettuce是如何实现多线程共享同一个物理连接的。 还是先看一下我们的示例代码,这一篇主要是跟进去sync.get方法看看Lettuc是如何发送get命令到Redis以及是如何读取Redis的命令的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class LettuceSimpleUse { private void testLettuce () throws ExecutionException, InterruptedException { RedisClient client = RedisClient.create("redis://localhost" ); StatefulRedisConnection<String, String> connection = client.connect(); RedisStringCommands<String, String> sync = connection.sync(); String value = sync.get("key" ); System.out.println("get redis value with lettuce sync command, value is :" value); RedisAsyncCommands<String, String> async = connection.async(); RedisFuture<String> getFuture = async.get("key" ); value = getFuture.get(); System.out.println("get redis value with lettuce async command, value is :" value); } public static void main (String[] args) throws ExecutionException, InterruptedException { new LettuceSimpleUse ().testLettuce(); } }
在看sync.get方法之前先看一下RedisStringCommands是如何生成生成的,从下面的代码可以看到RedisStringCommands其实是对RedisAsyncCommands<String, String>方法调用的同步阻塞版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 protected RedisCommands<K, V> newRedisSyncCommandsImpl () { return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class); } protected <T> T syncHandler (Object asyncApi, Class<?>... interfaces) { FutureSyncInvocationHandler h = new FutureSyncInvocationHandler ((StatefulConnection<?, ?>) this , asyncApi, interfaces); return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h); } class FutureSyncInvocationHandler extends AbstractInvocationHandler { ... @Override @SuppressWarnings("unchecked") protected Object handleInvocation (Object proxy, Method method, Object[] args) throws Throwable { try { Method targetMethod = this .translator.get(method); Object result = targetMethod.invoke(asyncApi, args); if (result instanceof RedisFuture<?>) { RedisFuture<?> command = (RedisFuture<?>) result; if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) { return null ; } long timeout = getTimeoutNs(command); return LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS); } return result; } catch (InvocationTargetException e) { throw e.getTargetException(); } } } ...
所以sync.get操作最终调用的还是async.get操作,接下来看async.get是怎么做的。还是先看一张时序图,心里有一个概念。
AbstractRedisAsyncCommands 1 2 3 4 5 @Override public RedisFuture<V> get (K key) { return dispatch(commandBuilder.get(key)); }
commandBuilder.get(key) 这一步骤主要是根据用户的输入参数key、命令类型get、序列化方式来生成一个command对象。而这个command对象会按照Redis的协议格式把命令序列化成字符串。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Command<K, V, V> get (K key) { notNullKey(key); return createCommand(GET, new ValueOutput <>(codec), key); } protected <T> Command<K, V, T> createCommand (CommandType type, CommandOutput<K, V, T> output, K key) { CommandArgs<K, V> args = new CommandArgs <K, V>(codec).addKey(key); return createCommand(type, output, args); } protected <T> Command<K, V, T> createCommand (CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) { return new Command <K, V, T>(type, output, args); }
AbstractRedisAsyncCommands.dispatch 1 2 3 4 5 6 7 8 9 10 11 public <T> AsyncCommand<K, V, T> dispatch (RedisCommand<K, V, T> cmd) { AsyncCommand<K, V, T> asyncCommand = new AsyncCommand <>(cmd); RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand); if (dispatched instanceof AsyncCommand) { return (AsyncCommand<K, V, T>) dispatched; } return asyncCommand; }
StatefulRedisConnectionImpl.dispatch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public <T> RedisCommand<K, V, T> dispatch (RedisCommand<K, V, T> command) { RedisCommand<K, V, T> toSend = preProcessCommand(command); try { return super .dispatch(toSend); } finally { if (command.getType().name().equals(MULTI.name())) { multi = (multi == null ? new MultiOutput <>(codec) : multi); } } } protected <T> RedisCommand<K, V, T> dispatch (RedisCommand<K, V, T> cmd) { if (debugEnabled) { logger.debug("dispatching command {}" , cmd); } if (tracingEnabled) { RedisCommand<K, V, T> commandToSend = cmd; TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class); if (provider == null ) { commandToSend = new TracedCommand <>(cmd, clientResources.tracing() .initialTraceContextProvider().getTraceContext()); } return channelWriter.write(commandToSend); } return channelWriter.write(cmd); }
DefaultEndpoint.write 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public <K, V, T> RedisCommand<K, V, T> write (RedisCommand<K, V, T> command) { LettuceAssert.notNull(command, "Command must not be null" ); try { sharedLock.incrementWriters(); validateWrite(1 ); if (autoFlushCommands) { if (isConnected()) { writeToChannelAndFlush(command); } else { writeToDisconnectedBuffer(command); } } else { writeToBuffer(command); } } finally { sharedLock.decrementWriters(); if (debugEnabled) { logger.debug("{} write() done" , logPrefix()); } } return command; }
DefaultEndpoint.writeToChannelAndFlush 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void writeToChannelAndFlush (RedisCommand<?, ?, ?> command) { QUEUE_SIZE.incrementAndGet(this ); ChannelFuture channelFuture = channelWriteAndFlush(command); if (reliability == Reliability.AT_MOST_ONCE) { channelFuture.addListener(AtMostOnceWriteListener.newInstance(this , command)); } if (reliability == Reliability.AT_LEAST_ONCE) { channelFuture.addListener(RetryListener.newInstance(this , command)); } } private ChannelFuture channelWriteAndFlush (RedisCommand<?, ?, ?> command) { if (debugEnabled) { logger.debug("{} write() writeAndFlush command {}" , logPrefix(), command); } return channel.writeAndFlush(command); }
到这里其实就牵扯到Netty的Channel、EventLoop相关概念了,简单的说channel会把需要write的对象放入Channel对应的EventLoop的队列中就返回了,EventLoop是一个SingleThreadEventExector,它会回调Bootstrap时配置的CommandHandler的write方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (debugEnabled) { logger.debug("{} write(ctx, {}, promise)" , logPrefix(), msg); } if (msg instanceof RedisCommand) { writeSingleCommand(ctx, (RedisCommand<?, ?, ?>) msg, promise); return ; } if (msg instanceof List) { List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg; if (batch.size() == 1 ) { writeSingleCommand(ctx, batch.get(0 ), promise); return ; } writeBatch(ctx, batch, promise); return ; } if (msg instanceof Collection) { writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise); } }
writeSingleCommand 核心在这里 Lettuce使用单一连接支持多线程并发向Redis发送Command,那Lettuce是怎么把请求Command与Redis返回的结果对应起来的呢,秘密就在这里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private void writeSingleCommand (ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise) { if (!isWriteable(command)) { promise.trySuccess(); return ; } addToStack(command, promise); if (tracingEnabled && command instanceof CompleteableCommand) { ... } ctx.write(command, promise); private void addToStack (RedisCommand<?, ?, ?> command, ChannelPromise promise) { try { validateWrite(1 ); if (command.getOutput() == null ) { complete(command); } RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command); if (promise.isVoid()) { stack.add(redisCommand); } else { promise.addListener(AddToStack.newInstance(stack, redisCommand)); } } catch (Exception e) { command.completeExceptionally(e); throw e; } } }
那么Lettuce收到Redis的回复消息之后是怎么通知RedisCommand,并且把结果与RedisCommand对应上的呢。Netty在收到Redis服务端返回的消息之后就会回调CommandHandler的channelRead方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf input = (ByteBuf) msg; ... try { ... decode(ctx, buffer); } finally { input.release(); } } protected void decode (ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException { if (pristine && stack.isEmpty() && buffer.isReadable()) { ... return ; } while (canDecode(buffer)) { RedisCommand<?, ?, ?> command = stack.peek(); if (debugEnabled) { logger.debug("{} Stack contains: {} commands" , logPrefix(), stack.size()); } pristine = false ; try { if (!decode(ctx, buffer, command)) { return ; } } catch (Exception e) { ctx.close(); throw e; } if (isProtectedMode(command)) { onProtectedMode(command.getOutput().getError()); } else { if (canComplete(command)) { stack.poll(); try { complete(command); } catch (Exception e) { logger.warn("{} Unexpected exception during request: {}" , logPrefix, e.toString(), e); } } } afterDecode(ctx, command); } if (buffer.refCnt() != 0 ) { buffer.discardReadBytes(); } }
从上面的代码可以看出来,当Lettuce收到Redis的回复消息时就从stack的头上取第一个RedisCommand,这个RedisCommand就是与该Redis返回结果对应的RedisCommand。为什么这样就能对应上呢,是因为Lettuce与Redis之间只有一条tcp连接,在Lettuce端放入stack时是有序的,tcp协议本身是有序的,redis是单线程处理请求的,所以Redis返回的消息也是有序的。这样就能保证Redis中返回的消息一定对应着stack中的第一个RedisCommand。当然如果连接断开又重连了,这个肯定就对应不上了,Lettuc对断线重连也做了特殊处理,防止对应不上。
Command.encode 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void encode (ByteBuf buf) { buf.writeByte('*' ); CommandArgs.IntegerArgument.writeInteger(buf, 1 (args != null ? args.count() : 0 )); buf.writeBytes(CommandArgs.CRLF); CommandArgs.BytesArgument.writeBytes(buf, type.getBytes()); if (args != null ) { args.encode(buf); } }