Redis
主从复制有以下三个阶段:
建立连接。
数据同步。
命令传播。
这一阶段的作用是主从节点建立连接,为数据同步做好准备,具体的步骤如下:
slave
节点保存master
节点信息从节点服务器内部维护了两个字段,即masterhost
和masterport
字段,用于存储主节点的IP
地址和端口号。开启主从的replicaof
命令实现原理在replication.c
中的replicaofCommand
方法中:
void replicaofCommand(client *c) {
/* SLAVEOF is not allowed in cluster mode as replication is automatically
* configured using the current address of the master node. */
if (server.cluster_enabled) {
addReplyError(c,"REPLICAOF not allowed in cluster mode.");
return;
}
if (server.failover_state != NO_FAILOVER) {
addReplyError(c,"REPLICAOF not allowed while failing over.");
return;
}
/* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
if (server.masterhost) {
replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
}
} else {
long port;
if (c->flags & CLIENT_SLAVE)
{
/* If a client is already a replica they cannot run this command,
* because it involves flushing all replicas (including this
* client) */
addReplyError(c, "Command is not valid when client is a replica.");
return;
}
if (getRangeLongFromObjectOrReply(c, c->argv[2], 0, 65535, &port,
"Invalid master port") != C_OK)
return;
/* Check if we are already attached to the specified master */
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
serverLog(LL_NOTICE,"REPLICAOF would result into synchronization "
"with the master we are already connected "
"with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified "
"master\r\n"));
return;
}
/* There was no previous master or the user specified a different one,
* we can continue. */
replicationSetMaster(c->argv[1]->ptr, port);
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
server.masterhost, server.masterport, client);
sdsfree(client);
}
addReply(c,shared.ok);
}
建立socket
连接从节点每秒调用复制定时函数replicationCron
一次,当定时任务发现有主节点可以连接,就会调用connectWithMaster
方法,根据主节点的IP
地址和端口号来建立socket
连接。如果连接成功,则从节点为该socket
建立一个专门的文件事件处理程序,负责后续的复制工作,如接收RDB
文件、接收命令传播等。当主节点接收到从节点的socket
连接请求后,为该socket
创建相应的客户端状态,并将从节点看作连接到主节点的一个客户端。replicationCron
源码如下:
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
static long long replication_cron_loops = 0;
/* Check failover status first, to see if we need to start
* handling the failover. */
updateFailoverStatus();
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake(1);
}
/* Bulk transfer I/O timeout? */
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
cancelReplicationHandshake(1);
}
/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
freeClient(server.master);
}
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
connectWithMaster();
}
/* Send ACK to master from time to time.
* Note that we do not send periodic acks to masters that don't
* support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
listIter li;
listNode *ln;
robj *ping_argv[1];
/* First, send PING according to ping_slave_period. */
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
/* Note that we don't send the PING if the clients are paused during
* a Redis Cluster manual failover: the PING we send will otherwise
* alter the replication offsets of master and slave, and will no longer
* match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress =
((server.cluster_enabled &&
clusterManualFailoverTimeLimit()) ||
server.failover_end_time) &&
isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA);
if (!manual_failover_in_progress) {
ping_argv[0] = shared.ping;
replicationFeedSlaves(server.slaves, -1,
ping_argv, 1);
}
}
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
*
* Also send the a newline to all the chained slaves we have, if we lost
* connection from our master, to keep the slaves aware that their
* master is online. This is needed since sub-slaves only receive proxied
* data from top-level masters, so there is no explicit pinging in order
* to avoid altering the replication offsets. This special out of band
* pings (newlines) can be sent, they will have no effect in the offset.
*
* The newline will be ignored by the slave but will refresh the
* last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
if (is_presync) {
connWrite(slave->conn, "\n", 1);
}
}
/* Disconnect timedout slaves. */
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_ONLINE) {
if (slave->flags & CLIENT_PRE_PSYNC)
continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
replicationGetSlaveName(slave));
freeClient(slave);
continue;
}
}
/* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
* by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
* from terminating. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
if (slave->repl_last_partial_write != 0 &&
(server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
replicationGetSlaveName(slave));
freeClient(slave);
continue;
}
}
}
}
/* If this is a master without attached slaves and there is a replication
* backlog active, in order to reclaim memory we can free it after some
* (configured) time. Note that this cannot be done for slaves: slaves
* without sub-slaves attached should still accumulate data into the
* backlog, in order to reply to PSYNC queries if they are turned into
* masters after a failover. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog && server.masterhost == NULL)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;
if (idle > server.repl_backlog_time_limit) {
/* When we free the backlog, we always use a new
* replication ID and clear the ID2. This is needed
* because when there is no backlog, the master_repl_offset
* is not updated, but we would still retain our replication
* ID, leading to the following problem:
*
* 1. We are a master instance.
* 2. Our slave is promoted to master. It's repl-id-2 will
* be the same as our repl-id.
* 3. We, yet as master, receive some updates, that will not
* increment the master_repl_offset.
* 4. Later we are turned into a slave, connect to the new
* master that will accept our PSYNC request by second
* replication ID, but there will be data inconsistency
* because we received writes. */
changeReplicationId();
clearReplicationId2();
freeReplicationBacklog();
serverLog(LL_NOTICE,
"Replication backlog freed after %d seconds "
"without connected replicas.",
(int) server.repl_backlog_time_limit);
}
}
replicationStartPendingFork();
/* Remove the RDB file used for replication if Redis is not running
* with any persistence. */
removeRDBUsedToSyncReplicas();
/* Sanity check replication buffer, the first block of replication buffer blocks
* must be referenced by someone, since it will be freed when not referenced,
* otherwise, server will OOM. also, its refcount must not be more than
* replicas number + 1(replication backlog). */
if (listLength(server.repl_buffer_blocks) > 0) {
replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks));
serverAssert(o->refcount > 0 &&
o->refcount <= (int)listLength(server.slaves)+1);
}
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}
发送ping命令从节点成为主节点的客户端之后,发送ping
命令进行首次请求,目的是检查socket
连接是否可用,以及主节点当前是否能够处理请求。从节点发送ping
命令后,可能会出现下面三种情况:
返回pong
:说明socket
连接正常,且主节点当前可以处理请求、可以进行复制。
超时:超过一定时间后从节点仍未收到主节点的回复,说明socket
连接不可用,则从节点断开socket
连接,并尝试重连。
如果主节点返回其他结果,如正在处理超时运行的Lua
脚本,说明主节点当前无法处理命令,则从节点断开socket
连接并且尝试重连。
身份验证如果从节点中设置了身份验证masterauth
选项,那么从节点需要向主节点进行身份验证;如果没有设置该选项,则不需要验证。从节点进行身份验证是通过向主节点发送auth
命令,auth
命令的参数即为配置文件中的主节点密码的值。如果主节点设置的密码与从节点提供的密码一致,则身份验证通过,复制过程继续;如果不一致,则从节点断开socket
连接并且尝试重连。
发送从节点端口信息身份验证之后,从节点会向主节点发送其监听的端口号,主节点将该信息保存到该从节点对应的客户端的slave_listening_port
字段中。
主从节点之间的连接建立好以后便可以开始进行数据同步,该阶段可以理解为从节点数据的初始化,具体执行的方式是:从节点向主节点发送psync
命令,并且开始同步。数据同步阶段是主从复制的核心阶段,根据主从节点的当前状态可以分为全量复制和部分复制。
当启动一个从节点的时候,它会发送一个PSYNC
命令给主节点,如果这时从节点重新连接主节点,那么主节点仅仅会复制给从节点部分缺少的数据;如果是从节点第一次连接主节点,就会触发一次全量重新同步ull resynchronization
。
开始全量重新同步时,主节点收到全量复制的命令后,执行bgsave
命令,在后台生成RDB
文件,并使用一个缓冲区(称为复制缓冲区)记录从现在开始执行的所有写命令来保证数据的完整性。
主节点的bgsave
命令执行完成后,将RDB
文件发送给从节点。从节点首先清除自己的旧数据,然后载入接收的RDB
文件,将数据库状态更新至主节点执行bgsave
命令时的数据库状态。
主节点将复制缓冲区中的所有写命令并发送给从节点,从节点执行这些写命令,将数据库状态更新至主节点的最新状态。所以这里会存在一个问题,如果缓冲区有修改的数据,那么从节点收到的数据就不是最新的数据。
全量复制源码请参阅replication.c
中的readSyncBulkPayload
方法。
从全量复制的过程可以看出,全量复制有如下几个非常重要的操作:
主节点通过bgsave
命令派生或创建fork
子进程进行RDB
持久化,这个过程非常消耗CPU
、内存以及硬盘IO的资源。
主节点通过网络将全量的RDB
文件发送给从节点,会大量消耗主从节点的带宽流量。
为了保证数据的一致性,从节点清空旧数据、载入新RDB
文件的过程是阻塞的,无法响应客户端的命令;如果从节点执行bgrewriteaof
命令,就会带来额外的消耗。
Redis 2.8
开始就支持主从复制的断点续传,如果主从复制过程中网络连接断掉了,那么可以接着上次复制的地方继续复制下去,而不是从头开始复制,这样在很大程度上保证了主从复制的性能。部分复制的实现主要依赖于三个重要的特征:
复制便宜量主节点和从节点分别维护一个复制偏移量offset
,表示主节点向从节点传送的字节数;主节点每次向从节点传播N
字节数据时,主节点的偏移量增加N
;从节点每次收到主节点传来的N
字节数据时,从节点的偏移量增加N
。偏移量用于判断主从节点的数据库状态是否一致:如果主从节点的偏移量相同,则数据一致;如果偏移量不同,则数据不一致,此时可以根据主从节点的偏移量找出从节点缺少的那部分数据。比如,主节点的偏移量是1000、从节点的偏移量是500,那么部分复制就需要将偏移量为501到1000的数据传送给从节点,而偏移量为501到1000的数据存储在复制积压缓冲区backlog
。
复制积压缓冲区backlog
复制积压缓冲区是专门由主节点维护、固定长度、先进先出的队列,默认大小是1MB
。当主节点开始有从节点时才会创建,否则不会创建。它的作用是备份主节点最近发送给从节点的数据。无论主节点有一个还是多个从节点,都只需要一个复制积压缓冲区,不会存在多个复制积压缓冲区。在命令传播阶段,主节点除了将写命令发送给从节点,还会发送一份给复制积压缓冲区作为写命令的备份,除了存储写命令,复制积压缓冲区中还存储了其中每个字节对应的复制偏移量。因为复制积压缓冲区是定长的且是先进先出的,所以它保存的是主节点最近执行的写命令,时间较早的写命令会被挤出缓冲区。该缓冲区的长度固定且有限,因此可以备份的写命令也有限,当主从节点偏移量的差距过大超过缓冲区长度时将无法执行部分复制,只能执行全量复制。所以,为了提高网络中断时部分复制执行的概率,可以根据需要修改配置repl-backlog-size
来增大复制积压缓冲区的大小。比如,网络中断的平均时间是60
秒,而主节点平均每秒产生的写命令所占的字节数为200KB
,则复制积压缓冲区的平均大小为12MB
,那么可以设置为24MB
来保证绝大多数网络断线情况都可以使用部分复制。从节点将偏移量发送给主节点后,主节点根据偏移量和缓冲区大小决定能否执行部分复制。如果偏移量之后的数据仍然都在复制积压缓冲区里,则执行部分复制。如果偏移量之后的数据已不在复制积压缓冲区中(数据已被挤出),则执行全量复制。所以,复制积压缓冲区的大小是非常重要的一个参数。
服务运行ID
可以通过info server
命令查询到服务运行ID
,如下所示:
127.0.0.1:6379> info server
# Server
redis_version:7.2.0
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:7f39debc4ae51812
redis_mode:standalone
os:Linux 6.11.10-200.fc40.x86_64 x86_64
arch_bits:64
monotonic_clock:POSIX clock_gettime
multiplexing_api:epoll
atomicvar_api:c11-builtin
gcc_version:12.2.0
process_id:1
process_supervised:no
run_id:4294f51a00ab3f5d90164a3f47b78ff49a5fdbf9
tcp_port:6379
server_time_usec:1733479267733986
uptime_in_seconds:32101
uptime_in_days:0
hz:10
configured_hz:10
lru_clock:5426019
executable:/data/redis-server
config_file:/etc/redis/redis.conf
io_threads_active:0
listener0:name=tcp,bind=*,bind=-::*,port=6379
如果从节点保存的runid
与主节点当前的runid
相同,就说明主从节点之前同步过,主节点会继续尝试使用部分复制。如果从节点保存的runid
与主节点现在的runid
不同,就说明从节点在断线前同步的Redis
节点并不是当前的主节点,只能进行全量复制。
部分复制源码请参阅replication.c
中的replicationResurrectCachedMaster
方法。
数据同步阶段完成后,主从节点进入命令传播阶段。在这一阶段,主节点将自己执行的写命令发送给从节点,从节点接收到命令并且执行,从而保证主从节点数据的一致性。
在命令传播阶段,除了发送写命令,主从节点还维持着心跳机制:ping
和replconf ack
。心跳机制是指主从复制的超时判断,即每隔指定的时间主节点会向从节点发送ping
命令,ping
命令的目的主要是让从节点进行超时判断。心跳机制是在replicationCron
函数中实现的,源码如下:
/* First, send PING according to ping_slave_period. */
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
/* Note that we don't send the PING if the clients are paused during
* a Redis Cluster manual failover: the PING we send will otherwise
* alter the replication offsets of master and slave, and will no longer
* match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress =
((server.cluster_enabled &&
clusterManualFailoverTimeLimit()) ||
server.failover_end_time) &&
isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA);
if (!manual_failover_in_progress) {
ping_argv[0] = shared.ping;
replicationFeedSlaves(server.slaves, -1,
ping_argv, 1);
}
}
ping
命令的作用如下:
虽然主从节点成功建立起了套接字连接,但是双方并未使用该套接字进行过任何通信,通过发送ping
命令可以检查套接字的读写状态。
复制工作接下来的几个步骤都需要在主节点可以正常处理命令请求的状态下进行,通过ping
可以知道主节点是否能正常处理命令请求。
如果主节点返回给从节点一个命令回复,但是从节点不能在规定的时限内读取出命令回复的内容,那么表示主从节点之间的网络连接状态不是很好,不能继续执行复制工作的后续步骤。当出现这种情况时,从节点断开并重新创建连接主节点的套接字。
在命令传播阶段,从节点会向主节点发送replconf ack
命令,频率也是每秒1次。命令格式为:
replconf ack{offset}
其中offset
是指从节点保存的复制偏移量。replconf ack
命令的意义和作用如下:
实时监测主从节点的网络状态:该命令会被主节点用于复制超时的判断。此外,在主节点中执行info Replication
命令,可以看到其从节点的状态中的lag
值(包含从节点的IP
地址和端口号以及数据同步偏移量),代表的是主节点上次收到该replconf ack
命令的时间间隔,在正常情况下,该值应该是0或1。
检测命令丢失:从节点发送了自身的偏移量,主节点会与自己的偏移量对比,如果从节点因为网络丢包等原因造成数据缺失,主节点会推送缺失的数据。
辅助保证从节点的数量和延迟:Redis
主节点中使用min-slaves-to-write
和min-slaves-max-lag
参数来保证主节点在不安全的情况下不会执行写命令。所谓不安全,是指从节点数量太少或延迟过高。
命令传播是异步的过程,即主节点发送写命令后并不会等待从节点的回复,因此实际上主从节点之间很难保持实时的一致性。数据不一致的程度与主从节点之间的网络状况、主节点写命令的执行频率以及主节点中的repl-disable-tcp-nodelay
配置有关,该配置有两个值:
关闭no
:无论数据大小都会及时同步到从节点,耗费带宽,适用于主从网络稳定的应用场景。
开启yes
:主节点每隔指定时间把数据合并为TCP
包,节省带宽,默认为40
毫秒同步一次。
一般来说,只有当应用对Redis
数据不一致的容忍度较高,且主从节点之间网络状况不好时才会设置为yes
,多数情况下使用默认设置值no
。