Mongodb代理程序如何实现

发布时间:2022-03-04 14:08:37 作者:iii
来源:亿速云 阅读:174

这篇文章主要介绍“Mongodb代理程序如何实现”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Mongodb代理程序如何实现”文章能帮助大家解决问题。

根据一贯的风格,我们先来梳理下项目目录结构,结构如下:

|__ bin/                   # 用于存放编译后生成的二进制文件

|__ config/                # 用于存放配置文件

|__ connection/            # 存放连接相关的文件

|   |__ proxy.go           # 代理组件

|   |__ pool.go            # 连接池组件

|   |__ repl_set.go        # 复制集组件

|   |__ conn.go            # 连接对象组件

|__ internal/              # 存放 mongo 内部协议相关文件

|   |__ auth.go            # 握手鉴权组件

|   |__ protocol.go        # 协议解析组件

|   |__ request.go         # 请求重写组件

|   |__ response.go        # 响应重写组件

|__ statistics/            # 存放指标统计上报组件

|__ test/                  # 存放各种语言驱动测试代码的文件夹

|__ utils/                 # 工具函数文件夹

|__ glide.yaml             # 依赖包配置文件

|__ main.go                # 入口文件

proxy 实现

最简单的 proxy 实现套路就像下面这样:

// main.go

func main() {

  // 传入配置参数,实例化一个代理对象

  p := NewProxy(conf)

  // 卡住,循环接受客户端请求

  p.LoopAccept()

}

接着来实现 NewProxy、LoopAccept 方法:

// connection/proxy.go

type Proxy struct {

  sync.RWMutex

  listener            net.Listener

  writePool, readPool *pool

}

func NewProxy(conf config.UserConf) *Proxy {

  // 开始监听本地端口

  listener, err := net.Listen("tcp", ":"+conf.GetString("port"))

  if err != nil {

    log.Fatalln(err)

  }

  p := &Proxy{

    listener: listener,

  }

  // 实例化连接池

  p.readPool, p.writePool, err = newPool(p)

  if err != nil {

    panic(err)

  }

  return p 

}

func (p *Proxy) LoopAccept() {

  for {

    client, err := p.listener.Accept()

    go func(c net.Conn) {

      defer c.Close()

      // 一个连接在多次 messageHandler 中共用一个 Reader 对象

      cr := bufio.NewReader(c)

      // 因为一个连接可能会进行多次读或写操作

      for {

        // 将客户端请求代理给服务端,服务端响应代理回客户端

        // 同时中间对请求或响应进行重写操作

        err := p.messageHandler(cr, c)

        if err != nil {

          // 只要出现错误,就执行到上面的 defer c.Close() 来关闭连接

          return

        }

      }

    }(client)

  }

}

接着来实现核心逻辑 messageHandler:

// connection/proxy.go

func (p *Proxy) messageHandler(cr *bufio.Reader, c net.Conn) error {

  // 对请求报文进行解析操作

  req, err := internal.Decode(clientReader)

  if err != nil {

        return errors.New("decode error")

    }

  // 将客户端请求发送给数据库服务器

  res, err := p.clientToServer(req)

  if err != nil {

    return errors.New("request error")

  }

  // 将数据库服务器响应返回给客户端

  return res.WriteTo(c)

}

func (p *Proxy) clientToServer(req *internal.Message) (*internal.Message, error) {

  var server net.Conn

  // 如果是读操作,就从读池中取出连接

  if req.IsReadOp() {

    host := req.GetHost()

    // 某些读操作需要发送到指定的读库上,所以需要传 host,来获取指定读库连接

    server = p.readPool.Acquire(host)

  // 反之,写操作从写池中取出连接

  } else {

    // 由于写库只有一个,所以不用传 host 参数了

    server = p.writePool.Acquire()

  }

  // 将客户端请求发送给数据库服务器

  err := req.WriteTo(server)

  if err != nil {

    return nil, err

  }

  // 获取解析数据库服务器响应

  res, err := internal.Decode(bufio.NewReader(server))

  return res, err

}

大致逻辑就是,客户端通过代理把请求发给服务端,服务端响应也通过代理响应回客户端。

------------  request  -----------  request  ------------

|          | --------> |         | --------> |          |

|  client  |           |  proxy  |           | repl_set |

|          | <-------- |         | <-------- |          |

------------  response -----------  response ------------

呐&mdash;&mdash;,当然还有非常多的细节,由于篇幅原因不得不省略...

pool 实现

由 proxy 的代码逻辑来看,我们取读或写库连接是通过读或写池的 Acquire 方法来取的:

// connection/pool.go

type pool struct {

  sync.RWMutex

  connCh   chan net.Conn

  newConn  func(string) (net.Conn, error)

  freeConn func(net.Conn) error

}

func (p *pool) Acquire(opts ...interface{}) (net.Conn, error) {

  host := ""

  if len(opts) > 0 {

    host, _ = (opts[0]).(string)

  }

  chLen := len(p.connCh)

  // 从 channel 中遍历剩余数量的 conn

  for i := 0; i < chLen; i++ {

    select {

    case conn, ok := <- ch:

      if ok {

        if len(host) > 0 {

          if conn.RemoteAddr().String() == host {

            return conn, nil

          }

          // 没有找到对应 host 的 conn,则把 conn 重新放回 channel

          // 你可以简单理解为只是执行了 p.connCh <- conn 操作

          p.freeConn(conn)

        } else {

          return conn, nil

        }

      }

    // 避免数量不足而导致 channel 阻塞等待

    default:

    }

  }

  // 若还没有从 channel 中取到 conn,则立马 new 一个

  conn, err := p.newConn(host)

  if err != nil {

    return nil, err

  }

  return conn, nil

}

池的实现大致就是实现了一个循环队列,连接从池中取,取出的连接在使用完后,可以放回池中。

关于“Mongodb代理程序如何实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注亿速云行业资讯频道,小编每天都会为大家更新不同的知识点。

推荐阅读:
  1. 如何实现squid传统代理
  2. MongoDB主从+php实现

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

mongodb

上一篇:JavaScript原型链是什么

下一篇:Django的signal信号机制如何使用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》