python版MapReduce基础实战
使用MapReduce计算班级每个学生的最好成绩,输入文件路径为/user/test/input,请将计算后的结果输出到/user/test/output/目录下。对于两个输入文件,即文件file1和文件file2,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件file3。输入文件在你每次点击评测的时候,平台会为你创建,无需你自己创建,只需要启动HDF
第1关:成绩统计
编程要求
使用MapReduce计算班级每个学生的最好成绩,输入文件路径为/user/test/input,请将计算后的结果输出到/user/test/output/目录下。
测试说明
输入文件在你每次点击评测的时候,平台会为你创建,无需你自己创建,只需要启动HDFS,编写python代码即可。
输入文件的数据格式如下:
张三 12\n 李四 13
张三 89\n 李四 92
...
依照如上格式你应该输出:
张三 89
李四 92
#! /usr/bin/python3
import sys
def main():
for line in sys.stdin:
line = line.strip()
if line:
mapper(line)
# 使用name,age分别表示姓名和年龄
def mapper(line):
########## begin ############
ps=line.split("\n")
for p in ps:
itms=p.split()
print("{}\t{}".format(itms[0],itms[1]))
########### End #################
if __name__ == '__main__':
main()
#! /usr/bin/python3
import sys
# 找出values的最大值,并按name\tmax_age的形式输出。
def reducer(k, values):
##### Begin #########
maxv=int(values[0])
for v in values[1:]:
if int(v)>maxv:
maxv=int(v)
print("{}\t{}".format(k,maxv))
##### End #########
#############################################
# mapper的输出经过分区处理后,数据行按照键排序;
# 具有相同键的行排在一起;
# hadoop-streaming不会合并相同键的各个值。
# 下面的代码将相同键的各个值放到同一个列表中,
# 并调用reducer函数实现找出每个人的最高分数并输出;
# 输出格式为:姓名\分数
#############################################
def group():
"""将框架排序后的<k1,v1>,<k1,v2>,<k2,v3><k2,v4> 包装为
<k1,[v1,v2]>,<k2,[v3,v4]>后提交reduce函数执行"""
cur_key = None
last_key = None
value = None
values = []
for line in sys.stdin:
try: #如果不是键值对
last_key, value = line.strip().split("\t", 1)
except:
continue
if not cur_key: #输入的是第一条记录
cur_key = last_key
values.append(value)
elif cur_key == last_key: # 输入的键值没变化
values.append(value)
else: # 输入的是一个新键,表明前一个键的值都已输完
reducer(cur_key, values)
cur_key = last_key
values = []
values.append(value)
else: # 所有的记录都已处理完,将最后一个键值对交reduce处理
reducer(cur_key, values)
if __name__ == '__main__':
group()
第2关:文件内容合并去重
编程要求
接下来我们通过一个练习来巩固学习到的MapReduce知识吧。
对于两个输入文件,即文件file1和文件file2,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件file3。
为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的不同文件合并到一个没有重复的整合文件,规则如下:
第一列按学号排列;
学号相同,按x,y,z排列;
输入文件路径为:/user/tmp/input/;
输出路径为:/user/tmp/output/。
注意:输入文件后台已经帮你创建好了,不需要你再重复创建。
测试说明
程序会对你编写的代码进行测试:
输入已经指定了测试文本数据:需要你的程序输出合并去重后的结果。
下面是输入文件和输出文件的一个样例供参考。
输入文件file1的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
输入文件file2的样例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
根据输入文件file1和file2合并得到的输出文件file3的样例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
#! /usr/bin/python3
import sys
def main():
for line in sys.stdin:
line = line.strip()
mapper(line)
def mapper(line):
########## Begin ###############
itms=line.split()
print("%s\t%s"%(itms[0],itms[1]))
########### End #############
if __name__ == '__main__':
main()
#! /usr/bin/python3
import sys
def reducer(k, values):
############ Begin ################
result=sorted(set(values))
for v in result:
print("%s\t%s"%(k,v))
############ End ################
def group():
"""将框架排序后的<k1,v1>,<k1,v2>,<k2,v3><k2,v4> 包装为
<k1,[v1,v2]>,<k2,[v3,v4]>后提交reduce函数执行"""
cur_key = None
last_key = None
value = None
values = []
for line in sys.stdin:
try: #如果不是键值对
last_key, value = line.strip().split("\t", 1)
except:
continue
if not cur_key: #输入的是第一条记录
cur_key = last_key
values.append(value)
elif cur_key == last_key: # 输入的键值没变化
values.append(value)
else: # 输入的是一个新键,表明前一个键的值都已输完
reducer(cur_key, values)
cur_key = last_key
values = []
values.append(value)
else: # 所有的记录都已处理完,将最后一个键值对交reduce处理
reducer(cur_key, values)
if __name__ == '__main__':
group()
第3关:信息挖掘 - 挖掘父子关系
编程要求
你编写的程序要能挖掘父子辈关系,给出祖孙辈关系的表格。规则如下:
孙子在前,祖父在后;
输入文件路径:/user/reduce/input;
输出文件路径:/user/reduce/output。
测试说明
程序会对你编写的代码进行测试:
下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。
输入文件内容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
#! /usr/bin/python3
import sys
def mapper(line):
############### Begin ############
c,p=line.split()
print("{}\tp-{}".format(c,p))
print("{}\tc-{}".format(p,c))
############### End #############
def main():
for line in sys.stdin:
line = line.strip()
if line.startswith('child'):
pass
else:
mapper(line)
if __name__ == '__main__':
main()
#! /usr/bin/python3
import sys
def reducer(k, values):
############## Begin ################
grandc = []
grandp = []
for value in values:
if value[:2] == 'c-':
grandc.append(value[2:])
elif value[:2] == 'p-':
grandp.append(value[2:])
for c in grandc:
for p in grandp:
print("{}\t{}".format(c,p))
############## End #################
def group():
"""将框架排序后的<k1,v1>,<k1,v2>,<k2,v3><k2,v4> 包装为
<k1,[v1,v2]>,<k2,[v3,v4]>后提交reduce函数执行"""
cur_key = None
last_key = None
value = None
values = []
for line in sys.stdin:
try: #如果不是键值对
last_key, value = line.strip().split("\t", 1)
except:
continue
if not cur_key: #输入的是第一条记录
cur_key = last_key
values.append(value)
elif cur_key == last_key: # 输入的键值没变化
values.append(value)
else: # 输入的是一个新键,表明前一个键的值都已输完
reducer(cur_key, values)
cur_key = last_key
values = []
values.append(value)
else: # 所有的记录都已处理完,将最后一个键值对交reduce处理
reducer(cur_key, values)
if __name__ == '__main__':
print("grand_child\tgrand_parent")
group()
更多推荐
所有评论(0)