RocketMQ DLedger种日志复制的实现方法

发布时间:2021-06-24 14:15:10 作者:chen
来源:亿速云 阅读:305

这篇文章主要介绍“RocketMQ DLedger种日志复制的实现方法”,在日常操作中,相信很多人在RocketMQ DLedger种日志复制的实现方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ DLedger种日志复制的实现方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

> 温馨提示:由于本篇幅较长,为了更好的理解其实现,大家可以带着如下疑问来通读本篇文章: 1、raft 协议中有一个非常重要的概念:已提交日志序号,该如何实现。 2、客户端向 DLedger 集群发送一条日志,必须得到集群中大多数节点的认可才能被认为写入成功。 3、raft 协议中追加、提交两个动作如何实现。

日志复制(日志转发)由 DLedgerEntryPusher 实现,具体类图如下: RocketMQ DLedger种日志复制的实现方法

主要由如下4个类构成:

接下来我们将详细介绍上述4个类,从而揭晓日志复制的核心实现原理。

1、DLedgerEntryPusher

1.1 核心类图

RocketMQ DLedger种日志复制的实现方法

DLedger 多副本日志推送的核心实现类,里面会创建 EntryDispatcher、QuorumAckChecker、EntryHandler 三个核心线程。其核心属性如下:

接下来介绍一下其核心方法的实现。

1.2 构造方法

public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,
    DLedgerRpcService dLedgerRpcService) {
    this.dLedgerConfig = dLedgerConfig;
    this.memberState = memberState;
    this.dLedgerStore = dLedgerStore;
    this.dLedgerRpcService = dLedgerRpcService;
    for (String peer : memberState.getPeerMap().keySet()) {
        if (!peer.equals(memberState.getSelfId())) {
            dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
        }
    }
}

构造方法的重点是会根据集群内的节点,依次构建对应的 EntryDispatcher 对象。

1.3 startup

DLedgerEntryPusher#startup

public void startup() {
    entryHandler.start();
    quorumAckChecker.start();
    for (EntryDispatcher dispatcher : dispatcherMap.values()) {
        dispatcher.start();
    }
}

依次启动 EntryHandler、QuorumAckChecker 与 EntryDispatcher 线程。

> 备注:DLedgerEntryPusher 的其他核心方法在详细分析其日志复制原理的过程中会一一介绍。

接下来将从 EntryDispatcher、QuorumAckChecker、EntryHandler 来阐述 RocketMQ DLedger(多副本)的实现原理。

2、EntryDispatcher 详解

2.1 核心类图

RocketMQ DLedger种日志复制的实现方法

其核心属性如下。

2.2 Push 请求类型

DLedger 主节点向从从节点复制日志总共定义了4类请求类型,其枚举类型为 PushEntryRequest.Type,其值分别为 COMPARE、TRUNCATE、APPEND、COMMIT。

对主从节点的请求类型有了一个初步的认识后,我们将从 EntryDispatcher 的业务处理入口 doWork 方法开始讲解。

2.3 doWork 方法详解

public void doWork() {
    try {
        if (!checkAndFreshState()) {                                            // [@1](https://my.oschina.net/u/1198)
            waitForRunning(1);
            return;
        }

        if (type.get() == PushEntryRequest.Type.APPEND) {   // @2
            doAppend();
        } else {
            doCompare();                                                           // [@3](https://my.oschina.net/u/2648711)
        }
        waitForRunning(1);
    } catch (Throwable t) {
        DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);
        DLedgerUtils.sleep(500);
    }
}

代码@1:检查状态,是否可以继续发送 append 或 compare。

代码@2:如果推送类型为APPEND,主节点向从节点传播消息请求。

代码@3:主节点向从节点发送对比数据差异请求(当一个新节点被选举成为主节点时,往往这是第一步)。

2.3.1 checkAndFreshState 详解

EntryDispatcher#checkAndFreshState

