切換語言為:簡體

透過 Socket 以及 Netty 兩種方式實現 Redis 客戶端

  • 爱糖宝
  • 2024-10-11
  • 2044
  • 0
  • 0

前言

實現客戶端與服務端間通訊,在傳輸層協議 TCP 或 UDP 的基礎上,定義客戶端與服務端都能識別資訊的一套規則:一方進行編碼,另一方進行解碼,這便是【協議】。

在 Redis 中,定義了簡單的 RESP(REdis Serialization Protocol)文字協議,接下來,我們嘗試實現一款能與服務端通訊的 Redis 客戶端,其核心原理如市面上常見的 Redisson、Jedis 等類似。

由淺入深,理解市面上流行的開源元件會更加的得心應手。

實現

基礎

1. 網路通訊

我們知道,在傳統計算機網路模型中,傳輸層(TCP / UDP)的上一層便是應用層。應用層協議一般專注於資料的編解碼等約定,比如經典的 HTTP 協議。

RESP 協議本質和 HTTP 是一個級別,都屬於應用層協議

在 redis 中,傳輸層協議使用的是 TCP,服務端從 TCP socket 緩衝區中讀取資料,然後經過 RESP 協議解碼得到我們的指令。

而寫入資料則是相反,伺服器先將響應資料使用 RESP 編碼,然後將編碼後的資料寫入 TCP Socket 緩衝區傳送給客戶端。

2. 協議格式

在 RESP 協議中,第一個位元組決定了具體資料型別:

  • 簡單字串:Simple Strings,首位元組響應 +

  • 錯誤:Errors,首位元組響應 -

  • 整型:Integers,首位元組響應 :

  • 批次字串:Bulk Strings,首位元組響應 $

  • 陣列:Arrays,首位元組響應 *

我們來看看一具體的例子,我們一條正常指令 PSETEX test_redisson_batch_key8 120000 test_redisson_batch_key=>value:8,經 RESP 協議編碼後長這樣:

*4
$6
PSETEX
$24
test_redisson_batch_key8
$6
120000
$32
test_redisson_batch_key=>value:8

值得注意的是,在 RESP 協議中的每一部分都是以 \R\N 結尾。

基於 Socket 實現

實現一個簡單的 Redis 客戶端可以使用 Java 的 Socket 程式設計來直接與 Redis 伺服器進行通訊。

Redis 使用 RESP(REdis Serialization Protocol)協議進行通訊,因此我們需要按照該協議格式化請求並解析響應。

1. 建立 RedisClient 類

首先,我們建立一個 RedisClient 類,用於管理與 Redis 伺服器的連線和通訊。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;

public class RedisClient {
    private Socket socket;
    private BufferedReader reader;
    private OutputStream writer;

    // 連線到 Redis 伺服器
    // 透過 Java 的 Socket 進行連線
    public RedisClient(String host, int port) throws IOException {
        this.socket = new Socket(host, port);
        this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        this.writer = socket.getOutputStream();
    }

    public void close() throws IOException {
        reader.close();
        writer.close();
        socket.close();
    }

    // 傳送命令到服務端,並讀取響應
    private String sendCommand(String command) throws IOException {
        writer.write(command.getBytes());
        writer.flush();
        return parseResponse();
    }
    
    // 解析響應資料
    private String parseResponse() throws IOException {
    String response = reader.readLine();
    if (response == null) {
        throw new IOException("Empty response from server");
    }

    char prefix = response.charAt(0);
    switch (prefix) {
        case '+': // Simple String
            return response.substring(1);
        case '-': // Error
            throw new IOException("Error response from server: " + response.substring(1));
        case ':': // Integer
            return response.substring(1);
        case '$': // Bulk String
            int length = Integer.parseInt(response.substring(1));
            if (length == -1) {
                return null; // Null Bulk String
            }
            char[] bulkString = new char[length];
            reader.read(bulkString, 0, length);
            reader.readLine(); // Read the trailing \r\n
            return new String(bulkString);
        case '*': // Array
            int count = Integer.parseInt(response.substring(1));
            StringBuilder arrayResponse = new StringBuilder();
            for (int i = 0; i < count; i++) {
                arrayResponse.append(parseResponse()).append("\n");
            }
            return arrayResponse.toString();
        default:
            throw new IOException("Unknown response prefix: " + prefix);
     }
   }

    // 設定鍵值對
    // 這裏需要按照 RESP 協議進行編碼資料,然後透過 sendCommand 傳送
    public String set(String key, String value) throws IOException {
        String command = String.format("*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n", key.length(), key, value.length(), value);
        return sendCommand(command);
    }

    // 獲取鍵值對
    public String get(String key) throws IOException {
        String command = String.format("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", key.length(), key);
        return sendCommand(command);
    }

    public String del(String key) throws IOException {
        String command = String.format("*2\r\n$3\r\nDEL\r\n$%d\r\n%s\r\n", key.length(), key);
        return sendCommand(command);
    }

    public String setex(String key, int seconds, String value) throws IOException {
        String command = String.format("*4\r\n$5\r\nSETEX\r\n$%d\r\n%s\r\n$%d\r\n%d\r\n$%d\r\n%s\r\n", key.length(), key, String.valueOf(seconds).length(), seconds, value.length(), value);
        return sendCommand(command);
    }
}

