【Redis】AOF 基础
因为 Redis AOF 的实现有些绕, 就分成 2 篇进行分析, 本篇主要是介绍一下 AOF 的一些特性和依赖的其他函数的逻辑,为下一篇 (Redis AOF 源码) 源码分析做一些铺垫。
AOF 全称: Append Only File, 是 Redis 提供了一种数据保存模式, Redis 默认不开启。
AOF 采用日志的形式来记录每个写操作, 并追加到文件。开启后, 执行更改 Redis 数据的命令时, 就会把命令写入到 AOF 文件中。
Redis 重启时会根据日志文件的内容把写指令从前到后执行一次以完成数据的恢复工作。
1 AOF 相关的配置
appendonly no # AOF 开关, 默认为关闭
appendfilename "appendonly.aof" # 保存的文件名
appendfsync everysec # AOF 持久化策略 (硬盘缓存写入到硬盘)
AOF 简单使用的话就这 3 个配置, 前 2 个就是字面的意思, 理解其他比较简单。
我们说明一下第三个配置的使用。
开启 AOF 后, 每次修改的命令都会存到 Redis 的一个缓存区。
缓存区的数据最终是需要写入到磁盘的, 而 Redis 是通过 write 函数, 将缓存中的数据写入到磁盘中。
但是 write 函数实际是先将数据先保存到系统层级的缓存, 后续由系统自身将数据保存到磁盘, 系统默认为 30 秒保存一次。这样的话, 可能有风险, 如果系统直接宕机了
可能会丢失 30 秒左右的数据, 所以系统提供了一个 fsync 函数, 可以把系统层级的缓存立即写入到磁盘中, 但是这是一个阻塞且缓慢的操作, 会影响到执行的线程。
所以上面的配置的第 3 项就是控制这个 Redis 缓存到磁盘的行为
- everysec: AOF 默认的持久化策略。每秒执行一次 fsync, 可能导致丢失 1s 数据, 这种策略兼顾了安全性和效率
- no: 表示不执行 fsync, 由操作系统保证数据同步到磁盘, 速度最快, 但是不太安全
- always: 表示每次写入到执行 fsync, 保证数据同步到磁盘, 效率很低
除了上面的 3 个基础配置, 还有几个关于 AOF 执行中的行为配置
# 默认为 100
# 当目前 AOF 文件大小超过上次重写的 AOF 文件的百分之多少进行重写 (重写的含义可以看下面的重写机制), 即当 AOF 文件增长到一定大小的时候, Redis 能够调用 bgrewriteaof 对日志文件进行重写
auto-aof-rewrite-percentag 100# 默认为 64m
# 设置允许重写的最小 AOF 文件大小, 避免达到约定百分比但占用的容量仍然很小的情况就重写
auto-aof-rewrite-min-size 64mb# 默认为 no
# 在 AOF 重写时, 是否不要执行 fsync, 将缓存写入到磁盘, 默认为 no。
# 如果对低延迟要求很高的应用, 这里可以设置为 yes, 否则设置为 no, 这样对持久化特性来说这是更安全的选择
# 设置为 yes 表示重写期间对新的写操作不 fsync, 暂时存在内存中, 等重新操作完成后再写入
# 默认为 no, 建议改为 yes, 因为 Linux 的默认 fsync 策略为 30 秒, 所以可能丢失 30 秒数据
no-appendfsync-on-rewrite no# 默认为 yes
# 当 Redis 启动的时候, AOF 文件的数据会被重新载入内存
# 但是 AOF 文件可能在尾部是不完整的, 比如突然的断电宕机什么的, 可能导致 AOF 文件数据不完整
# 对于不完整的 AOF 文件如何处理
# 配置为 yes, 当截断的 AOF 文件被导入的时候, 会自动发布一个 log 给客户端, 然后继续加载文件中的数据
# 配置为 no, 用户必须手动 redis-check-aof 修复 AOF 文件才可以
aof-load-truncated yes
2 AOF 重写机制
上面的配置中有好几个提示到重写的概念, 那么什么是重写呢?
由于 AOF 持久化是 Redis 不断将写命令记录到 AOF 文件中, 随着 Redis 不断的运行, AOF 文件将会越来越大, 占用服务器磁盘越来越大, 同时 AOF 恢复要求时间越长。
为了解决这个问题, Redis 新增了重写机制, 当 AOF 文件的大小超过了所设定的阈值时, Redis 就会自动启动 AOF 文件的内容压缩, 只保留可以恢复数据的最小指令集。
AOF 文件不是对原文件进行整理, 而是直接读取服务器现有的键值对, 然后用一条命令去代替之前记录这个键值对的多条命令, 生成一个新的文件替换原来的 AOF 文件。
用户可以通过 bgrewriteaof 命令来手动触发 AOF 文件的重写, 这个重写的过程也是通过子进程实现的。
在子进程进行 AOF 重写时, 主线程需要保证
- 处理客户端的请求
- 将新增和更新命令追加到现有的 AOF 文件中
- 将新增和更新命令追加到 AOF 重写缓存中
3 AOF 文件的优势和劣势
优势
- AOF 持久化的方法提供了多种的同步频率, 即使使用默认的同步频率每秒同步一次, Redis 最多也就丢失 1 秒的数据而已
- AOF 日志文件以 append-only 模式写入, 所以没有任何磁盘寻址的开销, 写入性能非常高, 而且文件不容易受损, 即使文件尾部受损, 也能很容易恢复, 打开文件, 把后面损坏的数据删除即可
劣势
- 对于具有相同数据的的 Redis, AOF 文件通常会比 RDF 文件体积更大 (RDB 存的是数据快照)
- 虽然 AOF 提供了多种同步的频率, 默认情况下, 每秒同步一次的频率也具有较高的性能。但是在高并发的情况下, RDB 比 AOF 具好更好的性能保证
4 AOF 和 RDB 两种方案比较
如果可以忍受一小段时间内数据的丢失, 使用 RDB 是最好的, 定时生成 RDB 快照 (snapshot) 非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快, 否则使用 AOF。
但是一般情况下建议不要单独使用某一种持久化机制, 而是应该两种一起用, 在这种情况下, 当 Redis 重启的时候会优先载入 AOF 文件来恢复原始的数据, 因为在通常情况下 AOF 文件保存的数据集要比 RDB 文件保存的数据集要完整。
在 Redis 4.0 带来了一个新的持久化选项 —— 混合持久化。将 RDB 文件的内容和增量的 AOF 日志文件存在一起。
这里的 AOF 日志不再是全量的日志, 而是自持久化开始到持久化结束的这段时间发生的增量 AOF 日志, 通常这部分 AOF 日志很小。
在 Redis 重启的时候, 可以先加载 RDB 的内容, 然后再重放增量 AOF 日志就可以完全替代之前的 AOF 全量文件重放, 重启效率因此大幅得到提升。
5 AOF 的过程
高度概括如下:
- 所有的修改命令会追加到 Redis 的一个 AOF 缓存区
- AOF 缓存区根据配置的策略向硬盘做同步操作
- 随着 AOF 文件越来越大, 达到配置的条件, 对 AOF 文件进行重写, 达到压缩的目的
到此, AOF 的理论知识就没了, 下面是介绍几个比较重要的函数的逻辑。
6 AOF 文件结构
如果现在向 Redis 中写入一个 key 为 redis-key, value 为 redis-value 的字符串键值对后, 这对键值对会以下面的格式保存在 AOF 文件中:
*3\r\n$3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n
以 *数字
的格式开始, 表示后面的命令的参数个数, 然后通过 $数字
表示后面参数的长度, 然后各个分隔之间通过 \r\n
进行分隔。
整体的格式就是 Redis 自定义的 RESP 协议, 具体的 RESP 介绍, 可以看一下这篇文章。
可以看到这种文本格式具有很高的可读性, 同时可以直接进行修改。
注: Redis 中有多个数据库, 写入的数据是保存在哪个数据库的?
在写入对应的数据库数据时, 内部会自动插入一条 select 数据库的编号 的命令到 AOF 文件, 表明对应的数据库, 解析时也是通过这条命令切换到对应的数据库。
源码中将 key 和 value 转换为上面的文件格式的实现是由 2 个函数实现的: catAppendOnlyGenericCommand 和 catAppendOnlyExpireAtCommand, 前者处理的是正常的命令, 而后者处理的是命令的过期时间。
6.1 catAppendOnlyGenericCommand - 没有过期时间的命令
/*** 将入参的参数转为 RESP 格式写入到入参的 dst* @param dst 当前未写入到文件的命令文本, 新的命令会追加到这个的后面* @param argc 命令参数的个数* @param argv 命令参数, 比如 set key value*/
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {char buf[32];int len, j;robj *o;// 上面的 set redis-key redis-value, 按照 RESP 协议转换的内容如下// *3\r\n$3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n// 这里面可以拆为 2 部分处理 // 1. *3\r\n --> 命令的参数个数// 2. $3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n --> 具体的命令// 1. 处理命令的参数个数部分// 命令开始的前缀为 *buf[0] = '*';// argc 表示的是写入命令的个数, 经过这一步 buf = *参数个数len = 1+ll2string(buf+1,sizeof(buf)-1,argc);// 追加 \r\n, 到了这一步 经过这一步 buf = *参数个数\r\nbuf[len++] = '\r';buf[len++] = '\n';// 先将处理的文本第一步拼接到 dst 的后面, 此时 dst = *参数个数\r\ndst = sdscatlen(dst,buf,len);// 2. 处理具体的命令// 拼接参数列表for (j = 0; j < argc; j++) {// 将对应的参数转为字符串类型o = getDecodedObject(argv[j]);buf[0] = '$';// 将命令的长度写入到 buf 中len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));// 继续在后面拼接 \r\n, 到这一步 buf = $命令的长度\r\nbuf[len++] = '\r';buf[len++] = '\n';// 同样将 $命令的长度\r\n 写入到 dstdst = sdscatlen(dst,buf,len);// 将当前的命令具体的内容 写入到 dstdst = sdscatlen(dst,o->ptr,sdslen(o->ptr));// 在 dist 后面追加一个 \r\ndst = sdscatlen(dst,"\r\n",2);// 经过一次循环, dist 里面多了一段 $命令的长度\r\n命令\r\n 的内容// 引用此数 - 1decrRefCount(o);}return dst;
}/*** 如果入参的对象是 raw 或者 embstr 编码, 引用次数 + 1* 如果为 int 编码, 根据这个整数创建出一个字符串, 同时返回这个字符串* 其他类型不会处理*/
robj *getDecodedObject(robj *o) {robj *dec;// 判断一个对象的编码是否为 OBJ_ENCODING_EMBSTR 或者 OBJ_ENCODING_RAWif (sdsEncodedObject(o)) {// 对象的引用次数还没达到最大值时, 进行引用次数 + 1incrRefCount(o);return o;}// 是字符串类型同时编码为 OBJ_ENCODING_INTif (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) {char buf[32];// 整形转为 char 数组ll2string(buf,32,(long)o->ptr);// 转为字符串dec = createStringObject(buf,strlen(buf));return dec;} else {serverPanic("Unknown encoding type");}
}
2.7.2 catAppendOnlyExpireAtCommand - 带过期时间的命令
/*** 将入参的过期时间转为 RESP 格式的字符串并存入 buf* @param buf 当前未写入到文件的命令文本, 新的命令会追加到这个的后面, 同时将命令修改为 pexpireat 的格式* @param cmd 执行的命令* @param key redis 的 key 值* @param second 过期的时间, 单位秒*/
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {long long when;robj *argv[3];// 转为字符串类型, 以便使用 strtoll 函数seconds = getDecodedObject(seconds);// 根据指定的进制将入参的 char 数组转为一个整数, 10 --> 10 进制// 得到过期的时间when = strtoll(seconds->ptr,NULL,10);// 当前执行的命令为 expire, setex expireat 将参数的秒转换成毫秒if (cmd->proc == expireCommand || cmd->proc == setexCommand || cmd->proc == expireatCommand) {when *= 1000;}// 将 expire, setex expireat 命令的参数,从相对时间设置为绝对时间// 以前可能是 10s 后过期, 经过这一步,得到的是 xxx 年 yyy 月 的具体时间if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == setexCommand || cmd->proc == psetexCommand) {when += mstime();}// 减少 second 的引用次数, 便于回收decrRefCount(seconds);// 拼接为 pexpireat key 超时时间 的命令格式argv[0] = createStringObject("PEXPIREAT",9);argv[1] = key;argv[2] = createStringObjectFromLongLong(when);// 将上面的 pexpireat key 过期时间 的命令通过 catAppendOnlyGenericCommand 转为 RESP 格式的字符串buf = catAppendOnlyGenericCommand(buf, 3, argv);decrRefCount(argv[0]);decrRefCount(argv[2]);// 返回bufreturn buf;}
7 代码中涉及的几个模块
在 Redis 的 AOF 中涉及了几个模块功能, 这些功能辅助着整个 AOF 的功能, 这里对这些功能进行一个简单的讲解, 需要说明的时, 这些功能具体的实现可以不用去深入理解, 到了 AOF 源码时, 知道对应的函数的功能就行了。
这里也只是简单的介绍一下, 感兴趣了解一下大体的思路而已, 可以在后面 AOF 源码分析后, 再回来看一下。
7.1 延迟统计
Redis 中会对一些比较耗时的操作做一下统计, 便于后面的性能分析。
而在 AOF 中在调用 write 函数等操作也会进行延迟操作的统计。
大体的延迟统计实现如下:
统计延迟信息的配置
#define CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD 0struct redisServer {// 延迟监视的阈值, 默认值为 0, 如果配置为大于 0 的值, 表示开启延迟监控, 同时超过了这个时间就进行延迟记录long long latency_monitor_threshold;// 字典, 也就是延迟记录的保存地方, 保存的格式是 延迟记录的事件名 和 latencyTimeSeries (一个数组)dict *latency_events;
}
统计延迟样本对象的定义
struct latencyTimeSeries {// 用于记录的下一个延迟样本的位置, 超过了数组的长度, 会重新被赋值为 0 int idx;// 最大的延时uint32_t max;// 最近的延时记录样本数组struct latencySample samples[LATENCY_TS_LEN];
}struct latencySample {// 延时样本创建的时间int32_t time; // 延迟样本的延迟时间, 单位毫秒uint32_t latency;
}
统计延迟样本的函数定义
// 下面的函数做了小改动, 逻辑一样的// 获取延迟事件的时间
void latencyStartMonitor(var) { // mstime() 获取到当前的时间var = server.latency_monitor_threshold ? mstime() : 0;
}void latencyEndMonitor(var) {if (server.latency_monitor_threshold) {var = mstime() - var;}
}/*** 判断是否需要记录延迟时间 * @param event 事件名* @param var 延迟事件的耗时时间*/
void latencyAddSampleIfNeeded(event,var) {if (server.latency_monitor_threshold && (var) >= server.latency_monitor_threshold)latencyAddSample((event),(var));
}/*** 添加延迟事件到 redisServer 的 latency_events 字典*/
void latencyAddSample(char *event, mstime_t latency) {// 找出 event 对应的延时事件记录结构体struct latencyTimeSeries *ts = dictFetchValue(server.latency_events,event);time_t now = time(NULL);int prev;// 没有对应事件的 latencyTimeSeries, 添加一个if (ts == NULL) {ts = zmalloc(sizeof(*ts));ts->idx = 0;ts->max = 0;memset(ts->samples,0,sizeof(ts->samples));dictAdd(server.latency_events,zstrdup(event),ts);}// 获取存储的位置prev = (ts->idx + LATENCY_TS_LEN - 1) % LATENCY_TS_LEN;// 数组对应位置的样本的创建时间等于当前时间if (ts->samples[prev].time == now) {// 当前的延迟时间大于样本里面的延迟时间, 更新为当前时间if (latency > ts->samples[prev].latency)ts->samples[prev].latency = latency;return;}// 修改对应位置的样本的时间信息ts->samples[ts->idx].time = time(NULL);ts->samples[ts->idx].latency = latency;// 如果大于当前所有样本的时间, 更新最大延迟时间为当前的延迟时间if (latency > ts->max) ts->max = latency;ts->idx++;// 超过了上限, 重新设置为 0 if (ts->idx == LATENCY_TS_LEN) ts->idx = 0; }
上面就是延迟事件的创建和保存, 至于在哪里使用的, 如何汇总分析, AOF 这里没有涉及, 就跳过了, 如果需要继续研究可以查看 latency.h 和 latency.c 这 2 个文件。
7.2 BIO - 后台线程
在上面的介绍中可以知道 fsync 是一个很耗时的过程, 如果把这个过程同样放在 Redis 的主线程中, 那么可能影响到整个 Redis 的性能, 所以 Redis 将 fsync 的过程交给了后台的线程处理。
Reids 将后台相关耗时的操作封装为了一个 BIO 的功能, 可以看出是一个线程池, 线程池在启动时初始了几个线程, 然后生产者向这个池中添加任务。
而 Redis 主线程在执行到 fsync 时, 会提交一个 fsync 的任务到 BIO 中, 完成结束。真正的 fsync 由后台线程处理。
大体的实现如下:
任务类型定义
// 执行 close 函数, 也就是文件的关闭
#define BIO_CLOSE_FILE 0 // 执行 redis_fsync 函数, 也就是 fsync 函数
#define BIO_AOF_FSYNC 1// 延迟对象释放
#define BIO_LAZY_FREE 2 // 任务类型的总数
#define BIO_NUM_OPS 3
BIO 的初始化
// 存放声明的线程数组
static pthread_t bio_threads[BIO_NUM_OPS];// 线程锁, 这里是多线程场景了, 所以有并发问题
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];// 添加任务相关的 condition, 简单的理解就是, 线程会被阻塞在这个 condition, 另一个线程可以唤醒这个 condition 上的线程
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];// 执行过程相关的 condition
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];// 任务列表
static list *bio_jobs[BIO_NUM_OPS];// 存放对应的任务类型还有多少个任务等待执行
static unsigned long long bio_pending[BIO_NUM_OPS];void bioInit(void) {pthread_attr_t attr;pthread_t thread;size_t stacksize;int j;// 初始锁, condition, 任务列表for (j = 0; j < BIO_NUM_OPS; j++) {pthread_mutex_init(&bio_mutex[j],NULL);pthread_cond_init(&bio_newjob_cond[j],NULL);pthread_cond_init(&bio_step_cond[j],NULL);bio_jobs[j] = listCreate();bio_pending[j] = 0;}//设置线程栈空间pthread_attr_init(&attr);pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;pthread_attr_setstacksize(&attr, stacksize);// 创建线程, 并存放到 bio_threads 这个线程数组for (j = 0; j < BIO_NUM_OPS; j++) {void *arg = (void*)(unsigned long) j;// 创建线程, 线程执行的逻辑为 bioProcessBackgroundJobsif (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");exit(1);}bio_threads[j] = thread;}
}
线程执行的逻辑
void *bioProcessBackgroundJobs(void *arg) {struct bio_job *job;unsigned long type = (unsigned long) arg;sigset_t sigset;// 任务类型校验if (type >= BIO_NUM_OPS) {serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type);return NULL;}// 配置 thread 能够在任何时候被杀掉pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);// 获取锁pthread_mutex_lock(&bio_mutex[type]);sigemptyset(&sigset);sigaddset(&sigset, SIGALRM);if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));// 线程主逻辑while(1) {listNode *ln; // 没有任务, 进行等待, if (listLength(bio_jobs[type]) == 0) {pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);continue;} // 获取列表的第一个任务ln = listFirst(bio_jobs[type]);job = ln->value; // 释放锁, 这个锁的功能主要是为了确保任务的获取pthread_mutex_unlock(&bio_mutex[type]);if (type == BIO_CLOSE_FILE) {close((long)job->arg1);} else if (type == BIO_AOF_FSYNC) {redis_fsync((long)job->arg1);} else if (type == BIO_LAZY_FREE) {if (job->arg1)lazyfreeFreeObjectFromBioThread(job->arg1);else if (job->arg2 && job->arg3)lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);else if (job->arg3)lazyfreeFreeSlotsMapFromBioThread(job->arg3);} else {serverPanic("Wrong job type in bioProcessBackgroundJobs().");}zfree(job);// 获取锁pthread_mutex_lock(&bio_mutex[type]);// 删除任务listDelNode(bio_jobs[type],ln);// 需要执行的任务减 1bio_pending[type]--;// 唤醒所有等待在 bio_step_cond 上的线程pthread_cond_broadcast(&bio_step_cond[type]);}
}
添加任务
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {struct bio_job *job = zmalloc(sizeof(*job));job->time = time(NULL);job->arg1 = arg1;job->arg2 = arg2;job->arg3 = arg3;// 获取锁pthread_mutex_lock(&bio_mutex[type]);// 添加任务listAddNodeTail(bio_jobs[type],job);bio_pending[type]++;// 通知等待在 bio_newjob_cond 的线程pthread_cond_signal(&bio_newjob_cond[type]);// 释放锁pthread_mutex_unlock(&bio_mutex[type]);
}
7.3 pipe - 父子进程通信
Redis 的 AOF 重写机制和 RDB 类型, 也是通过 fork 创建子进程, 将整个 AOF 重写过程交给子进程处理。
不同的时: AOF 重写过程中会不断和父进程通信获取父进程的命令缓存。 父子进程之间就是通过 pipe 进行通讯的。
这里只做一下简单的了解。
#include <unistd.h>int Pipe(int pipefd[2]);
通过 pipe 的函数可以在 2 个文件描述符之间建立一个通道, 第一个用来读 read(fd[0]), 第二个用来写 write(fd[1])。
具体的分析可以看一下这篇文章 APUE读书笔记—进程间通信(IPC)之管道和有名管道(FIFO)
而 Redis 中建立了 3 套通道
int aofCreatePipes(void) {int fds[6] = {-1, -1, -1, -1, -1, -1};int j;// parent -> children dataif (pipe(fds) == -1) goto error;// children -> parent ackif (pipe(fds+2) == -1) goto error; // parent -> children ackif (pipe(fds+4) == -1) goto error; // 同步非阻塞if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;// 注册一个事件, 执行的函数为 aofChildPipeReadable, 里面的逻辑就是读取 aof_pipe_read_ack_from_child 的数据到 aof_pipe_write_ack_to_childif (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;// 6 个通道server.aof_pipe_write_data_to_child = fds[1];server.aof_pipe_read_data_from_parent = fds[0];server.aof_pipe_write_ack_to_parent = fds[3];server.aof_pipe_read_ack_from_child = fds[2];server.aof_pipe_write_ack_to_child = fds[5];server.aof_pipe_read_ack_from_parent = fds[4];server.aof_stop_sending_diff = 0;return C_OK;
}
最终形成的效果如下:
父进程写入到 aof_pipe_read_data_from_parent 的数据会自动同步到子进程的 aof_pipe_read_data_from_parent, 另外 2 个类似。
至此, AOF 的一些概念和源码中相关的一些代码就介绍完了, 下篇开始真正的 AOF 源码分析。
相关文章:
【Redis】AOF 基础
因为 Redis AOF 的实现有些绕, 就分成 2 篇进行分析, 本篇主要是介绍一下 AOF 的一些特性和依赖的其他函数的逻辑,为下一篇 (Redis AOF 源码) 源码分析做一些铺垫。 AOF 全称: Append Only File, 是 Redis 提供了一种数据保存模式, Redis 默认不开启。 AOF 采用日志的形式来记…...
C语言—每日选择题—Day50
一天一天的更新,也是达到50天了,精选的题有250道,博主累计做了不下500道选择题,最喜欢的题型就是指针和数组之间的计算呀,不知道关注我的小伙伴是不是一直在坚持呢?文末有投票,大家可以投票让博…...
[C/C++]——内存管理
学习C/C的内存管理 前言:一、C/C的内存分布二、C语言中动态内存管理方式三、C中动态内存管理方式3.1、new/delete操作符3.1.2、new/delete操作内置类型3.1.3、new/delete操作自定义类型 3.2、认识operator new和operator delete函数3.3、了解new和delete的实现原理3…...
PDF文件的限制编辑,如何设置?
想要给PDF文件设置一个密码防止他人对文件进行编辑,那么我们可以对PDF文件设置限制编辑,设置方法很简单,我们在PDF编辑器中点击文件 – 属性 – 安全,在权限下拉框中选中【密码保护】 然后在密码保护界面中,我们勾选【…...
Linux 中使用 docker 安装 Elasticsearch 及 Kibana
Linux 中使用 docker 安装 Elasticsearch 及 Kibana 安装 Elasticsearch 和 Kibana安装分词插件 ik_smart 安装 Elasticsearch 和 Kibana 查看当前运行的镜像及本地已经下载的镜像,确认之前没有安装过 ES 和 Kibana 镜像 docker ps docker images从远程镜像仓库拉…...
在Flutter中使用PhotoViewGallery指南
介绍 Flutter中的PhotoViewGallery是一个功能强大的插件,用于在应用中展示可缩放的图片。无论是构建图像浏览器、相册应用,还是需要在应用中查看大图的场景,PhotoViewGallery都是一个不错的选择。 添加依赖 首先,需要在pubspec…...
c语言中的static静态(1)static修饰局部变量
#include<stdio.h> void test() {static int i 1;i;printf("%d ", i); } int main() {int j 0;while (j < 5){test();j j 1;}return 0; } 在上面的代码中,static修饰局部变量。 当用static定义一个局部变量后,这时局部变量就是…...
生信算法4 - 获取overlap序列索引和序列的算法
生信序列基本操作算法 建议在Jupyter实践,python版本3.9 1. 获取overlap序列索引和序列的算法实现 # min_length 最小overlap碱基数量3个 def getOverlapIndexAndSequence(a, b, min_length3):""" Return length of longest suffix of a matching…...
springboot 学习网站
Spring Boot 系列教程https://www.docs4dev.com/ Spring Boot 教程汇总 http://www.springboot.wiki/ Spring Cloud 微服务教程 http://www.springboot.wiki/ 1、自定义banner https://www.cnblogs.com/cc11001100/p/7456145.html 2、事件和监听器 https://blog.csd…...
论文笔记:A review on multi-label learning
一、介绍 传统的监督学习是单标签学习,但是现实中一个实例可能对应多个标签。这篇文章介绍了多标签分类的定义和评价指标、多标签学习的算法还有其他相关的任务。 二、问题相关定义 2.1 多标签学习任务 假设 X R d X R^d XRd,表示d维的输入空间&am…...
接口文档 YAPI介绍
YAPI介绍 YAPI使用流程...
LeetCode 300最长递增子序列 674最长连续递增序列 718最长重复子数组 | 代码随想录25期训练营day52
动态规划算法10 LeetCode 300 最长递增子序列 2023.12.15 题目链接代码随想录讲解[链接] int lengthOfLIS(vector<int>& nums) {//创建变量result存储最终答案,设默认值为1int result 1;//1确定dp数组,dp[i]表示以nums[i]为结尾的子数组的最长长度ve…...
Improving IP Geolocation with Target-Centric IP Graph (Student Abstract)
ABSTRACT 准确的IP地理定位对于位置感知的应用程序是必不可少的。虽然基于以路由器为中心(router-centric )的IP图的最新进展被认为是前沿的,但一个挑战仍然存在:稀疏IP图的流行(14.24%,少于10个节点,9.73%孤立)限制了图的学习。为了缓解这个问题,我们将目标主机(ta…...
华为技面三轮面试题
1. 最长回文子串 -- 中心扩散法 给你一个字符串 s,找到 s 中最长的回文子串。 如果字符串的反序与原始字符串相同,则该字符串称为回文字符串。 示例 1: 输入:s "babad" 输出:"bab" 解释&…...
Linux arm架构下构建Electron安装包
上篇文章我们介绍 Electron 基本的运行开发与 windows 安装包构建简单流程,这篇文章我们从零到一构建 Linux arm 架构下安装包,实际上 Linux arm 的构建流程,同样适用于 Linux x86 环境,只不过需要各自的环境依赖,Linu…...
【CCF BDCI 2023】多模态多方对话场景下的发言人识别 Baseline 0.71 NLP 部分
【CCF BDCI 2023】多模态多方对话场景下的发言人识别 Baseline 0.71 NLP 部分 概述NLP 简介文本处理词嵌入上下文理解 文本数据加载to_device 函数构造数据加载样本数量 len获取样本 getitem 分词构造函数调用函数轮次嵌入 RobertaRoberta 创新点NSP (Next Sentence Prediction…...
推免那些事
平生第一次搞推免,也是最后一次。错失了一些机会,也有幸获得了一些机会,值得祝庆,也值得反思。 以下记录为个人流水账。 个人背景 我的背景可以算不是非常好了,况且今年211受歧视比较严重。 学校:211&…...
华清远见嵌入式学习——QT——作业2
作业要求: 代码运行效果图: 登录失败 和 最小化 和 取消登录 登录成功 和 X号退出 代码: ①:头文件 #ifndef LOGIN_H #define LOGIN_H#include <QMainWindow> #include <QLineEdit> //行编辑器类 #include…...
C# Winfrm 编写一个天气查看助手
#前言# 最近这个北方的天气啊经常下雪,让我想起来我上学时候写的那个天气预报小功能了,今天又复现了一下,哈哈哈,大家当个乐子看哈! 1.创建项目 2.添加引用 上图所示,下载所需天气预报标识,网站…...
基于SpringBoot和微信小程序的农场信息管理系统
文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于SpringBoot和微信小程序的农场信息管…...
Linux统计网卡流量
cat /proc/net/dev Linux 内核提供了一种通过 /proc 文件系统,在运行时访问内核内部数据结构、改变内核设置的机制。proc文件系统是一个伪文件系统,它只存在内存当中,而不占用外存空间。它以文件系统的方式为访问系统内核数据的操作提供接口。…...
设计可编辑表格组件
前言 什么是可编辑表格呢?简单来说就是在一个表格里面进行表单操作,执行增删改查。这在一些后台管理系统中是尤为常见的。 今天我们根据vue2 element-ui来设计一个表单表格组件。(不涉及完整代码,想要使用完整功能可以看底部连…...
低代码是美食!!!
一、什么是低代码 低代码是一种软件开发方法,通过图形化界面和少量手写代码,让开发者能够更迅速、简单地构建应用程序。相比传统的编码方式,低代码平台提供了可视化的开发工具和预构建的组件,使开发过程更加快捷高效。 二、低代码…...
计算机网络网络层(期末、考研)
计算机网络总复习链接🔗 目录 路由算法静态路由与动态路由距离-向量算法链路状态路由算法层次路由 IPv4(这个必考)IPv4分组IPv4地址与NAT子网划分与子网掩码、CIDRARP、DHCP与ICMP地址解析协议ARP动态主机配置协议DHCP IPv6IPv6特点 路由协议…...
LCR 120. 寻找文件副本
解题思路: 利用增强for循环遍历documents,将遇见的id加入hmap中,如果id在hamp中存在,则直接返回id class Solution {public int findRepeatDocument(int[] documents) {Set<Integer> hmapnew HashSet<>();for(int d…...
git切换分支
切换到你想要保留的分支: 确保你在本地已经切换到了你想要保留的分支。 git checkout 要保留的分支名更改远程仓库地址: 如果你还没有更改远程仓库地址,使用 git remote set-url 来更改它。 git remote set-url origin 新的仓库地址推送当前分…...
Android 在UploadEventService使用ThreadPoolManager线程管理传递数据给后台
Android 在UploadEventService使用ThreadPoolManager线程管理传递数据给后台,如何实现呢? 可以通过以下步骤使用ThreadPoolManager线程管理传递数据给后台: 创建一个ThreadPoolManager类来管理线程池,比如: public cl…...
网络(十)ACL和NAT
前言 网络管理在生产环境和生活中,如何实现拒绝不希望的访问连接,同时又要允许正常的访问连接?当下公网地址消耗殆尽,且公网IP地址费用昂贵,企业访问Internet全部使用公网IP地址不够现实,如何让私网地址也…...
JavaScript算法46- 最长连续序列(leetCode:128middle)
128. 最长连续序列 一、题目 给定一个未排序的整数数组 nums ,找出数字连续的最长序列(不要求序列元素在原数组中连续)的长度。 请你设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例 输入:nums [100,4,200,1,3,2] 输出…...
提升 API 可靠性的五种方法
API 在我们的数字世界中发挥着关键的作用,使各种不同的应用能够相互通信。然而,这些 API 的可靠性是保证依赖它们的应用程序功能正常、性能稳定的关键因素。本文,我们将探讨提高 API 可靠性的五种主要策略。 1.全面测试 要确保 API 的可靠性…...
网站怎么做?/查询网站服务器
百分点是一个推荐服务的提供商,但是已经转型为大数据解决方案的提供商。 首先看一下大数据与应用画像的关系,现在大数据是炙手可热的,大数据的4个V都比较了解,大数据应该说是信息技术的自然延伸,意味的无所不在的数据…...
wordpress 培训插件/网站排名优化方法
1. 编写程序,声明一个method方法,在方法中打印一个108 的型矩形, 在main方法中调用该方法。 2. 修改上一个程序,在method方法中,除打印一个108的型矩形外,再 计算该矩形的面积,并将其作为方法返…...
线上推广员是干什么的/网站seo系统
‘can not read a block mapping entry; a multiline key may not be an implicit key’ 问题出现的地方: title: 2.8 Vue初体验 date: 2022-02-08 19:27:12 tags: 笔记 在这三个标题的:后面都必须添加一个空格,否则就会出错。...
wordpress 国内云/想做个网络推广
编者按Branch-and-Cut 是求解整数规划或混合整数规划问题最常用的算法之一。通常,把全部可行解空间反复地分割为越来越小的子集,称为分支;并且对每个子集内的解集计算一个目标下界(对于最小值问题),称为定界…...
网站建设公司做前端/私人网站管理软件
1.1 问题 编写一个send_mail.py脚本,实现以下功能: 1.创建bob和alice帐户 2.编写发送邮件件程序,发件人为root,收件人是本机的bob和alice帐户 1.2 步骤 实现此案例需要按照如下步骤进行。 步骤一:创建bob和alice帐…...
做网站接活全流程/网站推广多少钱
VLAN(Virtual Local Area Network)虚拟局域网一种将局域网内的设备通过逻辑地划分成为一个个网段来进行管理的技术VLAN是建立在物理网络基础上的一种逻辑子网,因此建立VLAN需要相应的支持VLAN技术的网络设备当网络中的不同VLAN间进行相互通信…...