Я храню небольшой объем данных (несколько МБ) в распределенном кеше и использую его для объединения с двумя большими файлами. Для нескольких строк данных в кеше функциональность работает нормально, но когда кэш имеет больше данных в процессе производства, он не может выполнять эту работу, но также не бросает никаких ошибок. Просто, что только несколько записей (около 20%) объединяются, а другие просто игнорируются. Итак, существует ли верхний предел количества записей, которые могут храниться в распределенном кеше? Почему он работает над некоторыми отчетами и игнорирует остальные? Любое предложение будет чрезвычайно полезно. пыльник мой кодРаспределенный кеш не работает
public class MyMapper extends Mapper<LongWritable, Text, Text, TextPair> {
Text albumKey = new Text();
Text photoKey = new Text();
private HashSet<String> photoDeleted = new HashSet<String>();
private HashSet<String> albDeleted = new HashSet<String>();
Text interKey = new Text();
private TextPair interValue = new TextPair();
private static final Logger LOGGER = Logger.getLogger(SharedStreamsSlMapper.class);
protected void setup(Context context) throws IOException, InterruptedException {
int count=0;
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
System.out.println(cacheFiles.length);
LOGGER.info(cacheFiles+"****");
try {
if (cacheFiles != null && cacheFiles.length > 0) {
for (Path path : cacheFiles) {
String line;
String[] tokens;
BufferedReader joinReader = new BufferedReader(new FileReader(path.toString()));
System.out.println(path.toString());
// BufferedReader joinReader = new BufferedReader(new FileReader("/Users/Kunal_Basak/Desktop/ss_test/dsitCache/part-m-00000"));
try {
while ((line = joinReader.readLine()) != null) {
count++;
tokens = line.split(SSConstants.TAB, 2);
if(tokens.length<2){
System.out.println("WL");
continue;
}
if (tokens[0].equals("P")) {
photoDeleted.add(tokens[1]);
}
else if (tokens[0].equals("A")) {
albDeleted.add(tokens[1]);
}
}
}
finally {
joinReader.close();
}
}
}
}
catch (IOException e) {
System.out.println("Exception reading DistributedCache: " + e);
}
System.out.println(count);
System.out.println("albdeleted *****"+albDeleted.size());
System.out.println("photo deleted *****"+photoDeleted.size());
LOGGER.info("albdeleted *****"+albDeleted.size());
LOGGER.info("albdeleted *****"+albDeleted.size());
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try{
//my mapper code
}
}
}
Когда вы печатаете количество файлов кеша, это так, как вы ожидаете, или некоторые файлы отсутствуют? – DNA