Моя команда застряла с запуском алгоритма с нечеткой логикой на двух больших наборах данных. Первый (подмножество) - около 180 тыс. Строк содержит имена, адреса и электронные письма для людей, которые нам нужно сопоставить во втором (надмножество). Надмножество содержит 2.5M записи. Оба имеют ту же самую структуру, и данные были очищены уже, т.е. адреса анализируется, имена нормализованы, и т.д.Нечеткая логика в больших наборах данных с использованием Python

  • ContactID INT,
  • ПолноеИмя VARCHAR (150),
  • Адрес VARCHAR (100) ,
  • Электронная почта VARCHAR (100)

цель состоит в том, чтобы соответствовать значениям в строке подмножества к соответствующим значениям в надмножество, поэтому выход будет сочетать подмножество и надмножество и соответствующие проценты подобия для каждого поля (токена).

  • ContactID,
  • LookupContactID,
  • ПолноеИмя,
  • LookupFullName,
  • FullName_Similarity,
  • Адрес,
  • LookupAddress,
  • Address_Similarity,
  • Email,
  • LookupEmail,
  • Email_Similarity

Для упрощения и проверки кода первого, мы сцепленные строки, и мы знаем, что код работает на очень небольшой супернабор; однако, как только мы увеличиваем количество записей, он застревает. Мы пробовали разные алгоритмы, Levenshtein, FuzzyWuzzy и т. Д. Безрезультатно. Проблема, на мой взгляд, в том, что Python делает это подряд за строкой; однако, я не уверен. Мы даже попытались запустить его на нашем кластере Hadoop с помощью потоковой передачи; однако он не дал никаких положительных результатов.

#!/usr/bin/env python 
import sys 
from fuzzywuzzy import fuzz 
import datetime 
import time 
import Levenshtein 

#init for comparison 
with open('normalized_set_record_set.csv') as normalized_records_ALL_file: 
# with open('delete_this/xab') as normalized_records_ALL_file: 
    normalized_records_ALL_dict = {} 
    for line in normalized_records_ALL_file: 
     key, value = line.strip('\n').split(':', 1) 
     normalized_records_ALL_dict[key] = value 
     # normalized_records_ALL_dict[contact_id] = concat_record 

def score_it_bag(target_contact_id, target_str, ALL_records_dict): 
    INPUT target_str, ALL_records_dict 
    OUTPUT sorted list by highest fuzzy match 
    return sorted([(value_str, contact_id_index_str, fuzz.ratio(target_str, value_str)) 
     for contact_id_index_str, value_str in ALL_records_dict.iteritems()], key=lambda x:x[2])[::-1] 

def score_it_closest_match_pandas(target_contact_id, target_str, place_holder_delete): 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match 
    # simply drop this index target_contact_id 
    df_score = df_ALL.concat_record.apply(lambda x: fuzz.ratio(target_str, x)) 

    return df_ALL.concat_record[df_score.idxmax()], df_score.max(), df_score.idxmax() 

def score_it_closest_match_L(target_contact_id, target_str, ALL_records_dict_input): 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str) 
    best_score = 100 

    #score it 
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems(): 
     if target_contact_id != comparison_contactid: 
      current_score = Levenshtein.distance(target_str, comparison_record_str) 

      if current_score < best_score: 
       best_score = current_score 
       best_match_id = comparison_contactid 
       best_match_str = comparison_record_str 

    return (best_match_str, best_score, best_match_id) 

def score_it_closest_match_fuzz(target_contact_id, target_str, ALL_records_dict_input): 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str) 
    best_score = 0 

    #score it 
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems(): 
     if target_contact_id != comparison_contactid: 
      current_score = fuzz.ratio(target_str, comparison_record_str) 

      if current_score > best_score: 
       best_score = current_score 
       best_match_id = comparison_contactid 
       best_match_str = comparison_record_str 

    return (best_match_str, best_score, best_match_id) 

def score_it_threshold_match(target_contact_id, target_str, ALL_records_dict_input): 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str) 
    score_threshold = 95 

    #score it 
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems(): 
     if target_contact_id != comparison_contactid: 
      current_score = fuzz.ratio(target_str, comparison_record_str) 

      if current_score > score_threshold: 
       return (comparison_record_str, current_score, comparison_contactid) 

    return (None, None, None) 

def score_it_closest_match_threshold_bag(target_contact_id, target_str, ALL_records_dict): 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match 
    threshold_score = 80 
    top_matches_list = [] 
    #score it 
    #iterate through dictionary 
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems(): 
     if target_contact_id != comparison_contactid: 
      current_score = fuzz.ratio(target_str, comparison_record_str) 

      if current_score > threshold_score: 
       top_matches_list.append((comparison_record_str, current_score, comparison_contactid)) 

    if len(top_matches_list) > 0: return top_matches_list 

def score_it_closest_match_threshold_bag_print(target_contact_id, target_str, ALL_records_dict): 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match 
    threshold_score = 80 

    #iterate through dictionary 
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems(): 
     if target_contact_id != comparison_contactid: 

      #score it 
      current_score = fuzz.ratio(target_str, comparison_record_str) 
      if current_score > threshold_score: 
       print target_contact_id + ':' + str((target_str,comparison_record_str, current_score, comparison_contactid)) 


#stream in all contacts ie large set 
for line in sys.stdin: 
    ts = time.time() 
    st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') 
    print >> sys.stderr, line, st 

    contact_id, target_str = line.strip().split(':', 1) 

    score_it_closest_match_threshold_bag_print(contact_id, target_str, normalized_records_ALL_dict) 
    # output = (target_str, score_it_closest_match_fuzz(contact_id, target_str, normalized_records_ALL_dict)) 
    # output = (target_str, score_it_closest_match_threshold_bag(contact_id, target_str, normalized_records_ALL_dict)) 
    # print contact_id + ':' + str(output) 



Ваш подход требует от вас составить 180 000 * 2500 000 = 450 000 000 000 сравнений.

450 миллиардов - это много.

Чтобы уменьшить количество сравнений, вы можете сначала сгруппировать записи, которые имеют некоторые общие черты, такие как первые пять символов поля адреса или общий токен. Затем сравните только записи, в которых есть функция. Эта идея называется «блокировкой» и обычно уменьшает количество полных сравнений, которые вы должны сделать, чтобы что-то управляемое.

Общая проблема, которую вы пытаетесь решить, называется «record linkage». Поскольку вы используете python, вы можете посмотреть на , который предлагает комплексный подход.