Redis是一个CS架构的软件,通信一般分两步(不包括pipeline和PubSub):
- 客户端(client)向服务端(server)发送一条命令
- 服务端解析并执行命令,返回响应结果给客户端
客户端发送命令的格式、服务端响应结果的格式遵顼 Redis Serialization Protocol,Redis 序列化协议,简称 RESP。
- Redis 1.2版本引入了RESP协议
- Redis 2.0版本中成为与Redis服务端通信的标准,称为RESP2
- Redis 6.0版本中,从RESP2升级到了RESP3协议,增加了更多数据类型并且支持6.0的新特性–客户端缓存
RESP2
在RESP中,通过首字节的字符来区分不同数据类型
单行字符串:首字节是 ‘+’ ,后面跟上单行字符串,以CRLF( “\r\n” )结尾。例如返回”OK”: “+OK\r\n”
错误(Errors):首字节是 ‘-’ ,与单行字符串格式一样,只是字符串是异常信息,例如:”-Error message\r\n”
数值:首字节是 ‘:’ ,后面跟上数字格式的字符串,以CRLF结尾。例如:”:10\r\n”
多行字符串:首字节是 ‘$’ ,表示二进制安全的字符串,最大支持512MB:
- 如果大小为0,则代表空字符串:”$0\r\n\r\n”
- 如果大小为-1,则代表不存在:”$-1\r\n”
- 若大小为5,$5\r\nhello\r\n
数组:首字节是 ‘*’,后面跟上数组元素个数,再跟上元素,元素数据类型不限
*2\r\n$3\r\nage\r\n:10\r\n
Redis支持TCP通信,可以使用Socket来模拟客户端,与Redis服务端建立连接。
import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
* RESP
*/
public class Main {
static Socket socket;
static PrintWriter writer;
static BufferedReader reader;
public static void main(String[] args) {
String host = "192.168.25.128";
int port = 6379;
try {
socket = new Socket(host, port);
writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
sendRequest("set", "name", "大哥");
Object o = handleResponse();
System.out.println("o = " + o);
sendRequest("get", "name");
o = handleResponse();
System.out.println("o = " + o);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (writer != null) {
writer.close();
}
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static Object handleResponse() throws IOException {
int prefix = reader.read();
switch (prefix) {
case '+'://单行字符串
return reader.readLine();
case '-'://异常,一行数据
throw new RuntimeException(reader.readLine());
case ':'://数字
return Long.parseLong(reader.readLine());
case '$'://多行字符串,先读长度,再读按照长度读数据(字节流)
int length = Integer.parseInt(reader.readLine());
if (length == -1) return null;
if (length == 0) return "";
case '*':
return readBulkString();
default:
throw new RuntimeException("错误的数据");
}
}
private static Object readBulkString() throws IOException {
int length = Integer.parseInt(reader.readLine());
if ((length <= 0)) return null;
List<Object> list = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
list.add(handleResponse());
}
return list;
}
private static void sendRequest(String... args) {
writer.println("*" + args.length);
for (String arg : args) {
writer.println("$" + arg.getBytes(StandardCharsets.UTF_8).length);
writer.println(arg);
}
writer.flush();
}
}
gossip
集群内节点通信机制