spark写入hdfs文件小文件解决办法
我们在使用spark写入hdfs文件时,会经常由于partition的数目问题,导致最后保存在hdfs中时产生了很多小文件,之前也用过repartition的方法,但是会出现虽然会大量减少生成文件的数目,但是会使得最后保存文件这一步的效率很低,主要也是当repartition数目很小时,task任务数目也减少了,每个task执行起来的时间变长了。我常识在spark写入hdfs文件后,再合并这些小.
·
我们在使用spark写入hdfs文件时,会经常由于partition的数目问题,导致最后保存在hdfs中时产生了很多小文件,之前也用过repartition的方法,但是会出现虽然会大量减少生成文件的数目,但是会使得最后保存文件这一步的效率很低,主要也是当repartition数目很小时,task任务数目也减少了,每个task执行起来的时间变长了。
我常识在spark写入hdfs文件后,再合并这些小文件,采取读取hdfs文件后,合并成一个大文件的方法,效果还可以,在此与大家分享下:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
def mergePath (srcStr : String):Unit={
val hdfs = FileSystem.get(new URI("hdfs:///"),new Configuration())
val src = srcStr.split("/").mkString("/")
val srcPath = new Path(src)
val tmpFile = new Path(src + "/_temporary_")
// val hdfs = FileSystem.get(new URI(src),new Configuration())
val outFile = "merge-00000"
//合并小于128M的小文件,大于128M的可以保留
val fileList = hdfs.listStatus(srcPath)
.filter(x => (!x.isDirectory
&& x.getPath.getName != tmpFile.getName
&& x.getLen < (128 * 1024 * 1024L)))
println(srcStr +"包含小文件"+fileList.size+"个:")
if (fileList.size <= 1) return
hdfs.delete(tmpFile,true)
val outputStream = hdfs.create(tmpFile)
for (file <- fileList){
val in = hdfs.open(file.getPath)
IOUtils.copyBytes(in,outputStream,4096,false)
in.close()
println("*********************Write File: " + file.getPath + "********************")
}
outputStream.close()
fileList.foreach(file => hdfs.delete(file.getPath,false))
hdfs.rename(tmpFile,new Path(src +"/" + outFile))
hdfs.close()
println("*********************Output Path: " + srcPath + " SUCCESS ********************")
}
更多推荐
所有评论(0)