2016-01-31 5 views
5

Я много работаю с асинхронными рабочими процессами и агентами в F #, в то время как я немного углублялся в Events. Я заметил, что тип события < _>() не является Thread- безопасно. Здесь я не говорю об общей проблеме поднятия события. Я на самом деле говорю о подписке и удалении/удалении из мероприятия. Для целей тестирования я написал эту короткую программуИспользование события F # в многопоточном коде

let event = Event<int>() 
let sub = event.Publish 

[<EntryPoint>] 
let main argv = 
    let subscribe sub x = async { 
     let mutable disposables = [] 
     for i=0 to x do 
      let dis = Observable.subscribe (fun x -> printf "%d" x) sub 
      disposables <- dis :: disposables 
     for dis in disposables do 
      dis.Dispose() 
    } 

    Async.RunSynchronously(async{ 
     let! x = Async.StartChild (subscribe sub 1000) 
     let! y = Async.StartChild (subscribe sub 1000) 
     do! x 
     do! y 
     event.Trigger 1 
     do! Async.Sleep 2000 
    }) 
    0 

Программа проста, я создать событие и функцию, которая подписывается определенное количеством событий, к нему, и после этого настраивает каждый обработчик. Я использую другое вычисление async для создания двух экземпляров этих функций с помощью Async.StartChild. После того как обе функции закончены, я запускаю событие, чтобы увидеть, остались ли какие-то обработчики.

Но когда event.Trigger(1) называется результатом, все еще есть некоторые обработчики, связанные с событием. Поскольку некоторые «1» будут напечатаны на консоли. Это в общем означает, что подписка и/или Disposing не являются потокобезопасными.

И это то, чего я не ожидал. Если подписка и Disposing не являются потокобезопасными, как можно безопасно использовать события в целом? Уверенные события также могут использоваться вне потоков, а триггер не вызывает какую-либо функцию параллельно или на разных потоках. Но для меня как-то нормально, что события используются в коде Async, Agent или вообще с Threads. Они часто используются в качестве сообщения для сбора информации о потоках Backroundworker. С Async.AwaitEvent можно подписаться на событие. Если подписка и Disposing не являются потокобезопасными, как можно использовать события в такой среде? И в чем цель Async.AwaitEvent? Учитывая, что рабочий процесс Async действительно использует Thread, просто используя Async.AwaitEvent, в основном «разбит по дизайну», если подписка/удаление на событие по умолчанию не является потокобезопасной.

Общий вопрос, с которым я столкнулся. Правильно ли, что подписка и утилизация не являются потокобезопасными? Из моего примера это похоже на это, но, вероятно, я пропустил некоторые важные детали. В настоящее время я использую Event много в моем проекте, я обычно имею MailboxProcessors и использую события для уведомления. Так что вопрос. Если события не являются потокобезопасными, весь проект, который я использую в настоящее время, не является потокобезопасным вообще. Так в чем же проблема? Создание целой новой поточной реализации событий? У них уже есть некоторые реализации, которые сталкиваются с этой проблемой? Или есть другие варианты безопасного использования события в среде с высокой степенью потоковой передачи?

ответ

3

FYI; реализация для Event<int> может быть найдена here.

Интересный бит, кажется:

