2013-03-05 2 views
1

Я довольно новичок в Mule, используя 3.3.0, но я стараюсь, по моему мнению, быть довольно хорошим примером. У меня есть конфиг мула, который будет читать csv-файл и попытаться обработать строки и столбцы в разных потоках async. Однако мы видим ConcurrentModificationException, когда сообщение «передается» одному из потоков async. Мне было интересно, видел ли кто-нибудь еще эту проблему и что они, возможно, сделали для решения этой проблемы.Видя ConcurrentModificationException в Mule 3.3.0

java.util.ConcurrentModificationException в org.apache.commons.collections.map.AbstractHashedMap $ HashIterator.nextEntry (AbstractHashedMap.java:1113) в org.apache.commons.collections.map.AbstractHashedMap $ KeySetIterator .next (AbstractHashedMap.java:938) at org.mule.DefaultMuleEvent.setMessage (DefaultMuleEvent.java:933) at org.mule.DefaultMuleEvent. (DefaultMuleEvent.java:318) at org.mule.DefaultMuleEvent. (DefaultMuleEvent .java: 290) at org.mule.DefaultMuleEvent.copy (По умолчаниюMuleEvent.java:948)

<queued-asynchronous-processing-strategy poolExhaustedAction="RUN" name="commonProcessingStrategy" maxQueueSize="1000" doc:name="Queued Asynchronous Processing Strategy"/> 

<file:connector name="inboundFileConnector" fileAge="1000" autoDelete="true" pollingFrequency="1000" workDirectory="C:/mule/orca/dataprovider/work"/> 

<file:endpoint name="dataProviderInbound" path="C:\mule\orca\dataprovider\inbound" moveToPattern="#[function:datestamp]-#[header:originalFilename]" moveToDirectory="C:\mule\orca\dataprovider\history" connector-ref="inboundFileConnector" doc:name="Data Feed File" doc:description="new files are processed in 'work' folder, then moved to 'archive' folder"/> 

<flow name="dataProviderFeedFlow"> 
    <inbound-endpoint ref="dataProviderInbound"/> 
    <file:file-to-string-transformer /> 
    <flow-ref name="dataSub"/> 
</flow> 

<sub-flow name="dataSub" > 

    <splitter expression="#[rows=org.mule.util.StringUtils.split(message.payload, '\n\r')]" /> 
    <expression-transformer expression="#[org.mule.util.StringUtils.split(message.payload, ',')]" /> 
    <foreach> 

     <flow-ref name="storageFlow" /> 
     <flow-ref name="id" />    

    </foreach> 

</sub-flow> 

<flow name="storageFlow" processingStrategy="commonProcessingStrategy"> 
    <logger level="INFO" message="calling the 'storageFlow' sub flow."/> 
</flow> 

<flow name="id" processingStrategy="commonProcessingStrategy"> 
    <logger level="INFO" message="calling the 'id' sub flow."/> 
</flow> 
+0

Я не совсем понимаю вашу конфигурацию. Почему бы не использовать второй разделитель выражений после первого? Не знаете, для чего используется 'foreach'? Также нет 'all' вокруг' storageFlow' и 'id' flow-refs, поэтому это означает, что вы хотите передать результат' storageFlow' в 'id'? Но эти частные потоки имеют асинхронную стратегию обработки, поэтому они не могут возвращать никакого результата. Я смущен: $ –

+0

Моя неопытность, вероятно, вызывает у вас путаницу. Цель состоит в том, чтобы создать сообщение, которое затем может быть передано каждому потоку, когда я попробовал сплиттер и все вокруг потока-refs. Я получаю сообщение об ошибке: –

+0

Нет, я просто пытаюсь посмотреть, прошли ли вы другие подходы/попытки до достижения этой точки. В любом случае, можете ли вы поделиться простым входным файлом, чтобы мы могли попытаться воспроизвести? Также, пожалуйста, укажите ожидаемое поведение для вызова 'storageFlow' и' id': должны ли они оба получать одно и то же сообщение или привязываться друг к другу один за другим? –

ответ

4

Вот фиксированная версия dataSub суб-поток, который работает отлично:

<sub-flow name="dataSub"> 
    <splitter expression="#[org.mule.util.StringUtils.split(message.payload, '\n\r')]" /> 
    <splitter expression="#[org.mule.util.StringUtils.split(message.payload, ',')]" /> 
    <flow-ref name="storageFlow" /> 
    <all> 
     <async> 
      <flow-ref name="storageFlow" /> 
     </async> 
     <async> 
      <flow-ref name="id" /> 
     </async> 
    </all> 
</sub-flow> 

Обратите внимание, что:

  • Я использую два разветвителя выражения,
  • Я использую all сообщение процессор для обеспечения того, что одна и та же полезная нагрузка отправляется в оба потока,
  • Я должен обернуть поток ref ref async обработчик сообщений в противном случае вызов вызывает сбои, потому что частные потоки являются асинхронными, но all принудительно синхронно.
+0

Отличный и спасибо! Знаете ли вы, почему или вы можете предположить, почему произошло исключение? Концептуально мне показалось хорошо ... поэтому я хотел бы знать, что происходит, чтобы настроить мое понимание. –

+0

Не уверен на 100%, но я предполагаю, что, поскольку 'foreach' изменяет свойства сообщения (он устанавливает переменные потока) и потому, что сообщения обрабатываются асинхронно и сериализован из-за использования« стратегии очереди-асинхронной обработки », сообщения, где ожидая, что они будут в устойчивом состоянии, когда они не будут. В общем, избегайте 'foreach', если у вас нет сильного аргумента в пользу его использования. Наконец, если мое решение работает для вас, примите его. –

Смежные вопросы