private boolean checkAndFreshState() {
    if (!memberState.isLeader()) {     // @1
        return false;
    }
    if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) {     // @2
        synchronized (memberState) {
            if (!memberState.isLeader()) {
                return false;
            }
            PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
            term = memberState.currTerm();
            leaderId = memberState.getSelfId();
            changeState(-1, PushEntryRequest.Type.COMPARE);
        }
    }
    return true;
}

代码@1:如果节点的状态不是主节点,则直接返回 false。则结束 本次 doWork 方法。因为只有主节点才需要向从节点转发日志。

代码@2:如果当前节点状态是主节点,但当前的投票轮次与状态机轮次或 leaderId 还未设置,或 leaderId 与状态机的 leaderId 不相等,这种情况通常是集群触发了重新选举,设置其term、leaderId与状态机同步,即将发送COMPARE 请求。

接下来看一下 changeState (改变状态)。

private synchronized void changeState(long index, PushEntryRequest.Type target) {
    logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);
    switch (target) {
        case APPEND:      // @1
            compareIndex = -1;
            updatePeerWaterMark(term, peerId, index);
            quorumAckChecker.wakeup();
            writeIndex = index + 1;
            break;
        case COMPARE:    // @2
            if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {
                compareIndex = -1;
                pendingMap.clear();
            }
            break;
        case TRUNCATE:     // @3
            compareIndex = -1;
            break;
        default:
            break;
    }
    type.set(target);
}

代码@1:如果将目标类型设置为 append,则重置 compareIndex ,并设置 writeIndex 为当前 index 加1。

代码@2:如果将目标类型设置为 COMPARE,则重置 compareIndex 为负一,接下将向各个从节点发送 COMPARE 请求类似,并清除已挂起的请求。

代码@3:如果将目标类型设置为 TRUNCATE,则重置 compareIndex 为负一。

接下来具体来看一下 APPEND、COMPARE、TRUNCATE 等请求。

2.3.2 append 请求详解

EntryDispatcher#doAppend

private void doAppend() throws Exception {
    while (true) {
        if (!checkAndFreshState()) {                                                 // @1
            break;
        }
        if (type.get() != PushEntryRequest.Type.APPEND) {        // @2
            break;
        }
        if (writeIndex > dLedgerStore.getLedgerEndIndex()) {    // @3
            doCommit();
            doCheckAppendResponse();
            break;
        }
        if (pendingMap.size() >= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000)) {     // @4
            long peerWaterMark = getPeerWaterMark(term, peerId);
            for (Long index : pendingMap.keySet()) {
                if (index < peerWaterMark) {
                    pendingMap.remove(index);
                }
            }
            lastCheckLeakTimeMs = System.currentTimeMillis();
        }
        if (pendingMap.size() >= maxPendingSize) {    // @5
            doCheckAppendResponse();
            break;
        }
        doAppendInner(writeIndex);                               // @6
        writeIndex++;
    }
}

代码@1:检查状态,已经在上面详细介绍。

代码@2:如果请求类型不为 APPEND,则退出,结束本轮 doWork 方法执行。

代码@3:writeIndex 表示当前追加到从该节点的序号,通常情况下主节点向从节点发送 append 请求时,会附带主节点的已提交指针,但如何 append 请求发不那么频繁,writeIndex 大于 leaderEndIndex 时(由于pending请求超过其 pending 请求的队列长度(默认为1w),时,会阻止数据的追加,此时有可能出现 writeIndex 大于 leaderEndIndex 的情况,此时单独发送 COMMIT 请求。

代码@4:检测 pendingMap(挂起的请求数量)是否发送泄漏,即挂起队列中容量是否超过允许的最大挂起阀值。获取当前节点关于本轮次的当前水位线(已成功 append 请求的日志序号),如果发现正在挂起请求的日志序号小于水位线,则丢弃。

代码@5:如果挂起的请求(等待从节点追加结果)大于 maxPendingSize 时,检查并追加一次 append 请求。

代码@6:具体的追加请求。

2.3.2.1 doCommit 发送提交请求

EntryDispatcher#doCommit

private void doCommit() throws Exception {
    if (DLedgerUtils.elapsed(lastPushCommitTimeMs) > 1000) {   // @1
        PushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT);   // @2
        //Ignore the results
        dLedgerRpcService.push(request);                                                                                        // @3
        lastPushCommitTimeMs = System.currentTimeMillis();
    }
}

