如何将数据按指定格式存入zookeeper

发布时间:2021-12-22 14:10:47 作者:iii
来源:亿速云 阅读:115

这篇文章主要讲解了“如何将数据按指定格式存入zookeeper”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何将数据按指定格式存入zookeeper”吧!

环境:

  scala版本:2.11.8

  zookeeper版本:3.4.5-cdh6.7.0

package com.ruozedata.zk
import java.util.concurrent.TimeUnit
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.locks.InterProcessMutex
import org.apache.curator.retry.ExponentialBackoffRetry
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
  * Created by ganwei on 2018/08/21
  * 要求:
  * 1 通过storeOffsets方法把数据存入zookeeper中。
  *  存储格式:
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/0
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/1
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/2
  * 2 通过obtainOffsets方法把存入的数据读取出来
  * 输出格式:
  *           topic:ruoze_offset_topic	partition:0	offset:7
  *           topic:ruoze_offset_topic	partition:1	offset:3
  *           topic:ruoze_offset_topic	partition:2	offset:5
  */
object ZkConnectApp{
  val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass)
  val client = {
    val client = CuratorFrameworkFactory
      .builder
      .connectString("172.16.100.31:2181")
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
      .namespace("consumers")
      .build()
    client.start()
    client
  }
  def lock(path: String)(body: => Unit) {
    val lock = new InterProcessMutex(client, path)
    lock.acquire()
    try {
      body
    } finally {
      lock.release()
    }
  }
  def tryDo(path: String)(body: => Unit): Boolean = {
    val lock = new InterProcessMutex(client, path)
    if (!lock.acquire(10, TimeUnit.SECONDS)) {
      LOG.info(s"不能获得锁 {$path},已经有任务在运行,本次任务退出")
      return false
    }
    try {
      LOG.info("获准运行")
      body
      true
    } finally {
      lock.release()
      LOG.info(s"释放锁 {$path}")
    }
  }
  //zookeeper创建路径
  def ensurePathExists(path: String): Unit = {
    if (client.checkExists().forPath(path) == null) {
      client.create().creatingParentsIfNeeded().forPath(path)
    }
  }
  /**
    * OffsetRange类定义(偏移量对象)
    * 用于存储偏移量
    */
  case class OffsetRange(
                          val topic:String,     // 主题
                          val partition:Int,    // 分区
                          val fromOffset:Long,  // 起始偏移量
                          val utilOffset:Long   // 终止偏移量
                        )
  /**
    * zookeeper存储offset的方法
    * 写入格式:
    * /consumers/G322/offsets/ruoze_offset_topic/partition/0
    * /consumers/G322/offsets/ruoze_offset_topic/partition/1
    * /consumers/G322/offsets/ruoze_offset_topic/partition/2
    * @param OffsetsRanges
    * @param groupName
    */
  def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={
    val offsetRootPath = s"/"+groupName
    if (client.checkExists().forPath(offsetRootPath) == null) {
      client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
    }
    for(els <- OffsetsRanges ){
      val data = String.valueOf(els.utilOffset).getBytes
      val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}"
      // 创建路径
      ensurePathExists(path)
      // 写入数据
      client.setData().forPath(path, data)
    }
  }
  /**
    * TopicAndPartition类定义(偏移量key对象)
    *  用于提取偏移量
    */
  case class TopicAndPartition(
                                topic:String,  // 主题
                                partition:Int  // 分区
                              )
  /**
    * zookeeper提取offset的方法
    * @param topic
    * @param groupName
    * @return
    */
  def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={
    // 定义一个空的HashMap
    val maps = mutable.HashMap[TopicAndPartition,Long]()
    // offset的路径
    val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition"
    // 判断路径是否存在
    val stat = client.checkExists().forPath(s"$offsetRootPath")
    if (stat == null ){
      println(stat)  // 路径不存在 就将路径打印在控制台,检查路径
    }else{
      // 获取 offsetRootPath路径下一级的所有子目录
      // 我们这里是获取的所有分区
      val children = client.getChildren.forPath(s"$offsetRootPath")
     // 遍历所有的分区
      for ( lines <- children ){
        // 获取分区的数据
        val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong
        // 将 topic  partition  和数据赋值给 maps
        maps(TopicAndPartition(topic,lines.toInt)) = data
      }
    }
    // 按partition排序后 返回map对象
    maps.toList.sortBy(_._1.partition).toMap
  }
  def main(args: Array[String]) {
      //定义初始化数据
      val off1 = OffsetRange("ruoze_offset_topic",0,0,7)
      val off2 = OffsetRange("ruoze_offset_topic",1,0,3)
      val off3 = OffsetRange("ruoze_offset_topic",2,0,5)
      val arr = Array(off1,off2,off3)
      //获取到namespace
//      println(client.getNamespace)
      // 创建路径
//      val offsetRootPath = "/G322"
//      if (client.checkExists().forPath(offsetRootPath) == null) {
//        client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
//      }
      //存储值
      storeOffsets(arr,"G322")
      //获取值
      /**
        * 输出格式:
        * topic:ruoze_offset_topic	partition:0	offset:7
        * topic:ruoze_offset_topic	partition:1	offset:3
        * topic:ruoze_offset_topic	partition:2	offset:5
        */
      val result = obtainOffsets("ruoze_offset_topic","G322")
      for (map <- result){
        println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2)
      }
  }
}

感谢各位的阅读,以上就是“如何将数据按指定格式存入zookeeper”的内容了,经过本文的学习后,相信大家对如何将数据按指定格式存入zookeeper这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

推荐阅读:
  1. '\033'格式:指定输出格式
  2. 如何将labelme格式数据转化为标准的coco数据集格式方式

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

zookeeper

上一篇:Spark RDD怎么创建

下一篇:mysql中出现1053错误怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》