Мы работаем на mule Community Edition 3.4.1. Инвестиции, чтобы сделать работу кода там, в настоящее время перевешивают варианты обновления.Mule Message/Thread management
Монгольский коннектор на этой версии так плохо утешает ресурсы, что в основном неприменимо для массовой обработки данных. Мы обрабатываем приблизительно 10M-100M записей в день (что не так уж и много) и с монгольским коннектором из коробки, мы должны перезапустить Mule примерно каждые три часа. Теперь вы можете сделать это разумно и нет, и мы можем инвестировать время, чтобы сделать его относительно безболезненным, однако с объемом автоматизации и зависимыми процессами мы вместо этого решили написать собственный Mongo-коннектор.
Для удобства использования и для того, чтобы соответствовать навыкам существующего пула талантов, мы хотели создать что-то, что обеспечивает удобство использования и отсутствие утечек ресурсов.
В основном мы создали простой Mule Transformer в Java, который предоставляет его функциональность с помощью статических методов. Каждый статический метод получает ссылку на экземпляр, и экземпляр имеет текущий MuleEvent. В принципе, мы просто переопределяем метод процесса, чтобы сохранить MuleEvent и метод doProcess для сохранения полезной нагрузки в частное свойство, а затем вернуть ссылку на экземпляр в качестве полезной нагрузки (this).
Следующий элемент в потоке - это трансформатор выражений, который вызывает статический метод с полезной нагрузкой в качестве экземпляра и содержит все выражения как простые выражения MEL, так же, как вы могли бы сделать в соединителе Mongo.
Мы используем POJO, а Mule Studio может собирать ссылки на статические методы, а использование этого разъема очень просто и просто. Он также может обрабатывать сотни миллионов вызовов без каких-либо проблем, поскольку мы очень тщательно управляем ресурсами (на самом деле, слишком консервативными), и все отлично. Кроме. Когда обработка выполняется очень быстро, иногда Mule теряет свою прохладу и бросает нулевое исключение. Мы знаем, что в этом случае в нормальном сценарии нет нулевого исключения. На самом деле мы даже явно добавляли фильтры выражений, которые не позволяли бы генерировать нулевое выражение, однако это все еще происходит. Но только, если есть асинхронный поток. Теперь, прежде чем мы прыгнем на это, внутри асинхронного потока компоненты все равно выполняются последовательно, и наше решение помещает экземпляр соединителя Mongo в полезную нагрузку сообщения и выбирает его прямо вверх. Мы хотим получить некоторые сведения о том, как эта проблема возникает и почему.
Вот некоторые части кода, я могу добавить еще. EDIT: Я добавил линию в поток раньше. Вы увидите, currentSku не может быть пустым
<set-session-variable variableName="currentSku" value="#[payload.?sku]" doc:name="currentSku"/>
<logger message="#[currentSku]" level="INFO" category="fiuze.plugins.contentproviders.ExtraData.asycnhFlow" doc:name="Logger"/>
<expression-filter expression="#[currentSku != null ]" doc:name="filterForNoSKUinpayload"/>
<custom-transformer class="com.fiuze.components.Mongo" doc:name="mongoInit" />
<expression-transformer expression="#[com.fiuze.components.Mongo.many(payload, "fizueDemoMongo","products","{'productChannelData.default.ExtraData':{$exists:false}}",100)]" doc:name="mongoGetProductsWithoutExtraData"/>
добавить здесь код Java, который отражает этот призыв:
public static synchronized Object many(Mongo instance, String configRef, String collectionName, String query, Integer limit) throws TransformerException {
return Mongo.exec(instance, configRef, collectionName, query, limit, null, null, false);
}
и здесь нам Exec реализации:
private static synchronized Object exec(Mongo instance, String configRef, String collectionName, Object query, Integer limit, Integer skip, Object sort, Boolean count) throws TransformerException {
instance.getMuleMessage().setPayload(instance.getPayload());
instance.setConfigref(configRef);
instance.setCollectionName(collectionName);
instance.setLimit(limit);
instance.setSkip(skip);
sort = instance.evaluate(sort);
instance.setQuery(instance.evaluate(query));
if (sort instanceof BasicDBObject) {
instance.setSort((BasicDBObject)sort);
} else if (sort instanceof String) {
instance.setSort((String)sort);
} else if (sort == null){
instance.setSort((String)null);
} else {
throw new TransformerException((Message) instance.getMuleMessage());
}
instance.setCount(count);
Object result = instance.execFind();
instance.dispose();
return result;
}
как вам заметил, мы используем синхронные вызовы, поэтому статические методы не путаются.
Так как это сложная проблема, и я не уверен, что я четко ее выражаю, пожалуйста, не стесняйтесь спрашивать подробности, я буду более чем счастлив предоставить.
EDIT:
Вот часть журнала, я буду включать часть без каких-либо проблем, ошибки, а затем никаких проблем в дальнейшем. В этом разделе вы увидите, что процесс продолжается, но конкретный поток Asynch испорчен.
INFO 2016-05-20 21:22:00,760 [[fiuze].getExtraProductData.async1.1579] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:00,769 [[fiuze].getExtraProductData.async1.1581] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,812 [[fiuze].getExtraProductData.async1.1590] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO 2016-05-20 21:22:00,825 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,831 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,887 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,959 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO 2016-05-20 21:22:00,995 [[fiuze].getExtraProductData.async1.1580] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84422
INFO 2016-05-20 21:22:00,996 [[fiuze].getExtraProductData.async1.1579] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84436
INFO 2016-05-20 21:22:00,998 [[fiuze].getExtraProductData.async1.1578] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84454
INFO 2016-05-20 21:22:00,999 [[fiuze].getExtraProductData.async1.1587] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84455
INFO 2016-05-20 21:22:01,000 [[fiuze].getExtraProductData.async1.1590] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84446
INFO 2016-05-20 21:22:01,002 [[fiuze].getExtraProductData.async1.1586] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84445
INFO 2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1589] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84470
INFO 2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84477
INFO 2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1591] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84462
INFO 2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1588] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84464
ERROR 2016-05-20 21:22:01,010 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
+ 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1583] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84447
INFO 2016-05-20 21:22:01,003 [[fiuze].getExtraProductData.async1.1585] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84456
ERROR 2016-05-20 21:22:01,014 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
+ 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.1582] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84482
INFO 2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.01] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84483
INFO 2016-05-20 21:22:01,023 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.api.processor.LoggerMessageProcessor: starting ForEach
WARN 2016-05-20 21:22:01,024 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
ERROR 2016-05-20 21:22:01,034 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,035 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,145 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,153 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,268 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,269 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
WARN 2016-05-20 21:22:01,385 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
WARN 2016-05-20 21:22:01,586 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
WARN 2016-05-20 21:22:01,600 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
INFO 2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.1580] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1583] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1586] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,820 [[fiuze].getExtraProductData.async1.1581] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84485
INFO 2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1584] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84488
INFO 2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84490
ERROR 2016-05-20 21:22:01,892 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO 2016-05-20 21:22:01,911 [[fiuze].getExtraProductData.async1.1585] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:02,136 [[fiuze].getExtraProductData.async1.1591] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:02,231 [[fiuze].getExtraProductData.async1.1588] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:02,356 [[fiuze].getExtraProductData.async1.1587] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:02,425 [[fiuze].getExtraProductData.async1.1587] org.mule.api.processor.LoggerMessageProcessor: no match found
Помощь в стеке исключений помогла бы. –
Добавил журнал. Дэвид - спасибо, что посмотрел, я уже многому научился у вас. –
Так что кажется 'update' методом на' com.fiuze.components.Mongo 'бросает NPE. Любая идея почему? –