如何在砖和删除文件列表更快

学习如何和删除文件列表在砖更快。

写的亚当Pavlacka

去年发表在:2022年5月31日

场景

假设您需要删除一个分区表一年,,日期,地区,服务。然而,桌子上是巨大的,将会有1000左右部分每个分区的文件。你能列出在每个分区的所有文件,然后删除它们使用Apache火花工作。

例如,假设您有一个表分区的,b和c:

% scala Seq ((1、2、3、4、5), (2、3、4、5、6), (3、4、5、6、7), (4、5、6、7、8)) .toDF (“a”、“b”、“c”,“d”,“e”) .write.mode .partitionBy(“覆盖”)(“a”、“b”、“c”) .parquet (“/ mnt /道路/表”)

列表文件

您可以使用这个函数所有的部分文件列表:

% scala org.apache.hadoop.conf进口。配置进口org.apache.hadoop.fs。{路径,文件系统}进口org.apache.spark.deploy.SparkHadoopUtil org.apache.spark.sql.execution.datasources进口。InMemoryFileIndeximport java.net.URI def listFiles(basep: String, globp: String): Seq[String] = { val conf = new Configuration(sc.hadoopConfiguration) val fs = FileSystem.get(new URI(basep), conf) def validated(path: String): Path = { if(path startsWith "/") new Path(path) else new Path("/" + path) } val fileCatalog = InMemoryFileIndex.bulkListLeafFiles( paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))), hadoopConf = conf, filter = null, sparkSession = spark, areRootPaths=true) // If you are using Databricks Runtime 6.x and below, // remove  from the bulkListLeafFiles function parameter. fileCatalog.flatMap(_._2.map(_.path)) } val root = "/mnt/path/table" val globp = "[^_]*" // glob pattern, e.g. "service=webapp/date=2019-03-31/*log4j*" val files = listFiles(root, globp) files.toDF("path").show()
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | |路径+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | dbfs: / mnt /道路/表/ = 1 / b = 2 / c = 3 / - 00000部分tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——5.。拼花| | dbfs: / mnt /道路/表/ = 2 / b = 3 / c = 4 /部分- 00001 tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——6.。拼花| | dbfs: / mnt /道路/表/ = 3 / b = 4 / c - 00002 = 5 /部分tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——7.。拼花| | dbfs: / mnt /道路/表/ = 4 / b = 5 / - 00003 c = 6 /部分tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——8.。拼花| + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +

listFiles函数接受一个基地路径和一个一团路径作为参数,扫描的文件和匹配一团模式,然后返回所有匹配叶文件作为一个字符串序列。

还使用效用函数的函数globPathSparkHadoopUtil包中。这个函数列出所有在一个目录路径指定的前缀,并没有进一步列表叶孩子(文件)。路径传入的列表InMemoryFileIndex.bulkListLeafFiles方法,这是一个引发内部API分布式文件清单。

这两个清单效用函数单独工作。通过结合他们你可以得到你想要的顶级目录列表列出使用globPath函数,它将运行在司机,您可以分发清单为所有孩子离开的顶级目录引发工人使用bulkListLeafFiles

周围的加速可以根据Amdahl法则20-50x更快。原因是,您可以很容易地控制一团文件路径根据实际物理布局和控制的并行性spark.sql.sources.parallelPartitionDiscovery.parallelismInMemoryFileIndex

删除文件

当你从一个非托管表删除文件或分区,您可以使用砖效用函数dbutils.fs.rm。这个函数利用原生云存储文件系统API,这是所有文件操作的优化。然而,您不能删除一个巨大的表直接使用dbutils.fs.rm(“路径/ / /表”)

你可以使用上面的脚本列表文件有效。对于小表,收集路径的文件删除适合司机内存,那么您可以使用一个火花工作分发文件删除任务。

对于巨大的表,即使对于一个顶级分区,文件路径的字符串表示不能适应司机记忆。解决这个问题最简单的方法是递归地收集内部分区的路径,并行路径列表,删除它们。

% scala scala.util进口。{尝试,成功,失败}def删除(p:字符串):单位= {dbutils.fs.ls (p) . map (_.path) .toDF。= > dbutils.fs.rm foreach{文件(文件(0)。toString, true) println (s“删除文件:$文件”)}}最后def walkDelete(根:字符串)(水平:Int):单位= {dbutils.fs.ls(根). map (_.path)。为each { p => println(s"Deleting: $p, on level: ${level}") val deleting = Try { if(level == 0) delete(p) else if(p endsWith "/") walkDelete(p)(level-1) // // Set only n levels of recursion, so it won't be a problem // else delete(p) } deleting match { case Success(v) => { println(s"Successfully deleted $p") dbutils.fs.rm(p, true) } case Failure(e) => println(e.getMessage) } } }

同时确保代码删除内部分区被删除的分区是足够小。它通过每个级别的分区递归搜索,只有开始删除当它击中级别设置。例如,如果你想开始删除顶级分区,使用walkDelete(根)(0)。火花将会删除所有的文件dbfs: / mnt /道路/表/ = 1 /,然后删除= 2 /…/模式后,直到精疲力竭。

火花的工作分配删除任务使用删除功能上面,清单文件dbutils.fs.ls与假设子分区的数量在这个级别很小。你也可以通过替换更有效率dbutils.fs.ls函数与listFiles功能上面,只有轻微的修改。

总结

这两种方法突出的方法清单和删除的表。他们用一些火花效用函数和函数具体砖环境。即使你不能直接使用它们,您可以创建自己的效用函数在一个类似的方式解决这个问题。

这篇文章有用吗?