博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[hadoop源码阅读][9]-mapreduce-job提交过程
阅读量:7297 次
发布时间:2019-06-30

本文共 4052 字,大约阅读时间需要 13 分钟。

1.从wordcount作为入口

 
public
class WordCount {
public
static
void main(String[] args) throws Exception { Configuration conf =
new Configuration(); String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs(); Job job =
new Job(conf, "
word count");//job类的主要工作就是设置各种参数 job.setJarByClass(WordCount.
class);//将包含WordCount.
class的jar找到,后面要上传的 job.setMapperClass(TokenizerMapper.
class); job.setCombinerClass(IntSumReducer.
class); job.setReducerClass(IntSumReducer.
class); job.setOutputKeyClass(Text.
class); job.setOutputValueClass(IntWritable.
class); FileInputFormat.addInputPath(job,
new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job,
new Path(otherArgs[1])); System.exit(job.waitForCompletion(
true) ? 0 : 1);//如果传入参数为ture则及时打印作业运作信息,否则只是等待作业结束 } }
 
 

 

2.job.waitForCompletion(true)函数内主要干活的是submit

 
public
void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE);
 
//Job.setMapperClass(xxx.class):实际上设置的是mapreduce.map.class,即New。JobConf.setMapperClass(xxx.class):实际上设置的是mapred.mapper.class,即Old。可见不调用JobConf.setMapperClass,就应该是使用的NewAPI。
 
setUseNewAPI(); info = jobClient.submitJobInternal(conf);//实际的job提交过程.info用来和jobtracker进行交互,对提交的job进行监控以及杀死等操作
state = JobState.RUNNING; }

 

3.在看jobClient.submitJobInternal(conf)函数之前,先看jobclient这个对象的构造过程:

3.1先将mapred-site.xml和core-site.xml包含到conf中.static代码

3.2 init函数

 
public
void init(JobConf conf) throws IOException { String tracker = conf.
get("
mapred.job.tracker", "
local");//如果没有设置,或者没有找到上面2个xml配置文件
if ("
local".equals(tracker)) { conf.setNumMapTasks(1);//local模式 reduce只能是1
this.jobSubmitClient =
new LocalJobRunner(conf); }
else {//创建rpc
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); } }
//jobSubmitClient对应的类是JobSubmissionProtocol的实现之一(目前有两个实现,JobTracker和LocalJobRunner)

3.3 submitJobInternal函数

 
public RunningJob submitJobInternal(JobConf job) { JobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir =
new Path(getSystemDir(), jobId.toString());
//conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system") Path submitJarFile =
new Path(submitJobDir, "
job.jar"); Path submitSplitFile =
new Path(submitJobDir, "
job.split");
/*1.建立submitJobDir目录,2.将参数中指定的jars,files,archives放到分布式缓存中, * 3.将main函数所在的jar包上传为submitJarFile * 4.设置user,group,这个对hdfs的文件操作是有权限影响的,设置当前工作目录 * */ configureCommandLineOptions(job, submitJobDir, submitJarFile); Path submitJobFile =
new Path(submitJobDir, "
job.xml");
int reduces = job.getNumReduceTasks(); JobContext context =
new JobContext(job, jobId);
// 检测输出目录是否存在,如果存在是会报错的 org.apache.hadoop.mapreduce.OutputFormat <? , ? > output = ReflectionUtils.newInstance(context.getOutputFormatClass(), job); output.checkOutputSpecs(context);
// Create the splits for the job LOG.debug("
Creating splits at " + fs.makeQualified(submitSplitFile));
int maps = writeNewSplits(context, submitSplitFile);
确定split信息 job.
set("
mapred.job.split.file", submitSplitFile.toString()); job.setNumMapTasks(maps);
//确定map个数
// 将所有job的参数写到hdfs上的job.xml FSDataOutputStream
out = FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION)); job.writeXml(
out); JobStatus status = jobSubmitClient.submitJob(jobId);
//实际的提交job 后面就交给jobtracker来处理了
if (status !=
null) {
return
new NetworkedJob(status); }
else {
throw
new IOException("
Could not launch job"); } }

 

 

 

剩下的注重来看下如何确定split,主要涉及到的函数有如下过程,以fileinputformat为例:

1.1 int writeNewSplits(JobContext job, Path submitSplitFile)

             2.1---->>List<InputSplit> getSplits(JobContext job)

                     3.1---->>List<FileStatus> listStatus(JobContext job)//过滤掉输入路径下,以_和.开头的路径,以及根据用户设置的mapred.input.pathFilter.class对文件进行过滤,得到文件列表

                     3.2 如果文件是压缩的,也即是不可splitable的,那么整个文件作为一个split

                     3.3 如果文件是splitable的,那么首先计算每个split的大小,mapred.min.split.size的,默认大小是1,

                            mapred.max.split.size的默认值是Long.MAX_VALUE,blockSize的默认大小是64M,

                            那么split的大小为Math.max(minSize, Math.min(maxSize, blockSize));从公式可以看出,

                            如果maxSize设置大于blockSize,那么每个block就是一个分片,否则就会将一个block文件分隔为多个分片,

                            如果block中剩下的一小段数据量小于splitSize,还是认为它是独立的分片。

                     3.4 将每个split的路径,大小,下标,以及位置信息保存到split数组

            2.2 安装每个split的大小进行排序,将大的放在前面,然后序列化到文件中.

 

 

参考文献

转载地址:http://enfnm.baihongyu.com/

你可能感兴趣的文章
myeclipse安装pydev
查看>>
【桌面虚拟化】之五PCoIP
查看>>
linux 监控CPU memory disk process 脚本
查看>>
Nginx启动脚本/etc/init.d/nginx
查看>>
Visual Studio 2017 离线安装方式
查看>>
枚举出局域网上所有网络资源
查看>>
Android深入浅出之Binder机制
查看>>
动态磁盘的管理
查看>>
zookeeper 集群安装(单点与分布式成功安装)
查看>>
python中list,dirt方法说明
查看>>
lamp环境一键部署(yum)
查看>>
一个IT大学生来深圳2年半的经历感受
查看>>
VMware View 5.0 桌面虚拟化方案介绍视频
查看>>
理解Spring中的事务抽象
查看>>
java 设计模式 建造者模式
查看>>
mysql备份和恢复工作记录
查看>>
我的友情链接
查看>>
vFrank考VCDX的过程
查看>>
jQuery input同步发sims
查看>>
memcached起步
查看>>