У меня есть producer
consumer
установки с использованием Wildfly
AS для JMS
, производитель использует newFixedThreadPool(126)
каждые 1 минуту каждого поток данных вытягивать вниз от REST
службы и толкает его на HornetQ
на Wildfly
AS.ConcurrentModificationException при использовании iter.hasNext()
Тогда у меня есть класс Потребителя, который расходует сообщения в HornetQ
и простой класс Parser
, для вставки БД, я пытаюсь буферизировать вставки БД, и я получаю исключение в потоке " main "java.util.ConcurrentModificationException Я подозреваю, что это связано с тем, что мой код не является потокобезопасным, но я не могу его сузить.
Потребитель:
public void Consume(Consumer asyncReceiver) throws Throwable {
try (javax.jms.Connection connection = connFactory.createConnection(props.getProperty("DEFAULT_USERNAME"), props.getProperty("
Session queueSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer msgConsumer = queueSession.createConsumer(queue)) {
msgConsumer.setMessageListener(asyncReceiver);
connection.setExceptionListener(asyncReceiver);
connection.start();
/** I think this is causing the problem */
System.out.println("waiting for messages");
int bufferCount = 0;
for (int i = 0; i < 47483647; i++) {
Thread.sleep(1000);
System.out.print(".");
if (bufferCount == 5) {
if(responseList.size() > 50){
this.buffer();
}
bufferCount = 0;
}
bufferCount++;
}
System.out.println();
}
} catch (Exception e) {
log.severe(e.getMessage());
throw e;
} finally {
if (context != null) {
context.close();
}
}
if (connection != null) {
connection.close();
}
}
public void buffer() throws Exception {
System.out.println("Parsing: " + responseList.size() + " messages");
Parser parser = new Parser();
parser.addList(responseList);
parser.parseApplication();
responseList.clear();
}
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
responseList.add(msg.getText());
} catch (Exception e) {
e.printStackTrace();
}
Parser:
public class Parser {
private ArrayList<String> responseList;
public void addList(ArrayList<String> list) {
this.responseList = list;
}
public void parseApplication() throws Exception {
DBConnection db = DBConnection.createApplication();
try (Connection connection = db.getConnection()) {
//Code removed for stack question
Iterator<String> iter = responseList.iterator();
while(iter.hasNext()) {
// This is where the error is thrown
while (fieldsIterator.hasNext()) {
//Cut code from here, basic JSON parsing done here
timeslices = parse(iter.Next())
for (int i = 0; i < timeslices.length(); i++) {
ThroughputEntry TP = new ThroughputEntry();
TP.setThroughput(values.getDouble(name));
TP.setEnvironment(envName);
TP.setName(appName);
TP.setRetrieved(from);
TP.setPeriodEnd(to);
db.addHistory(TP);
}
}
}
}
iter.remove();
}
}
}
Может быть, я должен сделать мой метод буфера в threadpool
?
Вау ... много кода здесь без четкой информации в вопросе о том, где именно в коде происходит ваша ошибка. Можете ли вы очистить это и уменьшить количество кода? – pczeus
Это сбрасывание кода. Пожалуйста, сделайте [mcve]. – Tunaki
Я очистил код, я не уверен, где проблема. – Johntk