基本步驟:

  1. 透過 Socket 與服務端建立連線

  2. 指令編碼(使用 RESP 協議)

  3. 指令傳送以及解析響應資料

2. 使用 RedisClient 類

透過 RedisClient 進行通訊:

public class RedisClientTest {
    public static void main(String[] args) {
        try {
            RedisClient client = new RedisClient("localhost", 6379);

            // 設定一個鍵值對
            System.out.println(client.set("name", "神醫"));

            // 獲取鍵對應的值
            System.out.println(client.get("name"));

            // 設定一個帶有過期時間的鍵值對(10秒後過期)
            System.out.println(client.setex("temp_key", 10, "臨時值"));

            // 獲取帶有過期時間的鍵值對
            System.out.println(client.get("temp_key"));

            // 刪除一個鍵
            System.out.println(client.del("name"));

            // 嘗試獲取已刪除的鍵
            System.out.println(client.get("name"));

            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

基於 Netty 實現

如果想要非同步通訊能力、高吞吐等能力,使用 Java Socket 實現會更加繁瑣,而成熟的  Netty 元件提供了非同步處理和更高階的網路通訊功能,已經應用於各類元件底層通訊能力,使用 Netty 實現 Redis 客戶端將更容易獲得這些能力。

注:常見的開源 Redis 客戶端 Redisson 底層便使用 Netty 實現。

接下來我們也使用 Netty 實現 Redis 客戶端:

RESP 編碼器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class RedisCommandEncoder extends MessageToByteEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) {
        String[] parts = msg.split(" ");
        out.writeByte('*');
        out.writeBytes(Integer.toString(parts.length).getBytes());
        out.writeBytes("\r\n".getBytes());

        for (String part : parts) {
            out.writeByte('$');
            out.writeBytes(Integer.toString(part.length()).getBytes());
            out.writeBytes("\r\n".getBytes());
            out.writeBytes(part.getBytes());
            out.writeBytes("\r\n".getBytes());
        }
    }
}

RESP 解碼器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class RedisResponseDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (!in.isReadable()) {
            return;
        }

        byte firstByte = in.readByte();
        switch (firstByte) {
            case '+':
                out.add(readSimpleString(in));
                break;
            case '-':
                out.add(readError(in));
                break;
            case ':':
                out.add(readInteger(in));
                break;
            case '$':
                out.add(readBulkString(in));
                break;
            case '*':
                out.add(readArray(in));
                break;
            default:
                throw new IllegalArgumentException("Unknown RESP type: " + (char) firstByte);
        }
    }

    private String readSimpleString(ByteBuf in) {
        return readLine(in);
    }

    private String readError(ByteBuf in) {
        return readLine(in);
    }

    private long readInteger(ByteBuf in) {
        return Long.parseLong(readLine(in));
    }

    private String readBulkString(ByteBuf in) {
        int length = Integer.parseInt(readLine(in));
        if (length == -1) {
            return null;
        }
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        in.readByte(); // \r
        in.readByte(); // \n
        return new String(bytes);
    }

    private Object[] readArray(ByteBuf in) {
        int length = Integer.parseInt(readLine(in));
        if (length == -1) {
            return null;
        }
        Object[] array = new Object[length];
        for (int i = 0; i < length; i++) {
            array[i] = decode(in);
        }
        return array;
    }

    private String readLine(ByteBuf in) {
        StringBuilder sb = new StringBuilder();
        while (in.isReadable()) {
            char c = (char) in.readByte();
            if (c == '\r') {
                in.readByte(); // \n
                break;
            }
            sb.append(c);
        }
        return sb.toString();
    }
}

Redis 客戶端實現

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class NettyRedisClient {
    private final String host;
    private final int port;
    private Channel channel;
    private BlockingQueue<Object> responseQueue = new ArrayBlockingQueue<>(1);

    public NettyRedisClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new RedisCommandEncoder(), new RedisResponseDecoder(), new SimpleChannelInboundHandler<Object>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
                                    responseQueue.offer(msg);
                                }
                            });
                        }
                    });

            channel = bootstrap.connect(host, port).sync().channel();
        } finally {
            // group.shutdownGracefully();
        }
    }

    public Object sendCommand(String command) throws InterruptedException {
        channel.writeAndFlush(command);
        return responseQueue.take();
    }

    public void stop() {
        channel.close();
    }

    public static void main(String[] args) throws InterruptedException {
        NettyRedisClient client = new NettyRedisClient("localhost", 6379);
        client.start();
        System.out.println(client.sendCommand("PING"));
        System.out.println(client.sendCommand("SET mykey myvalue"));
        System.out.println(client.sendCommand("GET mykey"));
        client.stop();
    }
}

使用 Netty 可以顯著提高網路通訊的效能和效率,同時簡化開發過程,提供更好的資源管理和錯誤處理機制。

對於需要處理大量併發連線和高效能要求的應用程式,Netty 是一個非常好的選擇。

以上是最小可用版的 Redis 客戶端,你可以基於此基礎上實現更豐富的功能~

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.