数据结构:
NewOrderBean
public class NewOrderBean implements WritableComparable<NewOrderBean>{ private String orderId; private String orderUser; private String orderName; private float orderPrice; private int orderNum; private float totalPrice; public NewOrderBean() {} public NewOrderBean(String orderId,String orderUser, String orderName, float orderPrice, int orderNum) { super(); this.orderId = orderId; this.orderUser = orderUser; this.orderName = orderName; this.orderPrice = orderPrice; this.orderNum = orderNum; this.totalPrice = orderPrice * orderNum; } public void set(String orderId,String orderUser,String orderName,String orderPrice,String orderNum) { this.orderId = orderId; this.orderUser = orderUser; this.orderName = orderName; this.orderPrice = Float.parseFloat(orderPrice); this.orderNum = Integer.parseInt(orderNum); this.totalPrice = this.orderPrice * this.orderNum; } public String getOrderUser() { return orderUser; } public void setOrderUser(String orderUser) { this.orderUser = orderUser; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getOrderName() { return orderName; } public void setOrderName(String orderName) { this.orderName = orderName; } public double getOrderPrice() { return orderPrice; } public void setOrderPrice(float orderPrice) { this.orderPrice = orderPrice; } public int getOrderNum() { return orderNum; } public void setOrderNum(int orderNum) { this.orderNum = orderNum; } public float getTotalPrice() { return totalPrice; } public void setTotalPrice(float totalPrice) { this.totalPrice = totalPrice; } @Override public String toString() { return "OrderBean [orderId=" + orderId + ", orderUser=" + orderUser + ", orderName=" + orderName + ", orderPrice=" + orderPrice + ", orderNum=" + orderNum + ", totalPrice=" + totalPrice + "]"; } public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.orderId = in.readUTF(); this.orderUser = in.readUTF(); this.orderName = in.readUTF(); this.orderPrice = in.readFloat(); this.orderNum = in.readInt(); this.totalPrice = this.orderPrice * this.orderNum; } public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(orderId); out.writeUTF(orderUser); out.writeUTF(orderName); out.writeFloat(orderPrice); out.writeInt(orderNum); } public int compareTo(NewOrderBean o) { // TODO Auto-generated method stub //return (int) (o.getOrderPrice() - this.orderPrice == 0 ? this.orderName.compareTo(o.getOrderName()) :o.getOrderPrice() - this.orderPrice) ; //return Float.compare(o.getTotalPrice(), this.totalPrice) == 0 ? this.orderName.compareTo(o.getOrderName()) : Double.compare(o.getTotalPrice(), this.totalPrice); return this.orderId.compareTo(o.getOrderId())==0?Float.compare(o.getTotalPrice(), this.getTotalPrice()):this.orderId.compareTo(o.getOrderId()); } }
partition分区
public class OrderPartitioner extends Partitioner<NewOrderBean, NullWritable>{ @Override public int getPartition(NewOrderBean key, NullWritable value, int numPartitions) { // TODO Auto-generated method stub // 按照订单中的orderid来分发数据 return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
grouping 分组
public class OrderGrouping extends WritableComparator { public OrderGrouping() { super(NewOrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub NewOrderBean o1 = (NewOrderBean)a; NewOrderBean o2 = (NewOrderBean)b; return o1.getOrderId().compareTo(o2.getOrderId()); } }
main处理
public class GroupComparator { public static class GroupMapper extends Mapper<LongWritable, Text, NewOrderBean, NullWritable>{ NewOrderBean orderBean = new NewOrderBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NewOrderBean, NullWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(","); orderBean.set(fields[0], fields[1], fields[2], fields[3], fields[4]); /* k.set(fields[0]); context.write(k, orderBean); */ // 按照key写了一堆的数据 context.write(orderBean, NullWritable.get()); } } public static class GroupReducer extends Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>{ @Override protected void reduce(NewOrderBean key, Iterable<NullWritable> values, Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>.Context context) throws IOException, InterruptedException { //分组输出 int topn = context.getConfiguration().getInt("order.top.n",3); /* for(int i=0;i<topn;i++) { NewOrderBean o = key; context.write(o, NullWritable.get()); } */ int i=0; for (NullWritable v : values) { context.write(key, v); if(++i==topn) return; } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); conf.setInt("order.top.n", 3); Job job = Job.getInstance(conf); job.setJarByClass(GroupComparator.class); job.setMapperClass(GroupMapper.class); //设置partition job.setPartitionerClass(OrderPartitioner.class); job.setReducerClass(GroupReducer.class); //设置grouping job.setGroupingComparatorClass(OrderGrouping.class); job.setMapOutputKeyClass(NewOrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(NewOrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("e:/mrdata/wordcount/input")); FileOutputFormat.setOutputPath(job, new Path("e:/mrdata/wordcount/output12")); job.setNumReduceTasks(2); boolean res = job.waitForCompletion(true); System.exit(res?0:-1); } }
0则评论给“[大数据]MapReducer 排序控制、分区控制、分组控制”