2015-01-29 3 views
2

Я хочу начать группу из 10 потоков. В моей основной конструктор программы я использую:Непонимание ExecutorService - я думаю

executor = Executors.newFixedThreadPool(NTHREADS); 

Callable<String> poller; 
for (int i = 0; i < NTHREADS; ++i) { 
    Future<String> future = executor.submit(new Poller(0x3A, m_socket, ds_in, ds_out, socketLock)); 
    set.add(future); 
} 

Для метода вызова() в классе Poller у меня есть:

public String call() 
{ 
    // This has to be set here, otherwise all threads will have a name of "main". 
    myID = Thread.currentThread().getName(); 

    boolean shutup_loop = true; 
    do { 
     System.out.println("Hey, I'm thread " + myID); 
     System.out.println("Hey, I'm thread " + Thread.currentThread().getName()); 
     try { 
      Thread.sleep(10); 
     } 
     catch (java.lang.InterruptedException e) { 
      System.out.println("thread " + myID + ": " + e); 
     } 

     // Do if you want the printing to all match up on one line 
     synchronized (this) { 
      ByteArrayOutputStream baos = SendReceive(pollPacket); 

      System.out.print(myID + ": "); 
      if (baos != null) { 
       printStuff(baos); 
       System.out.println(); 
      } 
      notify(); 
     } 

    } while (shutup_loop); 

    return "poller is finished"; 
} 

Эти нити Poller называют SendReceive(), часть класса Poller:

public synchronized ByteArrayOutputStream SendReceive(byte[] toSend) 
{ 
    System.out.println("START"); 
    System.out.println("SendReceive()1 " + myID); 
    System.out.println("SendReceive()2 " + Thread.currentThread().getName()); 
    System.out.println("END"); 

    try { 
     ds_out.write(toSend, 0, toSend.length); 
     ds_out.flush(); 
    } 
    catch (java.io.IOException e) { 
     System.out.println("thread " + myID + ": " + e); 
    } 

    try { 
     m_socket.setSoTimeout(200);  // <-- might need tweaking 
    } 
    catch (java.net.SocketException e) { 
     System.out.println("thread " + myID + ": " + e); 
    } 

    ByteArrayOutputStream baos = null; 
    try { 
     baos = getResponse(ds_in); 
    } 
    catch (java.io.IOException e) { 
     System.out.println("thread " + myID + ": " + e); 
    } 

    return baos; 
} 

Поскольку это синхронизировано метод, который я бы ожидать выход напоминать:

START 
SendReceive()1 pool-1-thread-1 
SendReceive()2 pool-1-thread-1 
END 

START 
SendReceive()1 pool-1-thread-2 
SendReceive()2 pool-1-thread-2 
END 

Вместо этого он делает:

START 
START 
START 
START 
START 
START 
SendReceive()1 pool-1-thread-2 
START 
START 
START 
SendReceive()1 pool-1-thread-6 
SendReceive()1 pool-1-thread-7 
SendReceive()2 pool-1-thread-2 
SendReceive()1 pool-1-thread-3 
SendReceive()2 pool-1-thread-6 
SendReceive()1 pool-1-thread-1 
SendReceive()1 pool-1-thread-9 
SendReceive()1 pool-1-thread-8 
SendReceive()2 pool-1-thread-9 
END 
... 

Что дает?

+0

Ваш вопрос о 'synchronized', а не' ExecutorService'. – immibis

+0

Ваш вопрос о 'synchronized', а не' ExecutorService'. – immibis

+0

Удалить 'synchronized' из' SendReceive' декларации (это все равно ничего не делает). Измените 'synchronized (this)' вокруг вызова 'SendReceive' на' synchronized (Poller.class) '. Это заставит его работать должным образом. См. Ответ assylias, чтобы понять, почему. – Dima

ответ

2

synchronized использует this как блокировку: в вашем случае у вас есть несколько экземпляров Poller, поэтому каждый использует другую блокировку. Для того, чтобы сделать его работу вам нужен общий замок:

  • либо сделать метод static
  • или использовать общий private static final Object lock = new Object(); и использовать synchronized(lock) {...}
0

Если то, что вы после обмена сокета между пуллерами, вам не нужно использовать сервис исполнителя. В любом случае вам необходимо сериализовать использование сокета, поэтому вы можете просто перебрать список задач опроса и передать текущий сокет для работы.

С другой стороны, если вы действительно хотите провести опрос параллельно с использованием одного и того же сокета, ваш сетевой протокол должен поддерживать это, разрешив вам отправлять голосовое сообщение, не дожидаясь ответа на него; у вас будет несколько запросов в полете. Но я подозреваю, что это большая и другая проблема, которую вы пытаетесь решить.

+0

Правда, но я делал это для осуществления синхронизации и блокировки ресурсов. –

Смежные вопросы