程序问答   发布时间:2022-06-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了Apache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决Apache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理?

开发过程中遇到Apache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理的问题如何解决?下面主要结合日常开发的经验,给出你关于Apache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理的解决方法建议,希望对你解决Apache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理有所启发或帮助;

我有一个 Azure Data Lake Storage 容器,它充当 JsON 文件的登陆区域,供 Apache Spark 处理。

那里有数以万计的小(最多几 MB)文件。 Spark 代码会定期读取这些文件并执行一些转换。

我希望文件只被读取一次并且 Spark 脚本是幂等的。 如何确保不会一次又一次地读取文件?我如何以有效的方式做到这一点?

我是这样读取数据的:

spark.read.Json("/mnt/input_LOCATIOn/*.Json")

虑了以下方法:

  1. 使用已经处理过的文件名创建一个 Delta 表,并在输入 DataFrame 上运行 EXCEPT 转换
  2. 将处理过的文件移动到不同的位置(或重命名它们)。我宁愿不这样做。如果我需要重新处理数据,我需要再次运行重命名此操作需要很长时间。

希望有更好的方法。请提出建议。

解决方法

您可以使用已启用检查点和 trigger.once 的结构化流作业。

该作业的检查点文件将跟踪该作业已使用的 JSON 文件。此外,trigger.once 触发器将使此流式作业如同批处理作业一样。

来自 Databrick 的一篇很好的文章解释了“为什么 Streaming 和 RunOnce 比 Batch 更好”。

您的结构化流媒体作业可能如下所示:

val checkpointLOCATIOn = "/path/to/checkpoints"
val pathToJsonFiles = "/mnt/input_LOCATIOn/"
val streamDF = spark.readStream.format("json").scheR_367_11845@a(jsonscheR_367_11845@a).load(pathToJsonFiles)

val query = streamDF
  .[...] // apply your processing
  .writeStream
  .format("console") // change sink format accordingly
  .option("checkpointLOCATIOn",checkpointLOCATIOn)
  .trigger(trigger.oncE)
  .start()

query.awaitTermination()

大佬总结

以上是大佬教程为你收集整理的Apache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理全部内容,希望文章能够帮你解决Apache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。