本文的目的是介绍深入剖析redisAOF持久化策略的详细情况,特别关注redisaof持久化配置的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解深入剖析redisAO
本文的目的是介绍深入剖析 redis AOF 持久化策略的详细情况,特别关注redis aof持久化配置的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解深入剖析 redis AOF 持久化策略的机会,同时也不会遗漏关于AOF 持久化策略、Golang 实现 Redis(4): AOF 持久化与AOF重写、redis AOF 持久化、Redis RESP 协议与 AOF 持久化有什么关系?(Redis持久化原理)的知识。
本文目录一览:- 深入剖析 redis AOF 持久化策略(redis aof持久化配置)
- AOF 持久化策略
- Golang 实现 Redis(4): AOF 持久化与AOF重写
- redis AOF 持久化
- Redis RESP 协议与 AOF 持久化有什么关系?(Redis持久化原理)
深入剖析 redis AOF 持久化策略(redis aof持久化配置)
深入剖析 redis AOF 持久化策略
转自 https://www.cnblogs.com/daoluanxiaozi/p/3664922.html
本篇主要讲的是 AOF 持久化,了解 AOF 的数据组织方式和运作机制。redis 主要在 aof.c 中实现 AOF 的操作。
数据结构 rio
redis AOF 持久化同样借助了 struct rio. 详细内容在《深入剖析 redis RDB 持久化策略》中有介绍。
AOF 数据组织方式
假设 redis 内存有「name:Jhon」的键值对,那么进行 AOF 持久化后,AOF 文件有如下内容:
*2 # 2个参数
$6 # 第一个参数长度为 6
SELECT # 第一个参数
$1 # 第二参数长度为 1
8 # 第二参数
*3 # 3个参数
$3 # 第一个参数长度为 4
SET # 第一个参数
$4 # 第二参数长度为 4
name # 第二个参数
$4 # 第三个参数长度为 4
Jhon # 第二参数长度为 4
所以对上面的内容进行恢复,能得到熟悉的一条 redis 命令:SELECT 8;SET name Jhon.
可以想象的是,redis 遍历内存数据集中的每个 key-value 对,依次写入磁盘中;redis 启动的时候,从 AOF 文件中读取数据,恢复数据。
AOF 持久化运作机制
和 redis RDB 持久化运作机制不同,redis AOF 有后台执行和边服务边备份两种方式。
aof_persistence
1)AOF 后台执行的方式和 RDB 有类似的地方,fork 一个子进程,主进程仍进行服务,子进程执行 AOF 持久化,数据被 dump 到磁盘上。与 RDB 不同的是,后台子进程持久化过程中,主进程会记录期间的所有数据变更(主进程还在服务),并存储在 server.aof_rewrite_buf_blocks 中;后台子进程结束后,redis 更新缓存追加到 AOF 文件中,是 RDB 持久化所不具备的。
来说说更新缓存这个东西。redis 服务器产生数据变更的时候,譬如 set name Jhon,不仅仅会修改内存数据集,也会记录此更新(修改)操作,记录的方式就是上面所说的数据组织方式。
更新缓存可以存储在 server.aof_buf 中,你可以把它理解为一个小型临时中转站,所有累积的更新缓存都会先放入这里,它会在特定时机写入文件或者插入到 server.aof_rewrite_buf_blocks 下链表(下面会详述);server.aof_buf 中的数据在 propagrate() 添加,在涉及数据更新的地方都会调用 propagrate() 以累积变更。更新缓存也可以存储在 server.aof_rewrite_buf_blocks,这是一个元素类型为 struct aofrwblock 的链表,你可以把它理解为一个仓库,当后台有 AOF 子进程的时候,会将累积的更新缓存(在 server.aof_buf 中)插入到链表中,而当 AOF 子进程结束,它会被整个写入到文件。两者是有关联的。
下面是后台执行的主要代码:
// 启动后台子进程,执行 AOF 持久化操作。bgrewriteaofCommand(),startAppendOnly(),serverCron() 中会调用此函数
/* This is how rewriting of the append only file in background works:
*
* 1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) When the child finished ''2a'' exists.
* 4) The parent will trap the exit code, if it''s OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
// 已经有正在执行备份的子进程
if (server.aof_child_pid != -1) return REDIS_ERR;
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
// 子进程
/* Child */
// 关闭监听
closeListeningSockets(0);
// 设置进程 title
redisSetProcTitle("redis-aof-rewrite");
// 临时文件名
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
// 脏数据,其实就是子进程所消耗的内存大小
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
// 获取脏数据大小
size_t private_dirty = zmalloc_get_private_dirty();
// 记录脏数据
if (private_dirty) {
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can''t rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
// AOF 已经开始执行,取消 AOF 计划
server.aof_rewrite_scheduled = 0;
// AOF 最近一次执行的起始时间
server.aof_rewrite_time_start = time(NULL);
// 子进程 ID
server.aof_child_pid = childpid;
updateDictResizePolicy();
// 因为更新缓存都将写入文件,要强制产生选择数据集的指令 SELECT ,以防出现数据合并错误。
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
// AOF 持久化主函数。只在 rewriteAppendOnlyFileBackground() 中会调用此函数
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
// 打开文件
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return REDIS_ERR;
}
// 初始化 rio 结构体
rioInitWithFile(&aof,fp);
// 如果设置了自动备份参数,将进行设置
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
// 备份每一个数据集
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
// 获取数据集的迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
// 写入 AOF 操作码
/* SELECT the new DB */
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
// 写入数据集序号
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
// 写入数据集中每一个数据项
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
keystr = dictGetKey(de);
o = dictGetVal(de);
// 将 keystr 封装在 robj 里
initStaticStringObject(key,keystr);
// 获取过期时间
expiretime = getExpire(db,&key);
// 如果已经过期,放弃存储
/* If this key is already expired skip it */
if (expiretime != -1 && expiretime < now) continue;
// 写入键值对应的写操作
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
} else {
redisPanic("Unknown object type");
}
// 写入过期时间
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
}
// 释放迭代器
dictReleaseIterator(di);
}
// 写入磁盘
/* Make sure data will not remain on the OS''s output buffers */
fflush(fp);
aof_fsync(fileno(fp));
fclose(fp);
// 重写文件名
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
return REDIS_OK;
werr:
// 清理工作
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
// 后台子进程结束后,redis 更新缓存 server.aof_rewrite_buf_blocks 追加到 AOF 文件中
// 在 AOF 持久化结束后会执行这个函数, backgroundRewriteDoneHandler() 主要工作是将 server.aof_rewrite_buf_blocks,即 AOF 缓存写入文件
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
......
// 将 AOF 缓存 server.aof_rewrite_buf_blocks 的 AOF 写入磁盘
if (aofRewriteBufferWrite(newfd) == -1) {
redisLog(REDIS_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
......
}
// 将累积的更新缓存 server.aof_rewrite_buf_blocks 同步到磁盘
/* Write the buffer (possibly composed of multiple blocks) into the specified
* fd. If no short write or any other error happens -1 is returned,
* otherwise the number of bytes written is returned. */
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
if (block->used) {
nwritten = write(fd,block->buf,block->used);
if (nwritten != block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}
2)边服务边备份的方式,即 redis 服务器会把所有的数据变更存储在 server.aof_buf 中,并在特定时机将更新缓存写入预设定的文件(server.aof_filename)。特定时机有三种:
进入事件循环之前
redis 服务器定时程序 serverCron() 中
停止 AOF 策略的 stopAppendOnly() 中
redis 无非是不想服务器突然崩溃终止,导致过多的数据丢失。redis 默认是每两秒钟进行一次边服务边备份,即隔两秒将累积的写入文件。
redis 为什么取消直接在本进程进行 AOF 持久化的方法?原因可能是产生一个 AOF 文件要比 RDB 文件消耗更多的时间;如果在当前进程执行 AOF 持久化,会占用服务进程(主进程)较多的时间,停止服务的时间也更长(?)
下面是边服务边备份的主要代码:
// 同步磁盘;将所有累积的更新 server.aof_buf 写入磁盘
/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* About the ''force'' argument:
*
* When the fsync policy is set to ''everysec'' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we''ll write regardless of the background
* fsync. */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
// 无数据,无需同步到磁盘
if (sdslen(server.aof_buf) == 0) return;
// 创建线程任务,主要调用 fsync()
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
// 如果没有设置强制同步的选项,可能不会立即进行同步
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
// 推迟执行 AOF
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
// 设置延迟冲洗时间选项
/* No previous write postponinig, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime; // /* Unix time sampled every cron cycle. */
return;
// 没有超过 2s,直接结束
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
// 否则,要强制写入磁盘
/* Otherwise fall trough, and go write since we can''t wait
* over two seconds. */
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
// 取消延迟冲洗时间设置
/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don''t think
* there is much to do about the whole server stopping for power problems
* or alike */
// AOF 文件已经打开了。将 server.aof_buf 中的所有缓存数据写入文件
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {
/* Ooops, we are in troubles. The best thing to do for now is
* aborting instead of giving the illusion that everything is
* working as expected. */
if (nwritten == -1) {
redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
} else {
redisLog(REDIS_WARNING,"Exiting on short write while writing to "
"the append-only file: %s (nwritten=%ld, "
"expected=%ld)",
strerror(errno),
(long)nwritten,
(long)sdslen(server.aof_buf));
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
}
exit(1);
}
// 更新 AOF 文件的大小
server.aof_current_size += nwritten;
/*当 server.aof_buf 足够小,重新利用空间,防止频繁的内存分配。
相反,当 server.aof_buf 占据大量的空间,采取的策略是释放空间,可见 redis 对内存很敏感。*/
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
/* Don''t fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
// sync,写入磁盘
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let''s try to get this data on the disk */
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}
细说更新缓存
上面两次提到了「更新缓存」,它即是 redis 累积的数据变更。
更新缓存可以存储在 server.aof_buf 中,可以存储在 server.server.aof_rewrite_buf_blocks 连表中。他们的关系是:每一次数据变更记录都会写入 server.aof_buf 中,同时如果后台子进程在持久化,变更记录还会被写入 server.server.aof_rewrite_buf_blocks 中。server.aof_buf 会在特定时期写入指定文件,server.server.aof_rewrite_buf_blocks 会在后台持久化结束后追加到文件。
redis 源码中是这么实现的:propagrate()->feedAppendOnlyFile()->aofRewriteBufferAppend()
注释:feedAppendOnlyFile() 会把更新添加到 server.aof_buf;接下来会有一个判断,如果存在 AOF 子进程,则调用 aofRewriteBufferAppend() 将 server.aof_buf 中的所有数据插入到 server.aof_rewrite_buf_blocks 链表。
一副可以缓解视力疲劳的图片——AOF 持久化运作机制:
how_aof_works
下面是主要的代码:
// 向 AOF 和从机发布数据更新
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
* flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// AOF 策略需要打开,且设置 AOF 传播标记,将更新发布给本地文件
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
// 设置了从机传播标记,将更新发布给从机
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
// 将数据更新记录到 AOF 缓存中
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appendend. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else {
/* All the other commands don''t need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
// 将生成的 AOF 追加到 server.aof_buf 中。server.在下一次进入事件循环之前,aof_buf 中的内容将会写到磁盘上
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
// 如果已经有 AOF 子进程运行,redis 采取的策略是累积子进程 AOF 备份的数据和内存中数据集的差异。 aofRewriteBufferAppend() 把 buf 的内容追加到 server.aof_rewrite_buf_blocks 数组中
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
// 将数据更新记录写入 server.aof_rewrite_buf_blocks,此函数只由 feedAppendOnlyFile() 调用
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
// 尾插法
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
if (len) { /* First block to allocate, or need another block. */
int numblocks;
// 创建新的节点,插到尾部
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
// 尾插法
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
REDIS_NOTICE;
redisLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
}
两种数据落地的方式,就是 AOF 的两个主线。因此,redis AOF 持久化机制有两条主线:后台执行和边服务边备份,抓住这两点就能理解 redis AOF 了。
这里有一个疑问,两条主线都会涉及文件的写:后台执行会写一个 AOF 文件,边服务边备份也会写一个,以哪个为准?
后台持久化的数据首先会被写入「temp-rewriteaof-bg-%d.aof」,其中「%d」是 AOF 子进程 id;待 AOF 子进程结束后,「temp-rewriteaof-bg-%d.aof」会被以追加的方式打开,继而写入 server.aof_rewrite_buf_blocks 中的更新缓存,最后「temp-rewriteaof-bg-%d.aof」文件被命名为 server.aof_filename,所以之前的名为 server.aof_filename 的文件会被删除,也就是说边服务边备份写入的文件会被删除。边服务边备份的数据会被一直写入到 server.aof_filename 文件中。
因此,确实会产生两个文件,但是最后都会变成 server.aof_filename 文件。
这里还有一个疑问,既然有了后台持久化,为什么还要边服务边备份?边服务边备份时间长了会产生数据冗余甚至备份过旧的数据,而后台持久化可以消除这些东西。看,这里是 redis 的双保险。
AOF 恢复过程
AOF 的数据恢复过程设计实在是棒极了,它模拟一个服务过程。redis 首先虚拟一个客户端,读取 AOF 文件恢复 redis 命令和参数;然后就像服务客户端一样执行命令相应的函数,从而恢复数据。这些过程主要在loadAppendOnlyFile() 中实现。
// 加载 AOF 文件,恢复数据
/* Replay the append log file. On error REDIS_OK is returned. On non fatal
* error (the append only file is zero-length) REDIS_ERR is returned. On
* fatal error an error message is logged and the program exists. */
int loadAppendOnlyFile(char *filename) {
struct redisClient *fakeClient;
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
// 文件大小不能为 0
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return REDIS_ERR;
}
if (fp == NULL) {
redisLog(REDIS_WARNING,"Fatal error: can''t open the append log file for reading: %s",strerror(errno));
exit(1);
}
// 正在执行 AOF 加载操作,于是暂时禁止 AOF 的所有操作,以免混淆
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
* to the same file we''re about to read. */
server.aof_state = REDIS_AOF_OFF;
// 虚拟出一个客户端,即 redisClient
fakeClient = createFakeClient();
startLoading(fp);
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;
// 每循环 1000 次,在恢复数据的同时,服务器也为客户端服务。aeProcessEvents() 会进入事件循环
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
}
// 可能 aof 文件到了结尾
if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp))
break;
else
goto readerr;
}
// 必须以“*”开头,格式不对,退出
if (buf[0] != ''*'') goto fmterr;
// 参数的个数
argc = atoi(buf+1);
// 参数个数错误
if (argc < 1) goto fmterr;
// 为参数分配空间
argv = zmalloc(sizeof(robj*)*argc);
// 依次读取参数
for (j = 0; j < argc; j++) {
if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
if (buf[0] != ''$'') goto fmterr;
len = strtol(buf+1,NULL,10);
argsds = sdsnewlen(NULL,len);
if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
argv[j] = createObject(REDIS_STRING,argsds);
if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
}
// 找到相应的命令
/* Command lookup */
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
redisLog(REDIS_WARNING,"Unknown command ''%s'' reading the append only file", (char*)argv[0]->ptr);
exit(1);
}
// 执行命令,模拟服务客户端请求的过程,从而写入数据
/* Run the command in the context of a fake client */
fakeClient->argc = argc;
fakeClient->argv = argv;
cmd->proc(fakeClient);
/* The fake client should not have a reply */
redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
/* The fake client should never get blocked */
redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
// 释放虚拟客户端空间
/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
for (j = 0; j < fakeClient->argc; j++)
decrRefCount(fakeClient->argv[j]);
zfree(fakeClient->argv);
}
/* This point can only be reached when EOF is reached without errors.
* If the client is in the middle of a MULTI/EXEC, log error and quit. */
if (fakeClient->flags & REDIS_MULTI) goto readerr;
// 清理工作
fclose(fp);
freeFakeClient(fakeClient);
// 恢复旧的 AOF 状态
server.aof_state = old_aof_state;
stopLoading();
// 记录最近 AOF 操作的文件大小
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
return REDIS_OK;
readerr:
// 错误,清理工作
if (feof(fp)) {
redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
} else {
redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
}
exit(1);
fmterr:
redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
exit(1);
}
AOF 的适用场景
如果对数据比较关心,分秒必争,可以用 AOF 持久化,而且 AOF 文件很容易进行分析。
—-
http://daoluan.net
深入剖析 redis AOF 持久化策略
转自 https://www.cnblogs.com/daoluanxiaozi/p/3664922.html
本篇主要讲的是 AOF 持久化,了解 AOF 的数据组织方式和运作机制。redis 主要在 aof.c 中实现 AOF 的操作。
数据结构 rio
redis AOF 持久化同样借助了 struct rio. 详细内容在《深入剖析 redis RDB 持久化策略》中有介绍。
AOF 数据组织方式
假设 redis 内存有「name:Jhon」的键值对,那么进行 AOF 持久化后,AOF 文件有如下内容:
*2 # 2个参数
$6 # 第一个参数长度为 6
SELECT # 第一个参数
$1 # 第二参数长度为 1
8 # 第二参数
*3 # 3个参数
$3 # 第一个参数长度为 4
SET # 第一个参数
$4 # 第二参数长度为 4
name # 第二个参数
$4 # 第三个参数长度为 4
Jhon # 第二参数长度为 4
所以对上面的内容进行恢复,能得到熟悉的一条 redis 命令:SELECT 8;SET name Jhon.
可以想象的是,redis 遍历内存数据集中的每个 key-value 对,依次写入磁盘中;redis 启动的时候,从 AOF 文件中读取数据,恢复数据。
AOF 持久化运作机制
和 redis RDB 持久化运作机制不同,redis AOF 有后台执行和边服务边备份两种方式。
aof_persistence
1)AOF 后台执行的方式和 RDB 有类似的地方,fork 一个子进程,主进程仍进行服务,子进程执行 AOF 持久化,数据被 dump 到磁盘上。与 RDB 不同的是,后台子进程持久化过程中,主进程会记录期间的所有数据变更(主进程还在服务),并存储在 server.aof_rewrite_buf_blocks 中;后台子进程结束后,redis 更新缓存追加到 AOF 文件中,是 RDB 持久化所不具备的。
来说说更新缓存这个东西。redis 服务器产生数据变更的时候,譬如 set name Jhon,不仅仅会修改内存数据集,也会记录此更新(修改)操作,记录的方式就是上面所说的数据组织方式。
更新缓存可以存储在 server.aof_buf 中,你可以把它理解为一个小型临时中转站,所有累积的更新缓存都会先放入这里,它会在特定时机写入文件或者插入到 server.aof_rewrite_buf_blocks 下链表(下面会详述);server.aof_buf 中的数据在 propagrate() 添加,在涉及数据更新的地方都会调用 propagrate() 以累积变更。更新缓存也可以存储在 server.aof_rewrite_buf_blocks,这是一个元素类型为 struct aofrwblock 的链表,你可以把它理解为一个仓库,当后台有 AOF 子进程的时候,会将累积的更新缓存(在 server.aof_buf 中)插入到链表中,而当 AOF 子进程结束,它会被整个写入到文件。两者是有关联的。
下面是后台执行的主要代码:
// 启动后台子进程,执行 AOF 持久化操作。bgrewriteaofCommand(),startAppendOnly(),serverCron() 中会调用此函数
/* This is how rewriting of the append only file in background works:
*
* 1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) When the child finished ''2a'' exists.
* 4) The parent will trap the exit code, if it''s OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
// 已经有正在执行备份的子进程
if (server.aof_child_pid != -1) return REDIS_ERR;
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
// 子进程
/* Child */
// 关闭监听
closeListeningSockets(0);
// 设置进程 title
redisSetProcTitle("redis-aof-rewrite");
// 临时文件名
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
// 脏数据,其实就是子进程所消耗的内存大小
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
// 获取脏数据大小
size_t private_dirty = zmalloc_get_private_dirty();
// 记录脏数据
if (private_dirty) {
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can''t rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
// AOF 已经开始执行,取消 AOF 计划
server.aof_rewrite_scheduled = 0;
// AOF 最近一次执行的起始时间
server.aof_rewrite_time_start = time(NULL);
// 子进程 ID
server.aof_child_pid = childpid;
updateDictResizePolicy();
// 因为更新缓存都将写入文件,要强制产生选择数据集的指令 SELECT ,以防出现数据合并错误。
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
// AOF 持久化主函数。只在 rewriteAppendOnlyFileBackground() 中会调用此函数
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
// 打开文件
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return REDIS_ERR;
}
// 初始化 rio 结构体
rioInitWithFile(&aof,fp);
// 如果设置了自动备份参数,将进行设置
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
// 备份每一个数据集
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
// 获取数据集的迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
// 写入 AOF 操作码
/* SELECT the new DB */
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
// 写入数据集序号
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
// 写入数据集中每一个数据项
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
keystr = dictGetKey(de);
o = dictGetVal(de);
// 将 keystr 封装在 robj 里
initStaticStringObject(key,keystr);
// 获取过期时间
expiretime = getExpire(db,&key);
// 如果已经过期,放弃存储
/* If this key is already expired skip it */
if (expiretime != -1 && expiretime < now) continue;
// 写入键值对应的写操作
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
} else {
redisPanic("Unknown object type");
}
// 写入过期时间
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
}
// 释放迭代器
dictReleaseIterator(di);
}
// 写入磁盘
/* Make sure data will not remain on the OS''s output buffers */
fflush(fp);
aof_fsync(fileno(fp));
fclose(fp);
// 重写文件名
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
return REDIS_OK;
werr:
// 清理工作
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
// 后台子进程结束后,redis 更新缓存 server.aof_rewrite_buf_blocks 追加到 AOF 文件中
// 在 AOF 持久化结束后会执行这个函数, backgroundRewriteDoneHandler() 主要工作是将 server.aof_rewrite_buf_blocks,即 AOF 缓存写入文件
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
......
// 将 AOF 缓存 server.aof_rewrite_buf_blocks 的 AOF 写入磁盘
if (aofRewriteBufferWrite(newfd) == -1) {
redisLog(REDIS_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
......
}
// 将累积的更新缓存 server.aof_rewrite_buf_blocks 同步到磁盘
/* Write the buffer (possibly composed of multiple blocks) into the specified
* fd. If no short write or any other error happens -1 is returned,
* otherwise the number of bytes written is returned. */
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
if (block->used) {
nwritten = write(fd,block->buf,block->used);
if (nwritten != block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}
2)边服务边备份的方式,即 redis 服务器会把所有的数据变更存储在 server.aof_buf 中,并在特定时机将更新缓存写入预设定的文件(server.aof_filename)。特定时机有三种:
进入事件循环之前
redis 服务器定时程序 serverCron() 中
停止 AOF 策略的 stopAppendOnly() 中
redis 无非是不想服务器突然崩溃终止,导致过多的数据丢失。redis 默认是每两秒钟进行一次边服务边备份,即隔两秒将累积的写入文件。
redis 为什么取消直接在本进程进行 AOF 持久化的方法?原因可能是产生一个 AOF 文件要比 RDB 文件消耗更多的时间;如果在当前进程执行 AOF 持久化,会占用服务进程(主进程)较多的时间,停止服务的时间也更长(?)
下面是边服务边备份的主要代码:
// 同步磁盘;将所有累积的更新 server.aof_buf 写入磁盘
/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* About the ''force'' argument:
*
* When the fsync policy is set to ''everysec'' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we''ll write regardless of the background
* fsync. */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
// 无数据,无需同步到磁盘
if (sdslen(server.aof_buf) == 0) return;
// 创建线程任务,主要调用 fsync()
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
// 如果没有设置强制同步的选项,可能不会立即进行同步
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
// 推迟执行 AOF
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
// 设置延迟冲洗时间选项
/* No previous write postponinig, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime; // /* Unix time sampled every cron cycle. */
return;
// 没有超过 2s,直接结束
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
// 否则,要强制写入磁盘
/* Otherwise fall trough, and go write since we can''t wait
* over two seconds. */
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
// 取消延迟冲洗时间设置
/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don''t think
* there is much to do about the whole server stopping for power problems
* or alike */
// AOF 文件已经打开了。将 server.aof_buf 中的所有缓存数据写入文件
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {
/* Ooops, we are in troubles. The best thing to do for now is
* aborting instead of giving the illusion that everything is
* working as expected. */
if (nwritten == -1) {
redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
} else {
redisLog(REDIS_WARNING,"Exiting on short write while writing to "
"the append-only file: %s (nwritten=%ld, "
"expected=%ld)",
strerror(errno),
(long)nwritten,
(long)sdslen(server.aof_buf));
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
}
exit(1);
}
// 更新 AOF 文件的大小
server.aof_current_size += nwritten;
/*当 server.aof_buf 足够小,重新利用空间,防止频繁的内存分配。
相反,当 server.aof_buf 占据大量的空间,采取的策略是释放空间,可见 redis 对内存很敏感。*/
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
/* Don''t fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
// sync,写入磁盘
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let''s try to get this data on the disk */
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}
细说更新缓存
上面两次提到了「更新缓存」,它即是 redis 累积的数据变更。
更新缓存可以存储在 server.aof_buf 中,可以存储在 server.server.aof_rewrite_buf_blocks 连表中。他们的关系是:每一次数据变更记录都会写入 server.aof_buf 中,同时如果后台子进程在持久化,变更记录还会被写入 server.server.aof_rewrite_buf_blocks 中。server.aof_buf 会在特定时期写入指定文件,server.server.aof_rewrite_buf_blocks 会在后台持久化结束后追加到文件。
redis 源码中是这么实现的:propagrate()->feedAppendOnlyFile()->aofRewriteBufferAppend()
注释:feedAppendOnlyFile() 会把更新添加到 server.aof_buf;接下来会有一个判断,如果存在 AOF 子进程,则调用 aofRewriteBufferAppend() 将 server.aof_buf 中的所有数据插入到 server.aof_rewrite_buf_blocks 链表。
一副可以缓解视力疲劳的图片——AOF 持久化运作机制:
how_aof_works
下面是主要的代码:
// 向 AOF 和从机发布数据更新
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
* flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// AOF 策略需要打开,且设置 AOF 传播标记,将更新发布给本地文件
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
// 设置了从机传播标记,将更新发布给从机
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
// 将数据更新记录到 AOF 缓存中
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appendend. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else {
/* All the other commands don''t need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
// 将生成的 AOF 追加到 server.aof_buf 中。server.在下一次进入事件循环之前,aof_buf 中的内容将会写到磁盘上
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
// 如果已经有 AOF 子进程运行,redis 采取的策略是累积子进程 AOF 备份的数据和内存中数据集的差异。 aofRewriteBufferAppend() 把 buf 的内容追加到 server.aof_rewrite_buf_blocks 数组中
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
// 将数据更新记录写入 server.aof_rewrite_buf_blocks,此函数只由 feedAppendOnlyFile() 调用
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
// 尾插法
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
if (len) { /* First block to allocate, or need another block. */
int numblocks;
// 创建新的节点,插到尾部
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
// 尾插法
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
REDIS_NOTICE;
redisLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
}
两种数据落地的方式,就是 AOF 的两个主线。因此,redis AOF 持久化机制有两条主线:后台执行和边服务边备份,抓住这两点就能理解 redis AOF 了。
这里有一个疑问,两条主线都会涉及文件的写:后台执行会写一个 AOF 文件,边服务边备份也会写一个,以哪个为准?
后台持久化的数据首先会被写入「temp-rewriteaof-bg-%d.aof」,其中「%d」是 AOF 子进程 id;待 AOF 子进程结束后,「temp-rewriteaof-bg-%d.aof」会被以追加的方式打开,继而写入 server.aof_rewrite_buf_blocks 中的更新缓存,最后「temp-rewriteaof-bg-%d.aof」文件被命名为 server.aof_filename,所以之前的名为 server.aof_filename 的文件会被删除,也就是说边服务边备份写入的文件会被删除。边服务边备份的数据会被一直写入到 server.aof_filename 文件中。
因此,确实会产生两个文件,但是最后都会变成 server.aof_filename 文件。
这里还有一个疑问,既然有了后台持久化,为什么还要边服务边备份?边服务边备份时间长了会产生数据冗余甚至备份过旧的数据,而后台持久化可以消除这些东西。看,这里是 redis 的双保险。
AOF 恢复过程
AOF 的数据恢复过程设计实在是棒极了,它模拟一个服务过程。redis 首先虚拟一个客户端,读取 AOF 文件恢复 redis 命令和参数;然后就像服务客户端一样执行命令相应的函数,从而恢复数据。这些过程主要在loadAppendOnlyFile() 中实现。
// 加载 AOF 文件,恢复数据
/* Replay the append log file. On error REDIS_OK is returned. On non fatal
* error (the append only file is zero-length) REDIS_ERR is returned. On
* fatal error an error message is logged and the program exists. */
int loadAppendOnlyFile(char *filename) {
struct redisClient *fakeClient;
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
// 文件大小不能为 0
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return REDIS_ERR;
}
if (fp == NULL) {
redisLog(REDIS_WARNING,"Fatal error: can''t open the append log file for reading: %s",strerror(errno));
exit(1);
}
// 正在执行 AOF 加载操作,于是暂时禁止 AOF 的所有操作,以免混淆
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
* to the same file we''re about to read. */
server.aof_state = REDIS_AOF_OFF;
// 虚拟出一个客户端,即 redisClient
fakeClient = createFakeClient();
startLoading(fp);
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;
// 每循环 1000 次,在恢复数据的同时,服务器也为客户端服务。aeProcessEvents() 会进入事件循环
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
}
// 可能 aof 文件到了结尾
if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp))
break;
else
goto readerr;
}
// 必须以“*”开头,格式不对,退出
if (buf[0] != ''*'') goto fmterr;
// 参数的个数
argc = atoi(buf+1);
// 参数个数错误
if (argc < 1) goto fmterr;
// 为参数分配空间
argv = zmalloc(sizeof(robj*)*argc);
// 依次读取参数
for (j = 0; j < argc; j++) {
if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
if (buf[0] != ''$'') goto fmterr;
len = strtol(buf+1,NULL,10);
argsds = sdsnewlen(NULL,len);
if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
argv[j] = createObject(REDIS_STRING,argsds);
if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
}
// 找到相应的命令
/* Command lookup */
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
redisLog(REDIS_WARNING,"Unknown command ''%s'' reading the append only file", (char*)argv[0]->ptr);
exit(1);
}
// 执行命令,模拟服务客户端请求的过程,从而写入数据
/* Run the command in the context of a fake client */
fakeClient->argc = argc;
fakeClient->argv = argv;
cmd->proc(fakeClient);
/* The fake client should not have a reply */
redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
/* The fake client should never get blocked */
redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
// 释放虚拟客户端空间
/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
for (j = 0; j < fakeClient->argc; j++)
decrRefCount(fakeClient->argv[j]);
zfree(fakeClient->argv);
}
/* This point can only be reached when EOF is reached without errors.
* If the client is in the middle of a MULTI/EXEC, log error and quit. */
if (fakeClient->flags & REDIS_MULTI) goto readerr;
// 清理工作
fclose(fp);
freeFakeClient(fakeClient);
// 恢复旧的 AOF 状态
server.aof_state = old_aof_state;
stopLoading();
// 记录最近 AOF 操作的文件大小
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
return REDIS_OK;
readerr:
// 错误,清理工作
if (feof(fp)) {
redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
} else {
redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
}
exit(1);
fmterr:
redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
exit(1);
}
AOF 的适用场景
如果对数据比较关心,分秒必争,可以用 AOF 持久化,而且 AOF 文件很容易进行分析。
—-
http://daoluan.net
AOF 持久化策略
Redis 为了解决 AOF 后台重写造成的数据不一致问题,设置了 AOF 重写缓冲区。即使设置了 no-appendfsync-on-rewrite yes 也会造成短暂的主进程阻塞。原因就在于子进程完成 AOF 重写之后,会发送一个信号给主进程,而父进程会在这个时候调用信号处理函数,主要是将新的 AOF 文件替换旧的 AOF 文件,那么在这段时间内, 主进程是阻塞的。
简介
AOF 持久化和 RDB 持久化的最主要区别在于,前者记录了数据的变更,而后者是保存了数据本身。本篇主要讲的是 AOF 持久化,了解 AOF 的数据组织方式和运作机制。Redis 主要在 aof.c 中实现 AOF 的操作。
同样,AOF 持久化也会涉及文件的读写,会用到数据结构 rio。关于 rio 已经在上一个篇章已经讲述,在此不做展开。
AOF 数据组织方式
假设 redis 内存有「name:Jhon」的键值对,那么进行 AOF 持久化后,AOF 文件有如下内容:
*2 # 2 个参数
$6 # 第一个参数长度为6
SELECT # 第一个参数
$1 # 第二参数长度为1
8 # 第二参数
*3 # 3 个参数
$3 # 第一个参数长度为4
SET # 第一个参数
$4 # 第二参数长度为4
name # 第二个参数
$4 # 第三个参数长度为4
Jhon # 第二参数长度为4
所以对上面的内容进行恢复,能得到熟悉的一条 Redis 命令:SELECT 8;SET name Jhon. 可以想象的是,Redis 遍历内存数据集中的每个 key-value 对,依次写入磁盘中;Redis 启动的时候,从 AOF 文件中读取数据,恢复数据。
AOF 持久化运作机制
和 redis RDB 持久化运作机制不同,redis AOF 有后台执行和边服务边备份两种方式。
1)AOF 后台执行的方式和 RDB 有类似的地方,fork 一个子进程,主进程仍进行服务,子进程执行 AOF 持久化,数据被 dump 到磁盘上。与 RDB 不同的是,后台子进程持久化过程中,主进程会记录期间的所有数据变更(主进程还在服务),并存储在 server.aof_rewrite_buf_blocks 中;后台子进程结束后,Redis 更新缓存追加到 AOF 文件中,是 RDB 持久化所不具备的。
来说说更新缓存这个东西。Redis 服务器产生数据变更的时候,譬如 set name Jhon,不仅仅会修改内存数据集,也会记录此更新(修改)操作,记录的方式就是上面所说的数据组织方式。
更新缓存可以存储在 server.aofbuf 中,你可以把它理解为一个小型临时中转站,所有累积的更新缓存都会先放入这里,它会在特定时机写入文件或者插入到 server.aof-rewrite_buf_blocks 下链表(下面会详述);server.aofbuf 中的数据在 propagrate () 添加,在涉及数据更新的地方都会调用 propagrate () 以累积变更。更新缓存也可以存储在 server.aof-rewrite_buf_blocks,这是一个元素类型为 struct aofrwblock 的链表,你可以把它理解为一个仓库,当后台有 AOF 子进程的时候,会将累积的更新缓存(在 server.aof_buf 中)插入到链表中,而当 AOF 子进程结束,它会被整个写入到文件。两者是有关联的。
这里的意图即是不用每次出现数据变更的时候都触发一个写操作,可以将写操作先缓存到内存中,待到合适的时机写入到磁盘,如此避免频繁的写操作。当然,完全可以实现让数据变更及时更新到磁盘中。两种做法的好坏就是一种博弈了。
下面是后台执行的主要代码:
// 启动后台子进程,执行AOF 持久化操作。bgrewriteaofCommand(),startAppendOnly(),
// serverCron() 中会调用此函数
/* This is how rewriting of the append only file in background works:
**1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* * 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) When the child finished ''2a'' exists.
* 4) The parent will trap the exit code, if it''s OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
// 已经有正在执行备份的子进程
if (server.aof_child_pid != -1) return REDIS_ERR;
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
// 子进程
/* Child */
// 关闭监听
closeListeningSockets(0);
// 设置进程title
redisSetProcTitle("redis-aof-rewrite");
// 临时文件名
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
// 开始执行AOF 持久化
if (rewriteAppendO nlyFile(tmpfile) == REDIS_OK) {
// 脏数据,其实就是子进程所消耗的内存大小
// 获取脏数据大小
size_t private_dirty = zmalloc_get_private_dirty();
// 记录脏数据
if (private_dirty) {
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can''t rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
// AOF 已经开始执行,取消AOF 计划
server.aof_rewrite_scheduled = 0;
// AOF 最近一次执行的起始时间
server.aof_rewrite_time_start = time(NULL);
// 子进程ID
server.aof_child_pid = childpid;
updateDictResizePolicy();
// 因为更新缓存都将写入文件,要强制产生选择数据集的指令SELECT ,以防出现数据
// 合并错误。
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
如上,子进程执行 AOF 持久化,父进程则会记录一些 AOF 的执行信息。下面来看看 AOF 持久化具体是怎么做的?
// AOF 持久化主函数。只在rewriteAppendOnlyFileBackground() 中会调用此函数
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
**
In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
// 打开文件
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in"
"rewriteAppendOnlyFile(): %s", strerror(errno));
return REDIS_ERR;
}
// 初始化rio 结构体
rioInitWithFile(&aof,fp);
// 如果设置了自动备份参数,将进行设置
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
// 备份每一个数据集
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
// 获取数据集的迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
// 写入AOF 操作码
/* SELECT the new DB */
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
// 写入数据集序号
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
// 写入数据集中每一个数据项
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
keystr = dictGetKey(de);
o = dictGetVal(de);
// 将keystr 封装在robj 里
initStaticStringObject(key,keystr);
// 获取过期时间
expiretime = getExpire(db,&key);
// 如果已经过期,放弃存储
/* If this key is already expired skip it */
if (expiretime != -1 && expiretime < now) continue;
// 写入键值对应的写操作
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
} else {
redisPanic("Unknown object type");
}
// 写入过期时间
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
}
// 释放迭代器
dictReleaseIterator(di);
}
// 写入磁盘
/* Make sure data will not remain on the OS''s output buffers */
fflush(fp);
aof_fsync(fileno(fp));
fclose(fp);
// 重写文件名
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp append only file on the "
"final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
return REDIS_OK;
werr:
// 清理工作
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error writing append only file on disk: "
"%s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
刚才所说,AOF 在持久化结束后,持久化过程产生的数据变更也会追加到 AOF 文件中。如果有留意定时处理函数 serverCorn ():父进程会在子进程结束后,将 AOF 持久化过程中产生的数据变更,追加到 AOF 文件。这就是 backgroundRewriteDoneHandler () 要做的:将 server.aof_rewrite_buf_blocks 追加到 AOF 文件。
// 后台子进程结束后,Redis 更新缓存server.aof_rewrite_buf_blocks 追加到AOF 文件中
// 在AOF 持久化结束后会执行这个函数, backgroundRewriteDoneHandler() 主要工作是
// 将server.aof_rewrite_buf_blocks,即AOF 缓存写入文件
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
......
// 将AOF 缓存server.aof_rewrite_buf_blocks 的AOF 写入磁盘
if (aofRewriteBufferWrite(newfd) == -1) {
redisLog(REDIS_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s",
strerror(errno));
close(newfd);
goto cleanup;
}
......
}
// 将累积的更新缓存server.aof_rewrite_buf_blocks 同步到磁盘
/* Write the buffer (possibly composed of multiple blocks) into the specified
* fd. If no short write or any other error happens -1 is returned,
* otherwise the number of bytes written is returned. */
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
if (block->used) {
nwritten = write(fd,block->buf,block->used);
if (nwritten != block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}
2)边服务边备份的方式,即 Redis 服务器会把所有的数据变更存储在 server.aof_buf 中,并在特定时机将更新缓存写入预设定的文件(server.aof_filename)。特定时机有三种:
- 进入事件循环之前
- Redis 服务器定时程序 serverCron () 中
- 停止 AOF 策略的 stopAppendOnly () 中
Redis 无非是不想服务器突然崩溃终止,导致过多的数据丢失。Redis 默认是每隔固定时间进行一次边服务边备份,即隔固定时间将累积的变更的写入文件。
下面是边服务边执行 AOF 持久化的主要代码:
// 同步磁盘;将所有累积的更新server.aof_buf 写入磁盘
/* Write the append only file buffer on disk.
**
Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
**
About the ''force'' argument:
**
When the fsync policy is set to ''everysec'' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
**
However if force is set to 1 we''ll write regardless of the background
* fsync. */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
// 无数据,无需同步到磁盘
if (sdslen(server.aof_buf) == 0) return;
// 创建线程任务,主要调用fsync()
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
// 如果没有设置强制同步的选项,可能不会立即进行同步
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
// 推迟执行AOF
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
// 设置延迟冲洗时间选项
/* No previous write postponinig, remember that we are
* postponing the flush and return. */
// /* Unix time sampled every cron cycle. */
server.aof_flush_postponed_start = server.unixtime;
return;
// 没有超过2s,直接结束
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
// 否则,要强制写入磁盘
/* Otherwise fall trough, and go write since we can''t wait
* over two seconds. */
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk"
" is busy?). Writing the AOF buffer without waiting for fsync to "
"complete, this may slow down Redis.");
}
}
// 取消延迟冲洗时间设置
/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don''t think
* there is much to do about the whole server stopping for power problems
* or alike */
// AOF 文件已经打开了。将server.aof_buf 中的所有缓存数据写入文件
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {
/* Ooops, we are in troubles. The best thing to do for now is
* aborting instead of giving the illusion that everything is
* working as expected. */
if (nwritten == -1) {
redisLog(REDIS_WARNING,"Exiting on error writing to the append-only"
" file: %s",strerror(errno));
} else {
redisLog(REDIS_WARNING,"Exiting on short write while writing to "
"the append-only file: %s (nwritten=%ld, "
"expected=%ld)",
strerror(errno),
(long)nwritten,
(long)sdslen(server.aof_buf));
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
}
exit(1);
}
// 更新AOF 文件的大小
server.aof_current_size += nwritten;
// 当server.aof_buf 足够小, 重新利用空间,防止频繁的内存分配。
// 相反,当server.aof_buf 占据大量的空间,采取的策略是释放空间,可见redis
// 对内存很敏感。
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
/* Don''t fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
// sync, 写入磁盘
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let''s try to get this data on the disk */
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}
细说更新缓存
上面两次提到了「更新缓存」,它即是 Redis 累积的数据变更。
更新缓存可以存储在 server.aof_buf 中,可以存储在 server.server.aof_rewrite_buf_blocks 连表中。他们的关系是:每一次数据变更记录都会写入 server.aof_buf 中,同时如果后台子进程在持久化,变更记录还会被写入 server.server.aof_rewrite_buf_blocks 中。server.aof_buf 会在特定时期写入指定文件,server.server.aof_rewrite_buf_blocks 会在后台持久化结束后追加到文件。
Redis 源码中是这么实现的:propagrate ()->feedAppendOnlyFile ()->aofRewriteBufferAppend ()
注意,feedAppendOnlyFile () 会把更新添加到 server.aof_buf;接下来会有一个判断,如果存在 AOF 子进程,则调用 aofRewriteBufferAppend () 将 server.aof_buf 中的所有数据插入到 server.aof_rewrite_buf_blocks 链表。这样,就能够理解为什么在 AOF 持久化子进程结束后,父进程会将 server.aof_rewrite_buf_blocks 追加到 AOF 文件了。
// 向AOF 和从机发布数据更新
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
**
flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// AOF 策略需要打开,且设置AOF 传播标记,将更新发布给本地文件
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
// 设置了从机传播标记,将更新发布给从机
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
// 将数据更新记录到AOF 缓存中
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv,
int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appendend. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else {
/* All the other commands don''t need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
// 将生成的AOF 追加到server.aof_buf 中。server. 在下一次进入事件循环之前,
// aof_buf 中的内容将会写到磁盘上
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
// 如果已经有AOF 子进程运行,redis 采取的策略是累积子进程AOF 备份的数据和
// 内存中数据集的差异。aofRewriteBufferAppend() 把buf 的内容追加到
// server.aof_rewrite_buf_blocks 数组中
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
// 将数据更新记录写入server.aof_rewrite_buf_blocks,此函数只由
// feedAppendOnlyFile() 调用
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
// 尾插法
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
if (len) { /* First block to allocate, or need another block. */
int numblocks;
// 创建新的节点,插到尾部
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
// 尾插法
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
REDIS_NOTICE;
redisLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
}
一副可以缓解视力疲劳的图片 ——AOF 持久化运作机制:
两种数据落地的方式,就是 AOF 的两个主线。因此,redis AOF 持久化机制有两条主线:后台执行和边服务边备份,抓住这两点就能理解 redis AOF 了。
这里有一个疑问,两条主线都会涉及文件的写:后台执行会写一个 AOF 文件,边服务边备份也会写一个,以哪个为准?
后台持久化的数据首先会被写入 “temp-rewriteaof-bg-% d.aof”,其中 “% d” 是 AOF 子进程 id;待 AOF 子进程结束后,“temp-rewriteaof-bg-% d.aof” 会被以追加的方式打开,继而写入 server.aof_rewrite_buf_blocks 中的更新缓存,最后 “temp-rewriteaof-bg-% d.aof” 文件被命名为 server.aof_filename,所以之前的名为 server.aof_filename 的文件会被删除,也就是说边服务边备份写入的文件会被删除。边服务边备份的数据会被一直写入到 server.aof_filename 文件中。
因此,确实会产生两个文件,但是最后都会变成 server.aof_filename 文件。这里可能还有一个疑问,既然有了后台持久化,为什么还要边服务边备份?边服务边备份时间长了会产生数据冗余甚至备份过旧的数据,而后台持久化可以消除这些东西。看,这里是 Redis 的双保险。
AOF 恢复过程
AOF 的数据恢复过程设计很巧妙,它模拟一个 Redis 的服务过程。Redis 首先虚拟一个客户端,读取 AOF 文件恢复 Redis 命令和参数;接着过程就和服务客户端一样执行命令相应的函数,从而恢复数据,这样做的目的无非是提高代码的复用率。这些过程主要在 loadAppendOnlyFile () 中实现。
// 加载AOF 文件,恢复数据
/* Replay the append log file. On error REDIS_OK is returned. On non fatal
* error (the append only file is zero-length) REDIS_ERR is returned. On
* fatal error an error message is logged and the program exists. */
int loadAppendOnlyFile(char *filename) {
struct redisClient *fakeClient;
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
// 文件大小不能为0
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return REDIS_ERR;
}
if (fp == NULL) {
redisLog(REDIS_WARNING,"Fatal error: can''t open the append log file "
"for reading: %s",strerror(errno));
exit(1);
}
// 正在执行AOF 加载操作,于是暂时禁止AOF 的所有操作,以免混淆
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
* to the same file we''re about to read. */
server.aof_state = REDIS_AOF_OFF;
// 虚拟出一个客户端,即redisClient
fakeClient = createFakeClient();
startLoading(fp);
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;
// 每循环1000 次,在恢复数据的同时,服务器也为客户端服务。
// aeProcessEvents() 会进入事件循环
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
}
// 可能aof 文件到了结尾
if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp))
break;
else
goto readerr;
}
// 必须以“*”开头,格式不对,退出
if (buf[0] != ''*'') goto fmterr;
// 参数的个数
argc = atoi(buf+1);
// 参数个数错误
if (argc < 1) goto fmterr;
// 为参数分配空间
argv = zmalloc(sizeof(robj*)*argc);
// 依次读取参数
for (j = 0; j < argc; j++) {
if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
if (buf[0] != ''$'') goto fmterr;
len = strtol(buf+1,NULL,10);
argsds = sdsnewlen(NULL,len);
if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
argv[j] = createObject(REDIS_STRING,argsds);
if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
}
// 找到相应的命令
/* Command lookup */
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
redisLog(REDIS_WARNING,"Unknown command ''%s'' reading the "
"append only file", (char*)argv[0]->ptr);
exit(1);
}
// 执行命令,模拟服务客户端请求的过程,从而写入数据
/* Run the command in the context of a fake client */
fakeClient->argc = argc;
fakeClient->argv = argv;
cmd->proc(fakeClient);
/* The fake client should not have a reply */
redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply)
== 0);
/* The fake client should never get blocked */
redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
// 释放虚拟客户端空间
/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
for (j = 0; j < fakeClient->argc; j++)
decrRefCount(fakeClient->argv[j]);
zfree(fakeClient->argv);
}
/* This point can only be reached when EOF is reached without errors.
* If the client is in the middle of a MULTI/EXEC, log error and quit. */
if (fakeClient->flags & REDIS_MULTI) goto readerr;
// 清理工作
fclose(fp);
freeFakeClient(fakeClient);
// 恢复旧的AOF 状态
server.aof_state = old_aof_state;
stopLoading();
// 记录最近AOF 操作的文件大小
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
return REDIS_OK;
readerr:
// 错误,清理工作
if (feof(fp)) {
redisLog(REDIS_WARNING,"Unexpected end of file reading the append "
"only file");
} else {
redisLog(REDIS_WARNING,"Unrecoverable error reading the append only "
"file: %s", strerror(errno));
}
exit(1);
fmterr:
redisLog(REDIS_WARNING,"Bad file format reading the append only file: "
"make a backup of your AOF file, then use ./redis-check-aof --fix "
"<filename>");
exit(1);
}
AOF 的适用场景
如果对数据比较关心,分秒必争,可以用 AOF 持久化,而且 AOF 文件很容易进行分析。
Golang 实现 Redis(4): AOF 持久化与AOF重写
本文是使用 golang 实现 redis 系列的第四篇文章,将介绍如何使用 golang 实现 Append Only File 持久化及 AOF 文件重写。
本文完整源代码在作者GithubHDT3213/godis
AOF 文件
AOF 持久化是典型的异步任务,主协程(goroutine) 可以使用 channel 将数据发送到异步协程由异步协程执行持久化操作。
在 DB 中定义相关字段:
type DB struct {
// 主线程使用此channel将要持久化的命令发送到异步协程
aofChan chan *reply.MultiBulkReply
// append file 文件描述符
aofFile *os.File
// append file 路径
aofFilename string
// aof 重写需要的缓冲区,将在AOF重写一节详细介绍
aofRewriteChan chan *reply.MultiBulkReply
// 在必要的时候使用此字段暂停持久化操作
pausingAof sync.RWMutex
}
在进行持久化时需要注意两个细节:
- get 之类的读命令并不需要进行持久化
- expire 命令要用等效的 expireat 命令替换。举例说明,10:00 执行
expire a 3600
表示键 a 在 11:00 过期,在 10:30 载入AOF文件时执行expire a 3600
就成了 11:30 过期与原数据不符。
我们在命令处理方法中返回 AOF 需要的额外信息:
type extra struct {
// 表示该命令是否需要持久化
toPersist bool
// 如上文所述 expire 之类的命令不能直接持久化
// 若 specialAof == nil 则将命令原样持久化,否则持久化 specialAof 中的指令
specialAof []*reply.MultiBulkReply
}
type CmdFunc func(db *DB, args [][]byte) (redis.Reply, *extra)
以 SET 命令为例:
func Set(db *DB, args [][]byte) (redis.Reply, *extra) {
//....
var result int
switch policy {
case upsertPolicy:
result = db.Put(key, entity)
case insertPolicy:
result = db.PutIfAbsent(key, entity)
case updatePolicy:
result = db.PutIfExists(key, entity)
}
extra := &extra{toPersist: result > 0} // 若实际写入了数据则toPresist=true, 若因为XX或NX选项没有实际写入数据则toPresist=false
if result > 0 {
if ttl != unlimitedTTL { // 使用了 EX 或 NX 选项
expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
db.Expire(key, expireTime)
// 持久化时使用 set key value 和 pexpireat 命令代替 set key value EX ttl 命令
extra.specialAof = []*reply.MultiBulkReply{
reply.MakeMultiBulkReply([][]byte{
[]byte("SET"),
args[0],
args[1],
}),
makeExpireCmd(key, expireTime),
}
} else {
db.Persist(key) // override ttl
}
}
return &reply.OkReply{}, extra
}
var pExpireAtCmd = []byte("PEXPIREAT")
func makeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply {
args := make([][]byte, 3)
args[0] = pExpireAtCmd
args[1] = []byte(key)
args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10))
return reply.MakeMultiBulkReply(args)
}
在处理命令的调度方法中将 aof 命令发送到 channel:
func (db *DB) Exec(c redis.Client, args [][]byte) (result redis.Reply) {
// ....
// normal commands
var extra *extra
cmdFunc, ok := router[cmd] // 找到命令对应的处理函数
if !ok {
return reply.MakeErrReply("ERR unknown command ''" + cmd + "''")
}
// 使用处理函数执行命令
if len(args) > 1 {
result, extra = cmdFunc(db, args[1:])
} else {
result, extra = cmdFunc(db, [][]byte{})
}
// AOF 持久化
if config.Properties.AppendOnly {
if extra != nil && extra.toPersist {
// 写入 specialAof
if extra.specialAof != nil && len(extra.specialAof) > 0 {
for _, r := range extra.specialAof {
db.addAof(r)
}
} else {
// 写入原始命令
r := reply.MakeMultiBulkReply(args)
db.addAof(r)
}
}
}
return
}
在异步协程中写入命令:
func (db *DB) handleAof() {
for cmd := range db.aofChan {
// 异步协程在持久化之前会尝试获取锁,若其他协程持有锁则会暂停持久化操作
// 锁也保证了每次写入完整的一条指令不会格式错误
db.pausingAof.RLock()
if db.aofRewriteChan != nil {
db.aofRewriteChan <- cmd
}
_, err := db.aofFile.Write(cmd.ToBytes())
if err != nil {
logger.Warn(err)
}
db.pausingAof.RUnlock()
}
}
读取过程与协议解析器一节基本相同,不在正文中赘述:loadAof。
AOF 重写
若我们对键a赋值100次会在AOF文件中产生100条指令但只有最后一条指令是有效的,为了减少持久化文件的大小需要进行AOF重写以删除无用的指令。
重写必须在固定不变的数据集上进行,不能直接使用内存中的数据。Redis 重写的实现方式是进行 fork 并在子进程中遍历数据库内的数据重新生成AOF文件。由于 golang 不支持 fork 操作,我们只能采用读取AOF文件生成副本的方式来代替fork。
在进行AOF重写操作时需要满足两个要求:
- 若 AOF 重写失败或被中断,AOF 文件需保持重写之前的状态不能丢失数据
- 进行 AOF 重写期间执行的命令必须保存到新的AOF文件中, 不能丢失
因此我们设计了一套比较复杂的流程:
- 暂停AOF写入 -> 更改状态为重写中 -> 复制当前AOF文件 -> 恢复AOF写入
- 在重写过程中,持久化协程在将命令写入文件的同时也将其写入内存中的重写缓存区
- 重写协程读取AOF副本并将重写到临时文件(tmp.aof)中
- 暂停AOF写入 -> 将重写缓冲区中的命令写入tmp.aof -> 使用临时文件tmp.aof覆盖AOF文件(使用文件系统的mv命令保证安全)-> 清空重写缓冲区 -> 恢复AOF写入
在不阻塞在线服务的同时进行其它操作是一项必需的能力,AOF重写的思路在解决这类问题时具有重要的参考价值。比如Mysql Online DDL: gh-ost采用了类似的策略保证数据一致。
首先准备开始重写操作:
func (db *DB) startRewrite() (*os.File, error) {
// 暂停AOF写入, 数据会在 db.aofChan 中暂时堆积
db.pausingAof.Lock()
defer db.pausingAof.Unlock()
// 创建重写缓冲区
db.aofRewriteChan = make(chan *reply.MultiBulkReply, aofQueueSize)
// 创建临时文件
file, err := ioutil.TempFile("", "aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
return file, nil
}
在重写过程中,持久化协程进行双写:
func (db *DB) handleAof() {
for cmd := range db.aofChan {
db.pausingAof.RLock()
if db.aofRewriteChan != nil {
// 数据写入重写缓冲区
db.aofRewriteChan <- cmd
}
_, err := db.aofFile.Write(cmd.ToBytes())
if err != nil {
logger.Warn(err)
}
db.pausingAof.RUnlock()
}
}
执行重写:
func (db *DB) aofRewrite() {
file, err := db.startRewrite()
if err != nil {
logger.Warn(err)
return
}
// load aof file
tmpDB := &DB{
Data: dict.MakeSimple(),
TTLMap: dict.MakeSimple(),
Locker: lock.Make(lockerSize),
interval: 5 * time.Second,
aofFilename: db.aofFilename,
}
tmpDB.loadAof()
// rewrite aof file
tmpDB.Data.ForEach(func(key string, raw interface{}) bool {
var cmd *reply.MultiBulkReply
entity, _ := raw.(*DataEntity)
switch val := entity.Data.(type) {
case []byte:
cmd = persistString(key, val)
case *List.LinkedList:
cmd = persistList(key, val)
case *set.Set:
cmd = persistSet(key, val)
case dict.Dict:
cmd = persistHash(key, val)
case *SortedSet.SortedSet:
cmd = persistZSet(key, val)
}
if cmd != nil {
_, _ = file.Write(cmd.ToBytes())
}
return true
})
tmpDB.TTLMap.ForEach(func(key string, raw interface{}) bool {
expireTime, _ := raw.(time.Time)
cmd := makeExpireCmd(key, expireTime)
if cmd != nil {
_, _ = file.Write(cmd.ToBytes())
}
return true
})
db.finishRewrite(file)
}
重写完毕后写入缓冲区中的数据并替换正式文件:
func (db *DB) finishRewrite(tmpFile *os.File) {
// 暂停AOF写入
db.pausingAof.Lock()
defer db.pausingAof.Unlock()
// 将重写缓冲区内的数据写入临时文件
// 因为handleAof已被暂停,在遍历期间aofRewriteChan中不会有新数据
loop:
for {
select {
case cmd := <-db.aofRewriteChan:
_, err := tmpFile.Write(cmd.ToBytes())
if err != nil {
logger.Warn(err)
}
default:
// 只有 channel 为空时才会进入此分支
break loop
}
}
// 释放重写缓冲区
close(db.aofRewriteChan)
db.aofRewriteChan = nil
// 使用临时文件代替aof文件
_ = db.aofFile.Close()
_ = os.Rename(tmpFile.Name(), db.aofFilename)
// 重新打开文件描述符以保证正常写入
aofFile, err := os.OpenFile(db.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
db.aofFile = aofFile
}
redis AOF 持久化
除了 RDB 持久化功能之外,Redis 还提供了 AOF (Append Only File) 持久化功能。与 RDB 持久化通过保存数据库中的键值对来记录数据库状态不同,AOF 持久化是通过保存 Redis 服务器所执行的写命令来记录数据库状态的
例如 set msg hello,RDB 持久化保存数据库状态的方法是将 msg 的键值对保存到 RDB 文件中,而 AOF 持久化保存数据库状态的方法则是将服务器执行的 set 命令保存到 AOF 文件中
被写入 AOF 文件的所有命令都是以 Redis 的命令请求协议格式保存的,因为 Redis 的命令请求协议是纯文本格式。服务器在启动时,可以通过载入和执行 AOF 文件中保存的命令来还原服务器关闭之前的数据库状态。
Redis 的服务器进程就是一个时间循环,这个循环中的文件事件负责接收客户端的命令请求,以及向客户端发送命令回复,而时间事件则负责执行像 serverCron 函数这样需要定时运行的函数。因为服务器在处理文件事件时可能会执行写命令,使得一些内容被追加到 aof_buf 缓冲区里面,所以在服务器每次结束一个事件循环之前,它都会调用 flushAppendOnlyFile 函数
考虑是否需要将 aof_buf 缓冲区中的内容写入和保存到 AOF 文件里面。
AOF 文件的载入与数据还原
因为 AOF 文件里面包含了重建数据库状态所需要的所有写命令,所以服务器只要读入并重新执行一遍 AOF 文件里面保存的写命令,就可以还原服务器关闭之前的数据库状态
Redis 读取 AOF 文件并还原数据库状态的详细步骤如图:
AOF 重写
随着服务器运行时间的流逝,AOF 文件中的内容会越来越多,为了解决 AOF 文件体积膨胀的问题,Redis 提供了 AOF 文件重写 (rewite) 功能。通过该功能,Redis 服务器可以创建一个新的 AOF 文件来替代现有的 AOF 文件,新旧两个 AOF 文件所保存的数据库状态相同,但新 AOF 文件不会包含任何浪费空间的冗余命令,所以新 AOF 文件的体积通常会比旧 AOF 文件的体积要小得多。
Redis AOF 重写程序放到子进程里执行,这样做可以达到两个目的
1:子进程进行 AOF 重写期间,服务器进程(父进程)可以继续处理命令请求
2:子进程带有服务器进程的数据副本,使用子进程而不是线程,可以避免使用锁的情况下,保证数据的安全性
不过使用子进程也有一个问题需要解决,因为子进程在进行 AOF 重写期间,服务器进程还需要继续处理命令请求,而新的命令可能会对现有的数据库状态进行修改,从而使得服务器当前的数据库状态和重写后的 aof 文件所保存的数据库状态不一致。,为了解决这种数据不一致问题,Redis 服务器设置了一个 AOF 重写缓冲区,这个缓冲区在服务器创建子进程之后开始使用,当 Redis 服务器执行完一个写命令后,它会同时将这个写命令发送给 AOF 缓冲区和 AOF 重写缓冲区
当子进程完成 AOF 重写工作之后它会向父进程发送一个信号,父进程在接到该信号之后,会调用一个信号处理函数,执行以下工作
1:将 AOF 重写缓冲区中的所有内容写入到新 AOF 文件中,这时新 AOF 文件所保存的数据库状态将和服务器当前的数据库状态一致
2:对新的 AOF 文件进行改名,原子地覆盖现有的 AOF 文件,完 t 成新旧两个 AOF 文件的替换
这给信号处理函数执行完毕后,父进程就可以继续像往常一样接受命令请求了,整个 AOF 后台重写过程中,只有信号处理函数执行时会对服务器进程造成阻塞,在其他时候 AOF 后台重写都不会阻塞父进程,这将 AOF 重写对服务器性能造成的影响降到最低。
Redis RESP 协议与 AOF 持久化有什么关系?(Redis持久化原理)
现在就来看一下 AOF 和 RESP 协议的关系
- 从两种持久化方式说起。
- RESP 协议是什么
- 动手实现一个简单的协议解析命令行工具
先从持久化说起,虽然一提到 Redis,首先想到的就是缓存,但是 Redis 不仅仅是缓存这么简单,它的定位是内存型数据库,可以存储多种类型的数据结构,还可以当做简单消息队列使用。既然是数据库,持久化功能是必不可少的。
Redis 的两种持久化方式
Redis 提供了两种持久化方式,一种是 RDB 方式,另外一种是 AOF 方式,AOF 是目前比较流行的持久化方案。
RDB 方式
RDB持久化是通过快照的方式,在指定的时间间隔内将内存中的数据集快照写入磁盘。它以一种紧凑压缩的二进制文件的形式出现。可以将快照复制到其他服务器以创建相同数据的服务器副本,或者在重启服务器后恢复数据。RDB是Redis默认的持久化方式,也是早期版本的必须方案。
RDB 由下面几个参数控制。
# 设置 dump 的文件名
dbfilename dump.rdb
# 持久化文件的存储目录
dir ./
# 900秒内,如果至少有1个key发生变化,就会自动触发bgsave命令创建快照
save 900 1
# 300秒内,如果至少有10个key发生变化,就会自动触发bgsave命令创建快照
save 300 10
# 60秒内,如果至少有10000个key发生变化,就会自动触发bgsave命令创建快照
save 60 10000
复制代码
持久化流程
上面说到了配置文件中的几个触发持久化的机制,比如 900 秒、300秒、60秒,当然也可以手动执行命令 save
或bgsave
进行触发。bgsave
是非阻塞版本,通过 fork 出子进程的方式来进行快照生成,而 save
会阻塞主进程,不建议使用。
1、首先 bgsave
命令触发;
2、父进程 fork 出一个子进程,这一步是比较重量级的操作,也是 RDB 方式性能不及 AOF 的一个重要原因;
3、父进程 fork 出子进程后就可以正常的相应客户端发来的其他命令了;
4、子进程开始进行持久化工作,对现有数据进行完整的快照存储;
5、子进程完成操作后,通知父进程;
RDB的优点:
-
RDB是一个紧凑压缩的二进制文件,代表Redis在某个时间点上的数据 快照。非常适用于备份,全量复制等场景。比如每6小时执行bgsave备份, 并把RDB文件拷贝到远程机器或者文件系统中(如hdfs),用于灾难恢复。
-
Redis加载RDB恢复数据远远快于AOF的方式。
RDB的缺点:
-
RDB方式数据没办法做到实时持久化/秒级持久化。因为bgsave每次运 行都要执行fork操作创建子进程,属于重量级操作,频繁执行成本过高。
-
RDB文件使用特定二进制格式保存,Redis版本演进过程中有多个格式 的RDB版本,存在老版本Redis服务无法兼容新版RDB格式的问题。
AOF 方式
AOF 由下面几个参数控制。
# appendonly参数开启AOF持久化
appendonly yes
# AOF持久化的文件名,默认是appendonly.aof
appendfilename "appendonly.aof"
# AOF文件的保存位置和RDB文件的位置相同,都是通过dir参数设置的
dir ./
# 同步策略
# appendfsync always
appendfsync everysec
# appendfsync no
# aof重写期间是否同步
no-appendfsync-on-rewrite no
# 重写触发配置
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
# 加载aof出错如何处理
aof-load-truncated yes
# 文件重写策略
aof-rewrite-incremental-fsync yes
复制代码
针对RDB不适合实时持久化的问题,Redis提供了AOF 持久化方式来解决,AOF 也是目前最流程的持久化方式。
AOF(append only file),以独立日志的方式记录每次写命令, 重启时再重新执行AOF文件中的命令达到恢复数据的目的。
1、所有的写入命令会追加到aof_buf(缓冲区)中;
2、AOF缓冲区根据对应的策略向硬盘做同步操作;
3、随着AOF文件越来越大,需要定期对AOF文件进行重写,达到压缩的目的;
4、当Redis服务器重启时,可以加载AOF文件进行数据恢复;
AOF 文件里存的是什么
我在本地的测试 redis 环境中随便刷了几条命令,然后打开 appendonly.aof 文件查看,发现里面的内容像下面这样子。
RESP 协议
Redis客户端与服务端通信,使用 RESP 协议通信,该协议是专门为 Redis 设计的通信协议,但也可以用于其它客户端-服务器通信的场景。
RESP 协议有如下几个特点:
-
实现简单;
-
快速解析;
-
可阅读;
客户端发送命令给服务端,服务端拿到命令后进行解析,然后执行对应的逻辑,之后返回给客户端,当然了,这一发一回复都是用的 RESP 协议特点的格式。
一般情况下我们会使用 redis-cli
或者一些客户端工具连接 Redis 服务端。
./redis-cli
复制代码
然后整个交互过程的命令发送和返回结果像下面这样,绿色部分为发送的命令,红色部分为返回的结果。
这就是我们再熟悉不过的部分了。但是,这并不能看出 RESP 协议的真实面貌。
用 telnet 试试
RESP 是基于 TCP 协议实现的,所以除了用各种客户端工具以及 Redis 提供的 redis-cli
工具,还可以用 telnet 查看,用 telnet 就可以看出 RESP 返回的原始数据格式了。
我本地的 Redis 是用的默认 6379 端口,并且没有设置 requirepass ,我们来试一下用 telnet 连接。
telnet 127.0.0.1 6379
复制代码
然后执行与前面相同的几条命令,发送和返回的结果如下,绿色部分为发送的命令,红色为返回的结果。
怎么样,有些命令的返回还好,但是像get str:hello
这条,返回的结果除了 world
值本身,上面还多了一行 $5
,是不是有点迷糊了。
协议规则
请求命令
一条客户端发往服务器的命令的规则如下:
*<参数数量> CR LF
$<参数 1 的字节数量> CR LF
<参数 1 的数据> CR LF
...
$<参数 N 的字节数量> CR LF
<参数 N 的数据> CR LF
复制代码
RESP 用\r\n
作为分隔符,会表明此条命令的具体参数个数,在命令上看来,空格分隔的都表示一个参数,例如 set str:hello world
这条命令就是3个参数,会表明每个参数的字符数和具体内容。
用这条命令举例,对应到 RESP 协议规则上就会变成下面这个样子:
*3\r\n$3\r\nset\r\n$9str:hello\r\n$5world\r\n
复制代码
服务端回复
Redis 命令会返回多种不同类型的回复。
通过检查服务器发回数据的第一个字节, 可以确定这个回复是什么类型:
1、状态回复(status reply)的第一个字节是 "+"
比如 ping
命令的回复,+PONG\r\n
2、错误回复(error reply)的第一个字节是 "-"
比如输入一个 redis 中不存在的命令,或者给某些命令设置错误的参数,例如输入 auth
,auth 命令后面需要有一个密码参数的,如果不输入就会返回错误回复类型。
-ERR wrong number of arguments for ''auth'' command\r\n
3、整数回复(integer reply)的第一个字节是 ":"
例如 INCR
、DECR
自增自减命令,返回的结果是这样的 :2\r\n
4、批量回复(bulk reply)的第一个字节是 "$"
例如对 string 类型执行 get 操作,$5\r\nworld\r\n
,$
后面的数字 5 表示返回的结果有 5 个字符,后面是返回结果的实际内容。
5、多条批量回复(multi bulk reply)的第一个字节是 "*"
例如 LRANGE key start stop
或者 hgetall
等返回多条结果的命令,比如 lrange
命令返回的结果:
*2\r\n$6\r\nnews-2\r\n$6\r\nnews-1\r\n
复制代码
多条批量回复和前面说的客户端发送命令的格式是一致的。
实现一个简单的 Redis 交互工具
了解了 Redis 的协议规则,我们就可以自己写一个简单的客户端了。当然,通过官网我们可以看到已经有各种语言,而且每种语言有不止一个客户端工具了。
比如 Java 语言的客户端就有这么多种,其中 Jedis 应该是用的最多了,既然已经有这么好用的轮子了,当然没必要重复造轮子,主要还是为了加深印象。
RESP 协议基于 TCP 协议,可以使用 socket 方式进行连接。
public Socket createSocket() throws IOException {
Socket socket = null;
try {
socket = new Socket();
socket.setReuseAddress(true);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
socket.setSoLinger(true, 0);
socket.connect(new InetSocketAddress(host, port), DEFAULT_TIMEOUT);
socket.setSoTimeout(DEFAULT_TIMEOUT);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
return socket;
} catch (Exception ex) {
if (socket != null) {
socket.close();
}
throw ex;
}
}
复制代码
然后剩下的就是对返回的结果进行字符串的解析了,我做的工具就到简陋的到这一步了,下面是一些简单命令的返回输出。
关于深入剖析 redis AOF 持久化策略和redis aof持久化配置的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于AOF 持久化策略、Golang 实现 Redis(4): AOF 持久化与AOF重写、redis AOF 持久化、Redis RESP 协议与 AOF 持久化有什么关系?(Redis持久化原理)等相关知识的信息别忘了在本站进行查找喔。
本文标签: