PostgreSQL中StartLogStreamer分析

发布时间:2021-11-09 15:15:44 作者:iii
来源:亿速云 阅读:147

本篇内容主要讲解“PostgreSQL中StartLogStreamer分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中StartLogStreamer分析”吧!

本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的BaseBackup中对WAL数据进行备份的实现函数StartLogStreamer.

一、数据结构

logstreamer_param
WAL data streamer参数.

typedef struct
{
     ////后台连接
    PGconn     *bgconn;
    //开始位置
    XLogRecPtr  startptr;
    //目录或者tar文件,依赖于使用的模式
    char        xlog[MAXPGPATH];    /* directory or tarfile depending on mode */
    //系统标识符
    char       *sysidentifier;
    //时间线
    int         timeline;
} logstreamer_param;

StreamCtl
接收xlog流数据时的全局参数

/*
 * Global parameters when receiving xlog stream. For details about the individual fields,
 * see the function comment for ReceiveXlogStream().
 * 接收xlog流数据时的全局参数.
 * 每个域字段的详细解释,参见ReceiveXlogStream()函数注释.
 */
typedef struct StreamCtl
{
    //streaming的开始位置
    XLogRecPtr  startpos;       /* Start position for streaming */
    //时间线
    TimeLineID  timeline;       /* Timeline to stream data from */
    //系统标识符
    char       *sysidentifier;  /* Validate this system identifier and
                                 * timeline */
    //standby超时信息
    int         standby_message_timeout;    /* Send status messages this often */
    //是否同步(写入时是否马上Flush WAL data)
    bool        synchronous;    /* Flush immediately WAL data on write */
    //在已归档的数据中标记segment为已完成
    bool        mark_done;      /* Mark segment as done in generated archive */
    //刷新到磁盘上以确保数据的一致性状态(是否已刷新到磁盘上)
    bool        do_sync;        /* Flush to disk to ensure consistent state of
                                 * data */
    //在返回T时停止streaming
    stream_stop_callback stream_stop;   /* Stop streaming when returns true */
    //如有效,监测该socket中的输入并检查stream_stop()的返回
    pgsocket    stop_socket;    /* if valid, watch for input on this socket
                                 * and check stream_stop() when there is any */
    //如何写WAL
    WalWriteMethod *walmethod;  /* How to write the WAL */
    //附加到部分接受文件的后缀
    char       *partial_suffix; /* Suffix appended to partially received files */
    //使用的replication slot,如无则为NULL
    char       *replication_slot;   /* Replication slot to use, or NULL */
} StreamCtl;

二、源码解读

StartLogStreamer
StartLogStreamer用于在备份时初始化后台进程用于接收WAL.接收进程将创建自己的数据库连接以并行的方式对文件进行streaming复制.

/*
 * Initiate background process for receiving xlog during the backup.
 * The background stream will use its own database connection so we can
 * stream the logfile in parallel with the backups.
 * 在备份时初始化后台进程用于接收WAL.
 * 后台stream进程将用自己的数据库连接以使以并行的方式stream文件.
 */
static void
StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
{
    //参数
    logstreamer_param *param;
    uint32      hi,
                lo;//高位/低位
    char        statusdir[MAXPGPATH];
    param = pg_malloc0(sizeof(logstreamer_param));
    param->timeline = timeline;
    param->sysidentifier = sysidentifier;
    /* Convert the starting position */
    //转换开始位置(高低位转换)
    if (sscanf(startpos, "%X/%X", &hi, &lo) != 2)
    {
        fprintf(stderr,
                _("%s: could not parse write-ahead log location \"%s\"\n"),
                progname, startpos);
        exit(1);
    }
    //开始位置,转换为64bit的地址
    param->startptr = ((uint64) hi) << 32 | lo;
    /* Round off to even segment position */
    //按segment取整
    param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz);
#ifndef WIN32
    //WIN32使用的代码
    /* Create our background pipe */
    if (pipe(bgpipe) < 0)
    {
        fprintf(stderr,
                _("%s: could not create pipe for background process: %s\n"),
                progname, strerror(errno));
        exit(1);
    }
