2015-07-30 3 views
1

У меня есть следующий метод:Non функция блокировки, которая сохраняет порядок

void store(SomeObject o) { 

} 

Идея этого метода заключается в хранении o для постоянного хранения но функция не должна блокировать. То есть Я не могу/не должен/не должен делать фактическое хранение в том же потоке, который называется store.
Я также не могу начать поток и сохранить объект из другого потока, потому что store можно назвать «огромным» количеством раз, и я не хочу начинать нерестовые потоки.
Так что я варианта, которые я не вижу, как они могут хорошо работать:
1) Использование пула потоков (Executor семьи)
2) В store сохранить объект в списке массива и возвращении. Когда список массивов достигает, например, 1000 (случайное число), затем запустите другой поток, чтобы «сбросить» список массивов в хранилище. Но у меня все еще была бы проблема с большим количеством потоков (пул потоков?)
Так что в обоих случаях единственным требованием я являюсь то, что я постоянно храню объекты в точно таком же порядке, который был передан store. И использование нескольких потоков смешивает вещи.
Как это можно решить?
Как я могу гарантировать:
1) неблокирующем store
2) Точный порядок добавления
3) Я не забочусь о каких-либо гарантий хранения. Если, например, что-то сбой, я не забочусь о потере данных, например. кэшированные в списке массивов перед их сохранением.

ответ

1

Имейте один отдельный поток, который получает элементы из конца очереди (блокирование пустой очереди) и записывает их на диск. Функция вашего основного потока store() просто добавляет элементы в начало очереди.

Вот примерная идея (хотя я предполагаю, что там будет чище и более быстрые способы для осуществления этого в производстве кода, в зависимости от того, как быстро вы нужны вещи, чтобы быть):

import java.util.*; 
import java.io.*; 
import java.util.concurrent.*; 

class ObjectWriter implements Runnable { 

    private final Object END = new Object(); 
    BlockingQueue<Object> queue = new LinkedBlockingQueue(); 

    public void store(Object o) throws InterruptedException { 
     queue.put(o); 
    } 

    public ObjectWriter() { 
     new Thread(this).start(); 
    } 

    public void close() throws InterruptedException { 
     queue.put(END); 
    } 

    public void run() { 
     while (true) { 
      try { 
       Object o = queue.take(); 
       if (o == END) { 
        // close output file. 
        return; 
       } 

       System.out.println(o.toString()); // serialize as appropriate 
      } catch (InterruptedException e) { 
      } 
     } 
    } 
} 

public class Test { 
    public static void main(String[] args) throws Exception { 
     ObjectWriter w = new ObjectWriter(); 
     w.store("hello"); 
     w.store("world"); 
     w.close(); 
    } 
} 
+1

Почему не 'ConcurrentLinkedQueue'? – Jim

+0

'put' блокирует право? – Jim

+0

'put' блоки в полной очереди. По умолчанию максимальный размер очереди равен 'Integer.MAX_VALUE', поэтому на практике все будет хорошо. Тем не менее, «ConcurrentLinkedQueue» звучит лучше, спасибо за то, что указали! – Vlad

2

Я хотел бы использовать SingleThreadExecutor и BlockingQueue.

SingleThreadExecutor как имя sais имеет одну единственную тему. Используйте его для опроса в очереди и сохраняйте объекты, блокируя, если они пусты.

Вы можете добавить, не блокируя очередь в методе хранилища.

EDIT На самом деле, вам даже не нужно, что дополнительные очереди - JavaDoc of newSingleThreadExecutor саис:

Создает Исполнителю, который использует единый рабочий поток работает от с неограниченной очередью. (Обратите внимание, что если этот единственный поток завершается из-за сбоя во время выполнения до выключения, новый, если потребуется, будет занят, чтобы выполнять последующие задачи.) Гарантируется выполнение задач последовательно и не более одной задачи будет активна в любой момент времени. В отличие от эквивалентного newFixedThreadPool (1), возвращенный исполнитель гарантированно не может быть перенастроен для использования дополнительных потоков.

Так что я думаю, что это именно то, что вам нужно.

private final ExecutorService persistor = Executors.newSingleThreadExecutor(); 
public void store(final SomeObject o){ 
    persistor.submit(new Runnable(){ 
      @Override public void run(){ 
       // your persist-code here. 
      } 
     }); 
} 

Преимущество использования Runnable, который имеет квазибесконечной петлю и с помощью дополнительной очереди будет возможность кодировать некоторые «взрывную» -functionality. Например, вы можете заставить его ждать, пока он не останется в очереди или не будет найден старейший элемент по крайней мере 1 минута назад.

+0

Почему блокировка, а не параллельная ссылка? – Jim

+1

Потому что поток будет блокироваться в пустой очереди без дорогостоящего ожидания. Выбор конкретной реализации зависит от вас. – Fildor

1

Предлагаю использовать Chronicle-Queue, который является созданной мной библиотекой.

Это позволяет вам писать в текущем потоке без блокировки. Он был первоначально разработан для систем с низкой задержкой. Для небольших сообщений для написания сообщения требуется около 300 нс.

Вам не нужно использовать обратную линию заземления или очередь на кучу, и она не дожидается, когда данные будут записаны на диск по умолчанию. Он также обеспечивает последовательный порядок для всех читателей. Если программа умирает в любой момент после вызова finish(), сообщение не будет потеряно. (Если ОС не выйдет из строя/не потеряет питание), она также поддерживает репликацию, чтобы избежать потери данных.

+3

Звучит как очень большая пушка для очень маленькой задачи ... – Holger

+0

@ Хольгер, возможно, он сделает все, что хочет OP, без потоков в очереди кучи. +1 –

0

Замечания в вашем вопросе звучат так, как будто вы нерелигиозны с многопоточными, но это действительно не так сложно.

Вам просто нужен другой поток, ответственный за запись в хранилище, которое выбирает элементы из очереди. - ваша функция store просто добавляет объекты в очередь в памяти и продолжает свой путь.

Некоторые псевдо-иш код:

final List<SomeObject> queue = new List<SomeObject>(); 

void store(SomeObject o) { 
    // add it to the queue - note that modifying o after this will also alter the 
    // instance in the queue 
    synchronized(queue) { 
     queue.add(queue); 
     queue.notify(); // tell the storage thread there's something in the queue 
    } 
} 

void storageThread() { 
    SomeObject item; 
    while (notfinished) { 
     synchronized(queue) { 
      if (queue.length > 0) { 
      item = queue.get(0); // get from start to ensure same order 
      queue.removeAt(0); 
      } else { 
      // wait for something 
      queue.wait(); 
      continue; 
      } 
     } 

     writeToStorage(item); 
    } 
} 
+0

Код, который вы разместили в блоке 'store' – Jim

+0

@Jim Я так не думаю. Где, по вашему мнению, он заблокирован? – adelphus

+0

Вы выполняете синхронизацию в очереди. – Jim