spark delta如何读数据

发布时间:2021-12-16 16:14:02 作者:小新
来源:亿速云 阅读:141

小编给大家分享一下spark delta如何读数据,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

分析

spark 的delta datasource的构建要从DataSource.lookupDataSourceV2开始,之后会流向到loadV1Source,这里会进行dataSource.createRelation进行构建datasource的Relation的构建,直接转到deltaDataSource 的createRelation:

override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String]): BaseRelation = {
    val maybePath = parameters.getOrElse("path", {
      throw DeltaErrors.pathNotSpecifiedException
    })

    // Log any invalid options that are being passed in
    DeltaOptions.verifyOptions(CaseInsensitiveMap(parameters))

    val timeTravelByParams = DeltaDataSource.getTimeTravelVersion(parameters)
    DeltaTableV2(
      sqlContext.sparkSession,
      new Path(maybePath),
      timeTravelOpt = timeTravelByParams).toBaseRelation
  }
  1. DeltaOptions.verifyOptions进行参数校验,有效的参数如下:

val validOptionKeys : Set[String] = Set(
    REPLACE_WHERE_OPTION,
    MERGE_SCHEMA_OPTION,
    EXCLUDE_REGEX_OPTION,
    OVERWRITE_SCHEMA_OPTION,
    USER_METADATA_OPTION,
    MAX_FILES_PER_TRIGGER_OPTION,
    IGNORE_FILE_DELETION_OPTION,
    IGNORE_CHANGES_OPTION,
    IGNORE_DELETES_OPTION,
    OPTIMIZE_WRITE_OPTION,
    DATA_CHANGE_OPTION,
    "queryName",
    "checkpointLocation",
    "path",
    "timestampAsOf",
    "versionAsOf"
  )
  1. DeltaDataSource.getTimeTravelVersion根据指定的timestampAsOf或者versionAsOf获取指定的版本

  2. 直接调用DeltaTableV2的toBaseRelation方法:

def toBaseRelation: BaseRelation = {
    if (deltaLog.snapshot.version == -1) {
      val id = catalogTable.map(ct => DeltaTableIdentifier(table = Some(ct.identifier)))
        .getOrElse(DeltaTableIdentifier(path = Some(path.toString)))
      throw DeltaErrors.notADeltaTableException(id)
    }
    val partitionPredicates = DeltaDataSource.verifyAndCreatePartitionFilters(
      path.toString, deltaLog.snapshot, partitionFilters)

    // TODO(burak): We should pass in the snapshot here
    deltaLog.createRelation(partitionPredicates, timeTravelSpec)
  }

通过之前文章的分析,我们直到deltalog记录了AddFile和Remove记录,那现在读数据怎么读取呢?全部在allFiles方法。
重点看一下:allFiles方法:

def allFiles: Dataset[AddFile] = {
 val implicits = spark.implicits
 import implicits._
 state.where("add IS NOT NULL").select($"add".as[AddFile])
 }

这里调用了state方法,而它又调用了stateReconstruction方法,

private lazy val cachedState =
 cacheDS(stateReconstruction, s"Delta Table State #$version - $redactedPath")

 /** The current set of actions in this [[Snapshot]]. */
 def state: Dataset[SingleAction] = cachedState.getDS

stateReconstruction方法在checkpoint的时用到了,在这里也用到了,主要是重新构造文件状态,合并AddFile和RemoveFile:

private def stateReconstruction: Dataset[SingleAction] = {
 ...
 loadActions.mapPartitions { actions =>
     val hdpConf = hadoopConf.value.value
     actions.flatMap {
       _.unwrap match {
         case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap)
         case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap)
         case other if other == null => None
         case other => Some(other.wrap)
       }
     }
    }
   ...
   .mapPartitions { iter =>
     val state = new InMemoryLogReplay(time)
     state.append(0, iter.map(_.unwrap))
     state.checkpoint.map(_.wrap)
   }
  }

而关键在于InMemoryLogReplay的append方法和checkpoint方法,这里做到了文件状态的合并:

  assert(currentVersion == -1 || version == currentVersion + 1,
   s"Attempted to replay version $version, but state is at $currentVersion")
 currentVersion = version
 actions.foreach {
   case a: SetTransaction =>
     transactions(a.appId) = a
   case a: Metadata =>
     currentMetaData = a
   case a: Protocol =>
     currentProtocolVersion = a
   case add: AddFile =>
     activeFiles(add.pathAsUri) = add.copy(dataChange = false)
     // Remove the tombstone to make sure we only output one `FileAction`.
     tombstones.remove(add.pathAsUri)
   case remove: RemoveFile =>
     activeFiles.remove(remove.pathAsUri)
     tombstones(remove.pathAsUri) = remove.copy(dataChange = false)
   case ci: CommitInfo => // do nothing
   case null => // Some crazy future feature. Ignore
  }
 }

重点就在case add: AddFile和 case remove: RemoveFile处理以及checkpoint方法,能够很好的合并文件状态。

再调用collect方法,返回DeltaScan,之后获取文件路径作为要处理的文件路径。

注意:spark读取delta格式整个流程和spark读取其他数据格式流程一致,主要区别在于读取数据之前,会把文件状态在内存中进行一次合并,这样只需要读取文件状态为Addfile的就行了

以上是“spark delta如何读数据”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

推荐阅读:
  1. Spark 系列(十)—— Spark SQL 外部数据源
  2. Spark Streaming反压机制探秘

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark delta

上一篇:spark 3.0.1集成delta 0.7.0之如何实现delta自定义sql

下一篇:怎么解析Python中的Dict

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》