member e.AddHandler(d) = 
    x.multicast <- (System.Delegate.Combine(x.multicast, d) :?> Handler<'T>) 
member e.RemoveHandler(d) = 
    x.multicast <- (System.Delegate.Remove(x.multicast, d) :?> Handler<'T>) 

Подписавшись на событие объединяет текущий обработчик событий с помощью обработчика событий, перешедшим в подписке. Этот комбинированный обработчик событий заменяет текущий.

Проблема с точки зрения параллелизма заключается в том, что здесь у нас есть условие гонки в том, что одновременные подписчики могут использовать обработчик текущего события, который должен сочетаться с «последним», который записывает обратный вызов обработчика (последний сложный концепция в параллелизм в эти дни, но nvm).

Что можно сделать здесь, это ввести цикл CAS с использованием Interlocked.CompareAndExchange, но это добавит дополнительные издержки производительности, которые нанесут ущерб несовместимым пользователям. Это то, что можно было бы отключить от PR, и посмотреть, хорошо ли оно одобрено сообществом F #.

WRT на ваш второй вопрос о том, что с этим делать. Я могу просто сказать, что я буду делать.Я бы пошел на вариант создания версии FSharpEvent, которая поддерживает защищенные подписки/отписки. Возможно, это базовое значение FSharpEvent, если это позволяет ваша политика FOSS вашей компании. Если это окажется успешным, тогда он сможет сформировать будущую базовую библиотеку PR для F #.

Я не знаю ваших требований, но также возможно, что если вам нужны сопрограммы (то есть Async), а не потоки, тогда можно перезаписать программу, чтобы использовать только 1 поток, и, следовательно, на вас не повлияет это состояние гонки.

+0

Что касается вашего последнего предложения. Каждый асинк начал с Async.Start запускается в пуле потоков, но также использует let! или делать! может переключить асинхронный вызов на другой поток, например, просто используя «Async.Sleep», он переключается на пул потоков или использует Async.StartChild. Наверху я запускаю событие из MailboxProcessor, и они также всегда запускают некоторые потоки в пуле потоков. Поэтому я не вижу, как заставить все работать на одном потоке. Но я тоже не хотел бы этого поведения. Я ожидаю такого поведения, поскольку я все равно не хочу запускать все в одном потоке. –

+0

Похоже, что отсутствие безопасности потоков на событиях является серьезной проблемой для библиотеки ядра F #. Не могли бы вы рассмотреть вопрос об этом? –

+0

Вы можете перевернуть свои собственные сопрограммы или убедиться, что каждый 'let!' 'Do!' Отправляется обратно в «основной поток». Но, как вы сказали, вы не хотите этого поведения, так что это не имеет значения. – FuleSnabel

2

Во-первых, благодаря FuleSnable за его ответ. Он указал мне в правильном направлении. Основываясь на информации, которую он представил, я сам внедрил тип ConcurrentEvent. Этот тип использует Interlocked.CompareExchange для добавления/удаления его обработчиков, поэтому он не блокируется и, надеюсь, самый быстрый способ сделать это. Я начал реализацию, скопировав тип Event из компилятора F #. (Я также оставляю комментарий как есть). Текущая реализация выглядит так.

type ConcurrentEvent<'T> = 
    val mutable multicast : Handler<'T> 
    new() = { multicast = null } 

    member x.Trigger(arg:'T) = 
     match x.multicast with 
     | null ->() 
     | d -> d.Invoke(null,arg) |> ignore 
    member x.Publish = 
     // Note, we implement each interface explicitly: this works around a bug in the CLR 
     // implementation on CompactFramework 3.7, used on Windows Phone 7 
     { new obj() with 
      member x.ToString() = "<published event>" 
      interface IEvent<'T> 
      interface IDelegateEvent<Handler<'T>> with 
      member e.AddHandler(d) = 
       let mutable exchanged = false 
       while exchanged = false do 
        System.Threading.Thread.MemoryBarrier() 
        let dels = x.multicast 
        let newDels = System.Delegate.Combine(dels, d) :?> Handler<'T> 
        let result = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels) 
        if obj.ReferenceEquals(dels,result) then 
         exchanged <- true 
      member e.RemoveHandler(d) = 
       let mutable exchanged = false 
       while exchanged = false do 
        System.Threading.Thread.MemoryBarrier() 
        let dels = x.multicast 
        let newDels = System.Delegate.Remove(dels, d) :?> Handler<'T> 
        let result = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels) 
        if obj.ReferenceEquals(dels,result) then 
         exchanged <- true 
      interface System.IObservable<'T> with 
      member e.Subscribe(observer) = 
       let h = new Handler<_>(fun sender args -> observer.OnNext(args)) 
       (e :?> IEvent<_,_>).AddHandler(h) 
       { new System.IDisposable with 
        member x.Dispose() = (e :?> IEvent<_,_>).RemoveHandler(h) } } 

Некоторые замечания по дизайну:

  • Я начал с рекурсией. Но делая это и глядя на скомпилированный код, он создает анонимный класс и вызывает AddHandler или RemoveHandler, создавая объект этого. При прямой реализации цикла while он избегает создания экземпляра объекта всякий раз, когда новый обработчик добавляется/удаляется.
  • Я явно использовал obj.ReferenceEquals, чтобы избежать Generic Hash Equality.

По крайней мере, в моих тестах Добавление/удаление обработчика теперь кажется потокобезопасным. ConcurrentEvent может быть заменен только Event по мере необходимости. Если у кого-то есть дополнительные улучшения для правильности или производительности, эти комментарии приветствуются. В противном случае я бы отметил этот ответ как решение.


Зафиксированного если люди любопытны о том, сколько медленнее ConcurrentEvent будет по сравнению с Event

let stopWatch() = System.Diagnostics.Stopwatch.StartNew() 

let event = Event<int>() 
let sub = event.Publish 

let cevent = ConcurrentEvent<int>() 
let csub = cevent.Publish 

let subscribe sub x = async { 
    let mutable disposables = [] 
    for i=0 to x do 
     let dis = Observable.subscribe (fun x -> printf "%d" x) sub 
     disposables <- dis :: disposables 
    for dis in disposables do 
     dis.Dispose() 
} 

let sw = stopWatch() 
Async.RunSynchronously(async{ 
    // Amount of tries 
    let tries = 10000 

    // benchmarking Event subscribe/unsubscribing 
    let sw = stopWatch() 
    let! x = Async.StartChild (subscribe sub tries) 
    let! y = Async.StartChild (subscribe sub tries) 
    do! x 
    do! y 
    sw.Stop() 
    printfn "Event: %O" sw.Elapsed 
    do! Async.Sleep 1000 
    event.Trigger 1 
    do! Async.Sleep 2000 

    // benchmarking ConcurrentEvent subscribe/unsubscribing 
    let sw = stopWatch() 
    let! x = Async.StartChild (subscribe csub tries) 
    let! y = Async.StartChild (subscribe csub tries) 
    do! x 
    do! y 
    sw.Stop() 
    printfn "\nConcurrentEvent: %O" sw.Elapsed 
    do! Async.Sleep 1000 
    cevent.Trigger 1 
    do! Async.Sleep 2000 
}) 

В моей системе подписок/отписка 10000 обработчика с не-поточно-Event занимает около 1.4 seconds завершить.

Потолочный ConcurrentEvent занимает около 1.8 seconds. Поэтому я думаю, что накладные расходы довольно низки.

+0

Что-то, что нужно учитывать: я не уверен, что «CompareExchange» вставляет барьер памяти (или какой он тип), или вам нужно сделать ручной барьер памяти. – FuleSnabel

+0

Кажется, что «CompareExchange» подразумевает полный барьер (http: // stackoverflow.ком/вопросы/1581718/делает-сблокированы-CompareExchange использования-а-памяти барьер). Чтение AFAIK x86 использует барьер 'приобретать', так что это должно означать, что вы хороши на x86, но если вы хотите, чтобы код нацелился на ARM/PowerPC, вам, возможно, придется вставлять вызовы' Interlocked' при чтении 'x.multicast' – FuleSnabel

+0

@FuleSnabel i ошибка, поскольку я использовал «x.multicast» в 'Delegate.Combined' и' Delegate.Remove'. Операция должна лучше работать с выбранной переменной 'dels'. Насколько я думаю, тогда мне нужно только один MemoryBarrier перед чтением из 'x.multicast', чтобы обеспечить свежесть' x.multicast'. –

Смежные вопросы