[大数据]MapReducer 排序控制、分区控制、分组控制

2019-3-18 流沙 大数据

数据结构:

NewOrderBean

public class NewOrderBean implements WritableComparable<NewOrderBean>{
	private String orderId;
	private String orderUser;
	private String orderName;
	private float orderPrice;
	private int orderNum;
	private float totalPrice;
	
	public NewOrderBean() {}

	public NewOrderBean(String orderId,String orderUser, String orderName, float orderPrice, int orderNum) {
		super();
		this.orderId = orderId;
		this.orderUser = orderUser;
		this.orderName = orderName;
		this.orderPrice = orderPrice;
		this.orderNum = orderNum;
		this.totalPrice  = orderPrice * orderNum;
	}
	
	public void set(String orderId,String orderUser,String orderName,String orderPrice,String orderNum) {
		this.orderId = orderId;
		this.orderUser = orderUser;
		this.orderName = orderName;
		this.orderPrice = Float.parseFloat(orderPrice);
		this.orderNum = Integer.parseInt(orderNum);
		this.totalPrice = this.orderPrice * this.orderNum;
		
	}
	
	
	public String getOrderUser() {
		return orderUser;
	}

	public void setOrderUser(String orderUser) {
		this.orderUser = orderUser;
	}

	public String getOrderId() {
		return orderId;
	}

	public void setOrderId(String orderId) {
		this.orderId = orderId;
	}

	public String getOrderName() {
		return orderName;
	}

	public void setOrderName(String orderName) {
		this.orderName = orderName;
	}

	public double getOrderPrice() {
		return orderPrice;
	}

	public void setOrderPrice(float orderPrice) {
		this.orderPrice = orderPrice;
	}

	public int getOrderNum() {
		return orderNum;
	}

	public void setOrderNum(int orderNum) {
		this.orderNum = orderNum;
	}

	public float getTotalPrice() {
		return totalPrice;
	}

	public void setTotalPrice(float totalPrice) {
		this.totalPrice = totalPrice;
	}
	
	
	@Override
	public String toString() {
		return "OrderBean [orderId=" + orderId + ", orderUser=" + orderUser + ", orderName=" + orderName
				+ ", orderPrice=" + orderPrice + ", orderNum=" + orderNum + ", totalPrice=" + totalPrice + "]";
	}
	

	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		this.orderId = in.readUTF();
		this.orderUser = in.readUTF();
		this.orderName = in.readUTF();
		this.orderPrice = in.readFloat();
		this.orderNum = in.readInt();
		this.totalPrice = this.orderPrice * this.orderNum;
	}

	
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeUTF(orderId);
		out.writeUTF(orderUser);
		out.writeUTF(orderName);
		out.writeFloat(orderPrice);
		out.writeInt(orderNum);
	}

	public int compareTo(NewOrderBean o) {
		// TODO Auto-generated method stub
		//return (int) (o.getOrderPrice() - this.orderPrice == 0 ? this.orderName.compareTo(o.getOrderName()) :o.getOrderPrice() - this.orderPrice) ;
		//return Float.compare(o.getTotalPrice(), this.totalPrice) == 0 ? this.orderName.compareTo(o.getOrderName()) : Double.compare(o.getTotalPrice(), this.totalPrice);
		
		return this.orderId.compareTo(o.getOrderId())==0?Float.compare(o.getTotalPrice(), this.getTotalPrice()):this.orderId.compareTo(o.getOrderId());
	}
	
	
}


partition分区

public class OrderPartitioner extends Partitioner<NewOrderBean, NullWritable>{
	@Override
	public int getPartition(NewOrderBean key, NullWritable value, int numPartitions) {
		// TODO Auto-generated method stub
		// 按照订单中的orderid来分发数据
		return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
	}

}


grouping 分组

public class OrderGrouping extends WritableComparator {
	
	public OrderGrouping() {
		super(NewOrderBean.class,true);
	}
	
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		// TODO Auto-generated method stub
		
		NewOrderBean o1 = (NewOrderBean)a;
		NewOrderBean o2 = (NewOrderBean)b;
		
		return o1.getOrderId().compareTo(o2.getOrderId());
	}
}


main处理

public class GroupComparator {
	
	public static class GroupMapper extends Mapper<LongWritable, Text, NewOrderBean, NullWritable>{
		
		NewOrderBean orderBean = new NewOrderBean();

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, NewOrderBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			
			String line = value.toString();
			String[] fields = line.split(",");
			
			orderBean.set(fields[0], fields[1], fields[2], fields[3], fields[4]);
			
			/*
			k.set(fields[0]);
			context.write(k, orderBean);
			*/
			// 按照key写了一堆的数据
			context.write(orderBean, NullWritable.get());	
		}
		
	}
	
	
	public static class GroupReducer extends Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>{
		@Override
		protected void reduce(NewOrderBean key, Iterable<NullWritable> values,
				Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			
			//分组输出
			int topn = context.getConfiguration().getInt("order.top.n",3);
			
			/*
			for(int i=0;i<topn;i++)
			{
				NewOrderBean o = key;
				context.write(o, NullWritable.get());
			}
			*/
			int i=0;
			for (NullWritable v : values) {
				context.write(key, v);
				if(++i==topn) return;
			}
			
			
			
		}
	}
	
	
	
	public static void main(String[] args) throws Exception{
		
		Configuration conf = new Configuration();
		conf.setInt("order.top.n", 3);
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(GroupComparator.class);
		
		job.setMapperClass(GroupMapper.class);
		//设置partition
		job.setPartitionerClass(OrderPartitioner.class);
		job.setReducerClass(GroupReducer.class);
		
		//设置grouping
		job.setGroupingComparatorClass(OrderGrouping.class);
		
		job.setMapOutputKeyClass(NewOrderBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(NewOrderBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("e:/mrdata/wordcount/input"));
		FileOutputFormat.setOutputPath(job, new Path("e:/mrdata/wordcount/output12"));
		
		job.setNumReduceTasks(2);
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:-1);	
		
	}

}

发表评论:

Powered by 流沙团