【Redis】AOF 基础

因为 Redis AOF 的实现有些绕, 就分成 2 篇进行分析, 本篇主要是介绍一下 AOF 的一些特性和依赖的其他函数的逻辑,为下一篇 (Redis AOF 源码) 源码分析做一些铺垫。

AOF 全称: Append Only File, 是 Redis 提供了一种数据保存模式, Redis 默认不开启。
AOF 采用日志的形式来记录每个写操作, 并追加到文件。开启后, 执行更改 Redis 数据的命令时, 就会把命令写入到 AOF 文件中。
Redis 重启时会根据日志文件的内容把写指令从前到后执行一次以完成数据的恢复工作。

1 AOF 相关的配置

appendonly no                   # AOF 开关, 默认为关闭
appendfilename "appendonly.aof" # 保存的文件名
appendfsync everysec            # AOF 持久化策略 (硬盘缓存写入到硬盘) 

AOF 简单使用的话就这 3 个配置, 前 2 个就是字面的意思, 理解其他比较简单。
我们说明一下第三个配置的使用。

开启 AOF 后, 每次修改的命令都会存到 Redis 的一个缓存区
缓存区的数据最终是需要写入到磁盘的, 而 Redis 是通过 write 函数, 将缓存中的数据写入到磁盘中。
但是 write 函数实际是先将数据先保存到系统层级的缓存, 后续由系统自身将数据保存到磁盘, 系统默认为 30 秒保存一次。这样的话, 可能有风险, 如果系统直接宕机了
可能会丢失 30 秒左右的数据, 所以系统提供了一个 fsync 函数, 可以把系统层级的缓存立即写入到磁盘中, 但是这是一个阻塞且缓慢的操作, 会影响到执行的线程。

所以上面的配置的第 3 项就是控制这个 Redis 缓存到磁盘的行为

  1. everysec: AOF 默认的持久化策略。每秒执行一次 fsync, 可能导致丢失 1s 数据, 这种策略兼顾了安全性和效率
  2. no: 表示不执行 fsync, 由操作系统保证数据同步到磁盘, 速度最快, 但是不太安全
  3. always: 表示每次写入到执行 fsync, 保证数据同步到磁盘, 效率很低

除了上面的 3 个基础配置, 还有几个关于 AOF 执行中的行为配置

# 默认为 100
# 当目前 AOF 文件大小超过上次重写的 AOF 文件的百分之多少进行重写 (重写的含义可以看下面的重写机制), 即当 AOF 文件增长到一定大小的时候, Redis 能够调用 bgrewriteaof 对日志文件进行重写
auto-aof-rewrite-percentag  100# 默认为 64m
# 设置允许重写的最小 AOF 文件大小, 避免达到约定百分比但占用的容量仍然很小的情况就重写
auto-aof-rewrite-min-size  64mb# 默认为 no
# 在 AOF 重写时, 是否不要执行 fsync, 将缓存写入到磁盘, 默认为 no。
# 如果对低延迟要求很高的应用, 这里可以设置为 yes, 否则设置为 no, 这样对持久化特性来说这是更安全的选择
# 设置为 yes 表示重写期间对新的写操作不 fsync, 暂时存在内存中, 等重新操作完成后再写入
# 默认为 no, 建议改为 yes, 因为 Linux 的默认 fsync 策略为 30 秒, 所以可能丢失 30 秒数据
no-appendfsync-on-rewrite  no# 默认为 yes
# 当 Redis 启动的时候, AOF 文件的数据会被重新载入内存
# 但是 AOF 文件可能在尾部是不完整的, 比如突然的断电宕机什么的, 可能导致 AOF 文件数据不完整
# 对于不完整的 AOF 文件如何处理
# 配置为 yes, 当截断的 AOF 文件被导入的时候, 会自动发布一个 log 给客户端, 然后继续加载文件中的数据
# 配置为 no, 用户必须手动 redis-check-aof 修复 AOF 文件才可以
aof-load-truncated yes

