2012-02-21 4 views
7

Скажем, у меня есть AtomicReference к списку объектов:AtomicReference на изменяемый объект и видимость

AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>()); 

Поток А добавляет элементы в этот список: batch.get().add(o);

Позже нить B принимает список и, например, хранит его в БД: insertBatch(batch.get());

Должен ли я выполнять дополнительную синхронизацию при записи (Thr ead A) и чтения (Thread B), чтобы гарантировать, что нить B видит список так, как он покинул его, или об этом заботится AtomicReference?

Другими словами: если у меня есть AtomicReference к изменяемому объекту, и один поток изменяет этот объект, то другие потоки сразу видят это изменение?

Edit:

Может быть несколько примеров кода в порядке:

public void process(Reader in) throws IOException { 
    List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>(); 
    ExecutorService exec = Executors.newFixedThreadPool(4); 

    for (int i = 0; i < 4; ++i) { 
     tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() { 
      @Override public AtomicReference<List<Object>> call() throws IOException { 

       final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize)); 

       Processor.this.parser.parse(in, new Parser.Handler() { 
        @Override public void onNewObject(Object event) { 
          batch.get().add(event); 

          if (batch.get().size() >= batchSize) { 
           dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize))); 
          } 
        } 
       }); 

       return batch; 
      } 
     })); 
    } 

    List<Object> remainingBatches = new ArrayList<Object>(); 

    for (Future<AtomicReference<List<Object>>> task : tasks) { 
     try { 
      AtomicReference<List<Object>> remainingBatch = task.get(); 
      remainingBatches.addAll(remainingBatch.get()); 
     } catch (ExecutionException e) { 
      Throwable cause = e.getCause(); 

      if (cause instanceof IOException) { 
       throw (IOException)cause; 
      } 

      throw (RuntimeException)cause; 
     } 
    } 

    // these haven't been flushed yet by the worker threads 
    if (!remainingBatches.isEmpty()) { 
     dao.insertBatch(remainingBatches); 
    } 
} 

Что здесь происходит то, что я создаю четыре рабочих потоков, чтобы разобрать текст (это Reader in параметр метода process()) , Каждый работник сохраняет строки, которые он разбирает в партии, и сбрасывает пакет, когда он заполнен (dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));).

Поскольку количество строк в тексте не кратно размеру партии, последние объекты попадают в пакет, который не очищается, так как он не заполнен. Эти оставшиеся партии, следовательно, вставлены основной нитью.

Я использую AtomicReference.getAndSet(), чтобы заменить полную партию пустым. Это эта программа правильная в отношении резьбы?

ответ

9

Ум ... на самом деле это не так. AtomicReference гарантирует, что сама ссылка видна по потокам, то есть если вы присвоите ей другую ссылку, чем оригинальную, обновление будет видимым. Он не дает никаких гарантий относительно фактического содержимого объекта, на который ссылается ссылка.

Следовательно, операции чтения/записи содержимого списка требуют отдельной синхронизации.

Редактировать: Итак, судя по вашему обновленному коду и комментарию, опубликованному вами, установка локальной ссылки на volatile является достаточной для обеспечения видимости.

+0

Хорошо, я добавил код примера к моему вопросу выше. Я использую 'AtomicReference.getAndSet()', чтобы заменить полную партию свежей пустой. Мне еще нужна дополнительная синхронизация? –

+1

Да, ваш код верен, хотя использование «AtomicReference» здесь не требуется. – Tudor

+0

@ Тьюдор думал о том же самом.На самом деле getAndSet() может не делать то, что он хочет, потому что он получит текущее значение, а затем изменит значение AtomicReference. –

0

Добавление к Tudor «s ответа: Вы будет должен сделать сам ArrayList поточно или - в зависимости от ваших требований - даже большие блоки кода.

Если вы можете уйти с многопоточной ArrayList вы можете «украсить» это так:

batch = java.util.Collections.synchronizedList(new ArrayList<Object>()); 

Но имейте в виду: даже «простой» строит подобные этому не поточно с этим:

Object o = batch.get(batch.size()-1); 
0

AtomicReference поможет вам со ссылкой на список, он ничего не сделает с самим списком. Более конкретно, в вашем сценарии вы почти наверняка столкнетесь с проблемами, когда система находится под нагрузкой, когда потребитель взял список, пока производитель добавляет к нему элемент.

