2015-09-08 5 views
3

я столкнулся вопрос с Flume (1.5 на Cloudera CDH 5.3):Flume памяти до Шанель HDFS тонуть

spoolDir source -> memory channel -> HDFS sink 

Что я пытаюсь сделать: Каждый 5mins, около 20 файлов помещаются в каталог буферизации (захвачен с удаленного хранилища). Каждый файл содержит несколько строк, каждая строка - журнал (в JSON). Размер файлов составляет от 10 КБ до 1 МБ.

Когда я запускаю агент, все файлы успешно переносятся в HDFS. Через 1 минуту (это то, что я установил в flume.conf) файлы свертываются (удалите суффикс .tmp и закрыты).

Но, когда новые файлы находятся в каталоге спулинга, я получаю сообщение:

org.apache.flume.source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 250 milliseconds 

Перепробовав много различных конфигураций без успеха (увеличение/уменьшение канала transactionCapacity и мощности, увеличение/уменьшение BATCHSIZE , и т. д.), прошу вас о помощи.

Вот моя последняя конфигурация желоба:

# source definition 
sebanalytics.sources.spooldir-source.type = spooldir 
sebanalytics.sources.spooldir-source.spoolDir = /var/flume/in 
sebanalytics.sources.spooldir-source.basenameHeader = true 
sebanalytics.sources.spooldir-source.basenameHeaderKey = basename 
sebanalytics.sources.spooldir-source.batchSize = 10 
sebanalytics.sources.spooldir-source.deletePolicy = immediate 
# Max blob size: 1.5Go 
sebanalytics.sources.spooldir-source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder 
sebanalytics.sources.spooldir-source.deserializer.maxBlobLength = 1610000000 
# Attach the interceptor to the source 
sebanalytics.sources.spooldir-source.interceptors = json-interceptor 
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.type = com.app.flume.interceptor.JsonInterceptor$Builder 
# Define event's headers. basenameHeader must be the same than source.basenameHeaderKey (defaults is basename) 
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.basenameHeader = basename 
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.resourceHeader = resources 
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.ssidHeader = ssid 

# channel definition 
sebanalytics.channels.mem-channel-1.type = memory 
sebanalytics.channels.mem-channel-1.capacity = 1000000 
sebanalytics.channels.mem-channel-1.transactionCapacity = 10 

# sink definition 
sebanalytics.sinks.hdfs-sink-1.type = hdfs 
sebanalytics.sinks.hdfs-sink-1.hdfs.path = hdfs://StandbyNameNode/data/in 
sebanalytics.sinks.hdfs-sink-1.hdfs.filePrefix = %{resources}_%{ssid} 
sebanalytics.sinks.hdfs-sink-1.hdfs.fileSuffix = .json 
sebanalytics.sinks.hdfs-sink-1.hdfs.fileType = DataStream 
sebanalytics.sinks.hdfs-sink-1.hdfs.writeFormat = Text 
sebanalytics.sinks.hdfs-sink-1.hdfs.rollInterval = 3600 
sebanalytics.sinks.hdfs-sink-1.hdfs.rollSize = 63000000 
sebanalytics.sinks.hdfs-sink-1.hdfs.rollCount = 0 
sebanalytics.sinks.hdfs-sink-1.hdfs.batchSize = 10 
sebanalytics.sinks.hdfs-sink-1.hdfs.idleTimeout = 60 

# connect source and sink to channel 
sebanalytics.sources.spooldir-source.channels = mem-channel-1 
sebanalytics.sinks.hdfs-sink-1.channel = mem-channel-1 

ответ

0

Полный канал означает, что: канал не в состоянии полученных более события от источника, так как раковина потребляет эти события медленнее, чем источник.

Увеличение пропускной способности канала связано только с проблемой. Возможные решения:

  • Улучшение обработки на раковине ... если раковина является обычным (улучшение/исключение циклов, использование более эффективного API-интерфейса и т. Д.). В этом случае это кажется невозможным, поскольку вы используете приемник HDFS по умолчанию.
  • Уменьшение частоты передачи данных на источник. Тем не менее, я думаю, вы не хотите/не можете этого сделать из-за требований к обработке.
  • Добавить больше раковины, работающих параллельно. Я не уверен в этом, но я могу представить дизайнеров Flume, решивших запустить каждую раковину в отдельной ветке. Если это правда, вы можете попробовать несколько параллельных HDFS-приемников. Чтобы разделить данные на несколько приемников, вам нужно будет использовать multiplexing selector, отличный от default replicating one.

HTH!

+0

Я пробовал ваши решения без успеха. Тем не менее, я изменил исходный код на HTTP, и я больше никогда не сталкивался с проблемой «channel full», даже с огромным вкладом. Итак, я решил свою проблему ... но я бы предпочел понять, почему он не работает с SpoolDir. Но я мог бы справиться без :) – Adagyo

+0

Причина должна быть простой: SpoolDir должен быть медленнее с точки зрения обработки данных, чем источник Http. – frb

Смежные вопросы