代码@1:如果上一次单独发送 commit 的请求时间与当前时间相隔低于 1s,放弃本次提交请求。

代码@2:构建提交请求。

代码@3:通过网络向从节点发送 commit 请求。

接下来先了解一下如何构建 commit 请求包。

EntryDispatcher#buildPushRequest

private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {
    PushEntryRequest request = new PushEntryRequest();
    request.setGroup(memberState.getGroup());  
    request.setRemoteId(peerId);                          
    request.setLeaderId(leaderId);
    request.setTerm(term);
    request.setEntry(entry);
    request.setType(target);
    request.setCommitIndex(dLedgerStore.getCommittedIndex());
    return request;
}

提交包请求字段主要包含如下字段:DLedger 节点所属组、从节点 id、主节点 id,当前投票轮次、日志内容、请求类型与 committedIndex(主节点已提交日志序号)。

2.3.2.2 doCheckAppendResponse 检查并追加请求

EntryDispatcher#doCheckAppendResponse

private void doCheckAppendResponse() throws Exception {
    long peerWaterMark = getPeerWaterMark(term, peerId);   // @1
    Long sendTimeMs = pendingMap.get(peerWaterMark + 1); 
    if (sendTimeMs != null && System.currentTimeMillis() - sendTimeMs > dLedgerConfig.getMaxPushTimeOutMs()) { // @2
        logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + 1);
        doAppendInner(peerWaterMark + 1);
    }
}

该方法的作用是检查 append 请求是否超时,其关键实现如下:

2.3.2.3 doAppendInner 追加请求

向从节点发送 append 请求。

EntryDispatcher#doAppendInner

private void doAppendInner(long index) throws Exception {
    DLedgerEntry entry = dLedgerStore.get(index);   // @1
    PreConditions.check(entry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);
    checkQuotaAndWait(entry);                                   // @2
    PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);   // @3
    CompletableFuture<pushentryresponse> responseFuture = dLedgerRpcService.push(request);   // @4
    pendingMap.put(index, System.currentTimeMillis());                                                                          // @5
    responseFuture.whenComplete((x, ex) -&gt; {
        try {
            PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
            DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
            switch (responseCode) {
                case SUCCESS:                                                                                                                // @6
                    pendingMap.remove(x.getIndex());
                    updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
                    quorumAckChecker.wakeup();
                    break;
                case INCONSISTENT_STATE:                                                                                         // @7
                    logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
                    changeState(-1, PushEntryRequest.Type.COMPARE);
                    break;
                default:
                    logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
                    break;
            }
        } catch (Throwable t) {
            logger.error("", t);
        }
    });
    lastPushCommitTimeMs = System.currentTimeMillis();
}

代码@1:首先根据序号查询出日志。

代码@2:检测配额,如果超过配额,会进行一定的限流,其关键实现点:

代码@3:构建 PUSH 请求日志。

代码@4:通过 Netty 发送网络请求到从节点,从节点收到请求会进行处理(本文并不会探讨与网络相关的实现细节)。

代码@5:用 pendingMap 记录待追加的日志的发送时间,用于发送端判断是否超时的一个依据。

代码@6:请求成功的处理逻辑,其关键实现点如下:

代码@7:Push 请求出现状态不一致情况,将发送 COMPARE 请求,来对比主从节点的数据是否一致。

日志转发 append 追加请求类型就介绍到这里了,接下来我们继续探讨另一个请求类型 compare。

2.3.3 compare 请求详解

COMPARE 类型的请求有 doCompare 方法发送,首先该方法运行在 while (true) 中,故在查阅下面代码时,要注意其退出循环的条件。 EntryDispatcher#doCompare