2 AOF 重写机制

上面的配置中有好几个提示到重写的概念, 那么什么是重写呢?

由于 AOF 持久化是 Redis 不断将写命令记录到 AOF 文件中, 随着 Redis 不断的运行, AOF 文件将会越来越大, 占用服务器磁盘越来越大, 同时 AOF 恢复要求时间越长。

为了解决这个问题, Redis 新增了重写机制, 当 AOF 文件的大小超过了所设定的阈值时, Redis 就会自动启动 AOF 文件的内容压缩, 只保留可以恢复数据的最小指令集。
AOF 文件不是对原文件进行整理, 而是直接读取服务器现有的键值对, 然后用一条命令去代替之前记录这个键值对的多条命令, 生成一个新的文件替换原来的 AOF 文件。

用户可以通过 bgrewriteaof 命令来手动触发 AOF 文件的重写, 这个重写的过程也是通过子进程实现的。
在子进程进行 AOF 重写时, 主线程需要保证

  1. 处理客户端的请求
  2. 将新增和更新命令追加到现有的 AOF 文件中
  3. 将新增和更新命令追加到 AOF 重写缓存中

3 AOF 文件的优势和劣势

优势

  1. AOF 持久化的方法提供了多种的同步频率, 即使使用默认的同步频率每秒同步一次, Redis 最多也就丢失 1 秒的数据而已
  2. AOF 日志文件以 append-only 模式写入, 所以没有任何磁盘寻址的开销, 写入性能非常高, 而且文件不容易受损, 即使文件尾部受损, 也能很容易恢复, 打开文件, 把后面损坏的数据删除即可

劣势

  1. 对于具有相同数据的的 Redis, AOF 文件通常会比 RDF 文件体积更大 (RDB 存的是数据快照)
  2. 虽然 AOF 提供了多种同步的频率, 默认情况下, 每秒同步一次的频率也具有较高的性能。但是在高并发的情况下, RDB 比 AOF 具好更好的性能保证

4 AOF 和 RDB 两种方案比较

如果可以忍受一小段时间内数据的丢失, 使用 RDB 是最好的, 定时生成 RDB 快照 (snapshot) 非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快, 否则使用 AOF。

但是一般情况下建议不要单独使用某一种持久化机制, 而是应该两种一起用, 在这种情况下, 当 Redis 重启的时候会优先载入 AOF 文件来恢复原始的数据, 因为在通常情况下 AOF 文件保存的数据集要比 RDB 文件保存的数据集要完整。

在 Redis 4.0 带来了一个新的持久化选项 —— 混合持久化。将 RDB 文件的内容和增量的 AOF 日志文件存在一起。
这里的 AOF 日志不再是全量的日志, 而是自持久化开始到持久化结束的这段时间发生的增量 AOF 日志, 通常这部分 AOF 日志很小。

在 Redis 重启的时候, 可以先加载 RDB 的内容, 然后再重放增量 AOF 日志就可以完全替代之前的 AOF 全量文件重放, 重启效率因此大幅得到提升。

5 AOF 的过程

高度概括如下:

  1. 所有的修改命令会追加到 Redis 的一个 AOF 缓存区
  2. AOF 缓存区根据配置的策略向硬盘做同步操作
  3. 随着 AOF 文件越来越大, 达到配置的条件, 对 AOF 文件进行重写, 达到压缩的目的

到此, AOF 的理论知识就没了, 下面是介绍几个比较重要的函数的逻辑。

6 AOF 文件结构

如果现在向 Redis 中写入一个 key 为 redis-key, value 为 redis-value 的字符串键值对后, 这对键值对会以下面的格式保存在 AOF 文件中:

*3\r\n$3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n

*数字 的格式开始, 表示后面的命令的参数个数, 然后通过 $数字 表示后面参数的长度, 然后各个分隔之间通过 \r\n 进行分隔。

