2015-06-16 3 views
1

Подписана подписка на наблюдаемое, которое отправляет сообщения журнала. Некоторые сообщения журнала поступают из других потоков, поскольку они находятся в блоках асинхронных F #. Мне нужно написать сообщения из основного потока.Публичные сообщения из темы async в основной поток из F #

Вот the code, что в настоящее время отфильтровывает многие из журнала сообщений, потому что они не находятся на главной теме:

member x.RegisterTrace() = 
    Logging.verbose <- x.Verbose 
    let id = Threading.Thread.CurrentThread.ManagedThreadId 
    Logging.subscribe (fun trace -> 
     if id = Threading.Thread.CurrentThread.ManagedThreadId then 
      match trace.Level with 
      | TraceLevel.Warning -> x.WriteWarning trace.Text 
      | TraceLevel.Error -> x.WriteWarning trace.Text 
      | _ -> x.WriteObject trace.Text 
     else 
      Diagnostics.Debug.Write(sprintf "not on main PS thread: %A" trace) 
    ) 

У меня есть различные формы использования System.Threading.SynchronizationContent.Current, .SetSynchronizationConent, .Send, .Post. Я также занимался System.Threading.Tasks.TaskScheduler.FromCurrentSynchronizationContext. Я также пробовал Async.SwitchToContext. Независимо от того, что я делаю, System.Threading.Thread.CurrentThread.ManagedThreadId заканчивается, и PowerShell жалуется. Я собираюсь сделать это неправильно?

Дальше это незавершенное производство pull request и более подробная информация о the problem.

UPDATE 2015-06-16 Вт 11:45 AM PST

@RCH Спасибо, но с помощью Async.SwitchToContext установить SynchronizationContext не появляется на работе. Вот код и отладки выходной, когда я Paket-Restore -Force:

member x.RegisterTrace() = 
    let a = Thread.CurrentThread.ManagedThreadId 
    Logging.verbose <- x.Verbose 
    let ctx = SynchronizationContext.Current 
    Logging.subscribe (fun trace -> 
     let b = Thread.CurrentThread.ManagedThreadId 
     async { 
      let c = Thread.CurrentThread.ManagedThreadId 
      do! Async.SwitchToContext ctx 
      let d = Thread.CurrentThread.ManagedThreadId 
      Debug.WriteLine (sprintf "%d %d %d %d %s" a b c d trace.Text) 
     } |> Async.Start 
    ) 

Debug Output

Эксперт по работе рекомендовал другое решение, которое я собираюсь попробовать, что включает прохождение в контексте при подписке.

UPDATE 2015-06-16 Вт 5:30 PM PST

я получил помощь в создании IObservable.SubscribeOn, что позволяет SynchrnonizationContext быть принят. К сожалению, это не решает проблему eithe, но может быть частью решения. Может быть, пользовательский SynchronizationContext нужен как SingleThreadSynchrnonizationContext. Мне бы очень хотелось помочь сделать это, но прежде чем я это сделаю, я попробую System.Reactive's Observable.ObserveOn(Scheduler.CurrentThread).

UPDATE 2015-06-16 вт 8:30 PM PST

Я не был в состоянии получить Rx работать либо. Scheduler.CurrentThreaddoesn't behave так, как я надеялся. Затем я попробовал these changes, и обратный вызов не вызван.

member x.RegisterTrace() = 
    Logging.verbose <- x.Verbose 
    let a = Threading.Thread.CurrentThread.ManagedThreadId 
    let ctx = match SynchronizationContext.Current with null -> SynchronizationContext() | sc -> sc 
    let sch = SynchronizationContextScheduler ctx 
    Logging.event.Publish.ObserveOn sch 
    |> Observable.subscribe (fun trace -> 
     let b = Threading.Thread.CurrentThread.ManagedThreadId 
     Debug.WriteLine(sprintf "%d %d %s" a b trace.Text) 

Пользовательский СинхронизацияКонтекст может быть тем, что необходимо. :/

+0

Ах, наконец-то. С самого начала нужно было подумать об этом: Консольные приложения не имеют SynchronizationContext, поэтому SynchronizationContext.Current всегда имеет значение null. -> см. отредактированный ответ. – CaringDev

+0

Да, если наличие Rx в качестве новой зависимости не является проблемой, это будет мой предпочтительный путь. Вместо «Event», Rx также предоставляет очень удобные «темы». – CaringDev

+0

Я думаю, что могу добавить зависимость к Paket.PowerShell, но не Paket.Core. К сожалению, он также не работает. Я опубликовал свою попытку. –

ответ

1

я в конечном итоге создание EventSink, который имеет очередь обратных вызовов, которые выполняются на главном потоке PowerShell с помощью Drain(). Я поместил основное вычисление в другой поток. pull request имеет полный код и более подробную информацию.

enter image description here

enter image description here

1

Для приложений пользовательского интерфейса (форм, WPF, F # интерактивные, ...) выбор SynchronizationContext.Current и Async.SwitchToContext от ваших испытаний, а также заемщики кода из Paket достаточно.

Для консольных приложений, однако, нет SynchronizationContext и, следовательно, нет нити, продолжения могут быть Post ed, поэтому они попадут в пул потоков. Возможное обходное решение можно найти на MSDN Blogs.

Решение только для приложений пользовательского интерфейса:

Имея

module Logging 

open System.Diagnostics 

type Trace = { Level: TraceLevel; Text: string } 

let public event = Event<Trace>() 

let subscribe callback = Observable.subscribe callback event.Publish 

и

[<AutoOpen>] 
module CmdletExt 

open System.Diagnostics 
open System.Threading 

type PSCmdletStandalone() = 
    member x.RegisterTrace() = 
     let syncContext = SynchronizationContext.Current 
     Logging.subscribe (fun trace -> 
      async { 
       do! Async.SwitchToContext syncContext 
       let threadId = Thread.CurrentThread.ManagedThreadId 
       match trace.Level with 
       | TraceLevel.Warning -> printfn "WARN (on %i): %s" threadId trace.Text 
       | TraceLevel.Error -> printfn "ERROR (on %i): %s" threadId trace.Text 
       | _ -> printfn "(on %i): %s" threadId trace.Text 
      } |> Async.Start // or Async.StartImmediate 
     ) 

затем, зарегистрировавшись на главном потоке

#load "Logging.fs" 
#load "SO.fs" 

open System.Diagnostics 
open System.Threading 
open System 

(new PSCmdletStandalone()).RegisterTrace() 

printfn "Main: %i" Thread.CurrentThread.ManagedThreadId 

for i in Enum.GetValues(typeof<TraceLevel>) do 
    async { 
     let workerId = Thread.CurrentThread.ManagedThreadId 
     do Logging.event.Trigger { Level = unbox i; Text = sprintf "From %i" workerId } 
    } |> Async.Start 

дает, например,

Main: 1 
WARN (on 1): From 23 
(on 1): From 25 
ERROR (on 1): From 22 
(on 1): From 13 
(on 1): From 14 
+0

Если вы хотите меня, я могу преобразовать этот вечер (CEST) в запрос на вытягивание для вашего филиала. – CaringDev

+0

Спасибо @RCH. К сожалению, он не работает, когда применяется. Я ответил более подробно. –

+0

@CameronTaggart, который выглядит интересным. Спасибо за обновление. Я попытаюсь воспроизвести это здесь, со взглядов, которые я бы никогда не ожидал, что «a, b, c, d» будут разными. – CaringDev

Смежные вопросы