if (!checkAndFreshState()) {
    break;
}
if (type.get() != PushEntryRequest.Type.COMPARE
    &amp;&amp; type.get() != PushEntryRequest.Type.TRUNCATE) {
    break;
}
if (compareIndex == -1 &amp;&amp; dLedgerStore.getLedgerEndIndex() == -1) {
    break;
}

Step1:验证是否执行,有几个关键点如下:

EntryDispatcher#doCompare

if (compareIndex == -1) {
    compareIndex = dLedgerStore.getLedgerEndIndex();
    logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);
} else if (compareIndex &gt; dLedgerStore.getLedgerEndIndex() || compareIndex &lt; dLedgerStore.getLedgerBeginIndex()) {
    logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());
    compareIndex = dLedgerStore.getLedgerEndIndex();
}

Step2:如果 compareIndex 为 -1 或compareIndex 不在有效范围内,则重置待比较序列号为当前已已存储的最大日志序号:ledgerEndIndex。

DLedgerEntry entry = dLedgerStore.get(compareIndex);
PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);
CompletableFuture<pushentryresponse> responseFuture = dLedgerRpcService.push(request);
PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);

Step3:根据序号查询到日志,并向从节点发起 COMPARE 请求,其超时时间为 3s。

EntryDispatcher#doCompare

long truncateIndex = -1;
if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {   // @1
    if (compareIndex == response.getEndIndex()) {
        changeState(compareIndex, PushEntryRequest.Type.APPEND);
        break;
    } else {
        truncateIndex = compareIndex;
    }

} else if (response.getEndIndex() &lt; dLedgerStore.getLedgerBeginIndex() 
        || response.getBeginIndex() &gt; dLedgerStore.getLedgerEndIndex()) {    // @2
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex &lt; response.getBeginIndex()) {                                    // @3
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex &gt; response.getEndIndex()) {                                      // @4
    compareIndex = response.getEndIndex();
} else {                                                                                                              // @5
	compareIndex--;
}

if (compareIndex &lt; dLedgerStore.getLedgerBeginIndex()) {                          // @6
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
}

Step4:根据响应结果计算需要截断的日志序号,其主要实现关键点如下:

if (truncateIndex != -1) {
    changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
    doTruncate(truncateIndex);
    break;
}

Step5:如果比较出来的日志序号不等于 -1 ,则向从节点发送 TRUNCATE 请求。

2.3.3.1 doTruncate 详解
private void doTruncate(long truncateIndex) throws Exception {
    PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
    DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);
    PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);
    logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());
    PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);
    PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS);
    PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);
    PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);
    lastPushCommitTimeMs = System.currentTimeMillis();
    changeState(truncateIndex, PushEntryRequest.Type.APPEND);
}

该方法主要就是构建 truncate 请求到从节点。

关于服务端的消息复制转发就介绍到这里了,主节点负责向从服务器PUSH请求,从节点自然而然的要处理这些请求,接下来我们就按照主节点发送的请求,来具体分析一下从节点是如何响应的。

3、EntryHandler 详解

EntryHandler 同样是一个线程,当节点状态为从节点时激活。

3.1 核心类图

RocketMQ DLedger种日志复制的实现方法

其核心属性如下:

3.2 handlePush

从上文得知,主节点会主动向从节点传播日志,从节点会通过网络接受到请求数据进行处理,其调用链如图所示: RocketMQ DLedger种日志复制的实现方法

最终会调用 EntryHandler 的 handlePush 方法。

EntryHandler#handlePush

