2013-06-17 2 views
0

Текущий проект, над которым я работаю, требует, чтобы я реализовал способ эффективно передавать набор объектов из одного потока, который выполняется непрерывно, в основной поток. Текущая настройка выглядит примерно так:Передача набора объектов между потоками

У меня есть основной поток, который создает новую тему. Этот новый поток работает непрерывно и вызывает метод, основанный на таймере. Этот метод выбирает группу сообщений из онлайн-источника и организует их в TreeSet.

Этот TreeSet затем должен быть передан обратно в основной поток, чтобы содержащиеся в нем сообщения могли обрабатываться независимо от повторяющегося таймера.

Для лучшей ссылки мой код выглядит следующим образом

// Called by the main thread on start. 
void StartProcesses() 
{ 
    if(this.IsWindowing) 
    { 
     return; 
    } 

    this._windowTimer = Executors.newSingleThreadScheduledExecutor(); 

    Runnable task = new Runnable() { 
     public void run() { 
      WindowCallback(); 
     } 
    }; 

    this.CancellationToken = false; 
    _windowTimer.scheduleAtFixedRate(task, 
      0, this.SQSWindow, TimeUnit.MILLISECONDS); 

    this.IsWindowing = true; 
} 

///////////////////////////////////////////////////////////////////////////////// 

private void WindowCallback() 
{ 
    ArrayList<Message> messages = new ArrayList<Message>(); 

    //TODO create Monitor 
    if((!CancellationToken)) 
    { 
     try 
     { 
      //TODO fix epochWindowTime 
      long epochWindowTime = 0; 
      int numberOfMessages = 0; 
      Map<String, String> attributes; 

      // Setup the SQS client 
      AmazonSQS client = new AmazonSQSClient(new 
        ClasspathPropertiesFileCredentialsProvider()); 

      client.setEndpoint(this.AWSSQSServiceUrl); 

      // get the NumberOfMessages to optimize how to 
      // Receive all of the messages from the queue 

      GetQueueAttributesRequest attributesRequest = 
        new GetQueueAttributesRequest(); 
      attributesRequest.setQueueUrl(this.QueueUrl); 
      attributesRequest.withAttributeNames(
        "ApproximateNumberOfMessages"); 
      attributes = client.getQueueAttributes(attributesRequest). 
        getAttributes(); 

      numberOfMessages = Integer.valueOf(attributes.get(
        "ApproximateNumberOfMessages")).intValue(); 

      // determine if we need to Receive messages from the Queue 
      if (numberOfMessages > 0) 
      { 

       if (numberOfMessages < 10) 
       { 
        // just do it inline it's less expensive than 
        //spinning threads 
        ReceiveTask(numberOfMessages); 
       } 
       else 
       { 
        //TODO Create a multithreading version for this 
        ReceiveTask(numberOfMessages); 
       } 
      } 

      if (!CancellationToken) 
      { 

       //TODO testing 
       _setLock.lock(); 

       Iterator<Message> _setIter = _set.iterator(); 
       //TODO 
       while(_setIter.hasNext()) 
       { 
        Message temp = _setIter.next(); 

        Long value = Long.valueOf(temp.getAttributes(). 
          get("Timestamp")); 
        if(value.longValue() < epochWindowTime) 
        { 
         messages.add(temp); 
         _set.remove(temp); 
        } 
       } 

       _setLock.unlock(); 

       // TODO deduplicate the messages 

       // TODO reorder the messages 

       // TODO raise new Event with the results 
      } 

      if ((!CancellationToken) && (messages.size() > 0)) 
      { 
       if (messages.size() < 10) 
       { 
        Pair<Integer, Integer> range = 
          new Pair<Integer, Integer>(Integer.valueOf(0), 
            Integer.valueOf(messages.size())); 
        DeleteTask(messages, range); 
       } 
       else 
       { 
        //TODO Create a way to divide this work among 
        //several threads 
        Pair<Integer, Integer> range = 
          new Pair<Integer, Integer>(Integer.valueOf(0), 
            Integer.valueOf(messages.size())); 
        DeleteTask(messages, range); 
       } 
      } 
     }catch (AmazonServiceException ase){ 
      ase.printStackTrace(); 
     }catch (AmazonClientException ace) { 
      ace.printStackTrace(); 
     } 
    } 
} 

Как можно увидеть некоторые из комментариев, мой текущий предпочтительный способ справиться с этим путем создания события в потоке таймера, если есть сообщения , Затем основной поток будет прослушивать это событие и обрабатывать его соответствующим образом.

В настоящее время я не знаком с тем, как Java обрабатывает события или как их создавать/слушать. Я также не знаю, можно ли создавать события и содержать информацию, содержащуюся в них, между потоками.

Может кто-нибудь, пожалуйста, дайте мне несколько советов/соображений относительно того, возможны ли мои методы? Если да, где я могу найти некоторую информацию о том, как их реализовать, поскольку мои текущие поисковые попытки не доказывают плодотворности.

Если нет, могу ли я получить некоторые предложения о том, как я буду это делать, имея в виду, что я бы хотел избежать необходимости управлять сокетами, если это вообще возможно.

EDIT 1:

Основной поток также будет отвечать за выдачу команд на основе сообщений, которые он получает, или выдавать команды, чтобы получить необходимую информацию. По этой причине основной поток не может ждать при получении сообщений и должен обрабатывать их на основе событий.

ответ

1

Производитель-Потребитель Узор:

Один поток (продюсер) непрерывно стеки объектов (сообщений) в очереди. другой поток (потребитель) считывает и удаляет объекты из очереди.

Если ваша проблема соответствует этому, попробуйте «BlockingQueue». http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

Это простой и эффективный способ.

Если очередь пуста, потребитель будет «блокирован», что означает, что поток ожидает (так что не использует время процессора), пока производитель не поместит некоторые объекты. иначе cosumer постоянно потребляет объекты. И если очередь заполнена, создатель будет заблокирован, пока потребитель не будет потреблять некоторые объекты, чтобы сделать комнату в очереди, наоборот.

Вот пример: (очередь должна быть такой же объект как производитель и потребитель)


(Producer резьба)

Message message = createMessage(); 
queue.put(message); 

(Consumer нить)

Message message = queue.take(); 
handleMessage(message); 
+0

К сожалению, BlockingQueue не будет работать, основной поток, который потребляет сообщения, должен также иметь возможность генерировать ели команды к сервису между прочим. – JME

+0

Разве это не просто запрос на две очереди? Передача сообщений на таких языках, как модель Erlang и Scala, имеет очередь почтовых ящиков/сообщений в потоке. – selig

+0

Не так просто, как создание двух очередей, основной поток не передает никакой информации в поток «сбор сообщений» и получает информацию только после его запуска. Основной поток затем создает http-соединения с внешними службами, которые могут или не могут генерировать сообщения для потока «сбора сообщений» для сбора. – JME

Смежные вопросы