Hadoop-Python实现Hadoop Streaming分组和二次排序

分组(partition)

Hadoop streaming框架默认情况下会以’/t’作为分隔符,将每行第一个’/t’之前的部分作为key,其余内容作为value,
如果没有’/t’分隔符,则整行作为key;这个key/tvalue对又作为该map对应的reduce的输入。
-D stream.map.output.field.separator 指定分割key分隔符,默认是/t
-D stream.num.map.output.key.fields 选择key的范围
-D map.output.key.field.separator 指定key内部的分隔符
-D num.key.fields.for.partition 指定对key分出来的前几部分做partition而不是整个key

准备数据

鲁V73930,鲁,549
黑ML1711,黑,235
鲁V75066,鲁,657
桂J73031,桂,900
晋M42387,晋,432
桂J73138,桂,456
晋M41665,晋,879
晋M42529,晋,790

step_run.sh

1
#!/bin/bash
EXEC_PATH=$(dirname "$0")
HPHOME=/opt/cloudera/parcels/CDH
JAR_PACKAGE=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
IN_PATH=/user/h_chenliling/test.txt.lzo
OUT_PATH=/user/h_chenliling/testout.txt
MAP_FILE=${EXEC_PATH}/step_map.py
RED_FILE=${EXEC_PATH}/step_red.py
$HPHOME/bin/hadoop fs -rm -r  $OUT_PATH
$HPHOME/bin/hadoop jar $JAR_PACKAGE \
-D mapred.job.queue.name=bdev \
-D stream.map.input.ignoreKey=true \
-D map.output.key.field.separator=, \ #内部key分隔符
-D num.key.fields.for.partition=1 \   #key分组范围
-numReduceTasks 2 \
-input $IN_PATH \
-output $OUT_PATH \
-inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \
-mapper $MAP_FILE \
-reducer $RED_FILE \
-file $MAP_FILE \
-file $RED_FILE \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  #指定分区类
$HPHOME/bin/hadoop fs -ls $OUT_PATH

step_map.py

1
#!/usr/bin/env python
#coding=utf-8
import sys
for line in sys.stdin:
	line = line.strip()
	seq = line.split(",")
	if len(seq) >=3:
		plate  = seq[0] #车牌号
		province = seq[1] #注册地
		mile = seq[2] #里程
		print province+ "," + plate + "\t" + mile

step_red.py

1
#!/usr/bin/env python
#coding=utf-8
import sys
prov = ""
sum_mile = 0
for line in sys.stdin:
	line = line.strip()
	seq = line.split("\t")
	mile = int(seq[1])
	if prov == "":
		prov = seq[0].split(",")[0]
		sum_mile = mile
	else:
		if prov == seq[0].split(",")[0]:
			# 相同组
			sum_mile = sum_mile + mile
		else:
			# 不同组,输出上一组数据
			print "%s\t%d" % (prov, sum_mile)
			sum_mile = mile
			prov = seq[0].split(",")[0]
print "%s\t%d" % (prov, sum_mile)

输出结果:

hadoop fs -text /user/h_chenliling/testout.txt/part-00000
晋 2101
鲁 1775
hadoop fs -text /user/h_chenliling/testout.txt/part-00001
桂 1356
黑 235

补充

事实上KeyFieldBasePartitioner还有一个高级参数 mapred.text.key.partitioner.options,这个参数可以认为是 num.key.fields.for.partition的升级版,它可以指定不仅限于key中的前几个字段用做partition,而是可以单独指定 key中某个字段或者某几个字段一起做partition。
比如上面的需求用mapred.text.key.partitioner.options表示为 mapred.text.key.partitioner.options=-k1,1

二次排序(Secondary Sort)

mapper的输出被partition到各个reducer之后,会有一步排序。默认是按照key做二次排序,如果key是多列组成,先按照第一列排序,第一列相同的,按照第二列排序
如果需要自定义排序。这里要控制的就是key内部的哪些元素用来做排序依据,是排字典序还是数字序,倒序还是正序。用来控制的参数是mapred.text.key.comparator.options。
通过org.apache.hadoop.mapred.lib.KeyFieldBasedComparator来自定义使用key中的部分字段做比较。

准备数据

鲁V73930,鲁,2,549
黑ML1711,黑,1,235
鲁V75066,鲁,1,657
桂J73031,桂,1,900
晋M42387,晋,3,432
桂J73138,桂,2,456
晋M41665,晋,2,879
晋M42529,晋,1,790
鲁V75530,鲁,3,569

step_run.sh

1
#!/bin/bash
EXEC_PATH=$(dirname "$0")
HPHOME=/opt/cloudera/parcels/CDH
JAR_PACKAGE=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
IN_PATH=/user/h_chenliling/test1.txt.lzo
OUT_PATH=/user/h_chenliling/testout1.txt
MAP_FILE=${EXEC_PATH}/step_map.py
RED_FILE=${EXEC_PATH}/step_red.py
$HPHOME/bin/hadoop fs -rm -r  $OUT_PATH
$HPHOME/bin/hadoop jar $JAR_PACKAGE \
-D mapred.job.queue.name=bdev \
-D stream.map.input.ignoreKey=true \
-D stream.map.output.field.separator=, \  #分割key/value
-D stream.num.map.output.key.fields=3 \   #取key范围
-D map.output.key.field.separator=, \     #内部key分割符
-D num.key.fields.for.partition=1 \       #取分区范围
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ #排序类
-D mapred.text.key.comparator.options=-k3,3nr \  #第三个元素倒序
-numReduceTasks 5 \
-input $IN_PATH \
-output $OUT_PATH \
-inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \
-mapper $MAP_FILE \
-reducer $RED_FILE \
-file $MAP_FILE \
-file $RED_FILE \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  #分区类
$HPHOME/bin/hadoop fs -ls $OUT_PATH

step_map.py

1
#!/usr/bin/env python
#coding=utf-8
import sys
for line in sys.stdin:
	line = line.strip()
	seq = line.split(",")
	if len(seq) >=3:
		plate  = seq[0] #车牌号
		province = seq[1] #注册地
		order = seq[2]
		mile = seq[3] #里程
		print province + "," +plate+","+ order  + "," + mile

step_red.py

1
#!/usr/bin/env python
#coding=utf-8
import sys
for line in sys.stdin:
	line = line.strip()
	print line

输出结果

hadoop fs -text /user/h_chenliling/testout1.txt/part-00000
鲁,鲁V73930,2 549
鲁,鲁V75066,1 657
黑,黑ML1711,1 235
hadoop fs -text /user/h_chenliling/testout1.txt/part-00001
桂,桂J73138,2 456
桂,桂J73031,1 900
hadoop fs -text /user/h_chenliling/testout1.txt/part-00002
晋,晋M42387,3 432
晋,晋M41665,2 879
晋,晋M42529,1 790