2017-02-22 21 views
0

В настоящее время я отправляю сообщения устройств в IoTHub на экземпляре Azure, а затем все сообщения отправляются в EventHub для обработки.Отправка/получение пакетных сообщений на шлюз Azure Protocol

Моя цель - использовать Azure Protocol Cloud Gateway, чтобы выступать в качестве посредника для получения пакетных сообщений, а затем разворачивать их перед отправкой их для обработки. Благодаря размещению сообщений, это позволит мне уменьшить количество передаваемых данных, сократив расходы на использование данных. После того, как данные находятся в облаке, он может быть без сжатия, а затем обработан в обычном режиме.

После некоторых исследований и игры с Gateway на моей локальной машине и использования некоторых тестов Unit, встроенных в решение, я видел, как сообщения отправляются на шлюз/IoTHub.

 ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString); 

     Stopwatch sw = Stopwatch.StartNew(); 

     await this.CleanupDeviceQueueAsync(hubConnectionStringBuilder.HostName, device); 

     var clientScenarios = new ClientScenarios(hubConnectionStringBuilder.HostName, this.deviceId, this.deviceSas); 

     var group = new MultithreadEventLoopGroup(); 
     string targetHost = this.tlsCertificate.GetNameInfo(X509NameType.DnsName, false); 

     var readHandler1 = new ReadListeningHandler(CommunicationTimeout); 
     Bootstrap bootstrap = new Bootstrap() 
      .Group(group) 
      .Channel<TcpSocketChannel>() 
      .Option(ChannelOption.TcpNodelay, true) 
      .Handler(this.ComposeClientChannelInitializer(targetHost, readHandler1)); 
     IChannel clientChannel = await bootstrap.ConnectAsync(this.ServerAddress, protocolGatewayPort); 
     this.ScheduleCleanup(() => clientChannel.CloseAsync()); 

     Task testWorkTask = Task.Run(async() => 
     { //Where the messaging actually starts and sends 
      Tuple<EventData, string>[] ehMessages = await CollectEventHubMessagesAsync(receivers, 2); //Async task for recieving messages back from the IoTHub 
      Tuple<EventData, string> qos0Event = Assert.Single(ehMessages.Where(x => TelemetryQoS0Content.Equals(x.Item2, StringComparison.Ordinal))); 
      Tuple<EventData, string> qos1Event = Assert.Single(ehMessages.Where(x => TelemetryQoS1Content.Equals(x.Item2, StringComparison.Ordinal))); 

      string qosPropertyName = ConfigurationManager.AppSettings["QoSPropertyName"]; 

      var qos0Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS0Content)); 
      qos0Notification.Properties[qosPropertyName] = "0"; 
      qos0Notification.Properties["subTopic"] = "tips"; 
      await serviceClient.SendAsync(this.deviceId, qos0Notification); 

      var qos1Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS1Content)); 
      qos1Notification.Properties["subTopic"] = "firmware-update"; 
      await serviceClient.SendAsync(this.deviceId, qos1Notification); 

      var qos2Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content)); 
      qos2Notification.Properties[qosPropertyName] = "2"; 
      qos2Notification.Properties["subTopic"] = "critical-alert"; 
      await serviceClient.SendAsync(this.deviceId, qos2Notification); 

      var qos2Notification2 = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content2)); 
      qos2Notification2.Properties[qosPropertyName] = "2"; 
      await serviceClient.SendAsync(this.deviceId, qos2Notification2); 
     }); 

Так что "ServiceClient" отправляет 4 сообщения в этом тестовом модуле: qos0Notification, qos1Notification, qos2Notification, qos2Notification2, и использует метод SendAsync для передачи информации.

Метод SendAsync является частью базового кода для приложения и недоступен для просмотра. Этот метод также принимает DeviceId и Message Object. Сообщение имеет 3 перегрузки для Object: Base, Byte Stream или Byte Array.

После шлюза инициализации он получает это сообщения, используя этот метод:

