问题描述:
一个trade table表
product1"trade1
product2"trade2
product3"trade3
一个pay table表
product1"pay1
product2"pay2
product2"pay3
product1"pay4
product3"pay5
product3"pay6
建立两个表之间的连接,该两表是一对多关系的
如下:
trade1pay1
trade1pay4
trade2pay2
...
思路:
为了将两个表整合到一起,由于有相同的第一列,且第一个表与第二个表是一对多关系的。
这里依然采用分组,以及组内排序,只要保证一方最先到达reduce端,则就可以进行迭代处理了。
为了保证第一个表先到达reduce端,可以为定义一个组合键,包含两个值,第一个值为product,第二个值为0或者1,来分别代表第一个表和第二个表,只要按照组内升序排列即可。
具体代码:
自定义组合键策略
package whut.onetomany;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.Hadoop.io.WritableComparable;
public class TextIntPair implements WritableComparable{
//product1 0/1
private String firstKey;//product1
private int secondKey;//0,1;0代表是trade表,1代表是pay表
//只需要保证trade表在pay表前面就行,则只需要对组顺序排列
public String getFirstKey() {
return firstKey;
}
public void setFirstKey(String firstKey) {
this.firstKey = firstKey;
}
public int getSecondKey() {
return secondKey;
}
public void setSecondKey(int secondKey) {
this.secondKey = secondKey;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(firstKey);
out.writeInt(secondKey);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
firstKey=in.readUTF();
secondKey=in.readInt();
}
@Override
public int compareTo(Object o) {
// TODO Auto-generated method stub
TextIntPair tip=(TextIntPair)o;
return this.getFirstKey().compareTo(tip.getFirstKey());
}
}
分组策略
package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TextComparator extends WritableComparator{
protected TextComparator() {
super(TextIntPair.class,true);//注册比较器
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
TextIntPair tip1=(TextIntPair)a;
TextIntPair tip2=(TextIntPair)b;
return tip1.getFirstKey().compareTo(tip2.getFirstKey());
}
}
组内排序策略:目的是保证第一个表比第二个表先到达
package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分组内部进行排序,按照第二个字段进行排序
public class TextIntComparator extends WritableComparator {
public TextIntComparator()
{
super(TextIntPair.class,true);
}
//这里可以进行排序的方式管理
//必须保证是同一个分组的
//a与b进行比较
//如果a在前b在后,则会产生升序
//如果a在后b在前,则会产生降序
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
TextIntPair ti1=(TextIntPair)a;
TextIntPair ti2=(TextIntPair)b;
//首先要保证是同一个组内,同一个组的标识就是第一个字段相同
if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
return ti1.getFirstKey().compareTo(ti2.getFirstKey());
else
return ti1.getSecondKey()-ti2.getSecondKey();//0,-1,1
}
}
分区策略: