я столкнулся вопрос с Flume (1.5 на Cloudera CDH 5.3):Flume памяти до Шанель HDFS тонуть
spoolDir source -> memory channel -> HDFS sink
Что я пытаюсь сделать: Каждый 5mins, около 20 файлов помещаются в каталог буферизации (захвачен с удаленного хранилища). Каждый файл содержит несколько строк, каждая строка - журнал (в JSON). Размер файлов составляет от 10 КБ до 1 МБ.
Когда я запускаю агент, все файлы успешно переносятся в HDFS. Через 1 минуту (это то, что я установил в flume.conf) файлы свертываются (удалите суффикс .tmp и закрыты).
Но, когда новые файлы находятся в каталоге спулинга, я получаю сообщение:
org.apache.flume.source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 250 milliseconds
Перепробовав много различных конфигураций без успеха (увеличение/уменьшение канала transactionCapacity и мощности, увеличение/уменьшение BATCHSIZE , и т. д.), прошу вас о помощи.
Вот моя последняя конфигурация желоба:
# source definition
sebanalytics.sources.spooldir-source.type = spooldir
sebanalytics.sources.spooldir-source.spoolDir = /var/flume/in
sebanalytics.sources.spooldir-source.basenameHeader = true
sebanalytics.sources.spooldir-source.basenameHeaderKey = basename
sebanalytics.sources.spooldir-source.batchSize = 10
sebanalytics.sources.spooldir-source.deletePolicy = immediate
# Max blob size: 1.5Go
sebanalytics.sources.spooldir-source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
sebanalytics.sources.spooldir-source.deserializer.maxBlobLength = 1610000000
# Attach the interceptor to the source
sebanalytics.sources.spooldir-source.interceptors = json-interceptor
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.type = com.app.flume.interceptor.JsonInterceptor$Builder
# Define event's headers. basenameHeader must be the same than source.basenameHeaderKey (defaults is basename)
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.basenameHeader = basename
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.resourceHeader = resources
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.ssidHeader = ssid
# channel definition
sebanalytics.channels.mem-channel-1.type = memory
sebanalytics.channels.mem-channel-1.capacity = 1000000
sebanalytics.channels.mem-channel-1.transactionCapacity = 10
# sink definition
sebanalytics.sinks.hdfs-sink-1.type = hdfs
sebanalytics.sinks.hdfs-sink-1.hdfs.path = hdfs://StandbyNameNode/data/in
sebanalytics.sinks.hdfs-sink-1.hdfs.filePrefix = %{resources}_%{ssid}
sebanalytics.sinks.hdfs-sink-1.hdfs.fileSuffix = .json
sebanalytics.sinks.hdfs-sink-1.hdfs.fileType = DataStream
sebanalytics.sinks.hdfs-sink-1.hdfs.writeFormat = Text
sebanalytics.sinks.hdfs-sink-1.hdfs.rollInterval = 3600
sebanalytics.sinks.hdfs-sink-1.hdfs.rollSize = 63000000
sebanalytics.sinks.hdfs-sink-1.hdfs.rollCount = 0
sebanalytics.sinks.hdfs-sink-1.hdfs.batchSize = 10
sebanalytics.sinks.hdfs-sink-1.hdfs.idleTimeout = 60
# connect source and sink to channel
sebanalytics.sources.spooldir-source.channels = mem-channel-1
sebanalytics.sinks.hdfs-sink-1.channel = mem-channel-1
Я пробовал ваши решения без успеха. Тем не менее, я изменил исходный код на HTTP, и я больше никогда не сталкивался с проблемой «channel full», даже с огромным вкладом. Итак, я решил свою проблему ... но я бы предпочел понять, почему он не работает с SpoolDir. Но я мог бы справиться без :) – Adagyo
Причина должна быть простой: SpoolDir должен быть медленнее с точки зрения обработки данных, чем источник Http. – frb