public CompletableFuture<pushentryresponse> handlePush(PushEntryRequest request) throws Exception {
    //The timeout should smaller than the remoting layer's request timeout
    CompletableFuture<pushentryresponse> future = new TimeoutFuture&lt;&gt;(1000);      // @1
    switch (request.getType()) {
        case APPEND:                                                                                                          // @2
            PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
            long index = request.getEntry().getIndex();
            Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; old = writeRequestMap.putIfAbsent(index, new Pair&lt;&gt;(request, future));
            if (old != null) {
                logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());
                future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
            }
            break;
        case COMMIT:                                                                                                           // @3
            compareOrTruncateRequests.put(new Pair&lt;&gt;(request, future));
            break;
        case COMPARE:
        case TRUNCATE:                                                                                                     // @4
            PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
            writeRequestMap.clear();
            compareOrTruncateRequests.put(new Pair&lt;&gt;(request, future));
            break;
        default:
            logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());
            future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
            break;
    }
    return future;
}

从几点处理主节点的 push 请求,其实现关键点如下。

代码@1:首先构建一个响应结果Future,默认超时时间 1s。

代码@2:如果是 APPEND 请求,放入到 writeRequestMap 集合中,如果已存在该数据结构,说明主节点重复推送,构建返回结果,其状态码为 REPEATED_PUSH。放入到 writeRequestMap 中,由 doWork 方法定时去处理待写入的请求。

代码@3:如果是提交请求, 将请求存入 compareOrTruncateRequests 请求处理中,由 doWork 方法异步处理。

代码@4:如果是 COMPARE 或 TRUNCATE 请求,将待写入队列 writeRequestMap 清空,并将请求放入 compareOrTruncateRequests 请求队列中,由 doWork 方法异步处理。

接下来,我们重点来分析 doWork 方法的实现。

3.3 doWork 方法详解

EntryHandler#doWork

public void doWork() {
    try {
        if (!memberState.isFollower()) {     // @1
            waitForRunning(1);
            return;
        }
        if (compareOrTruncateRequests.peek() != null) {    // @2
            Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; pair = compareOrTruncateRequests.poll();
            PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);
            switch (pair.getKey().getType()) {
                case TRUNCATE:
                    handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                    break;
                case COMPARE:
                    handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                    break;
                case COMMIT:
                    handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());
                    break;
                default:
                    break;
            }
        } else { // @3
            long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;
            Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; pair = writeRequestMap.remove(nextIndex);
            if (pair == null) {
                checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
                waitForRunning(1);
                return;
            }
            PushEntryRequest request = pair.getKey();
            handleDoAppend(nextIndex, request, pair.getValue());
        }
    } catch (Throwable t) {
        DLedgerEntryPusher.logger.error("Error in {}", getName(), t);
        DLedgerUtils.sleep(100);
    }
}

代码@1:如果当前节点的状态不是从节点,则跳出。

代码@2:如果 compareOrTruncateRequests 队列不为空,说明有COMMIT、COMPARE、TRUNCATE 等请求,这类请求优先处理。值得注意的是这里使用是 peek、poll 等非阻塞方法,然后根据请求的类型,调用对应的方法。稍后详细介绍。

代码@3:如果只有 append 类请求,则根据当前节点最大的消息序号,尝试从 writeRequestMap 容器中,获取下一个消息复制请求(ledgerEndIndex + 1) 为 key 去查找。如果不为空,则执行 doAppend 请求,如果为空,则调用 checkAbnormalFuture 来处理异常情况。

接下来我们来重点分析各个处理细节。

3.3.1 handleDoCommit

处理提交请求,其处理比较简单,就是调用 DLedgerStore 的 updateCommittedIndex 更新其已提交偏移量,故我们还是具体看一下DLedgerStore 的 updateCommittedIndex 方法。

DLedgerMmapFileStore#updateCommittedIndex

public void updateCommittedIndex(long term, long newCommittedIndex) {   // @1
    if (newCommittedIndex == -1
            || ledgerEndIndex == -1
            || term &lt; memberState.currTerm()
            || newCommittedIndex == this.committedIndex) {                               // @2
            return;
    }
    if (newCommittedIndex &lt; this.committedIndex
            || newCommittedIndex &lt; this.ledgerBeginIndex) {                             // @3
        logger.warn("[MONITOR]Skip update committed index for new={} &lt; old={} or new={} &lt; beginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeginIndex);
        return;
    }
    long endIndex = ledgerEndIndex;
    if (newCommittedIndex &gt; endIndex) {                                                       // @4
            //If the node fall behind too much, the committedIndex will be larger than enIndex.
        newCommittedIndex = endIndex;
    }
    DLedgerEntry dLedgerEntry = get(newCommittedIndex);                        // @5                
    PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.DISK_ERROR);
    this.committedIndex = newCommittedIndex;
    this.committedPos = dLedgerEntry.getPos() + dLedgerEntry.getSize();     // @6
}