#endif
    /* Get a second connection */
    //获取第二个连接
    param->bgconn = GetConnection();
    if (!param->bgconn)
        /* Error message already written in GetConnection() */
        exit(1);
    /* In post-10 cluster, pg_xlog has been renamed to pg_wal */
    //在PG 10,pg_xlog已命名为pg_wal
    snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
             basedir,
             PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
             "pg_xlog" : "pg_wal");
    /* Temporary replication slots are only supported in 10 and newer */
    //临时复制slots只在PG10+支持
    if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)
        temp_replication_slot = false;
    /*
     * Create replication slot if requested
     * 如要求,则创建复制slot
     */
    //static char *replication_slot = NULL;
    //static bool temp_replication_slot = true;
    if (temp_replication_slot && !replication_slot)
        //创建replication slot
        replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
    if (temp_replication_slot || create_slot)
    {
        //创建replication slot
        if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
                                   temp_replication_slot, true, true, false))
            exit(1);
        if (verbose)
        {
            //显示诊断信息
            if (temp_replication_slot)
                fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),
                        progname, replication_slot);
            else
                fprintf(stderr, _("%s: created replication slot \"%s\"\n"),
                        progname, replication_slot);
        }
    }
    if (format == 'p')
    {
        /*
         * Create pg_wal/archive_status or pg_xlog/archive_status (and thus
         * pg_wal or pg_xlog) depending on the target server so we can write
         * to basedir/pg_wal or basedir/pg_xlog as the directory entry in the
         * tar file may arrive later.
         * 基于目标服务器创建pg_wal/archive_status或pg_xlog/archive_status,
         * 这样可以写入到basedir/pg_wal 货 basedir/pg_xlog,可作为后续访问的tar文件目录条目
         */
        snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
                 basedir,
                 PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
                 "pg_xlog" : "pg_wal");
        if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST)
        {
            fprintf(stderr,
                    _("%s: could not create directory \"%s\": %s\n"),
                    progname, statusdir, strerror(errno));
            exit(1);
        }
    }
    /*
     * Start a child process and tell it to start streaming. On Unix, this is
     * a fork(). On Windows, we create a thread.
     * 启动子进程开始streaming.
     * 在UNIX平台,是一个fork进程,在Windows平台,创建线程.
     */
#ifndef WIN32
    //UNIX:fork进程
    bgchild = fork();
    if (bgchild == 0)
    {
        //这是子进程,返回0
        /* in child process */
        //启动新进程
        exit(LogStreamerMain(param));
    }
    else if (bgchild < 0)
    {
        fprintf(stderr, _("%s: could not create background process: %s\n"),
                progname, strerror(errno));
        exit(1);
    }
    /*
     * Else we are in the parent process and all is well.
     * 在父进程中,返回的bgchild是子进程PID.
     */
    atexit(kill_bgchild_atexit);
#else                           /* WIN32 */
    //WIN32:创建线程
    bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
    if (bgchild == 0)
    {
        fprintf(stderr, _("%s: could not create background thread: %s\n"),
                progname, strerror(errno));
        exit(1);
    }
#endif
}

LogStreamerMain
WAL流复制主函数,用于fork后的子进程调用

static int
LogStreamerMain(logstreamer_param *param)
{
    StreamCtl   stream;//接收xlog流数据时的全局参数
    in_log_streamer = true;
    //初始化StreamCtl结构体
    MemSet(&stream, 0, sizeof(stream));
    stream.startpos = param->startptr;
    stream.timeline = param->timeline;
    stream.sysidentifier = param->sysidentifier;
    stream.stream_stop = reached_end_position;
#ifndef WIN32
    stream.stop_socket = bgpipe[0];
#else
    stream.stop_socket = PGINVALID_SOCKET;
#endif
    stream.standby_message_timeout = standby_message_timeout;
    stream.synchronous = false;
    stream.do_sync = do_sync;
    stream.mark_done = true;
    stream.partial_suffix = NULL;
    stream.replication_slot = replication_slot;
    if (format == 'p')
        stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
    else
        stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
    //接收数据
    if (!ReceiveXlogStream(param->bgconn, &stream))
        /*
         * Any errors will already have been reported in the function process,
         * but we need to tell the parent that we didn't shutdown in a nice
         * way.
         * 在函数执行过程中出现的错误已通过警告的方式发出,
         * 但仍需要告知父进程不能优雅的关闭本进程.
         */
        return 1;
    if (!stream.walmethod->finish())
    {
        fprintf(stderr,
                _("%s: could not finish writing WAL files: %s\n"),
                progname, strerror(errno));
        return 1;
    }
    //结束连接
    PQfinish(param->bgconn);
    //普通文件格式
    if (format == 'p')
        FreeWalDirectoryMethod();
    else
        FreeWalTarMethod();
    //是否内存
    pg_free(stream.walmethod);
    return 0;
}

