如何使用spark-core实现广度优先搜索

发布时间:2021-12-17 10:13:11 作者:柒染
来源:亿速云 阅读:167

如何使用spark-core实现广度优先搜索,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

需求描述

数据源是一批网络日志数据,每条数据都有两个字段srcip和dstip,字段之间以逗号分隔,问题的需求是给定一个srcip和dstip,在给定的搜索深度下检索这两个ip之间所有的通联路径。这个问题是网络日志处理中的一个实际需求,之前在单机的程序中实现过,但是需要将所有的ip对加载到内存中。考虑到如果数据量太大的情况,可能单节点的内存无法支撑这样的操作,但是如果不将ip对全加载内存中,使用深度优先遍历的方法,搜索过程又会很慢。最近在学习spark框架,刚接触RDD,就是这用RDD来解决这个问题。以下是scala代码

package com.pxu.spark.core

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 *  pxu
 *  2021-01-29 16:57
 */
object FindIpRel {


  def main(args: Array[String]): Unit = {

    val srcIp = args(0) // 源ip
    val dstIp = args(1) // 目标ip
    val depth = args(2).toInt //搜索深度
    val resPath = args(3) //搜索结果的输出位置

    val conf = new SparkConf().setAppName("findIpRel")
    val sc = new SparkContext(conf)


    /**
     * 从数据源中构建原始rdd,每一行的数据形式为a,b
     */

    val ori = sc.textFile("hdfs://master:9000/submitTest/input/ipconn/srcdst.csv")

    /**
     * 对原始Rdd进行元组形式转化,现在每一行的数据形式为(a,b)
     * 除此之外还对数据进行了去重处理,并显示使用hash分区器对RDD中的数据进行分区
     * 为后面的join操作,做一些优化
     */
    val base = ori.map(a => {
      val tmpArr = a.split(",")
      (tmpArr(0), tmpArr(1))
    }).distinct().partitionBy(new HashPartitioner(10))


    /**
     * 这是一个用于保存结果的RDD,其中每一行的形式为(dstIp,List(ip on path))
     * 在查找过程中,发现了搜索结果后,就会将其并入到res中
     */
    var res = sc.makeRDD[(String,List[String])](List())

    /**
     * 这是一个用于迭代的RDD,其初始化的内容是,首先从baseRdd中过滤出元组第一个元素a是参数SrcIp的,
     * 然后将其转化成(b,List(a))的格式,其中b总是代表当前搜索路径上的尾ip,list中的其他内容代表搜索
     * 路径上其他的ip
     */
    var iteration = base.filter(_._1.equals(srcIp)).map(a => (a._2,List(a._1)))

    for(i <- 2 to depth){

      /**
       * 1.首先iteration和base按照key进行join,这个操作的意义就是更深一层的搜索,结果RDD的格式是(b,(List(ip on path),c))
       * 2.对数据进行一次过滤,过去掉那些路径已经形成环的元素,成环的判据就是List(ip on path)中的数据已经包含c了
       * 3.进行map操作,b并入到List(ip on path),将c作为新的key,因此此时更深一层的搜索,导致c成为了当前搜索路径中的尾节点,
       *   此时RDD中的每一个元素的格式应该是(c,(List(ip on path))
       */
      val tmp = iteration.join(base).filter(a => !a._2._1.contains(a._2._2)).map(a => (a._2._2,a._2._1:+a._1))

      /**
       * 将tmp中已经成功搜索的路径筛选出来,成功搜索的判据是(c,(List(ip on path)),c与dstIp相等
       */
      val success = tmp.filter(a => a._1.equals(dstIp))

      /**
       * 将成功搜索的数据合并到res中
       */
      res = res.union(success)
      
      /**
       * 更新iteration
       */
      iteration = tmp.subtract(success)

    }
    
    /**
     * 将成功搜索的路径并入到res中
     */
    res.union(iteration.filter(a => a._1.equals(dstIp)))

    /**
     * 执行一次转换操作,将res中的元素从(c,(List(ip on path))格式转换成List(all ip on path)
     */
    val finalResult = res.map(a => a._2 :+ a._1)

    finalResult.saveAsTextFile(resPath)

  }

}

关于如何使用spark-core实现广度优先搜索问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。

推荐阅读:
  1. 广度优先搜索(bfs)
  2. python实现广度优先搜索过程解析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

上一篇:如何实现Ceph librados编程访问

下一篇:python匿名函数怎么创建

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》