2014-11-15 3 views
0

У меня проблемы. Следующая строка выполняется нормально на клиенте при первом чтении, но не выполняется при втором чтении.ObjectInputStream возвращает ObjectStreamClass

WorkerNode.java:72 Message task = (Message) in.readObject();

в частный ObjectInputStream. Исключением является следующее:

Exception in thread "main" java.lang.ClassCastException: java.io.ObjectStreamClass cannot be cast to parallelprogramming.Message 
    at parallelprogramming.WorkerNode.receiveTask(WorkerNode.java:72) 
    at parallelprogramming.WorkerNode.computeTillEndOfWork(WorkerNode.java:139) 
    at parallelprogramming.Worker.main(Worker.java:24) 
Nov 15, 2014 11:07:15 PM parallelprogramming.WorkerNode receiveTask 
SEVERE: null 
java.io.StreamCorruptedException: invalid type code: 00 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at parallelprogramming.WorkerNode.receiveTask(WorkerNode.java:72) 
    at parallelprogramming.WorkerNode.lambda$startListeningForWork$0(WorkerNode.java:59) 
    at parallelprogramming.WorkerNode$$Lambda$1/798154996.run(Unknown Source) 
    at java.lang.Thread.run(Thread.java:745) 

Exception in thread "WorkListener" java.lang.ClassCastException: parallelprogramming.MatMulTask cannot be cast to parallelprogramming.Message 
    at parallelprogramming.WorkerNode.receiveTask(WorkerNode.java:72) 
    at parallelprogramming.WorkerNode.lambda$startListeningForWork$0(WorkerNode.java:59) 
    at parallelprogramming.WorkerNode$$Lambda$1/798154996.run(Unknown Source) 
    at java.lang.Thread.run(Thread.java:745) 
Java Result: 1 

Сообщение имеет следующую структуру.

public class Message implements IMessage{ 
    private final MessageType type; 
    private final Object payload; 

IMessage расширяет Serializable. Я использую тот же ObjectInputStream и ObjectOutputStream как на стороне клиента, так и на стороне сервера. Я пробовал искать, но не повезло. Кто-нибудь другой нашел что-то подобное?

EDIT2: код, который посылает сообщение для работника:

private final Map<String, WorkerConn> nodes; 
//nodes initialized in constructor 
private void sendTaskToNode(ITask task, String node) { 
     if(task == null){ 
      return; 
     }   
     try{ 
      Message msg = new Message(MessageType.task, task); 
      nodes.get(node).sendObject(msg); 

      nodes.get(node).incrementworkCount(); 

      System.out.println("Sent work to "+node); 
     } catch (IOException ex) { 
      Logger.getLogger(MasterNode.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

Отправить объект в WorkerConn:

public WorkerConn(Socket socket, String name) throws IOException { 
     this.socket = socket; 
     this.out = new ObjectOutputStream(socket.getOutputStream()); 
     this.in = new ObjectInputStream(socket.getInputStream()); 
     this.name = name; 
     workCount = 0; 
    } 

void sendObject(Message msg) throws IOException { 
     out.writeObject(msg); 
    } 

Та часть, которая получает сообщение от работника:

public void receiveTask() throws NotConnectedToMasterException { 
     try { 
      Message task = (Message) in.readObject(); 
      if(task.getMessageType() == MessageType.task){ 
       tasks.add((ITask) task.getPayload()); 
       System.out.println("Received Task"); 
      }else if(task.getMessageType() == MessageType.endOfWork){ 
       ITask t = new AbstractTask() { 

        @Override 
        public Object call() throws Exception { 
         throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. 
        } 
       }; 
       t.setDeathPill(); 
       tasks.add(t); 
       System.out.println("added deathpill"); 
      }else{ 
       System.out.println("Received "+task.getMessageType()); 
      } 
     } catch(EOFException ex) { 
      return; 
     } catch (IOException ex) { 
      Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex); 
      return; 
     } catch (ClassNotFoundException ex) { 
      Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

EDIT: Нашел мою проблему. Я создавал поток для прослушивания входящих задач, чтения входного потока и также читал один и тот же поток ввода из основного потока, когда очередь задач была пуста. В Рабочий узел:

private void startListeningForWork(){ 
     workListener = new Thread(() -> { 
      while(!master.isClosed()){ 
       try { 
        receiveTask(); 
       } catch (NotConnectedToMasterException ex) { 
        break; 
       } 
      } 
     }); 
     workListener.setName("WorkListener"); 
     workListener.start(); 
    } 

и

while(!task.isDeathPill()){ 
      try {     
       results.addToResult(task.call()); 
       sendACKtoMaster();     
       task = tasks.remove(); 
      } catch (Exception ex) { 
       try { 
        receiveTask(); 
       } catch (NotConnectedToMasterException ex1) { 
        Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex1); 
        break; 
       } 
      } 
     } 

Это вызывало один из потоков, чтобы прочитать объект перед другими и натворить.

+0

Показать, куда вы отправляете данные –

+0

загрузил весь код в github. link в конце сообщения – kalgecin

+1

Прошу прощения, но я не буду просматривать все эти файлы для одного оператора writeObject, который коррелирует с этим конкретным оператором readObject (тем более, что я нахожусь на телефоне). По крайней мере, скажите, какой класс, в котором пакет –

ответ

0

Нашел мою проблему. Я создавал поток для прослушивания входящих задач, чтения входного потока и также читал один и тот же поток ввода из основного потока, когда очередь задач была пуста. В Рабочий узел:

private void startListeningForWork(){ 
     workListener = new Thread(() -> { 
      while(!master.isClosed()){ 
       try { 
        receiveTask(); 
       } catch (NotConnectedToMasterException ex) { 
        break; 
       } 
      } 
     }); 
     workListener.setName("WorkListener"); 
     workListener.start(); 
    } 

и

while(!task.isDeathPill()){ 
      try {     
       results.addToResult(task.call()); 
       sendACKtoMaster();     
       task = tasks.remove(); 
      } catch (Exception ex) { 
       try { 
        receiveTask(); 
       } catch (NotConnectedToMasterException ex1) { 
        Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex1); 
        break; 
       } 
      } 
     } 

Это вызывало один из потоков, чтобы прочитать объект перед другими и натворить.

+0

Как предсказал @jtahlborn. – EJP

+0

yeah :) отлаживал каждую строку до того, как я ее нашел :(java нуждается в более подробных объяснениях в исключениях для такого рода вещей: p или лучше ручные гонки – kalgecin