2016-04-21 1 views
2

У меня такая же проблема, как и у here, и попытайтесь ее решить, но я не знаю, как правильно форматировать хранилище данных, поэтому cygnus не будет вызывать ошибку сохранения.Как создать хранилище данных в ckan для подключения cygnus в режиме устойчивости столбцов?

Мой Ориона suscription это одна:

(curl localhost:1026/v1/subscribeContext -s -S --header 'Content-Type: application/json' \ 
    --header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF 
{ 
    "entities": [ 
     { 
      "type": "Event", 
      "isPattern": "false", 
      "id": "es-leon-0" 
     }, 
     { 
      "type": "Event", 
      "isPattern": "false", 
      "id": "es-leon-1" 
     } 

    ], 
    "attributes": [ 
     "IdEvent", "IdUser", "Title" 
    ], 
    "reference": "http://localhost:5050/notify", 
    "duration": "P1M", 
    "notifyConditions": [ 
     { 
      "type": "ONCHANGE", 
      "condValues": [ ] 
     } 
    ], 
    "throttling": "PT5S" 
} 
EOF 

Мой Cygnus конфигурации:

ygnusagent.sources = http-source 
cygnusagent.sinks = ckan-sink 
cygnusagent.channels = ckan-channel 

cygnusagent.sources.http-source.channels = ckan-channel 
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource 
cygnusagent.sources.http-source.port = 5050 
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler 
cygnusagent.sources.http-source.handler.notification_target = /notify 
cygnusagent.sources.http-source.handler.default_service = Papel 
cygnusagent.sources.http-source.handler.default_service_path = Test 
cygnusagent.sources.http-source.handler.events_ttl = 5 
cygnusagent.sources.http-source.interceptors = ts gi 
cygnusagent.sources.http-source.interceptors.ts.type = timestamp 
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder 
cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /Applications/apache-flume-1.4.0-bin/conf/grouping_rules.conf 

cygnusagent.channels.ckan-channel.type = memory 
cygnusagent.channels.ckan-channel.capacity = 1000 
cygnusagent.channels.ckan-channel.transactionCapacity = 100 

# ============================================ 
# OrionCKANSink configuration 
# channel name from where to read notification events 
cygnusagent.sinks.ckan-sink.channel = ckan-channel 

# sink class, must not be changed 
cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink 

# true if the grouping feature is enabled for this sink, false otherwise 
cygnusagent.sinks.ckan-sink.enable_grouping = false 

# true if lower case is wanted to forced in all the element names, false otherwise 
cygnusagent.sinks.hdfs-sink.enable_lowercase = false 

# the CKAN API key to use 
cygnusagent.sinks.ckan-sink.api_key = xxxxx 

# the FQDN/IP address for the CKAN API endpoint 
cygnusagent.sinks.ckan-sink.ckan_host = ckan-demo.ckan.io 

# the port for the CKAN API endpoint 
cygnusagent.sinks.ckan-sink.ckan_port = 80 

# Orion URL used to compose the resource URL with the convenience operation URL to query it 
cygnusagent.sinks.ckan-sink.orion_url = http://localhost:1026 

# how the attributes are stored, either per row either per column (row, column) 
cygnusagent.sinks.ckan-sink.attr_persistence = column 

# enable SSL for secure Http transportation; 'true' or 'false' 
cygnusagent.sinks.ckan-sink.ssl = false 

# number of notifications to be included within a processing batch 
cygnusagent.sinks.ckan-sink.batch_size = 100 

# timeout for batch accumulation 
cygnusagent.sinks.ckan-sink.batch_timeout = 60 

# number of retries upon persistence error 
cygnusagent.sinks.ckan-sink.batch_ttl = 10 

Cygnus получает право его, но затем появляется следующее сообщение об ошибке:

time=2016-04-21T07:44:57.504CDT | lvl=INFO | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.OrionRestHandler[231] : Starting transaction (1461242686-614-0000000001) 
time=2016-04-21T07:44:57.528CDT | lvl=INFO | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.OrionRestHandler[258] : Received data ({ "subscriptionId" : "571897360e94f9fa53829885", "originator" : "localhost", "contextResponses" : [ {  "contextElement" : {  "type" : "Event",  "isPattern" : "false",  "id" : "es-leon-0",  "attributes" : [   {   "name" : "IdEvent",   "type" : "text",   "value" : "1084"   },   {   "name" : "IdUser",   "type" : "text",   "value" : "18"   },   {   "name" : "Title",   "type" : "text",   "value" : "Papes"   }  ]  },  "statusCode" : {  "code" : "200",  "reasonPhrase" : "OK"  } } ]}) 
time=2016-04-21T07:44:57.528CDT | lvl=INFO | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.OrionRestHandler[280] : Event put in the channel, id=2024732986 
time=2016-04-21T07:45:50.771CDT | lvl=INFO | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=persistAggregation | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.OrionCKANSink[417] : [ckan-sink] Persisting data at OrionCKANSink (orgName=papel, pkgName=papel_test, resName=es-leon-0_event, data={"recvTime": "2016-04-21T12:44:57.497Z","fiwareServicePath": "Test","entityId": "es-leon-0","entityType": "Event","Title": "Papes"},{"recvTime": "2016-04-21T12:44:57.528Z","fiwareServicePath": "Test","entityId": "es-leon-0","entityType": "Event","IdEvent": "1084","IdUser": "18","Title": "Papes"}) 
time=2016-04-21T07:45:51.875CDT | lvl=ERROR | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=processNewBatches | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.OrionSink[426] : Runtime error (Cannot persist the data (orgName=papel, pkgName=papel_test, resName=es-leon-0_event)) 

Как сказал в here, я создал соответствующий файл хранилища: http://ckan-demo.ckan.io/dataset/papel-test/resource/8d7cb489-878e-465e-8c8c-60ea537411e0 Но не знаю, как отформатировать его, или если csv - правильный формат.

Благодаря

* Примечание: Я пробовал в режиме строки и все работает, но это не то, что я хочу.

** Примечание. Я также обнаружил ошибку в программном обеспечении предварительного просмотра, изменив заголовок моего столбца «Заголовок» на заголовок страницы «CKAN Demo».

EDITED:

Я сделал то, что сказано в документации:

Колонка: Одна строка upserted для всех уполномоченной контекста атрибуты. Этот вид строки будет содержать два поля за атрибут каждого субъекта (по одному для значения, называемого <attrName>, а другой для метаданных, называется <attrName>_md), плюс четыре дополнительных поля:

  • recvTime: UTC метка времени в человеке -защищенный формат (ISO 8601).
  • fiwareServicePath: Уведомленный или по умолчанию.
  • entityId: Обозначенный идентификатор объекта.
  • entityType: Уведомленный тип сущности.

Но до сих пор имеют ту же ошибку:

time=2016-04-25T05:17:48.790CDT | lvl=ERROR | trans=1461579403-571-0000000000 | srv=Papel | subsrv=Test | function=processNewBatches | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.OrionSink[426] : Runtime error (Cannot persist the data (orgName=papel, pkgName=papel_test, resName=es-leon-0_event)) 

ответ

1

Прежде всего, вы будете нуждаться в организации CKAN и пакет/набор данных перед созданием ресурса и связанный с ним хранилищу с целью сохранения данных.

Создание организации, скажем в demo.ckan.org; название организации является frb, потому что наша организация будет в этом FIWARE службы:

$ curl -X POST "http://demo.ckan.org/api/3/action/organization_create" -d '{"name":"frb"}' -H "Authorization: xxxxxxxx" 

Создание пакета/набора данных в рамках вышеупомянутой организации; имя пакета frb_test, потому что наша организация будет в службе FIWARE frb и в пути FIWARE службы test:

$ curl -X POST "http://demo.ckan.org/api/3/action/package_create" -d '{"name":"frb_test","owner_org":"frb"}' -H "Authorization: xxxxxxxx" 

Создание ресурса в пределах вышеуказанного пакета/набора данных (пакет ID дается в ответ на указанный выше запрос создания пакета); имя ресурса room1_room, поскольку идентификатор объекта будет room1 и его тип room:

$ curl -X POST "http://demo.ckan.org/api/3/action/resource_create" -d '{"name":"room1_room","url":"none","format":"","package_id":"d35fca28-732f-4096-8376-944563f175ba"}' -H "Authorization: xxxxxxxx" 

Наконец, и отвечая на ваш вопрос, создавая хранилищу, связанный с выше ресурсов и подходит для приема данных Cgynus в колонке режиме (идентификатор ресурса дается в ответ на запрос создания вышеуказанного ресурса):

$ curl -X POST "http://demo.ckan.org/api/3/action/datastore_create" -d '{"fields":[{"id":"recvTime","type":"text"}, {"id":"fiwareServicePath","type":"text"}, {"id":"entityId","type":"text"}, {"id":"entityType","type":"text"}, {"id":"temperature","type":"float"}, {"id":"temperature_md","type":"json"}],"resource_id":"48c120df-5bcd-48c7-81fa-8ecf4e4ef9d7","force":"true"}' -H "Authorization: xxxxxxxx" 

Теперь Cygnus способен сохранять данные для объекта с идентификатором room1 типа room в frb службы,Путьслужбы:

time=2016-04-26T15:54:45.753CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=frb | subsvc=/test | function=getEvents | comp=cygnusagent | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[240] : Starting internal transaction (b465ffb8-710f-4cd3-9573-dc3799f774f9) 
time=2016-04-26T15:54:45.754CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=frb | subsvc=/test | function=getEvents | comp=cygnusagent | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[256] : Received data ({ "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8", "originator" : "localhost", "contextResponses" : [ {  "contextElement" : {  "attributes" : [   {   "name" : "temperature",   "type" : "centigrade",   "value" : "26.5"   }  ],  "type" : "room",  "isPattern" : "false",  "id" : "room1"  },  "statusCode" : {  "code" : "200",  "reasonPhrase" : "OK"  } } ]}) 
time=2016-04-26T15:55:07.843CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=frb | subsvc=/test | function=processNewBatches | comp=cygnusagent | msg=com.telefonica.iot.cygnus.sinks.NGSISink[342] : Batch accumulation time reached, the batch will be processed as it is 
time=2016-04-26T15:55:07.844CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=frb | subsvc=/test | function=processNewBatches | comp=cygnusagent | msg=com.telefonica.iot.cygnus.sinks.NGSISink[396] : Batch completed, persisting it 
time=2016-04-26T15:55:07.846CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=frb | subsvc=/test | function=persistAggregation | comp=cygnusagent | msg=com.telefonica.iot.cygnus.sinks.NGSICKANSink[419] : [ckan-sink] Persisting data at OrionCKANSink (orgName=frb, pkgName=frb_test, resName=room1_room, data={"recvTime": "2016-04-26T13:54:45.756Z","fiwareServicePath": "/test","entityId": "room1","entityType": "room","temperature": "26.5"}) 
time=2016-04-26T15:55:08.948CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=frb | subsvc=/test | function=processNewBatches | comp=cygnusagent | msg=com.telefonica.iot.cygnus.sinks.NGSISink[400] : Finishing internal transaction (b465ffb8-710f-4cd3-9573-dc3799f774f9) 

Встраивание можно проверить через CKAN API, а также:

$ curl -X POST "http://demo.ckan.org/api/3/action/datastore_search" -d '{"resource_id":"48c120df-5bcd-48c7-81fa-8ecf4e4ef9d7"}' -H "Authorization: xxxxxxxx" 
+0

Спасибо большое, это было невероятно полезно. –

Смежные вопросы