Этот звук для меня, как будто вы должны использовать BlockingQueue. Затем вы можете ограничить объем памяти, если производитель быстрее, чем ваш потребитель, и пусть очередь обрабатывает все утверждения.

Что-то вроде:

ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50); 

// ... Producer 
queue.put(o); 

// ... Consumer 
List<Object> queueContents = new ArrayList<Object>(); 
// Grab everything waiting in the queue in one chunk. Should never be more than 50 items. 
queue.drainTo(queueContents); 

Добавлено

Благодаря @Tudor для указывая на архитектуру, которую вы используете. ... Я должен признать, что это довольно странно. Насколько мне известно, вам действительно не нужно AtomicReference. Каждый поток имеет свой собственный ArrayList, пока он не будет передан по адресу dao, после чего он будет заменен, поэтому нет никаких утверждений нигде.

Я немного обеспокоен тем, что вы создаете четыре парсера на одном Reader. Надеюсь, у вас есть способ обеспечить, чтобы каждый парсер не затрагивал других.

Я лично использовал бы какую-то форму шаблона производителя-потребителя, как я описал в приведенном выше коде. Возможно, что-то вроде этого.

static final int PROCESSES = 4; 
static final int batchSize = 10; 

public void process(Reader in) throws IOException, InterruptedException { 

    final List<Future<Void>> tasks = new ArrayList<Future<Void>>(); 
    ExecutorService exec = Executors.newFixedThreadPool(PROCESSES); 
    // Queue of objects. 
    final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2); 
    // The final object to post. 
    final Object FINISHED = new Object(); 

    // Start the producers. 
    for (int i = 0; i < PROCESSES; i++) { 
    tasks.add(exec.submit(new Callable<Void>() { 
     @Override 
     public Void call() throws IOException { 

     Processor.this.parser.parse(in, new Parser.Handler() { 
      @Override 
      public void onNewObject(Object event) { 
      queue.add(event); 
      } 
     }); 
     // Post a finished down the queue. 
     queue.add(FINISHED); 
     return null; 
     } 
    })); 
    } 

    // Start the consumer. 
    tasks.add(exec.submit(new Callable<Void>() { 
    @Override 
    public Void call() throws IOException { 
     List<Object> batch = new ArrayList<Object>(batchSize); 
     int finishedCount = 0; 
     // Until all threads finished. 
     while (finishedCount < PROCESSES) { 
     Object o = queue.take(); 
     if (o != FINISHED) { 
      // Batch them up. 
      batch.add(o); 
      if (batch.size() >= batchSize) { 
      dao.insertBatch(batch); 
      // If insertBatch takes a copy we could merely clear it. 
      batch = new ArrayList<Object>(batchSize); 
      } 
     } else { 
      // Count the finishes. 
      finishedCount += 1; 
     } 
     } 
     // Finished! Post any incopmplete batch. 
     if (batch.size() > 0) { 
     dao.insertBatch(batch); 
     } 
     return null; 
    } 
    })); 

    // Wait for everything to finish. 
    exec.shutdown(); 
    // Wait until all is done. 
    boolean finished = false; 
    do { 
    try { 
     // Wait up to 1 second for termination. 
     finished = exec.awaitTermination(1, TimeUnit.SECONDS); 
    } catch (InterruptedException ex) { 
    } 
    } while (!finished); 
} 
+0

Умм ... из его кода он не похож на производителя-потребителя. Он на самом деле создает команду нитей, каждый из которых выполняет какую-то работу, затем присоединяется к ним и заканчивает работу в основном потоке. Между потоками не передаются фактические данные. – Tudor

1

Я думаю, что, забыв весь код здесь, вы точный вопрос:

Должен ли я делать дополнительную синхронизацию при записи (Thread A) и чтения (Thread B), чтобы убедитесь, что нить B видит список так, как он оставил его, или это позаботится об AtomicReference?

Таким образом, точный ответ на это: ДА, атомное уход за взятие видимости. И это не мое мнение, а JDK documentation one:

Память эффектов для доступов и обновлений Атомикс обычно следуют правилам для летучих веществ, как указано в спецификации языка Java, третье издание (17,4 модели памяти).

Надеюсь, это поможет.

Смежные вопросы