2009-04-13 2 views
1

У меня есть пул потоков, которые загружают задачи из очереди. Как правило, небольшое количество потоков может содержать пустую очередь. Иногда особенно большой пакет событий будет поддерживать размер очереди выше нуля в течение некоторого времени, но не надолго.Хороший метод для объединения записей в очередь

Моя озабоченность связана с событиями, которые дублируют или переносят данные, которые убирают предыдущие события. Во время большого объема такие события могут сосуществовать в очереди в течение короткого периода времени. Я хотел бы иметь возможность объединить их, чтобы сократить время, затрачиваемое на растрату.

Что такое хороший способ сконфигурировать такую ​​очередь? Я мог бы объединиться во время ввода, итерации со спины к голове и поиска кандидата для замены, однако это кажется слишком грубым. Если у вас есть рекомендации по коду или библиотеке, имейте в виду, что я использую Java.

ответ

2

Почему бы не просто реализовать hashCode() и equals() на основе ваших задач. Затем просто удалите задачу. Например.

queue.remove(task); 
queue.offer(task); 

Тогда у вас не будет дубликатов. Или альтернативно.

if(!queue.contains(task)) { 
    queue.offer(task); 
} 

Это позволит избежать постановки задачи, если она уже находится в очереди.

+0

Функция удаления() начнет поиск из головы очереди. Если он найдет совпадение, он удалит его, и я добавлю новый элемент в конце, но мне нужно добавить элемент в том же месте. Remove() также создаст проблему упорядочения, если в очереди есть несколько дубликатов. – omerkudat

+0

Затем не удаляйте, просто проверьте, прежде чем добавлять. Обратите внимание на два варианта, которые я предоставил. –

+0

Независимо от того, что я делаю, это все равно изменяет порядок очереди. Я выбрал это решение, но вместо проверки содержит: isEmpty()?offer(): remove(), offer(); Благодарю. – omerkudat

1

Если вы используете LinkedHashMap, вы можете сохранить порядок, в который записи были добавлены в очередь.

Когда приходит подходящая запись, я понимаю, что вы хотите добавить некоторые ее данные в исходную запись в очередь. В этом случае вы можете либо обновить хешированный объект на месте, либо использовать HashMap.put(key, value), чтобы заменить объект в очереди новым объектом. (Я думаю , что это сохраняет порядок исходного пункта, но я не проверял это.)

Обратите внимание, что код нужно будет явно синхронизировать чтения и доступ к LinkedHashMap и данные внутри него пишут. Вы не хотите обновлять элемент в очереди в то же время, когда другой поток захватывает его для обработки. Самый простой способ синхронизации - это, вероятно, доступ к LinkedHashMap по Collections.synchronizedMap().

2

Это conflator, кажется, делает то, что вы ищете: https://github.com/GuillaumeArnaud/conflator

В зависимости от ваших требований, реализация может быть изменена, чтобы объединить или заменить последнее событие с существующим событием, если он существует в очереди конфлации ,

Для примера. для следующего, каждое событие реализуется как «Tick», который определяет поведение слияния.

public class Tick implements Message<Tick> { 

    private final String ticker; 

    public long getInitialQuantity() { 
     return initialQuantity; 
    } 

    private final long initialQuantity; 

    public long getCurrentQuantity() { 
     return currentQuantity; 
    } 

    private long currentQuantity; 
    private int numberOfMerges; 

    public String getTicker() { 
     return ticker; 
    } 

    public Tick(String ticker, long quantity) { 
     this.ticker = ticker; 
     this.initialQuantity = quantity; 
     this.currentQuantity = quantity; 
    } 

    @Override 
    public String key() { 
     return this.ticker; 
    } 

    @Override 
    public String body() { 
     return String.valueOf(currentQuantity); 
    } 

    @Override 
    public boolean isMerged() { 
     return this.initialQuantity != this.currentQuantity; 
    } 

    @Override 
    public int mergesCount() { 
     return numberOfMerges; 
    } 

    @Override 
    public boolean isValid() { 
     return false; 
    } 

    @Override 
    public boolean merge(Tick message) { 
     if (this.equals(message)) { 
      this.currentQuantity += message.currentQuantity; 
      numberOfMerges++; 
      return true; 
     } 
     return false; 
    } 

    @Override 
    public int hashCode() { 
     return ticker.hashCode(); 
    } 

    @Override 
    public boolean equals(Object obj) { 
     if (obj != null && obj instanceof Tick) { 
      Tick other = (Tick) obj; 
      return this.ticker.equals(other.getTicker()); 
     } 
     return false; 
    } 

Тестовый пример:

public class TickMergeTest { 
    MultiValuedMapConflator conflator; 

    @Test 
    public void two_unmergeable_ticks_should_be_remain_unmergeable() { 
     Tick tick1 = new Tick("GOOG", 100L); 
     Tick tick2 = new Tick("AAPL", 120L); 

     List<Tick> messages = conflator.merge(Lists.newArrayList(tick1, tick2)); 

     assertNotNull(messages); 
     assertEquals(messages.size(), 2); 
     assertEquals(Long.valueOf(messages.get(0).body()).longValue(), tick1.getCurrentQuantity()); 
     assertEquals(Long.valueOf(messages.get(1).body()).longValue(), tick2.getCurrentQuantity()); 
    } 

    @Test(timeout = 1000) 
    public void two_mergeable_ticks_should_be_merged() { 
     Tick tick1 = new Tick("GOOG", 100L); 
     Tick tick2 = new Tick("GOOG", 120L); 

     List<Tick> messages = conflator.merge(Lists.newArrayList(tick1, tick2)); 

     assertNotNull(messages); 
     assertEquals(messages.size(), 1); 
     assertEquals(Long.valueOf(messages.get(0).body()).longValue(), tick1.getInitialQuantity() + tick2.getInitialQuantity()); 
    } 

    @Test(timeout = 1000) 
    public void should_merge_messages_on_same_key() throws InterruptedException { 
     // given 
     conflator.put(new Tick("GOOG", 100L)); 
     conflator.put(new Tick("GOOG", 120L)); 

     // test 
     Thread.sleep(300); // waiting the conflation 
     Message message = conflator.take(); 

     // check 
     assertNotNull(message); 
     assertEquals(Long.valueOf(message.body()).longValue(), 220L); 
     assertTrue(message.isMerged()); 
    } 

    @Test(timeout = 1000) 
    public void should_not_merge_messages_on_diff_key() throws InterruptedException { 
     // given 
     conflator.put(new Tick("GOOG", 100L)); 
     conflator.put(new Tick("AAPL", 120L)); 

     // test 
     Thread.sleep(300); // waiting the conflation 
     Message message1 = conflator.take(); 
     Message message2 = conflator.take(); 

     // check 
     assertNotNull(message1); 
     assertNotNull(message2); 

     assertEquals(Long.valueOf(message1.body()).longValue(), 100L); 
     assertFalse(message1.isMerged()); 

     assertEquals(Long.valueOf(message2.body()).longValue(), 120L); 
     assertFalse(message2.isMerged()); 

    } 
    @Before 
    public void setUp() { 
     conflator = new MultiValuedMapConflator<Tick>(true); 
    } 

    @After 
    public void tearDown() { 
     conflator.stop(); 
    } 
} 
+0

Обновлено с помощью контекста и тестовых примеров – Mahesh

Смежные вопросы