整体的格式就是 Redis 自定义的 RESP 协议, 具体的 RESP 介绍, 可以看一下这篇文章。

可以看到这种文本格式具有很高的可读性, 同时可以直接进行修改。

注: Redis 中有多个数据库, 写入的数据是保存在哪个数据库的?
在写入对应的数据库数据时, 内部会自动插入一条 select 数据库的编号 的命令到 AOF 文件, 表明对应的数据库, 解析时也是通过这条命令切换到对应的数据库。

源码中将 key 和 value 转换为上面的文件格式的实现是由 2 个函数实现的: catAppendOnlyGenericCommand 和 catAppendOnlyExpireAtCommand, 前者处理的是正常的命令, 而后者处理的是命令的过期时间。

6.1 catAppendOnlyGenericCommand - 没有过期时间的命令

/*** 将入参的参数转为 RESP 格式写入到入参的 dst* @param dst 当前未写入到文件的命令文本, 新的命令会追加到这个的后面* @param argc 命令参数的个数* @param argv 命令参数, 比如 set key value*/
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {char buf[32];int len, j;robj *o;// 上面的 set redis-key redis-value, 按照 RESP 协议转换的内容如下// *3\r\n$3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n// 这里面可以拆为 2 部分处理 // 1. *3\r\n  --> 命令的参数个数// 2. $3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n  --> 具体的命令// 1. 处理命令的参数个数部分// 命令开始的前缀为 *buf[0] = '*';// argc 表示的是写入命令的个数, 经过这一步 buf = *参数个数len = 1+ll2string(buf+1,sizeof(buf)-1,argc);// 追加 \r\n, 到了这一步 经过这一步 buf = *参数个数\r\nbuf[len++] = '\r';buf[len++] = '\n';// 先将处理的文本第一步拼接到 dst 的后面, 此时 dst = *参数个数\r\ndst = sdscatlen(dst,buf,len);// 2. 处理具体的命令// 拼接参数列表for (j = 0; j < argc; j++) {// 将对应的参数转为字符串类型o = getDecodedObject(argv[j]);buf[0] = '$';// 将命令的长度写入到 buf 中len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));// 继续在后面拼接 \r\n, 到这一步 buf = $命令的长度\r\nbuf[len++] = '\r';buf[len++] = '\n';// 同样将 $命令的长度\r\n 写入到 dstdst = sdscatlen(dst,buf,len);// 将当前的命令具体的内容 写入到 dstdst = sdscatlen(dst,o->ptr,sdslen(o->ptr));// 在 dist 后面追加一个 \r\ndst = sdscatlen(dst,"\r\n",2);// 经过一次循环, dist 里面多了一段 $命令的长度\r\n命令\r\n 的内容// 引用此数 - 1decrRefCount(o);}return dst;
}/*** 如果入参的对象是 raw 或者 embstr 编码, 引用次数 + 1* 如果为 int 编码, 根据这个整数创建出一个字符串, 同时返回这个字符串* 其他类型不会处理*/
robj *getDecodedObject(robj *o) {robj *dec;// 判断一个对象的编码是否为 OBJ_ENCODING_EMBSTR 或者 OBJ_ENCODING_RAWif (sdsEncodedObject(o)) {// 对象的引用次数还没达到最大值时, 进行引用次数 + 1incrRefCount(o);return o;}// 是字符串类型同时编码为 OBJ_ENCODING_INTif (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) {char buf[32];// 整形转为 char 数组ll2string(buf,32,(long)o->ptr);// 转为字符串dec = createStringObject(buf,strlen(buf));return dec;} else {serverPanic("Unknown encoding type");}
}

2.7.2 catAppendOnlyExpireAtCommand - 带过期时间的命令

