博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop0.20.2 Bloom filter应用演示样例
阅读量:5286 次
发布时间:2019-06-14

本文共 5264 字,大约阅读时间需要 17 分钟。

1. 简单介绍

    參见《Hadoop in Action》P102 以及 《Hadoop实战(第2版)》(陆嘉恒)P69

    

           

2. 案例

    网上大部分的说明不过依照《Hadoop in Action》中的演示样例代码给出。这里是Hadoop0.20.2版本号,在该版本号中已经实现了BloomFilter。

    案例文件例如以下:

    customers.txt

    1,Stephanie Leung,555-555-5555

    2,Edward Kim,123-456-7890
    3,Jose Madriz,281-330-8004
    4,David Stork,408-555-0000

    -----------------------------------------------------------------

    orders.txt

    3,A,12.95,02-Jun-2008

    1,B,88.25,20-May-2008
    2,C,32.00,30-Nov-2007
    3,D,25.02,22-Jan-2009
    5,E,34.59,05-Jan-2010
    6,F,28.67,16-Jan-2008
    7,G,49.82,24-Jan-2009

    两个文件通过customer ID关联。

3. 代码

import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.util.ArrayList;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.bloom.BloomFilter;import org.apache.hadoop.util.bloom.Key;import org.apache.hadoop.util.hash.Hash;public class BloomMRMain {	public static class BloomMapper extends Mapper
{ BloomFilter bloomFilter = new BloomFilter(10000, 6, Hash.MURMUR_HASH); protected void setup(Context context) throws IOException ,InterruptedException { Configuration conf = context.getConfiguration(); String path = "hdfs://localhost:9000/user/hezhixue/input/customers.txt"; Path file = new Path(path); FileSystem hdfs = FileSystem.get(conf); FSDataInputStream dis = hdfs.open(file); BufferedReader reader = new BufferedReader(new InputStreamReader(dis)); String temp; while ((temp = reader.readLine()) != null) { // System.out.println("bloom filter temp:" + temp); String[] tokens = temp.split(","); if (tokens.length > 0) { bloomFilter.add(new Key(tokens[0].getBytes())); } } } protected void map(Object key, Text value, Context context) throws IOException ,InterruptedException { //获得文件输入路径 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); if (pathName.contains("customers")) { String data = value.toString(); String[] tokens = data.split(","); if (tokens.length == 3) { String outKey = tokens[0]; String outVal = "0" + ":" + tokens[1] + "," + tokens[2]; context.write(new Text(outKey), new Text(outVal)); } } else if (pathName.contains("orders")) { String data = value.toString(); String[] tokens = data.split(","); if (tokens.length == 4) { String outKey = tokens[0]; System.out.println("in map and outKey:" + outKey); if (bloomFilter.membershipTest(new Key(outKey.getBytes()))) { String outVal = "1" + ":" + tokens[1] + "," + tokens[2]+ "," + tokens[3]; context.write(new Text(outKey), new Text(outVal)); } } } } } public static class BloomReducer extends Reducer
{ ArrayList
leftTable = new ArrayList
(); ArrayList
rightTable = new ArrayList
(); protected void reduce(Text key, Iterable
values, Context context) throws IOException ,InterruptedException { leftTable.clear(); rightTable.clear(); for (Text val : values) { String outVal = val.toString(); System.out.println("key: " + key.toString() + " : " + outVal); int index = outVal.indexOf(":"); String flag = outVal.substring(0, index); if ("0".equals(flag)) { leftTable.add(new Text(outVal.substring(index+1))); } else if ("1".equals(flag)) { rightTable.add(new Text(outVal.substring(index + 1))); } } if (leftTable.size() > 0 && rightTable.size() > 0) { for(Text left : leftTable) { for (Text right : rightTable) { context.write(key, new Text(left.toString() + "," + right.toString())); } } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: BloomMRMain
"); System.exit(2); } Job job = new Job(conf, "BloomMRMain"); job.setJarByClass(BloomMRMain.class); job.setMapperClass(BloomMapper.class); job.setReducerClass(BloomReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

转载于:https://www.cnblogs.com/mengfanrong/p/5069058.html

你可能感兴趣的文章
【算法•日更•第四十八期】二分图(匈牙利算法)
查看>>
【算法•日更•第四十九期】双系统
查看>>
【算法•日更•第五十期】二分图(km算法)
查看>>
【算法•日更•第五十一期】知识扫盲:什么是离散化?
查看>>
【算法•日更•第五十二期】知识扫盲:什么是调和数?
查看>>
【算法•日更•第五十三期】知识扫盲:什么是积性函数?
查看>>
【算法•日更•第五十四期】知识扫盲:什么是operator?
查看>>
【算法•日更•第五十五期】知识扫盲:什么是卡常数?
查看>>
【算法•日更•第五十六期】扩展欧几里得算法
查看>>
【算法•日更•第五十七期】快速傅里叶变换(FFT):从入门到放弃
查看>>
【算法•日更】日更内容汇总
查看>>
【数论】莫比乌斯反演Mobius inversion
查看>>
LASSO回归与L1正则化 西瓜书
查看>>
S4VM解析
查看>>
SWATS算法剖析(自动切换adam与sgd)
查看>>
DeepMind提出新型超参数最优化方法:性能超越手动调参和贝叶斯优化
查看>>
CVPR2019目标检测方法进展综述
查看>>
史上最全的spark面试题——持续更新中
查看>>
Nginx、OpenResty和Kong的基本概念与使用方法
查看>>
Hyperledger Fabric-sdk-java
查看>>