三、跟踪分析

备份命令

pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

启动gdb跟踪

[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-110.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>...
Reading symbols from /appdb/atlasdb/pg11.2/bin/pg_basebackup...done.
(gdb) b StartLogStreamer
Breakpoint 1 at 0x403e6b: file pg_basebackup.c, line 555.
(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
(gdb) r
Starting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Password: 
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/57000060 on timeline 16
pg_basebackup: starting background WAL receiver
Breakpoint 1, StartLogStreamer (startpos=0x7fffffffdf60 "0/57000060", timeline=16, 
    sysidentifier=0x61f1a0 "6666964067616600474") at pg_basebackup.c:555
555     param = pg_malloc0(sizeof(logstreamer_param));
(gdb)

输入参数
startpos=0x7fffffffdf60 “0/57000060”,
timeline=16,
sysidentifier=0x61f1a0 “6666964067616600474”
构造参数

(gdb) n
556     param->timeline = timeline;
(gdb) 
557     param->sysidentifier = sysidentifier;
(gdb) 
560     if (sscanf(startpos, "%X/%X", &hi, &lo) != 2)
(gdb) 
567     param->startptr = ((uint64) hi) << 32 | lo;
(gdb) p hi
$1 = 0
(gdb) p lo
$2 = 1459617888
(gdb) n
569     param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz);
(gdb) n
573     if (pipe(bgpipe) < 0)
(gdb) p *param
$3 = {bgconn = 0x0, startptr = 1459617792, xlog = '\000' <repeats 1023 times>, 
  sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb)

建立连接,创建replication slot

(gdb) n
583     param->bgconn = GetConnection();
(gdb) 
584     if (!param->bgconn)
(gdb) 
591              PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
(gdb) 
589     snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
(gdb) 
595     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)
(gdb) 
601     if (temp_replication_slot && !replication_slot)
(gdb) 
602         replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
(gdb) 
603     if (temp_replication_slot || create_slot)
(gdb) 
605         if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
(gdb) 
609         if (verbose)
(gdb) 
611             if (temp_replication_slot)
(gdb) 
612                 fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),
(gdb) 
pg_basebackup: created temporary replication slot "pg_basebackup_59378"
620     if (format == 'p')
(gdb) 
(gdb) n
630                  PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
(gdb) 
628         snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",

创建备份目录

(gdb) 
633         if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST)
(gdb) p *param
$4 = {bgconn = 0x62a280, startptr = 1459617792, xlog = "/data/backup/pg_wal", '\000' <repeats 1004 times>, 
  sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb) n
647     bgchild = fork();
(gdb) 
#############
[xdb@localhost backup]$ ls
pg_wal

fork进程,父进程返回子进程的PID

(gdb) n
647     bgchild = fork();
(gdb) n
Detaching after fork from child process 43001.
648     if (bgchild == 0)
(gdb) p bgchild
$5 = 43001
(gdb)

子进程(PID=43001)

[xdb@localhost backup]$ ps -ef|grep 43001
xdb      43001 42820  1 11:54 pts/1    00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[xdb@localhost backup]$ ps -ef|grep 192.168.26.25
xdb      42820 42756  0 11:48 pts/1    00:00:00 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
xdb      43001 42820  0 11:54 pts/1    00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

完成调用

(gdb) n
653     else if (bgchild < 0)
(gdb) 
672 }
(gdb) 
BaseBackup () at pg_basebackup.c:1937
1937        for (i = 0; i < PQntuples(res); i++)
(gdb)

pg_wal目录中的数据

[xdb@localhost backup]$ ls -l ./pg_wal/
total 16388
-rw-------. 1 xdb xdb 16777216 Mar 18 11:54 000000100000000000000057
-rw-------. 1 xdb xdb      217 Mar 18 11:54 00000010.history
drwx------. 2 xdb xdb       35 Mar 18 11:54 archive_status
[xdb@localhost backup]$

到此,相信大家对“PostgreSQL中StartLogStreamer分析”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. 分析PostgreSQL中的distinct和group by
  2. 分析PostgreSQL CreateFunction中的ProcedureCreate函数

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

startlogstreamer postgresql

上一篇:数据库集合相关命令有哪些

下一篇:Django中的unittest应用是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》