/*** 将入参的过期时间转为 RESP 格式的字符串并存入 buf* @param buf 当前未写入到文件的命令文本, 新的命令会追加到这个的后面, 同时将命令修改为 pexpireat 的格式* @param cmd 执行的命令* @param key redis 的 key 值* @param second 过期的时间, 单位秒*/
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {long long when;robj *argv[3];// 转为字符串类型, 以便使用 strtoll 函数seconds = getDecodedObject(seconds);// 根据指定的进制将入参的 char 数组转为一个整数, 10 --> 10 进制// 得到过期的时间when = strtoll(seconds->ptr,NULL,10);// 当前执行的命令为 expire, setex expireat 将参数的秒转换成毫秒if (cmd->proc == expireCommand || cmd->proc == setexCommand || cmd->proc == expireatCommand) {when *= 1000;}// 将 expire, setex expireat 命令的参数,从相对时间设置为绝对时间// 以前可能是 10s 后过期, 经过这一步,得到的是 xxx 年 yyy 月 的具体时间if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == setexCommand || cmd->proc == psetexCommand) {when += mstime();}// 减少 second 的引用次数, 便于回收decrRefCount(seconds);// 拼接为 pexpireat key 超时时间 的命令格式argv[0] = createStringObject("PEXPIREAT",9);argv[1] = key;argv[2] = createStringObjectFromLongLong(when);// 将上面的 pexpireat key 过期时间 的命令通过 catAppendOnlyGenericCommand 转为 RESP 格式的字符串buf = catAppendOnlyGenericCommand(buf, 3, argv);decrRefCount(argv[0]);decrRefCount(argv[2]);// 返回bufreturn buf;}

7 代码中涉及的几个模块

在 Redis 的 AOF 中涉及了几个模块功能, 这些功能辅助着整个 AOF 的功能, 这里对这些功能进行一个简单的讲解, 需要说明的时, 这些功能具体的实现可以不用去深入理解, 到了 AOF 源码时, 知道对应的函数的功能就行了。

这里也只是简单的介绍一下, 感兴趣了解一下大体的思路而已, 可以在后面 AOF 源码分析后, 再回来看一下。

7.1 延迟统计

Redis 中会对一些比较耗时的操作做一下统计, 便于后面的性能分析。
而在 AOF 中在调用 write 函数等操作也会进行延迟操作的统计。
大体的延迟统计实现如下:

统计延迟信息的配置


#define CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD 0struct redisServer {// 延迟监视的阈值, 默认值为 0, 如果配置为大于 0 的值, 表示开启延迟监控, 同时超过了这个时间就进行延迟记录long long latency_monitor_threshold;// 字典, 也就是延迟记录的保存地方, 保存的格式是 延迟记录的事件名 和 latencyTimeSeries (一个数组)dict *latency_events;
}

统计延迟样本对象的定义

struct latencyTimeSeries {// 用于记录的下一个延迟样本的位置, 超过了数组的长度, 会重新被赋值为 0 int idx;// 最大的延时uint32_t max;// 最近的延时记录样本数组struct latencySample samples[LATENCY_TS_LEN];
}struct latencySample {// 延时样本创建的时间int32_t time; // 延迟样本的延迟时间, 单位毫秒uint32_t latency;
}

统计延迟样本的函数定义

