Я не знаю, почему мой движок StreamInsight не может обрабатывать более 140 событий/с, в условиях, когда до 8000 событий доставляются в секунду. Я вижу количество событий/s для моего запроса в Performance Monitor. И в консоли приложения походит на двигатель, позволяющий избежать множества событий. У меня есть событие с id: 200, затем следующее с id: 3330. Кто-нибудь знает, в чем проблема?Узкое место Streaminsight
Для моего тестирования у меня есть цепочка запросов. Результатом первого запроса является ввод второго и т. Д., И для этого сценария результат не более 140 событий/с.
Теперь я тестировал свое приложение с помощью простого запроса, который выводит все события, полученные из входного потока, и в этой ситуации сервер обрабатывает около 2000 событий/с. У меня есть некоторые изображения с результатами, но, к сожалению, я пока не могу их поставить. Меня беспокоит, почему количество событий/сек внезапно уменьшилось до 0 и почему двигатель все еще не учитывает все входные события.
Вот моя конфигурация сервера и запрос, который я использую.
using (var server = Server.Create("SIInstance23"))
{
log.Info("StreamInsight Server started");
Application application = server.CreateApplication("StreamInsight Application Test");
ServiceHost host = new ServiceHost(server.CreateManagementService());
WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message);
binding.HostNameComparisonMode = HostNameComparisonMode.Exact;
host.AddServiceEndpoint(typeof(IManagementService),
binding,
"http://localhost:8081/StreamInsight/SIInstance23");
ScenarioWorkflow.NormalScenarioWorkflow(application, server, host);
}
public static void NormalScenarioWorkflow(this Application application, Server server, ServiceHost host)
{
host.Open();
IQStreamable<SensorDataEvent> inputStream = application.DefineSensorDataEventStream();
var simpleQuery = from e in inputStream
select e;
var simpleQueryConsumer = application.DefineConsumer(simpleQuery);
var simpleQueryBinding = simpleQuery.Bind(simpleQueryConsumer);
using (simpleQueryBinding.Run("process"))
{
Thread.Sleep(1000);
Console.WriteLine(string.Empty);
DiagnosticSettings settings = server.GetDiagnosticSettings(new Uri("cep:/Server/Application/StreamInsight Application Test/Entity/process/Query/StreamableBinding_1"));
settings.Aspects |= DiagnosticAspect.PerformanceCounters;
server.SetDiagnosticSettings(new Uri("cep:/Server/Application/StreamInsight Application Test/Entity/process/Query/StreamableBinding_1"), settings);
Console.WriteLine("***Hit Return to exit after viewing query output***");
Console.WriteLine();
Console.ReadLine();
}
host.Close();
}
Далее, когда я попытался запустить свой последний запрос (из цепочки), я получил приблизительно 1500 событий/с. Тем не менее существует проблема с внезапным уменьшением событий/s. Я думал, что каждое преобразование, сделанное в событие, создает новый, который должен обрабатываться движком. Таким образом, количество событий/с из цепочки запросов должно превышать 1500. Пожалуйста, исправьте меня, если я ошибаюсь. Я новичок в этом домене, и любые советы приветствуются.
Я думаю, проблема в классе ниже. Я пробовал также с PointEvent<SendorDataEvent>
вместо SensorDataEvent
, а также пытался вставить CTI, но результатов не найдено.
public class SocketEventInputAdapter : IObservable<SensorDataEvent>, IDisposable
{
public List<IObserver<SensorDataEvent>> observers { get; set; }
public object sync { get; set; }
public bool done { get; set; }
public SocketEventInputAdapter()
{
this.done = false;
this.observers = new List<IObserver<SensorDataEvent>>();
this.sync = new object();
SocketServer serverSocket = new SocketServer(4444, this);
}
internal void NotifyObservers(SensorDataEvent value)
{
lock (sync)
{
if (!done)
{
foreach (var observer in observers)
{
observer.OnNext(value);
}
}
}
}
public IDisposable Subscribe(IObserver<SensorDataEvent> observer)
{
lock (sync)
{
observers.Add(observer);
}
return new Subscription(this, observer);
}
void IDisposable.Dispose()
{
}
private sealed class Subscription : IDisposable
{
private readonly SocketEventInputAdapter _subject;
private IObserver<SensorDataEvent> _observer;
public Subscription(SocketEventInputAdapter subject, IObserver<SensorDataEvent> observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
IObserver<SensorDataEvent> observer = _observer;
if (null != observer)
{
lock (_subject.sync)
{
_subject.observers.Remove(observer);
}
_observer = null;
}
}
}
}
спасибо.
Вам нужно будет предоставить гораздо больше подробностей о том, что вы делаете с событиями, вашей конфигурацией и т. Д., Чтобы кто-нибудь мог вам помочь –