У меня есть следующее тестовое приложение, которое имитирует мой сценарий приложения:Как обрабатывать исключения в TPL Dataflow - Producer/Consumer
class Program
{
static void Main()
{
Console.WriteLine("***Press R = record to new file, S = Stop Recording, E = Exit");
var timer = new Timer(Callback, null, 0, 1000);
while(!_Close) HandleInput(Console.ReadLine());
Console.WriteLine("Finished");
}
private static bool _Close;
private static void HandleInput(string input)
{
switch (input.ToLower())
{
case "r": CreateWriter();
break;
case "s": Console.WriteLine("File Closed: {0}", _FileWriter.Name);
_FileWriter.Dispose();
_FileWriter = null;
break;
case "e":
_Close = true;
break;
}
}
private static void CreateWriter()
{
if (_FileWriter != null)
_FileWriter.Dispose();
string filename = Path.Combine("C:\\", string.Format("{0:yyyy_MM_dd HH_mm_ss}.txt",DateTime.Now));
_FileWriter = new AsyncFileWriter(filename);
Console.WriteLine("New File Created: {0}", filename);
}
private static void Callback(object state)
{
if (_FileWriter != null)
_FileWriter.Produce(MakeData());
}
private static byte[] MakeData()
{
string data = string.Empty;
for (int i = 0; i < 50; i++)
{
data += string.Format("{0:yyyy-MM-dd HH:mm:ss.fff}{1}", DateTime.Now, Environment.NewLine);
}
return Encoding.UTF8.GetBytes(data);
}
private static AsyncFileWriter _FileWriter;
}
Он использует следующий класс в качестве потребителей (/ продюсер):
public class AsyncFileWriter : IDisposable
{
private readonly FileStream _Filewriter;
private readonly Task _WriteTask;
private readonly BufferBlock<byte[]> _BufferBlock;
public AsyncFileWriter(string filename)
{
_Filewriter = new FileStream(filename, FileMode.CreateNew, FileAccess.Write, FileShare.None);
_BufferBlock = new BufferBlock<byte[]>();
_WriteTask = WriteToFile();
}
public void Produce(byte[] data)
{
_BufferBlock.Post(data);
}
public long Filesize { get; private set; }
public string Name { get { return _Filewriter.Name; } }
private async Task WriteToFile()
{
while (await _BufferBlock.OutputAvailableAsync())
{
byte[] data = _BufferBlock.Receive();
await _Filewriter.WriteAsync(data, 0, data.Length);
Filesize = _Filewriter.Length;
}
}
private async Task Complete()
{
_BufferBlock.Complete();
await Task.WhenAll(_WriteTask, _BufferBlock.Completion);
//now close the file
_Filewriter.Dispose();
}
public void Dispose()
{
Complete();
}
}
важные вещи, чтобы отметить следующие:
- у меня есть непрерывный поток байтов, которые передаются мне от проприетарная библиотека через обратный вызов. Я не знаю, когда я получу эти данные.
- Мне нужно иметь возможность контролировать, когда начинать запись этих данных (нажмите «R»)
- Мне нужно иметь возможность переключиться на новый файл, но убедитесь, что предыдущие данные были записаны в последний файл (нажмите «R» снова во время записи)
У меня есть несколько вопросов, так как я совершенно новичок в библиотеке потоков данных TPL.
- Во-первых, является ли моя реализация звуком продюсера/потребителя (то есть AsyncFileWriter)?
- Является ли моя реализация IDisposable ОК?
- Как обрабатывать исключения в Потребителе (то есть WriteToFile)? Мне действительно нужно получить уведомление, если что-то пойдет не так, но, как вы видите, единственный публичный метод -
Produce()
, и исключение не произойдет здесь ... Существует ли наилучшая практика для этого шаблона?
Спасибо, что сообщение в блоге также очищает некоторые вещи. – Simon