2016-05-12 5 views
0

Мне нужно использовать фильтр цветения в алгоритме сборочного сокращения для фильтрации одного из моих входов, но у меня есть проблема с функцией readFields, которая де-сериализует входной поток распределенного кеша (фильтр цветения) в цветной фильтр.Bloom Filter in MapReduce

public class BloomJoin { 

    //function map : input transaction.txt 
    public static class TransactionJoin extends 
      Mapper<LongWritable, Text, Text, Text> { 

     private Text CID=new Text(); 
     private Text outValue=new Text(); 

     public void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 

      String line = value.toString(); 
       String record[] = line.split(",", -1); 
       CID.set(record[1]); 

       outValue.set("A"+value); 
       context.write(CID, outValue); 
       } 
     } 
    //function map : input customer.txt 
      public static class CustomerJoinMapper extends 
        Mapper<LongWritable, Text, Text, Text> { 

       private Text outkey=new Text(); 
       private Text outvalue = new Text(); 
       private BloomFilter bfilter = new BloomFilter(); 
       public void setup(Context context) throws IOException { 

        URI[] files = DistributedCache 
          .getCacheFiles(context.getConfiguration()); 

        // if the files in the distributed cache are set 
        if (files != null) { 
        System.out.println("Reading Bloom filter from: " 
        + files[0].getPath()); 
        // Open local file for read. 

        DataInputStream strm = new DataInputStream(new FileInputStream(
        files[0].toString())); 
        bfilter.readFields(strm); 
        strm.close(); 

        // Read into our Bloom filter. 

        } else { 
        throw new IOException(
        "Bloom filter file not set in the DistributedCache."); 
        } 
       }; 

       public void map(LongWritable key, Text value, Context context) 
         throws IOException, InterruptedException { 
        String line = value.toString(); 
        String record[] = line.split(",", -1); 

         outkey.set(record[0]); 
         if (bfilter.membershipTest(new Key(outkey.getBytes()))) { 
         outvalue.set("B"+value); 
         context.write(outkey, outvalue); 
         } 
      } 
      } 

    //function reducer: join customer with transaction 
    public static class JoinReducer extends 
      Reducer<Text, Text, Text, Text> { 

     private ArrayList<Text> listA = new ArrayList<Text>(); 
     private ArrayList<Text> listB = new ArrayList<Text>(); 


     @Override 
     public void reduce(Text key, Iterable<Text> values, Context context) 
       throws IOException, InterruptedException { 

      listA.clear(); 
      listB.clear(); 

        for (Text t : values) { 
       if (t.charAt(0) == 'A') { 
        listA.add(new Text(t.toString().substring(1))); 
        System.out.println("liste A: "+listA); 
       } else /* if (t.charAt('0') == 'B') */{ 
        listB.add(new Text(t.toString().substring(1))); 
        System.out.println("listeB :"+listB); 
       } 
      } 

      executeJoinLogic(context); 
     } 

     private void executeJoinLogic(Context context) throws IOException, 
       InterruptedException { 
       if (!listA.isEmpty() && !listB.isEmpty()) { 
        for (Text A : listB) { 
         for (Text B : listA) { 
          context.write(A, B); 
          System.out.println("A="+A+",B="+B); 
         } 
        } 
       } 

     } 
    } 

    public static void main(String[] args) throws Exception { 

     Configuration conf = new Configuration(); 
     Path bloompath=new Path("/user/biadmin/ezzaki/bloomfilter/output/part-00000"); 
     DistributedCache.addCacheFile(bloompath.toUri(),conf); 
     Job job = new Job(conf, "Bloom Join"); 
     job.setJarByClass(BloomJoin.class); 
     String[] otherArgs = new GenericOptionsParser(conf, args) 
     .getRemainingArgs(); 
     if (otherArgs.length != 3) { 
    System.err 
      .println("ReduceSideJoin <Transaction data> <Customer data> <out> "); 
    System.exit(1); 
            } 
     MultipleInputs.addInputPath(job, new Path(otherArgs[0]), 
       TextInputFormat.class,TransactionJoin.class); 
     MultipleInputs.addInputPath(job, new Path(otherArgs[1]), 
       TextInputFormat.class, CustomerJoinMapper.class); 

     job.setReducerClass(JoinReducer.class); 

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 
     //job.setMapOutputKeyClass(Text.class); 
     //job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 
     System.exit(job.waitForCompletion(true) ? 0 : 3); 
    } 
} 

Как я могу решить эту проблему?

+0

Пожалуйста, добавьте код и исключение ошибки в свой вопрос ... –

+0

Я добавил код моего класса и изображение с ошибкой, если оно не отображается: java.io.EOFException: at java.io. DataInputStream.readInt/at org.apache.hadoop.util.bloom.Filter.readFields/at org.apache.hadoop.util.bloom.BloomFilter.readFields – Fatiso

ответ

1

Вы можете попробовать изменить

URI[] files = DistributedCache.getCacheFiles(context.getConfiguration()); 

в

Path[] cacheFilePaths = DistributedCache.getLocalCacheFiles(conf); 
for (Path cacheFilePath : cacheFilePaths) {  
    DataInputStream fileInputStream = fs.open(cacheFilePath); 
} 
bloomFilter.readFields(fileInputStream); 
fileInputStream.close(); 

Кроме того, я думаю, что вы используете на стороне карты присоединиться и не уменьшить сторону, так как вы используете Распределенный кэш Mapper.

+0

Благодарим вас за ответ, но проблема по-прежнему связана с функцией: readFields, которую я получил в ecxeption: java.io.EOFException: при java.io.DataInputStream.readInt/at org.apache.hadoop.util.bloom.Filter.readFields/at org.apache.hadoop.util.bloom.BloomFilter.readFields – Fatiso

+0

это соединение сбоку сокращается, потому что соединение выполняется со стороны уменьшения, и я пытаюсь отфильтровать один из моих входов на стороне карты перед отправкой PKV в редуктор – Fatiso