У меня проблемы. Следующая строка выполняется нормально на клиенте при первом чтении, но не выполняется при втором чтении.ObjectInputStream возвращает ObjectStreamClass
WorkerNode.java:72 Message task = (Message) in.readObject();
в частный ObjectInputStream. Исключением является следующее:
Exception in thread "main" java.lang.ClassCastException: java.io.ObjectStreamClass cannot be cast to parallelprogramming.Message
at parallelprogramming.WorkerNode.receiveTask(WorkerNode.java:72)
at parallelprogramming.WorkerNode.computeTillEndOfWork(WorkerNode.java:139)
at parallelprogramming.Worker.main(Worker.java:24)
Nov 15, 2014 11:07:15 PM parallelprogramming.WorkerNode receiveTask
SEVERE: null
java.io.StreamCorruptedException: invalid type code: 00
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at parallelprogramming.WorkerNode.receiveTask(WorkerNode.java:72)
at parallelprogramming.WorkerNode.lambda$startListeningForWork$0(WorkerNode.java:59)
at parallelprogramming.WorkerNode$$Lambda$1/798154996.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "WorkListener" java.lang.ClassCastException: parallelprogramming.MatMulTask cannot be cast to parallelprogramming.Message
at parallelprogramming.WorkerNode.receiveTask(WorkerNode.java:72)
at parallelprogramming.WorkerNode.lambda$startListeningForWork$0(WorkerNode.java:59)
at parallelprogramming.WorkerNode$$Lambda$1/798154996.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
Java Result: 1
Сообщение имеет следующую структуру.
public class Message implements IMessage{
private final MessageType type;
private final Object payload;
IMessage расширяет Serializable. Я использую тот же ObjectInputStream и ObjectOutputStream как на стороне клиента, так и на стороне сервера. Я пробовал искать, но не повезло. Кто-нибудь другой нашел что-то подобное?
EDIT2: код, который посылает сообщение для работника:
private final Map<String, WorkerConn> nodes;
//nodes initialized in constructor
private void sendTaskToNode(ITask task, String node) {
if(task == null){
return;
}
try{
Message msg = new Message(MessageType.task, task);
nodes.get(node).sendObject(msg);
nodes.get(node).incrementworkCount();
System.out.println("Sent work to "+node);
} catch (IOException ex) {
Logger.getLogger(MasterNode.class.getName()).log(Level.SEVERE, null, ex);
}
}
Отправить объект в WorkerConn:
public WorkerConn(Socket socket, String name) throws IOException {
this.socket = socket;
this.out = new ObjectOutputStream(socket.getOutputStream());
this.in = new ObjectInputStream(socket.getInputStream());
this.name = name;
workCount = 0;
}
void sendObject(Message msg) throws IOException {
out.writeObject(msg);
}
Та часть, которая получает сообщение от работника:
public void receiveTask() throws NotConnectedToMasterException {
try {
Message task = (Message) in.readObject();
if(task.getMessageType() == MessageType.task){
tasks.add((ITask) task.getPayload());
System.out.println("Received Task");
}else if(task.getMessageType() == MessageType.endOfWork){
ITask t = new AbstractTask() {
@Override
public Object call() throws Exception {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
};
t.setDeathPill();
tasks.add(t);
System.out.println("added deathpill");
}else{
System.out.println("Received "+task.getMessageType());
}
} catch(EOFException ex) {
return;
} catch (IOException ex) {
Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex);
return;
} catch (ClassNotFoundException ex) {
Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex);
}
}
EDIT: Нашел мою проблему. Я создавал поток для прослушивания входящих задач, чтения входного потока и также читал один и тот же поток ввода из основного потока, когда очередь задач была пуста. В Рабочий узел:
private void startListeningForWork(){
workListener = new Thread(() -> {
while(!master.isClosed()){
try {
receiveTask();
} catch (NotConnectedToMasterException ex) {
break;
}
}
});
workListener.setName("WorkListener");
workListener.start();
}
и
while(!task.isDeathPill()){
try {
results.addToResult(task.call());
sendACKtoMaster();
task = tasks.remove();
} catch (Exception ex) {
try {
receiveTask();
} catch (NotConnectedToMasterException ex1) {
Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex1);
break;
}
}
}
Это вызывало один из потоков, чтобы прочитать объект перед другими и натворить.
Показать, куда вы отправляете данные –
загрузил весь код в github. link в конце сообщения – kalgecin
Прошу прощения, но я не буду просматривать все эти файлы для одного оператора writeObject, который коррелирует с этим конкретным оператором readObject (тем более, что я нахожусь на телефоне). По крайней мере, скажите, какой класс, в котором пакет –