raft理论与实践[6]-lab3a-基于raft构建分布式容错kv服务

发布时间:2020-04-02 16:49:41 作者:jonson_jackson
来源:网络 阅读:608

准备工作

前言

本服务实现的功能

实验思路

获取源代码

git clone git@github.com:dreamerjackson/golang-deep-distributed-lab.git
git reset --hard   d345b34bc

客户端

补充Clerk

type Clerk struct {
    ...
    leader int   // remember last leader
    seq    int   // RPC sequence number
    id     int64 // client id
}

补充Get方法

func (ck *Clerk) Get(key string) string {
    DPrintf("Clerk: Get: %q\n", key)
    cnt := len(ck.servers)
    for {
        args := &GetArgs{Key: key, ClientID: ck.id, SeqNo: ck.seq}
        reply := new(GetReply)

        ck.leader %= cnt
        done := make(chan bool, 1)
        go func() {
            ok := ck.servers[ck.leader].Call("KVServer.Get", args, reply)
            done <- ok
        }()
        select {
        case <-time.After(200 * time.Millisecond): // rpc timeout: 200ms
            ck.leader++
            continue
        case ok := <-done:
            if ok && !reply.WrongLeader {
                ck.seq++
                if reply.Err == OK {
                    return reply.Value
                }
                return ""
            }
            ck.leader++
        }
    }

    return ""
}

补充Append和Put方法

func (ck *Clerk) Put(key string, value string) {
    ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
    ck.PutAppend(key, value, "Append")
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
    // You will have to modify this function.
    DPrintf("Clerk: PutAppend: %q => (%q,%q) from: %d\n", op, key, value, ck.id)
    cnt := len(ck.servers)
    for {
        args := &PutAppendArgs{Key: key, Value: value, Op: op, ClientID: ck.id, SeqNo: ck.seq}
        reply := new(PutAppendReply)

        ck.leader %= cnt
        done := make(chan bool, 1)
        go func() {
            ok := ck.servers[ck.leader].Call("KVServer.PutAppend", args, reply)
            done <- ok
        }()
        select {
        case <-time.After(200 * time.Millisecond): // rpc timeout: 200ms
            ck.leader++
            continue
        case ok := <-done:
            if ok && !reply.WrongLeader && reply.Err == OK {
                ck.seq++
                return
            }
            ck.leader++
        }
    }
}

Server

type KVServer struct {
    ...
    rf      *raft.Raft
    applyCh chan raft.ApplyMsg
    // Your definitions here.
    persist       *raft.Persister
    db            map[string]string
    notifyChs     map[int]chan struct{} // per log
    // duplication detection table
    duplicate map[int64]*LatestReply
}

完成PutAppend、Get方法


func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
    // Your code here.
    // not leader
    if _, isLeader := kv.rf.GetState(); !isLeader {
        reply.WrongLeader = true
        reply.Err = ""
        return
    }

    DPrintf("[%d]: leader %d receive rpc: PutAppend(%q => (%q,%q), (%d-%d).\n", kv.me, kv.me,
        args.Op, args.Key, args.Value, args.ClientID, args.SeqNo)

    kv.mu.Lock()
    // duplicate put/append request
    if dup, ok := kv.duplicate[args.ClientID]; ok {
        // filter duplicate
        if args.SeqNo <= dup.Seq {
            kv.mu.Unlock()
            reply.WrongLeader = false
            reply.Err = OK
            return
        }
    }

    // new request
    cmd := Op{Key: args.Key, Value: args.Value, Op: args.Op, ClientID: args.ClientID, SeqNo: args.SeqNo}
    index, term, _ := kv.rf.Start(cmd)
    ch := make(chan struct{})
    kv.notifyChs[index] = ch
    kv.mu.Unlock()

    reply.WrongLeader = false
    reply.Err = OK

    // wait for Raft to complete agreement
    select {
    case <-ch:
        // lose leadership
        curTerm, isLeader := kv.rf.GetState()
        if !isLeader || term != curTerm {
            reply.WrongLeader = true
            reply.Err = ""
            return
        }
    case <-kv.shutdownCh:
        return
    }
}

完成对于log的应用操作

func (kv *KVServer) applyDaemon() {
    for {
        select {
        case msg, ok := <-kv.applyCh:
            if ok {
                // have client's request? must filter duplicate command
                if msg.Command != nil {
                    cmd := msg.Command.(Op)
                    kv.mu.Lock()
                    if dup, ok := kv.duplicate[cmd.ClientID]; !ok || dup.Seq < cmd.SeqNo {
                        switch cmd.Op {
                        case "Get":
                            kv.duplicate[cmd.ClientID] = &LatestReply{Seq: cmd.SeqNo,
                                Reply: GetReply{Value: kv.db[cmd.Key],}}
                        case "Put":
                            kv.db[cmd.Key] = cmd.Value
                            kv.duplicate[cmd.ClientID] = &LatestReply{Seq: cmd.SeqNo,}
                        case "Append":
                            kv.db[cmd.Key] += cmd.Value
                            kv.duplicate[cmd.ClientID] = &LatestReply{Seq: cmd.SeqNo,}
                        default:
                            DPrintf("[%d]: server %d receive invalid cmd: %v\n", kv.me, kv.me, cmd)
                            panic("invalid command operation")
                        }
                        if ok {
                            DPrintf("[%d]: server %d apply index: %d, cmd: %v (client: %d, dup seq: %d < %d)\n",
                                kv.me, kv.me, msg.CommandIndex, cmd, cmd.ClientID, dup.Seq, cmd.SeqNo)
                        }
                    }
                    // notify channel
                    if notifyCh, ok := kv.notifyChs[msg.CommandIndex]; ok && notifyCh != nil {
                        close(notifyCh)
                        delete(kv.notifyChs, msg.CommandIndex)
                    }
                    kv.mu.Unlock()
                }
            }
        }
    }
}

测试

> go test -v -run=3A
rm -rf res
mkdir res
set int j = 0
for ((i = 0; i < 2; i++))
do
    for ((c = $((i*10)); c < $(( (i+1)*10)); c++))
    do
         (go test -v -run TestPersistPartitionUnreliableLinearizable3A) &> ./res/$c &
    done

    sleep 40

    if grep -nr "FAIL.*raft.*" res; then
        echo "fail"
    fi

done

总结

参考资料


推荐阅读:
  1. MySQL Installer is running in Community mode 的解决办法
  2. vue如何进行动画的封装

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

go raft la

上一篇:Jmeter_从jdbc请求的响应中获取参数做关联

下一篇:使用故事板

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》