2014-11-15 2 views
1

Я пытаюсь запустить пример JavaKinesisWordCountASL.Apache Spark Kinesis Образец не работает

Пример, похоже, подключается к моему потоку Kinesis и получает данные из потока (как показано в журнале ниже). Однако Sparks не вызывает функцию вызова, переданную в метод unionStreams.flatMap в примере, и не печатает ни одного слова.

Я пробовал работать с использованием как Java 8, так и Java 7. Я запускаю его на экземпляре ubuntu. Тот же пример работает на моем macbook.

14/11/15 01:59:42 ИНФО scheduler.ReceiverTracker: Поток 1 0 получил блоки 14/11/15 1:59:42 ИНФО storage.MemoryStore: ensureFreeSpace (264) вызывается с curMem = 3512, maxMem = 938244833 14/11/15 01:59:42 INFO storage.MemoryStore: Блок ввода-0-1416016781800, сохраненный как значения в памяти (примерный размер 264,0 B, бесплатно 894,8 МБ) 14/11/15 01:59: 42 INFO storage.BlockManagerInfo: Добавлен ввод-0-1416016781800 в память на ip-10-80-91-13.ec2.internal: 39149 (размер: 264,0 B, бесплатно: 894,8 МБ) 14/11/15 01:59 : 42 INFO storage.BlockManagerMaster: обновлена ​​информация о блочном вводе-0-1416016781800 14/11/15 01:59:42 INFO scheduler.JobScheduler: добавлены задания для времени 1416016782000 мс 14/11/15 01:59:42 INFO netwo rk.SendingConnection: инициирование подключения к [ip-10-80-91-13.ec2.internal/10.80.91.13: 39149] 14/11/15 01:59:42 INFO network.SendingConnection: подключен к [ip-10 -80-91-13.ec2.internal/10.80.91.13: 39149], 1 сообщение находится на рассмотрении 14/11/15 01:59:42 INFO network.ConnectionManager: принятое соединение из [ip-10-80-91-13 .ec2.internal/10.80.91.13: 56700] 14/11/15 01:59:42 WARN storage.BlockManager: вход блока - 0-1416016781800 уже существует на этом компьютере; не повторное добавление 14/11/15 01:59:42 INFO receiver.BlockGenerator: Вдвинутый вход блока 0-1416016781800 14/11/15 01:59:43 INFO storage.MemoryStore: обеспечитьFreeSpace (256), вызываемый с помощью curMem = 3776, maxMem = 938244833 14/11/15 01:59:43 INFO storage.MemoryStore: Блок ввода-0-1416016782800 сохранен как значения в памяти (примерный размер 256.0 B, бесплатно 894.8 MB) 14/11/15 01:59:43 INFO storage.BlockManagerInfo: Добавлен ввод-0-1416016782800 в память на ip-10-80-91-13.ec2.internal: 39149 (размер: 256,0 B, бесплатно: 894,8 МБ) 14/11/15 01:59:43 INFO storage.BlockManagerMaster: Обновлена ​​информация о блочном вводе-0-1416016782800 14/11/15 01:59:43 WARN storage.BlockManager: Блок ввода-0-1416016782800 уже существует на этой машине; не повторное добавление 14/11/15 01:59:43 INFO receiver.BlockGenerator: Вдвинутый блок ввода-0-1416016782800 14/11/15 01:59:44 INFO scheduler.ReceiverTracker: Stream 0 получил 2 блока 14/11/15 01:59:44 INFO scheduler.ReceiverTracker: Stream 1 получил 0 блоков 14/11/15 01:59:44 INFO scheduler.JobScheduler: Добавлены задания для времени 1416016784000 ms 14/11/15 01: 59:46 INFO scheduler.ReceiverTracker: Stream 0 получил 0 блоков 14/11/15 01:59:46 INFO scheduler.ReceiverTracker: Stream 1 получил 0 блоков 14/11/15 01:59:46 INFO scheduler.JobScheduler: Добавлены задания для времени 1416016786000 мс 14/11/15 01:59:46 INFO impl.CWPublisherRunnable: успешно опубликовано 17 баз данных. 14/11/15 01:59:46 INFO storage.MemoryStore: обеспечитьFreeSpace (248), вызываемый с curMem = 4032, maxMem = 938244833 14/11/15 01:59:46 INFO storage.MemoryStore: Блокировать вход-1- 1416016786000, хранящиеся как значения в памяти (оценочный размер 248,0 B, бесплатно 894,8 МБ) 14/11/15 01:59:46 INFO storage.BlockManagerInfo: Добавлен ввод-1-1416016786000 в память на ip-10-80-91-13 .ec2.internal: 39149 (размер: 248.0 B, бесплатно: 894.8 МБ) 14/11/15 01:59:46 INFO storage.BlockManagerMaster: Обновлена ​​информация о блочном вводе-1-1416016786000 14/11/15 01: 59:46 WARN хранилище.BlockManager: Блок ввода-1-1416016786000 уже существует на этой машине; не повторное добавление 14/11/15 01:59:46 INFO receiver.BlockGenerator: Вдвинутый блок ввода-1-1416016786000 14/11/15 01:59:46 INFO impl.CWPublisherRunnable: успешно опубликовано 14 баз данных. 14/11/15 01:59:48 INFO scheduler.ReceiverTracker: Stream 0 получил 0 блоков 14/11/15 01:59:48 INFO storage.MemoryStore: обеспечитьFreeSpace (264), вызываемый с curMem = 4280, maxMem = 938244833 14/11/15 01:59:48 INFO scheduler.ReceiverTracker: Stream 1 получил 1 блок 14/11/15 01:59:48 INFO storage.MemoryStore: Блок ввода-0-1416016787800 сохранен как значения в памяти (по оценкам размер 264,0 B, бесплатно 894,8 МБ) 14/11/15 01:59:48 INFO storage.BlockManagerInfo: Добавлен ввод-0-1416016787800 в память на ip-10-80-91-13.ec2.internal: 39149 (размер : 264.0 B, бесплатно: 894.8 MB) 14/11/15 01:59:48 INFO storage.BlockManagerMaster: Обновлена ​​информация о блочном вводе-0-1416016787800 14/11/15 01:59:48 INFO scheduler.JobScheduler: Добавлены задания для времени 1416016788000 мс 14/11/15 01:59:48 WARN storage.BlockManager: вход блока-0-1416016787800 уже существует на этой машине; не повторное добавление его 14/11/15 01:59:48 INFO receiver.BlockGenerator: Вдвинутый блок ввода-0-1416016787800 14/11/15 01:59:50 INFO scheduler.ReceiverTracker: Stream 0 получил 1 блок 14/11/15 01:59:50 INFO scheduler.ReceiverTracker: Stream 1 получил 0 блоков 14/11/15 01:59:50 INFO scheduler.JobScheduler: Добавлены задания для времени 1416016790000 ms 14/11/15 01: 59:51 INFO storage.MemoryStore: secureFreeSpace (264), вызываемый с curMem = 4544, maxMem = 938244833 14/11/15 01:59:51 INFO storage.MemoryStore: вход блока-0-1416016790800, сохраненный как значения в памяти (по оценкам размер 264,0 B, бесплатно 894,8 МБ) 14/11/15 01:59:51 INFO storage.BlockManagerInfo: Добавлен ввод-0-1416016790800 в память на ip-10-80-91-13.ec2.internal: 39149 (размер : 264,0 B, бесплатно: 894,8 МБ) 14/1 1/15 01:59:51 INFO storage.BlockManagerMaster: Обновлена ​​информация о блочном вводе-0-1416016790800 14/11/15 01:59:51 WARN storage.BlockManager: Блок ввода-0-1416016790800 уже существует на этом компьютере; не повторное добавление его 14/11/15 01:59:51 INFO receiver.BlockGenerator: Вдвинутый блок ввода-0-1416016790800

