2015-12-07 4 views
0

Я партия загрузки Neo4j графа, используя py2neo с помощью этого скрипта:Пакетная загрузка Neo4j

batch = neo4j.WriteBatch(graph) 
counter = 0 
for each in ans: 
    n1 = graph.merge_one("Page", "url", each[0]) 
#  batch.create(n1) 
    counter +=1 
    for linkvalue in each[6]: 
     try: 
      text,link = linkvalue.split('!__!') 
      n2 = graph.merge_one("Page", "url", link) 
#    batch.create(n2) 
      counter+=1 
      rel = Relationship(n1,'LINKS',n2, anchor_text=text) 
      batch.create(rel) 

     except (KeyboardInterrupt, SystemExit): 
      print 'fail' 
      raise 

    if counter > 900: 
     counter = 0 
     batch.submit() 
     print 'submit' 
     batch = neo4j.WriteBatch(graph) 

merge_one ОБА сделать вызов на графике, который я считаю, замедляет мой алгоритм. Я прокомментировал batch.create(), потому что они воссоздавали узлы. Есть ли способ сделать эту функцию, но сохранить ее до тех пор, пока я не batch.submit() не ускорит процесс?

Я обрабатываю около 50 000 узлов и 1 000 000 отношений.

ответ

1

Вам необходимо добавить утверждения в WriteBatch, а затем run пакет, когда он достигнет некоторого количества операторов.

Вот пример:

import json 
from py2neo.neo4j import CypherQuery, GraphDatabaseService, WriteBatch 
from py2neo import neo4j 

db = neo4j.GraphDatabaseService() 

business_index_query = CypherQuery(db, "CREATE INDEX ON :Business(id)") 
business_index_query.execute() 

category_index_query = CypherQuery(db, "CREATE INDEX ON :Category(name)") 
category_index_query.execute() 

create_business_query = ''' 
    CREATE (b:Business {id: {business_id}, name: {name}, lat:{latitude}, 
    lon:{longitude}, stars: {stars}, review_count: {review_count}}) 
''' 

merge_category_query = ''' 
    MATCH (b:Business {id: {business_id}}) 
    MERGE (c:Category {name: {category}}) 
    CREATE UNIQUE (c)<-[:IS_IN]-(b) 
''' 

print "Beginning business batch" 
with open('data/yelp_academic_dataset_business.json', 'r') as f: 
    business_batch = WriteBatch(db) 
    count = 0 
    for b in (json.loads(l) for l in f): 
     business_batch.append_cypher(create_business_query, b) 
     count += 1 
     if count >= 10000: 
      business_batch.run() 
      business_batch.clear() 
      count = 0 
    if count > 0: 
     business_batch.run() 

print "Beginning category batch" 
with open('data/yelp_academic_dataset_business.json', 'r') as f: 
    category_batch = WriteBatch(db) 
    count = 0 
    for b in (json.loads(l) for l in f): 
     for c in b['categories']: 
      category_batch.append_cypher(merge_category_query, {'business_id': b['business_id'], 'category': c}) 
      count += 1 
      if count >= 10000: 
       category_batch.run() 
       category_batch.clear() 
       count = 0 
    if count > 0: 
     category_batch.run() 

Обратите внимание, что в этом примере используется только Cypher заявления и добавляет каждое заявление в WriteBatch. Также этот пример использует два разных экземпляра WriteBatch.

Смежные вопросы