// 下面的函数做了小改动, 逻辑一样的// 获取延迟事件的时间
void latencyStartMonitor(var) { // mstime() 获取到当前的时间var = server.latency_monitor_threshold ? mstime() : 0;
}void latencyEndMonitor(var) {if (server.latency_monitor_threshold) {var = mstime() - var;}
}/*** 判断是否需要记录延迟时间 * @param event 事件名* @param var 延迟事件的耗时时间*/
void latencyAddSampleIfNeeded(event,var) {if (server.latency_monitor_threshold &&  (var) >= server.latency_monitor_threshold)latencyAddSample((event),(var));
}/*** 添加延迟事件到 redisServer 的 latency_events 字典*/ 
void latencyAddSample(char *event, mstime_t latency) {// 找出 event 对应的延时事件记录结构体struct latencyTimeSeries *ts = dictFetchValue(server.latency_events,event);time_t now = time(NULL);int prev;// 没有对应事件的 latencyTimeSeries, 添加一个if (ts == NULL) {ts = zmalloc(sizeof(*ts));ts->idx = 0;ts->max = 0;memset(ts->samples,0,sizeof(ts->samples));dictAdd(server.latency_events,zstrdup(event),ts);}// 获取存储的位置prev = (ts->idx + LATENCY_TS_LEN - 1) % LATENCY_TS_LEN;// 数组对应位置的样本的创建时间等于当前时间if (ts->samples[prev].time == now) {// 当前的延迟时间大于样本里面的延迟时间, 更新为当前时间if (latency > ts->samples[prev].latency)ts->samples[prev].latency = latency;return;}// 修改对应位置的样本的时间信息ts->samples[ts->idx].time = time(NULL);ts->samples[ts->idx].latency = latency;// 如果大于当前所有样本的时间, 更新最大延迟时间为当前的延迟时间if (latency > ts->max) ts->max = latency;ts->idx++;// 超过了上限, 重新设置为 0 if (ts->idx == LATENCY_TS_LEN) ts->idx = 0;    }

上面就是延迟事件的创建和保存, 至于在哪里使用的, 如何汇总分析, AOF 这里没有涉及, 就跳过了, 如果需要继续研究可以查看 latency.hlatency.c 这 2 个文件。

7.2 BIO - 后台线程

在上面的介绍中可以知道 fsync 是一个很耗时的过程, 如果把这个过程同样放在 Redis 的主线程中, 那么可能影响到整个 Redis 的性能, 所以 Redis 将 fsync 的过程交给了后台的线程处理。
Reids 将后台相关耗时的操作封装为了一个 BIO 的功能, 可以看出是一个线程池, 线程池在启动时初始了几个线程, 然后生产者向这个池中添加任务。
而 Redis 主线程在执行到 fsync 时, 会提交一个 fsync 的任务到 BIO 中, 完成结束。真正的 fsync 由后台线程处理。

大体的实现如下:

任务类型定义

// 执行 close 函数, 也就是文件的关闭
#define BIO_CLOSE_FILE    0 // 执行 redis_fsync 函数, 也就是 fsync 函数
#define BIO_AOF_FSYNC     1// 延迟对象释放
#define BIO_LAZY_FREE     2 // 任务类型的总数
#define BIO_NUM_OPS       3

BIO 的初始化

// 存放声明的线程数组
static pthread_t bio_threads[BIO_NUM_OPS];// 线程锁, 这里是多线程场景了, 所以有并发问题
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];// 添加任务相关的 condition, 简单的理解就是, 线程会被阻塞在这个 condition, 另一个线程可以唤醒这个 condition 上的线程
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];// 执行过程相关的 condition
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];// 任务列表
static list *bio_jobs[BIO_NUM_OPS];// 存放对应的任务类型还有多少个任务等待执行
static unsigned long long bio_pending[BIO_NUM_OPS];void bioInit(void) {pthread_attr_t attr;pthread_t thread;size_t stacksize;int j;// 初始锁, condition, 任务列表for (j = 0; j < BIO_NUM_OPS; j++) {pthread_mutex_init(&bio_mutex[j],NULL);pthread_cond_init(&bio_newjob_cond[j],NULL);pthread_cond_init(&bio_step_cond[j],NULL);bio_jobs[j] = listCreate();bio_pending[j] = 0;}//设置线程栈空间pthread_attr_init(&attr);pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;pthread_attr_setstacksize(&attr, stacksize);// 创建线程, 并存放到 bio_threads 这个线程数组for (j = 0; j < BIO_NUM_OPS; j++) {void *arg = (void*)(unsigned long) j;// 创建线程, 线程执行的逻辑为 bioProcessBackgroundJobsif (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");exit(1);}bio_threads[j] = thread;}
}

线程执行的逻辑

