Скажем, у меня есть AtomicReference
к списку объектов:AtomicReference на изменяемый объект и видимость
AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());
Поток А добавляет элементы в этот список: batch.get().add(o);
Позже нить B принимает список и, например, хранит его в БД: insertBatch(batch.get());
Должен ли я выполнять дополнительную синхронизацию при записи (Thr ead A) и чтения (Thread B), чтобы гарантировать, что нить B видит список так, как он покинул его, или об этом заботится AtomicReference?
Другими словами: если у меня есть AtomicReference к изменяемому объекту, и один поток изменяет этот объект, то другие потоки сразу видят это изменение?
Edit:
Может быть несколько примеров кода в порядке:
public void process(Reader in) throws IOException {
List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
ExecutorService exec = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; ++i) {
tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
@Override public AtomicReference<List<Object>> call() throws IOException {
final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));
Processor.this.parser.parse(in, new Parser.Handler() {
@Override public void onNewObject(Object event) {
batch.get().add(event);
if (batch.get().size() >= batchSize) {
dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
}
}
});
return batch;
}
}));
}
List<Object> remainingBatches = new ArrayList<Object>();
for (Future<AtomicReference<List<Object>>> task : tasks) {
try {
AtomicReference<List<Object>> remainingBatch = task.get();
remainingBatches.addAll(remainingBatch.get());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
}
throw (RuntimeException)cause;
}
}
// these haven't been flushed yet by the worker threads
if (!remainingBatches.isEmpty()) {
dao.insertBatch(remainingBatches);
}
}
Что здесь происходит то, что я создаю четыре рабочих потоков, чтобы разобрать текст (это Reader in
параметр метода process()
) , Каждый работник сохраняет строки, которые он разбирает в партии, и сбрасывает пакет, когда он заполнен (dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
).
Поскольку количество строк в тексте не кратно размеру партии, последние объекты попадают в пакет, который не очищается, так как он не заполнен. Эти оставшиеся партии, следовательно, вставлены основной нитью.
Я использую AtomicReference.getAndSet()
, чтобы заменить полную партию пустым. Это эта программа правильная в отношении резьбы?
Хорошо, я добавил код примера к моему вопросу выше. Я использую 'AtomicReference.getAndSet()', чтобы заменить полную партию свежей пустой. Мне еще нужна дополнительная синхронизация? –
Да, ваш код верен, хотя использование «AtomicReference» здесь не требуется. – Tudor
@ Тьюдор думал о том же самом.На самом деле getAndSet() может не делать то, что он хочет, потому что он получит текущее значение, а затем изменит значение AtomicReference. –