前言
實現客戶端與服務端間通訊,在傳輸層協議 TCP 或 UDP 的基礎上,定義客戶端與服務端都能識別資訊的一套規則:一方進行編碼,另一方進行解碼,這便是【協議】。
在 Redis 中,定義了簡單的 RESP(REdis Serialization Protocol)文字協議,接下來,我們嘗試實現一款能與服務端通訊的 Redis 客戶端,其核心原理如市面上常見的 Redisson、Jedis 等類似。
由淺入深,理解市面上流行的開源元件會更加的得心應手。
實現
基礎
1. 網路通訊
我們知道,在傳統計算機網路模型中,傳輸層
(TCP / UDP)的上一層便是應用層
。應用層協議一般專注於資料的編解碼等約定,比如經典的 HTTP 協議。
RESP 協議本質和 HTTP 是一個級別,都屬於應用層協議
。
在 redis 中,傳輸層協議使用的是 TCP
,服務端從 TCP socket 緩衝區中讀取資料,然後經過 RESP 協議解碼得到我們的指令。
而寫入資料則是相反,伺服器先將響應資料使用 RESP 編碼,然後將編碼後的資料寫入 TCP Socket 緩衝區傳送給客戶端。
2. 協議格式
在 RESP 協議中,第一個位元組決定了具體資料型別:
簡單字串
:Simple Strings,首位元組響應+
錯誤
:Errors,首位元組響應-
整型
:Integers,首位元組響應:
批次字串
:Bulk Strings,首位元組響應$
陣列
:Arrays,首位元組響應*
我們來看看一具體的例子,我們一條正常指令 PSETEX test_redisson_batch_key8 120000 test_redisson_batch_key=>value:8,經 RESP 協議編碼後長這樣:
*4 $6 PSETEX $24 test_redisson_batch_key8 $6 120000 $32 test_redisson_batch_key=>value:8
值得注意的是,在 RESP 協議中的每一部分都是以 \R\N
結尾。
基於 Socket 實現
實現一個簡單的 Redis 客戶端可以使用 Java 的 Socket 程式設計來直接與 Redis 伺服器進行通訊。
Redis 使用 RESP(REdis Serialization Protocol)協議進行通訊,因此我們需要按照該協議格式化請求並解析響應。
1. 建立 RedisClient 類
首先,我們建立一個 RedisClient 類,用於管理與 Redis 伺服器的連線和通訊。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.Socket; public class RedisClient { private Socket socket; private BufferedReader reader; private OutputStream writer; // 連線到 Redis 伺服器 // 透過 Java 的 Socket 進行連線 public RedisClient(String host, int port) throws IOException { this.socket = new Socket(host, port); this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); this.writer = socket.getOutputStream(); } public void close() throws IOException { reader.close(); writer.close(); socket.close(); } // 傳送命令到服務端,並讀取響應 private String sendCommand(String command) throws IOException { writer.write(command.getBytes()); writer.flush(); return parseResponse(); } // 解析響應資料 private String parseResponse() throws IOException { String response = reader.readLine(); if (response == null) { throw new IOException("Empty response from server"); } char prefix = response.charAt(0); switch (prefix) { case '+': // Simple String return response.substring(1); case '-': // Error throw new IOException("Error response from server: " + response.substring(1)); case ':': // Integer return response.substring(1); case '$': // Bulk String int length = Integer.parseInt(response.substring(1)); if (length == -1) { return null; // Null Bulk String } char[] bulkString = new char[length]; reader.read(bulkString, 0, length); reader.readLine(); // Read the trailing \r\n return new String(bulkString); case '*': // Array int count = Integer.parseInt(response.substring(1)); StringBuilder arrayResponse = new StringBuilder(); for (int i = 0; i < count; i++) { arrayResponse.append(parseResponse()).append("\n"); } return arrayResponse.toString(); default: throw new IOException("Unknown response prefix: " + prefix); } } // 設定鍵值對 // 這裏需要按照 RESP 協議進行編碼資料,然後透過 sendCommand 傳送 public String set(String key, String value) throws IOException { String command = String.format("*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n", key.length(), key, value.length(), value); return sendCommand(command); } // 獲取鍵值對 public String get(String key) throws IOException { String command = String.format("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", key.length(), key); return sendCommand(command); } public String del(String key) throws IOException { String command = String.format("*2\r\n$3\r\nDEL\r\n$%d\r\n%s\r\n", key.length(), key); return sendCommand(command); } public String setex(String key, int seconds, String value) throws IOException { String command = String.format("*4\r\n$5\r\nSETEX\r\n$%d\r\n%s\r\n$%d\r\n%d\r\n$%d\r\n%s\r\n", key.length(), key, String.valueOf(seconds).length(), seconds, value.length(), value); return sendCommand(command); } }
基本步驟:
透過 Socket 與服務端建立連線
指令編碼(使用 RESP 協議)
指令傳送以及解析響應資料
2. 使用 RedisClient 類
透過 RedisClient 進行通訊:
public class RedisClientTest { public static void main(String[] args) { try { RedisClient client = new RedisClient("localhost", 6379); // 設定一個鍵值對 System.out.println(client.set("name", "神醫")); // 獲取鍵對應的值 System.out.println(client.get("name")); // 設定一個帶有過期時間的鍵值對(10秒後過期) System.out.println(client.setex("temp_key", 10, "臨時值")); // 獲取帶有過期時間的鍵值對 System.out.println(client.get("temp_key")); // 刪除一個鍵 System.out.println(client.del("name")); // 嘗試獲取已刪除的鍵 System.out.println(client.get("name")); client.close(); } catch (IOException e) { e.printStackTrace(); } } }
基於 Netty 實現
如果想要非同步通訊能力、高吞吐等能力,使用 Java Socket 實現會更加繁瑣,而成熟的 Netty 元件提供了非同步處理和更高階的網路通訊功能,已經應用於各類元件底層通訊能力,使用 Netty 實現 Redis 客戶端將更容易獲得這些能力。
注:常見的開源 Redis 客戶端 Redisson 底層便使用 Netty 實現。
接下來我們也使用 Netty 實現 Redis 客戶端:
RESP 編碼器
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class RedisCommandEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) { String[] parts = msg.split(" "); out.writeByte('*'); out.writeBytes(Integer.toString(parts.length).getBytes()); out.writeBytes("\r\n".getBytes()); for (String part : parts) { out.writeByte('$'); out.writeBytes(Integer.toString(part.length()).getBytes()); out.writeBytes("\r\n".getBytes()); out.writeBytes(part.getBytes()); out.writeBytes("\r\n".getBytes()); } } }
RESP 解碼器
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class RedisResponseDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (!in.isReadable()) { return; } byte firstByte = in.readByte(); switch (firstByte) { case '+': out.add(readSimpleString(in)); break; case '-': out.add(readError(in)); break; case ':': out.add(readInteger(in)); break; case '$': out.add(readBulkString(in)); break; case '*': out.add(readArray(in)); break; default: throw new IllegalArgumentException("Unknown RESP type: " + (char) firstByte); } } private String readSimpleString(ByteBuf in) { return readLine(in); } private String readError(ByteBuf in) { return readLine(in); } private long readInteger(ByteBuf in) { return Long.parseLong(readLine(in)); } private String readBulkString(ByteBuf in) { int length = Integer.parseInt(readLine(in)); if (length == -1) { return null; } byte[] bytes = new byte[length]; in.readBytes(bytes); in.readByte(); // \r in.readByte(); // \n return new String(bytes); } private Object[] readArray(ByteBuf in) { int length = Integer.parseInt(readLine(in)); if (length == -1) { return null; } Object[] array = new Object[length]; for (int i = 0; i < length; i++) { array[i] = decode(in); } return array; } private String readLine(ByteBuf in) { StringBuilder sb = new StringBuilder(); while (in.isReadable()) { char c = (char) in.readByte(); if (c == '\r') { in.readByte(); // \n break; } sb.append(c); } return sb.toString(); } }
Redis 客戶端實現
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class NettyRedisClient { private final String host; private final int port; private Channel channel; private BlockingQueue<Object> responseQueue = new ArrayBlockingQueue<>(1); public NettyRedisClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new RedisCommandEncoder(), new RedisResponseDecoder(), new SimpleChannelInboundHandler<Object>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { responseQueue.offer(msg); } }); } }); channel = bootstrap.connect(host, port).sync().channel(); } finally { // group.shutdownGracefully(); } } public Object sendCommand(String command) throws InterruptedException { channel.writeAndFlush(command); return responseQueue.take(); } public void stop() { channel.close(); } public static void main(String[] args) throws InterruptedException { NettyRedisClient client = new NettyRedisClient("localhost", 6379); client.start(); System.out.println(client.sendCommand("PING")); System.out.println(client.sendCommand("SET mykey myvalue")); System.out.println(client.sendCommand("GET mykey")); client.stop(); } }
使用 Netty 可以顯著提高網路通訊的效能和效率,同時簡化開發過程,提供更好的資源管理和錯誤處理機制。
對於需要處理大量併發連線和高效能要求的應用程式,Netty 是一個非常好的選擇。
以上是最小可用版的 Redis 客戶端,你可以基於此基礎上實現更豐富的功能~