void *bioProcessBackgroundJobs(void *arg) {struct bio_job *job;unsigned long type = (unsigned long) arg;sigset_t sigset;// 任务类型校验if (type >= BIO_NUM_OPS) {serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type);return NULL;}// 配置 thread 能够在任何时候被杀掉pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);// 获取锁pthread_mutex_lock(&bio_mutex[type]);sigemptyset(&sigset);sigaddset(&sigset, SIGALRM);if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));// 线程主逻辑while(1) {listNode *ln; // 没有任务, 进行等待, if (listLength(bio_jobs[type]) == 0) {pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);continue;}  // 获取列表的第一个任务ln = listFirst(bio_jobs[type]);job = ln->value; // 释放锁, 这个锁的功能主要是为了确保任务的获取pthread_mutex_unlock(&bio_mutex[type]);if (type == BIO_CLOSE_FILE) {close((long)job->arg1);} else if (type == BIO_AOF_FSYNC) {redis_fsync((long)job->arg1);} else if (type == BIO_LAZY_FREE) {if (job->arg1)lazyfreeFreeObjectFromBioThread(job->arg1);else if (job->arg2 && job->arg3)lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);else if (job->arg3)lazyfreeFreeSlotsMapFromBioThread(job->arg3);} else {serverPanic("Wrong job type in bioProcessBackgroundJobs().");}zfree(job);// 获取锁pthread_mutex_lock(&bio_mutex[type]);// 删除任务listDelNode(bio_jobs[type],ln);// 需要执行的任务减 1bio_pending[type]--;// 唤醒所有等待在 bio_step_cond 上的线程pthread_cond_broadcast(&bio_step_cond[type]);}      
}

添加任务

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {struct bio_job *job = zmalloc(sizeof(*job));job->time = time(NULL);job->arg1 = arg1;job->arg2 = arg2;job->arg3 = arg3;// 获取锁pthread_mutex_lock(&bio_mutex[type]);// 添加任务listAddNodeTail(bio_jobs[type],job);bio_pending[type]++;// 通知等待在 bio_newjob_cond 的线程pthread_cond_signal(&bio_newjob_cond[type]);// 释放锁pthread_mutex_unlock(&bio_mutex[type]);
}

7.3 pipe - 父子进程通信

Redis 的 AOF 重写机制和 RDB 类型, 也是通过 fork 创建子进程, 将整个 AOF 重写过程交给子进程处理。
不同的时: AOF 重写过程中会不断和父进程通信获取父进程的命令缓存。 父子进程之间就是通过 pipe 进行通讯的。

这里只做一下简单的了解。

#include <unistd.h>int Pipe(int pipefd[2]);

通过 pipe 的函数可以在 2 个文件描述符之间建立一个通道, 第一个用来读 read(fd[0]), 第二个用来写 write(fd[1])

具体的分析可以看一下这篇文章 APUE读书笔记—进程间通信(IPC)之管道和有名管道(FIFO)

而 Redis 中建立了 3 套通道

int aofCreatePipes(void) {int fds[6] = {-1, -1, -1, -1, -1, -1};int j;//  parent -> children dataif (pipe(fds) == -1) goto error;// children -> parent ackif (pipe(fds+2) == -1) goto error; // parent -> children ackif (pipe(fds+4) == -1) goto error; // 同步非阻塞if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;// 注册一个事件, 执行的函数为 aofChildPipeReadable, 里面的逻辑就是读取 aof_pipe_read_ack_from_child 的数据到 aof_pipe_write_ack_to_childif (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;// 6 个通道server.aof_pipe_write_data_to_child = fds[1];server.aof_pipe_read_data_from_parent = fds[0];server.aof_pipe_write_ack_to_parent = fds[3];server.aof_pipe_read_ack_from_child = fds[2];server.aof_pipe_write_ack_to_child = fds[5];server.aof_pipe_read_ack_from_parent = fds[4];server.aof_stop_sending_diff = 0;return C_OK;
}

最终形成的效果如下:
Alt 'AOF 创建的通道'