+1

Из рекомендаций SO: вопросы, требующие помощи по отладке («почему этот код не работает?») должны включать в себя желаемое поведение, конкретную проблему или ошибку и кратчайший код, необходимый для воспроизведения в самом вопросе. Вопросы без четкого описания проблемы не полезны другим читателям. См. также: http://stackoverflow.com/ help/mcve – maasg

ответ

3

Это может иметь какое-то отношение к тому, сколько рабочих потоков вы получили. У меня была такая же проблема, когда я запускал приложение с помощью --master local [2]. Я много часов искал ответа и ничего не нашел. Просто из любопытства я перешел на «local» [4], и это сработало. Я не знаю причину. Может быть, кто-то, более знакомый с Спарком, может просветить нас.

Примечание: в моем случае поток Kinesis имел два осколка. Таким образом, приложение создало два входных потока, по одному для каждого осколка.

+1

В этом была проблема: я создал новый поток с 1 осколком, и программа начала работать. Я исследовал еще и обнаружил, что с 1 осколком вам нужно как минимум 2 ядра в вашем кластере, с двумя осколками вам нужно 4 ядра, и с 3-мя осколками вам нужно как минимум 6 ядер. – user3594557

1

Благодаря подсказке от пользователя @ user3594557.

Есть две большие заметки из https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#input-dstreams

Если количество ядер выделяется для приложения меньше или равно числу входных DStreams/приемников, то система будет получать данные, но не смогут обрабатывать их.

При запуске локально, если у вас установлен URL-адрес, установленный на «local», то - это только одно ядро ​​для запуска задач. Этого недостаточно для программ с , даже один вход DStream (потоки файлов в порядке), поскольку приемник будет занять это ядро, и для обработки данных не останется ядра.

+1

Я до сих пор не могу решить эту проблему, у меня есть 5 осколков и я упоминал о местных [20] в мастер-классе. Он все еще показывает мне пустые rdds. Пример wordcount. – RockSolid

+0

@RockSolid: Если бы у вас была возможность решить эту проблему, у меня тоже проблема. Я использую специальный приемник – user1050619

Смежные вопросы