2013-06-24 3 views
15

Я объединил код ниже, который не делает ничего сложного - просто создает переменную byte [], записывает ее в поле blob в Cassandra (v1. 2, через новую библиотеку Datastax CQL), затем снова считывает ее.Сериализация объектов Java в Cassandra 1.2 через ByteBuffer & CQL 3

Когда я положил его на 3 элемента, и когда я прочитал его, это 84 элемента длиной ...! Это означает, что вещь, которую я на самом деле пытаюсь сделать (сериализация объектов Java), с ошибкой org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008 при попытке десериализации снова.

Вот пример кода, который демонстрирует мою проблему:

import java.nio.ByteBuffer; 

import org.apache.commons.lang.SerializationUtils; 

import com.datastax.driver.core.BoundStatement; 
import com.datastax.driver.core.Cluster; 
import com.datastax.driver.core.Host; 
import com.datastax.driver.core.Metadata; 
import com.datastax.driver.core.PreparedStatement; 
import com.datastax.driver.core.ResultSet; 
import com.datastax.driver.core.Row; 
import com.datastax.driver.core.Session; 

public class TestCassandraSerialization { 


    private Cluster cluster; 
    private Session session; 

    public TestCassandraSerialization(String node) { 
     connect(node); 
    } 

    private void connect(String node) { 
     cluster = Cluster.builder().addContactPoint(node).build(); 
     Metadata metadata = cluster.getMetadata(); 
     System.out.printf("Connected to %s\n", metadata.getClusterName()); 
     for (Host host: metadata.getAllHosts()) { 
       System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", 
         host.getDatacenter(), host.getAddress(), host.getRack()); 
     } 
     session = cluster.connect(); 
    } 

    public void setUp() { 
     session.execute("CREATE KEYSPACE test_serialization WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};"); 

     session.execute("CREATE TABLE test_serialization.test_table (id text PRIMARY KEY, data blob)"); 
    } 

    public void tearDown() { 
     session.execute("DROP KEYSPACE test_serialization"); 
    } 

    public void insertIntoTable(String key, byte[] data) { 
     PreparedStatement statement = session.prepare("INSERT INTO test_serialization.test_table (id,data) VALUES (?, ?)"); 
     BoundStatement boundStatement = new BoundStatement(statement); 
     session.execute(boundStatement.bind(key,ByteBuffer.wrap(data))); 
    } 

    public byte[] readFromTable(String key) { 
     String q1 = "SELECT * FROM test_serialization.test_table WHERE id = '"+key+"';"; 

     ResultSet results = session.execute(q1); 
     for (Row row : results) { 
      ByteBuffer data = row.getBytes("data"); 
      return data.array(); 
     } 
     return null; 
    } 


    public static boolean compareByteArrays(byte[] one, byte[] two) { 
     if (one.length > two.length) { 
      byte[] foo = one; 
      one = two; 
      two = foo; 
     } 

     // so now two is definitely the longer array  
     for (int i=0; i<one.length; i++) { 
      //System.out.printf("%d: %s\t%s\n", i, one[i], two[i]); 
      if (one[i] != two[i]) { 
       return false; 
      } 
     } 
     return true; 
    } 


    public static void main(String[] args) { 
     TestCassandraSerialization tester = new TestCassandraSerialization("localhost"); 

     try { 
      tester.setUp(); 
      byte[] dataIn = new byte[]{1,2,3}; 
      tester.insertIntoTable("123", dataIn); 
      byte[] dataOut = tester.readFromTable("123"); 

      System.out.println(dataIn); 
      System.out.println(dataOut); 

      System.out.println(dataIn.length); // prints "3" 
      System.out.println(dataOut.length); // prints "84" 

      System.out.println(compareByteArrays(dataIn, dataOut)); // prints false   

      String toSave = "Hello, world!"; 
      dataIn = SerializationUtils.serialize(toSave); 
      tester.insertIntoTable("toSave", dataIn); 
      dataOut = tester.readFromTable("toSave"); 

      System.out.println(dataIn.length); // prints "20" 
      System.out.println(dataOut.length); // prints "104" 


      // The below throws org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008 
      String hasLoaded = (String) SerializationUtils.deserialize(dataOut); 
      System.out.println(hasLoaded); 

     } finally { 
      tester.tearDown(); 
     } 
    } 
} 

Похоже, правильный материал делает его в базу данных:

cqlsh:flight_cache> select * from test_serialization.test_table; 

id  | data 
--------+-------------------------------------------- 
    123 |         0x010203 
toSave | 0xaced000574000d48656c6c6f2c20776f726c6421 

cqlsh:flight_cache> 

Так выглядит как ошибка при чтении, а не запись двоичных данных. Может ли кто-нибудь дать мне какие-либо указания относительно того, что я делаю неправильно?

ответ

31

Проблема почти наверняка связана с тем, что массив, возвращаемый ByteBuffer.array(), является полным массивом поддержки, но данные могут содержаться только в его части.

Действительные данные, которые возвращаются, начинаются с ByteBuffer.arrayOffset() и имеют длину ByteBuffer.remaining(). Чтобы получить массив байтов, содержащий только действительные данные, используйте этот код в readFromTable:

byte[] result = new byte[data.remaining()]; 
data.get(result); 

тогда ваши данные в результате, и вы можете вернуть это.

+1

Ах! Это точно. Большое спасибо! –

+1

Спасибо, Ричард. Ты тоже спасал мой день :) – Easility

+0

@ Рихард, большое спасибо ... – AlexR

9

Поскольку вы уже используете DataStax Java Driver, есть также в com.datastax.driver.core.utilsutility class, которые вы можете использовать, как:

byte[] result = Bytes.getArray(data)