ClickHouse是如何批量写入的

发布时间:2021-12-22 17:16:03 作者:柒染
来源:亿速云 阅读:510

这篇文章将为大家详细讲解有关ClickHouse是如何批量写入的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

简介

批量写入又称为bulk write,对于单表插入多条数据的场景,可以减少插入请求数量,提高吞吐量和效率。clickhouse官方Golang驱动clickhouse-go[1]支持该关键特性,但是文档的介绍不是很详细,只有一句:

Bulk write support : begin->prepare->(in loop exec)->commit
 

并没有详细介绍用法和原理,笔者在开发业务时使用的库是sqlx[2],sql也支持clickhouse-go驱动。参考了官方样例代码[3]

...
tx, err := connect.Begin()
checkErr(err)
stmt, err := tx.Prepare("INSERT INTO example (country_code, os_id, browser_id, categories, action_day, action_time) VALUES (?, ?, ?, ?, ?, ?)")
checkErr(err)

for i := 0; i < 100; i++ {
 if _, err := stmt.Exec(
  "RU",
  10+i,
  100+i,
  []int16{1, 2, 3},
  time.Now(),
  time.Now(),
 ); err != nil {
  log.Fatal(err)
 }
}
...
 

我写的bulk write类似上面的代码,但是提交给同事review时,他提出了疑问:stmt.Exec是每次执行都发送写请求到数据库吗?这个问题其实我不敢肯定,官方文档也说得不明确。考虑到严谨性,让自己的PR更有说服力,自己去翻看了相关源代码。

这里需要指出,如果利用编辑器里的代码跳转功能会跳到database/sql库中的Exec函数实现,实际上我们要看的代码是clickhouse-go中的实现,至于编辑器跳转到database/sql中的原因,书写此文时笔者也没弄清楚,先挖个坑吧

 

核心实现

stmt.Exec的核心代码如下[4]

func (stmt *stmt) execContext(ctx context.Context, args []driver.Value) (driver.Result, error) {
 if stmt.isInsert {
  stmt.counter++
  if err := stmt.ch.block.AppendRow(args); err != nil {
   return nil, err
  }
  if (stmt.counter % stmt.ch.blockSize) == 0 {
   stmt.ch.logf("[exec] flush block")
   if err := stmt.ch.writeBlock(stmt.ch.block); err != nil {
    return nil, err
   }
   if err := stmt.ch.encoder.Flush(); err != nil {
    return nil, err
   }
  }
  return emptyResult, nil
 }
 if err := stmt.ch.sendQuery(stmt.bind(convertOldArgs(args))); err != nil {
  return nil, err
 }
 if err := stmt.ch.process(); err != nil {
  return nil, err
 }
 return emptyResult, nil
}
 

上面的代码不多,非常清晰,当执行Exec时,stmt.ch.block.AppendRow(args)会先把sql参数附加到本地缓存block中,然后(stmt.counter % stmt.ch.blockSize)判断本地缓存大小是否到达阈值,到达则执行Flush(),将数据写入远端。综上,clickhouse-go中的核心实现逻辑是:

  1. 底层维护一个缓存block,同时设置block_size控制缓存大小
  2. 执行stmt.Exec时,不会直接写入远程ClickHouse中,而是将插入参数Append到block中
  3. 每次Append后,判断block的size和block_size的关系,如果正好整除,则刷新block(即写入clickhouse)

因此block_size这个参数很重要,它表示本地缓存的上限,如果很大的话,程序会占用一些内存。笔者起初设置为100000,在调试日志中看不到stmt.ch.logf("[exec] flush block")打印的log,设置小后就看到下面的输出:

...
[clickhouse][connect=1][begin] tx=false, data=false
[clickhouse][connect=1][prepare]
[clickhouse][connect=1][read meta] <- data: packet=1, columns=6, rows=0
[clickhouse][connect=1][exec] flush block
[clickhouse][connect=1][exec] flush block
....
   

关于ClickHouse是如何批量写入的就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

推荐阅读:
  1. python数据批量写入ScrolledText的优化方法
  2. Redis批量写入的案例

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

clickhouse

上一篇:如何实现IPFS分布式上传图片

下一篇:mysql中出现1053错误怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》