代码@1:首先介绍一下方法的参数:

代码@2:如果待更新提交序号为 -1 或 投票轮次小于从节点的投票轮次或主节点投票轮次等于从节点的已提交序号,则直接忽略本次提交动作。

代码@3:如果主节点的已提交日志序号小于从节点的已提交日志序号或待提交序号小于当前节点的最小有效日志序号,则输出警告日志[MONITOR],并忽略本次提交动作。

代码@4:如果从节点落后主节点太多,则重置 提交索引为从节点当前最大有效日志序号。

代码@5:尝试根据待提交序号从从节点查找数据,如果数据不存在,则抛出 DISK_ERROR 错误。

代码@6:更新 commitedIndex、committedPos 两个指针,DledgerStore会定时将已提交指针刷入 checkpoint 文件,达到持久化 commitedIndex 指针的目的。

3.3.2 handleDoCompare

处理主节点发送过来的 COMPARE 请求,其实现也比较简单,最终调用 buildResponse 方法构造响应结果。

EntryHandler#buildResponse

private PushEntryResponse buildResponse(PushEntryRequest request, int code) {
    PushEntryResponse response = new PushEntryResponse();
    response.setGroup(request.getGroup());
    response.setCode(code);
    response.setTerm(request.getTerm());
    if (request.getType() != PushEntryRequest.Type.COMMIT) {
        response.setIndex(request.getEntry().getIndex());
    }
    response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());
    response.setEndIndex(dLedgerStore.getLedgerEndIndex());
    return response;
}

主要也是返回当前从几点的 ledgerBeginIndex、ledgerEndIndex 以及投票轮次,供主节点进行判断比较。

3.3.3 handleDoTruncate

handleDoTruncate 方法实现比较简单,删除从节点上 truncateIndex 日志序号之后的所有日志,具体调用dLedgerStore 的 truncate 方法,由于其存储与 RocketMQ 的存储设计基本类似故本文就不在详细介绍,简单介绍其实现要点:根据日志序号,去定位到日志文件,如果命中具体的文件,则修改相应的读写指针、刷盘指针等,并将所在在物理文件之后的所有文件删除。大家如有兴趣,可以查阅笔者的《RocketMQ技术内幕》第4章:RocketMQ 存储相关内容。

3.3.4 handleDoAppend
private void handleDoAppend(long writeIndex, PushEntryRequest request,
    CompletableFuture<pushentryresponse> future) {
    try {
        PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
        DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());
        PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
        dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
    } catch (Throwable t) {
        logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t);
        future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
    }
}

其实现也比较简单,调用DLedgerStore 的 appendAsFollower 方法进行日志的追加,与appendAsLeader 在日志存储部分相同,只是从节点无需再转发日志。

3.3.5 checkAbnormalFuture

该方法是本节的重点,doWork 的从服务器存储的最大有效日志序号(ledgerEndIndex) + 1 序号,尝试从待写请求中获取不到对应的请求时调用,这种情况也很常见,例如主节点并么有将最新的数据 PUSH 给从节点。接下来我们详细来看看该方法的实现细节。 EntryHandler#checkAbnormalFuture

if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) &lt; 1000) {
    return;
}
lastCheckFastForwardTimeMs  = System.currentTimeMillis();
if (writeRequestMap.isEmpty()) {
    return;
}

Step1:如果上一次检查的时间距现在不到1s,则跳出;如果当前没有积压的append请求,同样跳出,因为可以同样明确的判断出主节点还未推送日志。

EntryHandler#checkAbnormalFuture