public override void ChannelRead(IChannelHandlerContext context, object message) 
    { 
     var packet = message as Packet; 
     if (packet == null) 
     { 
      CommonEventSource.Log.Warning($"Unexpected message (only `{typeof(Packet).FullName}` descendants are supported): {message}", this.ChannelId); 
      return; 
     } 

     this.lastClientActivityTime = DateTime.UtcNow; // notice last client activity - used in handling disconnects on keep-alive timeout 

     if (this.IsInState(StateFlags.Connected) || packet.PacketType == PacketType.CONNECT) 
     { 
      this.ProcessMessage(context, packet); 
     } 
     else 
     { 
      if (this.IsInState(StateFlags.ProcessingConnect)) 
      { 
       Queue<Packet> queue = this.connectPendingQueue ?? (this.connectPendingQueue = new Queue<Packet>(4)); 
       queue.Enqueue(packet); 
      } 
      else 
      { 
       // we did not start processing CONNECT yet which means we haven't received it yet but the packet of different type has arrived. 
       ShutdownOnError(context, string.Empty, new InvalidOperationException($"First packet in the session must be CONNECT. Observed: {packet}, channel id: {this.ChannelId}, identity: {this.identity}")); 
      } 
     } 
    } 

Я чувствую, что это будет самое лучшее место, чтобы разворачивать любые пакетные сообщения. Как только у нас будет список сообщений, мы отправим их в ProcessMessage, чтобы определить, какое это сообщение и как его обрабатывать.

Кажется, что для этого не так много информации, так как это очень новое.

+0

Каков ваш вопрос? – CSharpRocks

ответ

0

После некоторых исследований и играть со шлюзом на моей локальной машине и с помощью некоторых тестов Unit, встроенные в решения, я видел, как сообщения отправляются на шлюз/IoTHub.

Здесь ServiceClient отправляет сообщения устройствам не IoTHub. Такими сообщениями являются Cloud-To-Device messages. Вы можете использовать для отправки сообщений Device-To-Cloud.

Моя цель состоит в том, чтобы использовать Azure протокол Cloud Gateway, чтобы выступать в качестве посредника получать сообщения и пакетные затем UnWrap их перед отправляя их на переработку. Имея отправленные сообщения, позволит мне уменьшить объем передаваемых данных, сократив расходы на использование данных .

Вы можете ввести пользовательский процесс, чтобы уменьшить данные, которые будут переданы IoTHub. Ну, вы можете добавить собственные обработчики каналов.

Protocol gateway uses pipeline-based message processing using the following handlers: TLS encryption/decryption, MQTT codec (encoder and decoder), and MqttIotHubAdapter. One way for customizing the gateway behavior is by injecting additional handlers in this pipeline. By adding a custom handler between the MQTT codec and the MqttIotHubAdapter you can inspect, modify, discard, delay, or split the passing messages.

Более подробная информация и примеры кода вы можете ссылаться на "Microsoft Azure IoT Protocol Gateway Developer Guide".

+0

Таким образом, использование 'DeviceClient' дает возможность для' SendEventBatchAsync'.Этот метод отправляет все сообщения в пакете вместе или как отдельные сообщения? Я построил свой собственный модульный тест с помощью 'DeviceClient' и отправил пакет сообщений, но в настоящее время я не могу определить, принимаются ли сообщения на стороне шлюза в виде отдельной партии или отдельных сообщений. «Проводник устройств» показывает данные с устройства, входящего в IoTHub, и у всех их есть метка времени, но я не смог перехватить сообщения, когда они вошли в шлюз. –

+0

Да, ['SendEventBatchAsync'] (https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.devices.client.deviceclient#Microsoft_Azure_Devices_Client_DeviceClient_SendEventBatchAsync_System_Collections_Generic_IEnumerable_Microsoft_Azure_Devices_Client_Message__) метод отправляет сообщения в пакетном режиме. Azure iot sdk является открытым исходным кодом, и вы можете найти его реализацию [здесь] (https://github.com/Azure/azure-iot-sdk-csharp/blob/master/device/Microsoft.Azure.Devices.Client/DeviceClient. CS/# L664). –

+0

Когда я использую 'SendEventBatchAsync', он напрямую отправляется на шлюз протокола или в IotHub? Я построил модульный тест, используя «DeviceClient» и используя этот метод, и показал сообщения в Проводнике устройств, но IoTHub только показывал 1 сообщение, полученное на Azure Portal. Если он получен шлюзом, вы знаете, где получены сообщения? Метод ReadChannel в 'MqttAdapter', похоже, не получает сообщений. –

Смежные вопросы