父进程写入到 aof_pipe_read_data_from_parent 的数据会自动同步到子进程的 aof_pipe_read_data_from_parent, 另外 2 个类似。

至此, AOF 的一些概念和源码中相关的一些代码就介绍完了, 下篇开始真正的 AOF 源码分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/219590.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

写好ChatGPT提示词原则之:清晰且具体(clear specific)

ChatGPT 的优势在于它允许用户跨越机器学习和深度学习的复杂门槛&#xff0c;直接利用已经训练好的模型。然而&#xff0c;即便是这些先进的大型语言模型也面临着上下文理解和模型固有局限性的挑战。为了最大化这些大型语言模型&#xff08;LLM&#xff09;的潜力&#xff0c;关…

.NET core 搭建一个跨平台的 Web Service

以前搭建的webservice 都是基于.NET fromwork的&#xff0c;我们知道.NET fromwork是非跨平台的&#xff0c;只能部署在iis上&#xff0c;今天教大家用.NET core搭建一个可跨平台的Web Service 新建一个.net core空项目 给项目起一个名字 选一个.net框架&#xff0c;我这里选…

电脑开机出现:CLIENT MAD ADDR (网卡启动系统)的解决办法

文章目录 前言步骤1、确定情况2、对症下药——关闭网卡启动 补充1、关于BIOS2、关于PXE 前言 最近给旧电脑重装系统安了下开发环境和常用软件啥的&#xff0c;之前还好好启动的电脑&#xff0c;开机突然需要额外加载一个页面&#xff0c;虽然最后正常启动了不影响使用&#xf…

60.Sentinel源码分析

Sentinel源码分析 1.Sentinel的基本概念 Sentinel实现限流、隔离、降级、熔断等功能&#xff0c;本质要做的就是两件事情&#xff1a; 统计数据&#xff1a;统计某个资源的访问数据&#xff08;QPS、RT等信息&#xff09; 规则判断&#xff1a;判断限流规则、隔离规则、降级规…

亚马逊云科技 re:Invent 大会 - S3 对象存储华丽升级

亚马逊云科技 re:Invent 大会 - S3 对象存储华丽升级 本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技开发者社区, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道。 文章目录 亚马逊云科技 re:Inv…

Flink系列之:大状态与 Checkpoint 调优

Flink系列之&#xff1a;大状态与 Checkpoint 调优 一、概述二、监控状态和 Checkpoints三、Checkpoint 调优四、RocksDB 调优五、增量 Checkpoint六、RocksDB 或 JVM 堆中的计时器七、RocksDB 内存调优八、容量规划九、压缩十、Task 本地恢复十一、主要&#xff08;分布式存储…

智能优化算法应用:基于JAYA算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于JAYA算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于JAYA算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.JAYA算法4.实验参数设定5.算法结果6.参考文献7.MA…

【MySQL】MySQL表的操作-创建查看删除和修改

文章目录 1.创建表2.查看表结构3.修改表4.删除表 1.创建表 语法&#xff1a; CREATE TABLE table_name (field1 datatype,field2 datatype,field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎;说明&#xff1a; field 表示列名datatype 表示列的类型…

flask简单应用-1

目标&#xff1a; 做一个搜索网页&#xff0c;搜索当前路径下是否含有指定关键字的文件&#xff0c;如果有就列出来&#xff0c;没有返回消息 第一步&#xff1a;我们需要先显示一个搜索页面&#xff0c;页面上需要有一个可以输入的对话框&#xff0c;一个按钮执行搜索 建立ht…

基于开源的JAVA mongodb jdbc 驱动 使用教程

基于开源的JAVA mongodb jdbc 驱动 使用教程介绍 介绍 本文介绍一款开源的基于JAVA的 Mongodb JDBC 驱动使用教程 开源地址 https://gitee.com/bgong/jdbc-mongodb-driver功能价值 与mybaits融合&#xff1a;复用mybatis的功能特性&#xff0c;如:缓存,if动态判断标签等特…