for (Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; pair : writeRequestMap.values()) {
    long index = pair.getKey().getEntry().getIndex();             // @1
    //Fall behind
    if (index &lt;= endIndex) {                                                   // @2
        try {
            DLedgerEntry local = dLedgerStore.get(index);
            PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);
            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
            logger.warn("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex);
        } catch (Throwable t) {
            logger.error("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex, t);
            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
        }
        writeRequestMap.remove(index);
        continue;
    }
    //Just OK
    if (index ==  endIndex + 1) {    // @3
        //The next entry is coming, just return
        return;
    }
    //Fast forward
    TimeoutFuture<pushentryresponse> future  = (TimeoutFuture<pushentryresponse>) pair.getValue();    // @4
    if (!future.isTimeOut()) {
        continue;
    }
    if (index &lt; minFastForwardIndex) {                                                                                                                // @5
        minFastForwardIndex = index;
    }
}

Step2:遍历当前待写入的日志追加请求(主服务器推送过来的日志复制请求),找到需要快速快进的的索引。其关键实现点如下:

EntryHandler#checkAbnormalFuture

if (minFastForwardIndex == Long.MAX_VALUE) {
    return;
}
Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; pair = writeRequestMap.get(minFastForwardIndex);
if (pair == null) {
    return;
}

Step3:如果未找到需要快速失败的日志序号或 writeRequestMap 中未找到其请求,则直接结束检测。

EntryHandler#checkAbnormalFuture

logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex);
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));

Step4:则向主节点报告从节点已经与主节点发生了数据不一致,从节点并没有写入序号 minFastForwardIndex 的日志。如果主节点收到此种响应,将会停止日志转发,转而向各个从节点发送 COMPARE 请求,从而使数据恢复一致。

行为至此,已经详细介绍了主服务器向从服务器发送请求,从服务做出响应,那接下来就来看一下,服务端收到响应结果后的处理,我们要知道主节点会向它所有的从节点传播日志,主节点需要在指定时间内收到超过集群一半节点的确认,才能认为日志写入成功,那我们接下来看一下其实现过程。

4、QuorumAckChecker

日志复制投票器,一个日志写请求只有得到集群内的的大多数节点的响应,日志才会被提交。

4.1 类图

RocketMQ DLedger种日志复制的实现方法

其核心属性如下:

4.2 doWork 详解

QuorumAckChecker#doWork

if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) &gt; 3000) {    
    logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",
            memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
    lastPrintWatermarkTimeMs = System.currentTimeMillis();
}

Step1:如果离上一次打印 watermak 的时间超过3s,则打印一下当前的 term、ledgerBegin、ledgerEnd、committed、peerWaterMarksByTerm 这些数据日志。

QuorumAckChecker#doWork

if (!memberState.isLeader()) {   // @2
    waitForRunning(1);
    return;
}

Step2:如果当前节点不是主节点,直接返回,不作为。

QuorumAckChecker#doWork

if (pendingAppendResponsesByTerm.size() &gt; 1) {   // @1
    for (Long term : pendingAppendResponsesByTerm.keySet()) {
        if (term == currTerm) {
            continue;
        }
        for (Map.Entry<long, timeoutfuture<appendentryresponse>&gt; futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setIndex(futureEntry.getKey());
            response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
            response.setLeaderId(memberState.getLeaderId());
            logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);
            futureEntry.getValue().complete(response);
        }
        pendingAppendResponsesByTerm.remove(term);
    }
}
if (peerWaterMarksByTerm.size() &gt; 1) {
    for (Long term : peerWaterMarksByTerm.keySet()) {
        if (term == currTerm) {
            continue;
        }
        logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm);
        peerWaterMarksByTerm.remove(term);
    }
}

Step3:清理pendingAppendResponsesByTerm、peerWaterMarksByTerm 中本次投票轮次的数据,避免一些不必要的内存使用。

