大佬教程收集整理的这篇文章主要介绍了MapReduce执行流程,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
public List<InputSplit> getSplits(JobContext job) throws IOException {
。。。。。。
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLOCATIOn[] blkLOCATIOns;
if (file instanceof LocatedFileStatus) {
blkLOCATIOns = ((LocatedFileStatus) filE).getBlockLOCATIOns();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLOCATIOns = fs.getFileBlockLOCATIOns(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSizE);
long bytesRemaining = length;
while (((doublE) bytesRemaining)/splitSize > SPLIT_SLOp) {
int blkIndex = getBlockIndex(blkLOCATIOns, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLOCATIOns[blkIndex].getHosts(),
blkLOCATIOns[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLOCATIOns, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLOCATIOns[blkIndex].getHosts(),
blkLOCATIOns[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLOCATIOns[0].getHosts(),
blkLOCATIOns[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
。。。。。。
return splits;
}
其流程图如下所示:
@H_104_15@math.max(minSize, Math.min(maxSize, blockSizE)) set mapred.max.split.size=256000000;2.x版本默认约是128M,我们集群配置的是256M set mapred.min.split.size=10000000;2.x版本默认是约10M,我们集群配置的是1 blockSize 在hdfs-site.xml参数dfs.block.size中配置,我们集群设置的是默认的是134217728=128M set mapred.map.tasks 对map task数量仅仅是参考的作用,我们集群默认的是2 对应的是set mapred.reduce.tasks,我们集群默认的是-1 reducer数量可能起作用的 hive.exec.reducers.bytes.per.reducer=256000000 hive.exec.reducers.max=1009 min( hive.exec.reducers.max ,总输入数据量/hive.exec.reducers.bytes.per.reducer)
其中,minSize是配置文件中设置的分片最小值,minSize则为最大值,blockSize为HDFS中数据块的大小。 完成逻辑分片后,FileInputFormat的各个子类向MapTask映射k-v键值对(如TexTinputFormat)。FileInputFormat的子类是对数据分片中的数据进行处理。
@H_104_15@mapred-site.xml 文件中 mapreduce.task.io .sort.mb=300M mapreduce.map.sort.spill.percent 配置的默认只0.8
Reduce阶段更为详细的流程如下图所示:
public RawKeyValueIterator run() throws IOException, InterruptedException {
。。。。。。
// Start the map-output fetcher threads
Boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.geTint(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reducEID, scheduler,
merger, reporter, metrics, this, reduCETask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reducEID, scheduler, merger,
reporter, metrics, this,reduCETask.getShuffleSecret());
fetchers[i].start();
}
}
。。。。。。
eventFetcher.shutDown();
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
scheduler.close();
copyPhase.complete();
// copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduCETask.statusupdate(umbilical);
RawKeyValueIterator kvIter = null;
。。。。。。
return kvIter;
}
在run()方法中它是通过启动fetcher线程来拉取数据的。首先需要判断将要拉取的数据是否具有本地性,如果数据在本地则直接传入文件的地址否则创建fetcher线程来从其他节点远程拉取数据。Fetcher类类图如下:
protected void copyFromHost(MapHost host) throws IOException {
。。。。。。
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
。。。。。。
while (!remaining.isEmpty() && failedTasks == null) {
try {
failedTasks = copymapOutput(host, input, remaining, fetchRetryEnabled);
}
catch (IOException E) {
。。。。。
}
}
}
读取数据是在copymapOutput()方法中完成的,方法中用到了ShufferHeader类它实现了Writable接口从而可以完成序列化与反序列化的工作,它调用readFields()方法从数据流中读取数据。
@H_104_15@mapreduce.task.io .sort.factor =25
读取数据过程中需要注意的是,如果中间结果小则复制到内存缓冲区中否则复制到本地磁盘中。当内存缓冲区达到大小阈值或者文件数阈值则溢写到本地磁盘,与此同时后台线程会不停的合并溢写文件形成大的有序的文件。 在Shuffle-copy阶段进行的同时Shuffle-Sort也在处理数据,这个阶段就是针对内存中的数据和磁盘上的数据进行归并排序。 复制完所有的map输出做循环归并排序合并数据。举个例子更加好理解,若合并因子为10,50个输出文件,则合并5次,最后剩下5个文件不符合合并条件,则将这5个文件交给Reduce处理。 Reduce阶段会接收到已经排完序的k-v对,然后对k-v对进行逻辑处理最后输出结果k-v对到HDFS中.
以上是大佬教程为你收集整理的MapReduce执行流程全部内容,希望文章能够帮你解决MapReduce执行流程所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。