Текущий проект, над которым я работаю, требует, чтобы я реализовал способ эффективно передавать набор объектов из одного потока, который выполняется непрерывно, в основной поток. Текущая настройка выглядит примерно так:Передача набора объектов между потоками
У меня есть основной поток, который создает новую тему. Этот новый поток работает непрерывно и вызывает метод, основанный на таймере. Этот метод выбирает группу сообщений из онлайн-источника и организует их в TreeSet.
Этот TreeSet затем должен быть передан обратно в основной поток, чтобы содержащиеся в нем сообщения могли обрабатываться независимо от повторяющегося таймера.
Для лучшей ссылки мой код выглядит следующим образом
// Called by the main thread on start.
void StartProcesses()
{
if(this.IsWindowing)
{
return;
}
this._windowTimer = Executors.newSingleThreadScheduledExecutor();
Runnable task = new Runnable() {
public void run() {
WindowCallback();
}
};
this.CancellationToken = false;
_windowTimer.scheduleAtFixedRate(task,
0, this.SQSWindow, TimeUnit.MILLISECONDS);
this.IsWindowing = true;
}
/////////////////////////////////////////////////////////////////////////////////
private void WindowCallback()
{
ArrayList<Message> messages = new ArrayList<Message>();
//TODO create Monitor
if((!CancellationToken))
{
try
{
//TODO fix epochWindowTime
long epochWindowTime = 0;
int numberOfMessages = 0;
Map<String, String> attributes;
// Setup the SQS client
AmazonSQS client = new AmazonSQSClient(new
ClasspathPropertiesFileCredentialsProvider());
client.setEndpoint(this.AWSSQSServiceUrl);
// get the NumberOfMessages to optimize how to
// Receive all of the messages from the queue
GetQueueAttributesRequest attributesRequest =
new GetQueueAttributesRequest();
attributesRequest.setQueueUrl(this.QueueUrl);
attributesRequest.withAttributeNames(
"ApproximateNumberOfMessages");
attributes = client.getQueueAttributes(attributesRequest).
getAttributes();
numberOfMessages = Integer.valueOf(attributes.get(
"ApproximateNumberOfMessages")).intValue();
// determine if we need to Receive messages from the Queue
if (numberOfMessages > 0)
{
if (numberOfMessages < 10)
{
// just do it inline it's less expensive than
//spinning threads
ReceiveTask(numberOfMessages);
}
else
{
//TODO Create a multithreading version for this
ReceiveTask(numberOfMessages);
}
}
if (!CancellationToken)
{
//TODO testing
_setLock.lock();
Iterator<Message> _setIter = _set.iterator();
//TODO
while(_setIter.hasNext())
{
Message temp = _setIter.next();
Long value = Long.valueOf(temp.getAttributes().
get("Timestamp"));
if(value.longValue() < epochWindowTime)
{
messages.add(temp);
_set.remove(temp);
}
}
_setLock.unlock();
// TODO deduplicate the messages
// TODO reorder the messages
// TODO raise new Event with the results
}
if ((!CancellationToken) && (messages.size() > 0))
{
if (messages.size() < 10)
{
Pair<Integer, Integer> range =
new Pair<Integer, Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages, range);
}
else
{
//TODO Create a way to divide this work among
//several threads
Pair<Integer, Integer> range =
new Pair<Integer, Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages, range);
}
}
}catch (AmazonServiceException ase){
ase.printStackTrace();
}catch (AmazonClientException ace) {
ace.printStackTrace();
}
}
}
Как можно увидеть некоторые из комментариев, мой текущий предпочтительный способ справиться с этим путем создания события в потоке таймера, если есть сообщения , Затем основной поток будет прослушивать это событие и обрабатывать его соответствующим образом.
В настоящее время я не знаком с тем, как Java обрабатывает события или как их создавать/слушать. Я также не знаю, можно ли создавать события и содержать информацию, содержащуюся в них, между потоками.
Может кто-нибудь, пожалуйста, дайте мне несколько советов/соображений относительно того, возможны ли мои методы? Если да, где я могу найти некоторую информацию о том, как их реализовать, поскольку мои текущие поисковые попытки не доказывают плодотворности.
Если нет, могу ли я получить некоторые предложения о том, как я буду это делать, имея в виду, что я бы хотел избежать необходимости управлять сокетами, если это вообще возможно.
EDIT 1:
Основной поток также будет отвечать за выдачу команд на основе сообщений, которые он получает, или выдавать команды, чтобы получить необходимую информацию. По этой причине основной поток не может ждать при получении сообщений и должен обрабатывать их на основе событий.
К сожалению, BlockingQueue не будет работать, основной поток, который потребляет сообщения, должен также иметь возможность генерировать ели команды к сервису между прочим. – JME
Разве это не просто запрос на две очереди? Передача сообщений на таких языках, как модель Erlang и Scala, имеет очередь почтовых ящиков/сообщений в потоке. – selig
Не так просто, как создание двух очередей, основной поток не передает никакой информации в поток «сбор сообщений» и получает информацию только после его запуска. Основной поток затем создает http-соединения с внешними службами, которые могут или не могут генерировать сообщения для потока «сбора сообщений» для сбора. – JME