[计网01] 物理层 详细解析笔记,特性

计算机网络的物理层是网络协议栈中的第一层&#xff0c;负责传输原始的比特流&#xff08;bitstream&#xff09;通过物理媒介进行通信。物理层主要关注传输介质、信号的编码和调制、数据传输速率以及数据传输的物理连接等方面。 相关特性 机械特性&#xff08;Mechanical Ch…

【JAVA】CyclicBarrier源码解析以及示例

文章目录 前言CyclicBarrier源码解析以及示例主要成员变量核心方法 应用场景任务分解与合并应用示例 并行计算应用示例 游戏开发应用示例输出结果 数据加载应用示例 并发工具的协同应用示例 CyclicBarrier和CountDownLatch的区别循环性&#xff1a;计数器的变化&#xff1a;用途…

C# 命令行参数解析库示例

写在前面 在日常开发中&#xff0c;我们经常会用到命令行参数&#xff0c;比如cmd下的各种指令&#xff1b;还有C#的控制台类型的项目&#xff0c;在默认入口Main函数中&#xff0c;那个args参数&#xff0c;就是有系统传入到程序进程的命令行参数&#xff1b;在传入的参数相对…

晚期食管癌肿瘤治疗线程分类

文章目录 1、肿瘤治疗的线数1.1 基础概念1.2 线程定义1.3 如何计算治疗线数 2 食管癌治疗指南2.1 食管癌诊疗指南2.1 CSCO 本文前半部分主要来源于参考文件1&#xff0c;其余部分来源于官方指南。无原创内容&#xff0c;全部为摘要。 1、肿瘤治疗的线数 1.1 基础概念 抗肿瘤药…

信息安全和网络安全的区别

信息安全与网络安全都属于安全领域&#xff0c;但它们的范围和重点不同。 信息安全主要关注数据的保护&#xff0c;包括对敏感数据进行加密、防止数据丢失或泄露等措施。信息安全通常与数据存储、传输和处理相关。 而网络安全更侧重于保护计算机系统和网络免受攻击、病毒、蠕…

SCI一区级 | Matlab实现GWO-CNN-GRU-selfAttention多变量多步时间序列预测

SCI一区级 | Matlab实现GWO-CNN-GRU-selfAttention多变量多步时间序列预测 目录 SCI一区级 | Matlab实现GWO-CNN-GRU-selfAttention多变量多步时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现GWO-CNN-GRU-selfAttention灰狼算法优化卷积门控循环…

I.MX RT1170双核学习(3):多核管理之MCMGR源码分析详解

本文通过SDK中最简单的hello_world例程来说明一下双核程序如何运行。在CM7和CM4的工程中都有一个MCMGR(Multicore Manager)文件夹&#xff0c;它是用来管理多核之间的操作的&#xff0c;当然也包括我们前面提到的那些寄存器的设置。 文章目录 1 MCMGR_EarlyInit1.1 MCMGR_Trigg…

数据库交付运维高级工程师-腾讯云TDSQL

数据库交付运维高级工程师-腾讯云TDSQL上机指导&#xff0c;付费指导&#xff0c;暂定99

设计模式—装饰模式

与其明天开始&#xff0c;不如现在行动&#xff01; 文章目录 装饰模式—穿衣服&#x1f48e;总结 装饰模式—穿衣服 装饰模式&#xff08;Decorator&#xff09;可以动态的给对象添加一些额外的职责。 Component是定义一个对象接口&#xff0c;可以给这些对象动态地添加职责。…

关于mysql存储过程中N/A和null的使用注意事项

oracle和mysql的存储过程大同小异&#xff0c;但是一些细节还是需要留意的。最近发现mysql的N/A和null在存储过程中容易忽略的一点&#xff0c;这会导致我们的存储过程提前结束。今天突然想起来了就记录一下。   mysql的N/A和null区别网上也说得很详细了&#xff0c;我就不赘…