Map<string, long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);
long quorumIndex = -1;
for (Long index : peerWaterMarks.values()) {  // @1
    int num = 0;
    for (Long another : peerWaterMarks.values()) {  // @2
        if (another &gt;= index) {
            num++;
        }
    }
    if (memberState.isQuorum(num) &amp;&amp; index &gt; quorumIndex) {  // @3
        quorumIndex = index;
    }
}
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);  // @4

Step4:根据各个从节点反馈的进度,进行仲裁,确定已提交序号。为了加深对这段代码的理解,再来啰嗦一下 peerWaterMarks 的作用,存储的是各个从节点当前已成功追加的日志序号。例如一个三节点的 DLedger 集群,peerWaterMarks 数据存储大概如下:

{
“dledger_group_01_0” : 100,
"dledger_group_01_1" : 101,
}

其中 dledger_group_01_0 为从节点1的ID,当前已复制的序号为 100,而 dledger_group_01_1 为节点2的ID,当前已复制的序号为 101。再加上主节点,如何确定可提交序号呢?

ConcurrentMap<long, timeoutfuture<appendentryresponse>&gt; responses = pendingAppendResponsesByTerm.get(currTerm);
boolean needCheck = false;
int ackNum = 0;
if (quorumIndex &gt;= 0) {
    for (Long i = quorumIndex; i &gt;= 0; i--) {  // @1
        try {
            CompletableFuture<appendentryresponse> future = responses.remove(i);   // @2
            if (future == null) {                                                                                              // @3
                needCheck = lastQuorumIndex != -1 &amp;&amp; lastQuorumIndex != quorumIndex &amp;&amp; i != lastQuorumIndex;
                break;
            } else if (!future.isDone()) {                                                                                // @4
                AppendEntryResponse response = new AppendEntryResponse();
                response.setGroup(memberState.getGroup());
                response.setTerm(currTerm);
                response.setIndex(i);
                response.setLeaderId(memberState.getSelfId());
                response.setPos(((AppendFuture) future).getPos());
                future.complete(response);
            }
            ackNum++;                                                                                                      // @5
        } catch (Throwable t) {
            logger.error("Error in ack to index={} term={}", i, currTerm, t);
        }
    }
}

Step5:处理 quorumIndex 之前的挂起请求,需要发送响应到客户端,其实现步骤:

if (ackNum == 0) {
    for (long i = quorumIndex + 1; i &lt; Integer.MAX_VALUE; i++) {
        TimeoutFuture<appendentryresponse> future = responses.get(i);
        if (future == null) {
            break;
        } else if (future.isTimeOut()) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
            response.setTerm(currTerm);
            response.setIndex(i);
            response.setLeaderId(memberState.getSelfId());
            future.complete(response);
        } else {
            break;
        }
    }
    waitForRunning(1);
}

Step6:如果本次确认的个数为0,则尝试去判断超过该仲裁序号的请求,是否已经超时,如果已超时,则返回超时响应结果。

if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) &gt; 1000 || needCheck) {
    updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
    for (Map.Entry<long, timeoutfuture<appendentryresponse>&gt; futureEntry : responses.entrySet()) {
        if (futureEntry.getKey() &lt; quorumIndex) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setTerm(currTerm);
            response.setIndex(futureEntry.getKey());
            response.setLeaderId(memberState.getSelfId());
            response.setPos(((AppendFuture) futureEntry.getValue()).getPos());
            futureEntry.getValue().complete(response);
            responses.remove(futureEntry.getKey());
        }
    }
    lastCheckLeakTimeMs = System.currentTimeMillis();
}

Step7:检查是否发送泄漏。其判断泄漏的依据是如果挂起的请求的日志序号小于已提交的序号,则移除。

Step8:一次日志仲裁就结束了,最后更新 lastQuorumIndex 为本次仲裁的的新的提交值。

到此,关于“RocketMQ DLedger种日志复制的实现方法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. RocketMQ
  2. RocketMQ 集群部署

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

rocketmq

上一篇:易语言表白网页生成器的示例分析

下一篇:Reactive中有哪些 Streams规范

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》