2015-11-17 6 views
0

Я пытаюсь получать сообщения от kafka и обрабатывать их по java-методу. Я приложил файлы, которые помогут вам понять и найти проблему. Проблема в том, что я получил ошибку: org.springframework.beans.factory.BeanCreationException: Ошибка при создании компонента с именем «org.springframework.integration.config.ConsumerEndpointFactoryBean # 0, как показано ниже.Spring Integration Kafka Tomcat issue

web.xml

<?xml version="1.0" encoding="UTF-8"?> 
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0"> 
    <display-name>cosmob</display-name> 
    <welcome-file-list> 
    <welcome-file>index.html</welcome-file> 
    <welcome-file>index.htm</welcome-file> 
    <welcome-file>index.jsp</welcome-file> 
    <welcome-file>default.html</welcome-file> 
    <welcome-file>default.htm</welcome-file> 
    <welcome-file>default.jsp</welcome-file> 
    </welcome-file-list> 

    <context-param> 
     <param-name>contextConfigLocation</param-name> 
     <param-value>/WEB-INF/beans.xml</param-value> 
    </context-param> 

    <context-param> 
     <param-name>contextConfigLocation</param-name> 
     <param-value>/WEB-INF/intergration-cfg-app.xml</param-value> 
    </context-param> 

    <servlet> 
     <servlet-name>CXFServlet</servlet-name> 
     <servlet-class>org.apache.cxf.transport.servlet.CXFServlet</servlet-class> 
    </servlet> 
    <servlet-mapping> 
     <servlet-name>CXFServlet</servlet-name> 
     <url-pattern>/rest/*</url-pattern> 
    </servlet-mapping> 

    <servlet> 
     <servlet-name>WsServlet</servlet-name> 
     <servlet-class>org.cosmob.ccc.web.service.WsServlet</servlet-class> 
    </servlet> 

    <servlet-mapping> 
     <servlet-name>WsServlet</servlet-name> 
     <url-pattern>/ws/positions</url-pattern> 
    </servlet-mapping> 

    <listener> 
     <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> 
    </listener> 

</web-app> 

-CFG Интеграция-app.xml файл

<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:int="http://www.springframework.org/schema/integration" 
     xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" 
     xmlns:task="http://www.springframework.org/schema/task" 
     xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd 
     http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> 

    <int:channel id="inputFromKafka"> 
     <int:queue/> 
    </int:channel> 

    <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" 
              kafka-consumer-context-ref="consumerContext" 
              auto-startup="false" 
              channel="inputFromKafka"> 
      <int:poller default="true" fixed-delay="1000" time-unit="MILLISECONDS" max-messages-per-poll="5"/> 
    </int-kafka:inbound-channel-adapter> 

    <int-kafka:consumer-context id="consumerContext" 
            consumer-timeout="1000" 
            zookeeper-connect="zookeeperConnect"> 
      <int-kafka:consumer-configurations> 
       <int-kafka:consumer-configuration group-id="default" 
         max-messages="5000"> 
        <int-kafka:topic id="test2" streams="2"/> 
       </int-kafka:consumer-configuration> 

      </int-kafka:consumer-configurations> 
    </int-kafka:consumer-context> 

    <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="172.16.201.173:2181" zk-connection-timeout="6000" 
         zk-session-timeout="6000" 
         zk-sync-time="2000" /> 

    <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumer" method="processMessage" /> 

    <bean id="kafkaConsumer" class="org.cosmob.kafka.KafkaConsumer"/> 

</beans> 

KafkaConsumer.java

package org.cosmob.kafka; 

import java.util.Collection; 
import java.util.Iterator; 
import java.util.List; 
import java.util.Map; 
import java.util.Set; 
import java.util.concurrent.ConcurrentHashMap; 

import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 

public class KafkaConsumer { 



    public KafkaConsumer() { 
     // TODO Auto-generated constructor stub 
    } 

    public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) { 
     for (Map.Entry<String, Map<Integer, List<byte[]>>> entry : msgs 
       .entrySet()) { 
      System.out.println("Topic:" + entry.getKey()); 
      ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry 
        .getValue(); 
      System.out.println("\n**** Partition: \n"); 
      Set<Integer> keys = messages.keySet(); 
      for (Integer i : keys) 
       System.out.println("p:"+i); 
      System.out.println("\n**************\n"); 
      Collection<List<byte[]>> values = messages.values(); 
      for (Iterator<List<byte[]>> iterator = values.iterator(); iterator 
        .hasNext();) { 
       List<byte[]> list = iterator.next(); 
       for (byte[] object : list) { 
        String message = new String(object); 
        System.out.println("Message: " + message); 

       } 
      } 

     } 
    } 

} 

ОШИБКА Tomcat начиная

13:59:34.401 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.handler.MethodInvokingMessageHandler#0' 
13:59:34.404 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Invoking afterPropertiesSet() on bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 
13:59:34.404 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'inputFromKafka' 
13:59:35.364 [localhost-startStop-1] WARN o.s.w.c.s.XmlWebApplicationContext - Exception encountered during context initialization - cancelling refresh attempt 
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. 
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1578) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:545) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:305) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:301) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:196) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:753) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:834) ~[spring-context-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:537) ~[spring-context-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:446) [spring-web-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:328) [spring-web-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:107) [spring-web-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4994) [catalina.jar:7.0.57] 
    at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5492) [catalina.jar:7.0.57] 
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [catalina.jar:7.0.57] 
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1575) [catalina.jar:7.0.57] 
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1565) [catalina.jar:7.0.57] 
    at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_31] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_31] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_31] 
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_31] 
Caused by: java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. 
    at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.integration.config.ConsumerEndpointFactoryBean.initializeEndpoint(ConsumerEndpointFactoryBean.java:260) ~[spring-integration-core-4.1.6.RELEASE.jar:na] 
    at org.springframework.integration.config.ConsumerEndpointFactoryBean.afterPropertiesSet(ConsumerEndpointFactoryBean.java:211) ~[spring-integration-core-4.1.6.RELEASE.jar:na] 
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1637) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1574) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE] 
    ... 21 common frames omitted 
13:59:35.365 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Destroying singletons in org.s[email protected]76967156: defining beans [channelInitializer,$autoCreateChannelCandidates,IntegrationConfigurationBeanFactoryPostProcessor,integrationEvaluationContext,org.springframework.integration.expression.IntegrationEvaluationContextAwareBeanPostProcessor#0,integrationGlobalProperties,integrationHeaderChannelRegistry,globalChannelInterceptorProcessor,toStringFriendlyJsonNodeToStringConverter,converterRegistrar,integrationConversionService,DefaultConfiguringBeanFactoryPostProcessor,datatypeChannelMessageConverter,messageBuilderFactory,inputFromKafka,kafkaInboundChannelAdapter.source,org.springframework.scheduling.support.PeriodicTrigger#0,kafkaInboundChannelAdapter,consumerContext,zookeeperConnect,org.springframework.integration.handler.MethodInvokingMessageHandler#0,org.springframework.integration.config.ConsumerEndpointFactoryBean#0,kafkaConsumer,org.springframework.integration.dsl.config.IntegrationFlowBeanPostProcessor,integrationRequestMappingHandlerMapping,nullChannel,errorChannel,_org.springframework.integration.errorLogger,taskScheduler,org.springframework.integration.config.IdGeneratorConfigurer#0]; root of factory hierarchy 
13:59:35.366 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Retrieved dependent beans for bean '(inner bean)#382ecec5': [kafkaInboundChannelAdapter] 
13:59:35.367 [localhost-startStop-1] DEBUG o.s.b.f.s.DisposableBeanAdapter - Invoking destroy() on bean with name 'consumerContext' 
15/11/17 13:59:35 INFO consumer.ZookeeperConsumerConnector: [default_wassimd-1447757975520-3e9c50da], Connecting to zookeeper instance at 172.16.201.173:2181 
15/11/17 13:59:35 INFO zkclient.ZkEventThread: Starting ZkClient event thread. 

ответ

2

ошибка, кажется, совершенно ясно, - вы всегда должны смотреть вниз трассировки стека для дела ...

Caused by: java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context.

Поскольку inputFromKafka является QueueChannel, что вам нужно poller на его потребителя.

<int:outbound-channel-adapter channel="inputFromKafka"... > 
    <int:poller ... /> 
</int:outbound-channel-adapter> 

Однако использование здесь QueueChannel не требуется; просто удалите элемент <int:queue/> с этого канала, и вам не понадобится полеллер.

+0

Спасибо! Я удалил _ _ из _ _, и теперь он бежал без ошибок. Однако метод processMessage, кажется, не получает никакого сообщения: вывод не выводится. Можете ли вы предложить решение – Wassim

+0

Включите ведение журнала DEBUG и посмотрите на активность. –

+0

Мне нужно установить auto-startup = "true" – Wassim

Смежные вопросы