2016-02-20 2 views
0

У меня есть длинный список ID края (около 12 миллиардов), который я готов удалить из моего графика Titan (который размещен на бэкэнд HBase).Массовое удаление краев на Titan 1.0

Как я могу сделать это быстро и эффективно?

Я попытался удалить края через Gremlin, но это слишком медленно для этого количества ребер.

Можно ли напрямую выполнять команды Delete на HBase? Как мне это сделать? (Как собрать ключ, чтобы удалить?)

Благодаря

ответ

1

После двух дней исследований я пришел с решением.

Основная цель - дается очень большая коллекция строки edgeIds, реализует логику, которая удаляет их из графика - Реализация должна поддержать удаление миллиардов ребер, поэтому оно должно быть эффективным в памяти и время.

Непосредственное использование Titan дисквалифицировано, поскольку Titan выполняет множество ненужных экземпляров, которые являются избыточными - как правило, мы не хотим загружать ребра, мы просто хотим удалить их из HBase.

/** 
* Deletes the given edge IDs, by splitting it to chunks of 100,000 
* @param edgeIds Collection of edge IDs to delete 
* @throws IOException 
*/ 
public static void deleteEdges(Iterator<String> edgeIds) throws IOException { 
    IDManager idManager = new IDManager(NumberUtil.getPowerOf2(GraphDatabaseConfiguration.CLUSTER_MAX_PARTITIONS.getDefaultValue())); 
    byte[] columnFamilyName = "e".getBytes(); // 'e' is your edgestore column-family name 
    long deletionTimestamp = System.currentTimeMillis(); 
    int chunkSize = 100000; // Will contact HBase only once per 100,000 records two deletes (=> 50,000 edges, since each edge is removed one time as IN and one time as OUT) 

    org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration(); 
    config.set("hbase.zookeeper.quorum", "YOUR-ZOOKEEPER-HOSTNAME"); 
    config.set("hbase.table", "YOUR-HBASE-TABLE"); 
    List<Delete> deletions = Lists.newArrayListWithCapacity(chunkSize); 

    Connection connection = ConnectionFactory.createConnection(config); 
    Table table = connection.getTable(TableName.valueOf(config.get("hbase.table"))); 

    Iterators.partition(edgeIds, chunkSize) 
      .forEachRemaining(edgeIdsChunk -> deleteEdgesChunk(edgeIdsChunk, deletions, table, idManager, 
        columnFamilyName, deletionTimestamp)); 
} 

/** 
* Given a collection of edge IDs, and a list of Delete object (that is cleared on entrance), 
* creates two Delete objects for each edge (one for IN and one for OUT), 
* and deletes it via the given Table instance 
*/ 
public static void deleteEdgesChunk(List<String> edgeIds, List<Delete> deletions, Table table, IDManager idManager, 
            byte[] columnFamilyName, long deletionTimestamp) { 
    deletions.clear(); 

    for (String edgeId : edgeIds) 
    { 
     RelationIdentifier identifier = RelationIdentifier.parse(edgeId); 

     deletions.add(createEdgeDelete(idManager, columnFamilyName, deletionTimestamp, identifier.getRelationId(), 
       identifier.getTypeId(), identifier.getInVertexId(), identifier.getOutVertexId(), 
       IDHandler.DirectionID.EDGE_IN_DIR); 

     deletions.add(createEdgeDelete(idManager, columnFamilyName, deletionTimestamp, identifier.getRelationId(), 
       identifier.getTypeId(), identifier.getOutVertexId(), identifier.getInVertexId(), 
       IDHandler.DirectionID.EDGE_OUT_DIR)); 
    } 

    try { 
     table.delete(deletions); 
    } 
    catch (IOException e) 
    { 
     logger.error("Failed to delete a chunk due to inner exception: " + e); 
    } 
} 

/** 
* Creates an HBase Delete object for a specific edge 
* @return HBase Delete object to be used against HBase 
*/ 
private static Delete createEdgeDelete(IDManager idManager, byte[] columnFamilyName, long deletionTimestamp, 
             long relationId, long typeId, long vertexId, long otherVertexId, 
             IDHandler.DirectionID directionID) { 

    byte[] vertexKey = idManager.getKey(vertexId).getBytes(0, 8); // Size of a long 
    byte[] edgeQualifier = makeQualifier(relationId, otherVertexId, directionID, typeId); 

    return new Delete(vertexKey) 
      .addColumn(columnFamilyName, edgeQualifier, deletionTimestamp); 
} 

/** 
* Cell Qualifier for a specific edge 
*/ 
private static byte[] makeQualifier(long relationId, long otherVertexId, IDHandler.DirectionID directionID, long typeId) { 
    WriteBuffer out = new WriteByteBuffer(32); // Default length of array is 32, feel free to increase 

    IDHandler.writeRelationType(out, typeId, directionID, false); 
    VariableLong.writePositiveBackward(out, otherVertexId); 
    VariableLong.writePositiveBackward(out, relationId); 

    return out.getStaticBuffer().getBytes(0, out.getPosition()); 
} 

Имейте в виду, что я не считаю типов систем и так - я полагаю, что данные идентификаторы края являются пользовательские края.

Используя эту реализацию, я смог удалить края примерно 2 минут.

Смежные вопросы