2015-06-08 2 views
3

Я хотел бы иметь один конкретный поток, очередь для задач и задач процесса в этом отдельном потоке. Приложение будет выполнять задачи, основанные на использовании пользователей, и помещать их в очередь задач. Затем отдельный поток обрабатывает задачи. Жизненно важно сохранить поток в сети и использовать его для обработки поставленных задач, даже если очередь пуста.Запуск работы по определенной теме

Я попытался выполнить несколько реализаций TaskScheduler с BlockingCollection и ограничить параллелизм только одним потоком, но кажется, что поток становится удаленным, когда очередь становится пустой, а задача обрабатывается другим потоком.

Не могли бы вы хотя бы направить меня к некоторым источникам, как достичь этой цели?

tl; dr Пытается ограничить один конкретный поток для обработки задач, которые динамически добавляются в очередь.

Edit1:

Это экспериментальный веб-приложение, которое использует WCF и .NET Framework 4.6. В библиотеке WCF я пытаюсь реализовать это поведение с одной задачей обработки потоков. Этот один поток должен инициализировать пролог, используя внешнюю библиотеку dll, а затем работать с прологом. Если другой поток используется в процессе, библиотека выбрасывает AccessViolationException. Я провел некоторое исследование, и это, скорее всего, из-за плохо управляемых потоков в этой библиотеке. У меня была реализация, где у меня были блокировки повсюду, и это сработало. Теперь я пытаюсь переопределить и сделать его асинхронным, поэтому я не блокирую основной поток с помощью блокировки.

Я не на своем компьютере, но я предоставляю код, когда вернусь домой сегодня.

+3

Просьба указать образец кода, чтобы выделить проблему. С уважением, –

+2

В чем причина этих требований? –

+0

Я отредактировал мой вопрос, я предоставлю код, когда вернусь домой примерно через 10 часов. –

ответ

3

Ваш подход кажется прекрасным, поэтому вы, вероятно, просто сделали какую-то кропотливую глупую ошибку.

На самом деле довольно легко сделать простой обычай TaskScheduler. В вашем случае:

void Main() 
{ 
    var cts = new CancellationTokenSource(); 
    var myTs = new SingleThreadTaskScheduler(cts.Token); 

    myTs.Schedule(() => 
    { Print("Init start"); Thread.Sleep(1000); Print("Init done"); }); 
    myTs.Schedule(() => Print("Work 1")); 
    myTs.Schedule(() => Print("Work 2")); 
    myTs.Schedule(() => Print("Work 3")); 
    var lastOne = myTs.Schedule(() => Print("Work 4")); 

    Print("Starting TS"); 
    myTs.Start(); 

    // Wait for all of them to complete... 
    lastOne.GetAwaiter().GetResult(); 

    Thread.Sleep(1000); 

    // And try to schedule another 
    myTs.Schedule(() => Print("After emptied")).GetAwaiter().GetResult(); 

    // And shutdown; it's also pretty useful to have the 
    // TaskScheduler return a "complete task" to await 
    myTs.Complete(); 

    Print("On main thread again"); 
} 

void Print(string str) 
{ 
    Console.WriteLine("{0}: {1}", Thread.CurrentThread.ManagedThreadId, str); 
    Thread.Sleep(100); 
} 

public sealed class SingleThreadTaskScheduler : TaskScheduler 
{ 
    [ThreadStatic] 
    private static bool isExecuting; 
    private readonly CancellationToken cancellationToken; 

    private readonly BlockingCollection<Task> taskQueue; 

    public SingleThreadTaskScheduler(CancellationToken cancellationToken) 
    { 
     this.cancellationToken = cancellationToken; 
     this.taskQueue = new BlockingCollection<Task>(); 
    } 

    public void Start() 
    { 
     new Thread(RunOnCurrentThread) { Name = "STTS Thread" }.Start(); 
    } 

    // Just a helper for the sample code 
    public Task Schedule(Action action) 
    { 
     return 
      Task.Factory.StartNew 
       (
        action, 
        CancellationToken.None, 
        TaskCreationOptions.None, 
        this 
      ); 
    } 

    // You can have this public if you want - just make sure to hide it 
    private void RunOnCurrentThread() 
    { 
     isExecuting = true; 

     try 
     { 
      foreach (var task in taskQueue.GetConsumingEnumerable(cancellationToken)) 
      { 
       TryExecuteTask(task); 
      } 
     } 
     catch (OperationCanceledException) 
     { } 
     finally 
     { 
      isExecuting = false; 
     } 
    } 

    // Signaling this allows the task scheduler to finish after all tasks complete 
    public void Complete() { taskQueue.CompleteAdding(); } 
    protected override IEnumerable<Task> GetScheduledTasks() { return null; } 

    protected override void QueueTask(Task task) 
    { 
     try 
     { 
      taskQueue.Add(task, cancellationToken); 
     } 
     catch (OperationCanceledException) 
     { } 
    } 

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 
    { 
     // We'd need to remove the task from queue if it was already queued. 
     // That would be too hard. 
     if (taskWasPreviouslyQueued) return false; 

     return isExecuting && TryExecuteTask(task); 
    } 
} 

Это довольно легко изменить это, чтобы дать вам полный контроль над где планировщик задач на самом деле выполнения задачи - на самом деле, я адаптировано это из предыдущего планировщика задач, которые я использовал который просто использовал метод RunOnCurrentThread.

Для вашего случая, когда вы всегда хотите придерживаться только одной нити, подход в SingleThreadTaskScheduler, вероятно, лучше. Хотя это также имеет свои достоинства:

// On a new thread 
try 
{ 
    InitializeProlog(); 

    try 
    { 
    myTs.RunOnCurrentThread(); 
    } 
    finally 
    { 
    ReleaseProlog(); 
    } 
} 
catch (Exception ex) 
{ 
    // The global handler 
} 
+0

Вы были правы, глупые ошибки - у меня было слишком много потоков, пытающихся создать новый объект Engine, потому что никакой блокировки не было, поэтому больше потоков пыталось инициализировать пролог сразу. И спасибо за ваш Планировщик, у меня было что-то похожее, но это было очень тяжело и сбивало с толку, ваш кажется приятным и чистым! –

Смежные вопросы