2012-02-10 2 views
2

Я пишу Netty клиент и серверное приложение, которое будет записывать статистику JVM GC в базу данных временных рядов для анализа около 300 серверов. Однако мой трубопровод бросает много из этих исключений:StreamCorruptedException возникает при декодировании объекта

10/02/2012 10:14:23.415 New I/O server worker #2-2 ERROR GCEventsCollector - netty error 
java.io.StreamCorruptedException: invalid type code: 6E 
    at java.io.ObjectInputStream.readObject0(Unknown Source) 
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source) 
    at java.io.ObjectInputStream.readSerialData(Unknown Source) 
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) 
    at java.io.ObjectInputStream.readObject0(Unknown Source) 
    at java.io.ObjectInputStream.readObject(Unknown Source) 
    at org.jboss.netty.handler.codec.serialization.ObjectDecoder.decode(ObjectDecoder.java:94) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:280) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:200) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

Похоже, это создает OutputStream, но один уже существует - так кидает, что конкретное исключение. Он появляется в моей среде AIT, где> 300 серверов подключаются, но не в DEV, где только один агент подключается.

Я подозреваю, что это либо ошибка в декодере объекта, либо где-то есть неправильный код. Пожалуйста, может кто-нибудь объяснить, почему это происходит?

Вот коллектор:

общественного класса GCEventsCollector расширяет AbstractDataCollector { защищенный статический окончательный Logger Регистратор = Logger.getLogger (GCEventsCollector.class); private static final ExecutorService WORKERS = Executors.newCachedThreadPool(); private static final ChannelGroup GROUP = new DefaultChannelGroup ("gc-events"); закрытый конечный порт; закрытый окончательный бутстрап ServerBootstrap;

public GCEventsCollector(final int port) { 
    logger.info("Creating GC Events collector on port " + port); 
    this.port = port; 
    this.bootstrap = newServerBootstrap(port); 
} 



/** 
* Creates a bootstrap for creating bindings to sockets. Each channel has a pipeline, which contains the 
* logic for handling a message such as encoding, decoding, buffering, etc. 
* 
* @param port port of socket 
* @return configured bootstrap 
*/ 
private ServerBootstrap newServerBootstrap(int port) { 
    ExecutorService bossExecutor = Executors.newCachedThreadPool(); 
    ExecutorService workerExecutor = Executors.newCachedThreadPool(); 
    NioServerSocketChannelFactory channelFactory = 
      new NioServerSocketChannelFactory(bossExecutor, workerExecutor); 
    ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); 
    ChannelHandler collectorHandler = new CollectorHandler(); 
    bootstrap.setPipelineFactory(new CollectorPipelineFactory(collectorHandler)); 
    bootstrap.setOption("localAddress", new InetSocketAddress(port)); 
    return bootstrap; 
} 

protected KDBCategory[] getKDBCategories() { 
    return new KDBCategory[] { KDBCategory.GC_EVENTS }; 
} 

/** 
* Bind to a socket to accept messages 
* 
* @throws Exception 
*/ 
public void doStart() throws Exception { 
    Channel channel = bootstrap.bind(); 
    GROUP.add(channel); 
} 

/** 
* Disconnect the channel to stop accepting messages and wait until disconnected 
* 
* @throws Exception 
*/ 
public void doStop() throws Exception { 
    logger.info("disconnecting"); 
    GROUP.close().awaitUninterruptibly(); 
} 

class CollectorHandler extends SimpleChannelHandler { 
    @Override 
    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) 
      throws Exception {   
     super.channelOpen(ctx, e); 
     GROUP.add(e.getChannel()); 
    } 

    @Override 
    public void channelConnected(ChannelHandlerContext ctx, 
      ChannelStateEvent e) throws Exception {   
     super.channelConnected(ctx, e); 
     logger.info("channel connected"); 
    } 

    @Override 
    public void channelDisconnected(ChannelHandlerContext ctx, 
      ChannelStateEvent e) throws Exception { 
     super.channelDisconnected(ctx, e); 
     logger.info("channel disconnected"); 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws Exception { 
     if (logger.isDebugEnabled()) { 
      logger.debug("Received GcStats event: " + e.toString()); 
    } 
     WORKERS.execute(new Runnable() { 
      public void run() { 
     saveData(KDBCategory.GC_EVENTS, (GcEventsPersister) e.getMessage()); 
    } 
     }); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 
     logger.error("netty error", e.getCause()); 
    } 
} 

private static class CollectorPipelineFactory implements ChannelPipelineFactory { 
    private final ChannelHandler handler; 

    private CollectorPipelineFactory(ChannelHandler handler) { 
     this.handler = handler; 
    } 

    @Override 
    public ChannelPipeline getPipeline() throws Exception { 
     return Channels.pipeline(new ObjectDecoder(), handler); 
    } 
} 

}

Вот агент:

public class GCEventsAgent { 
    private final static Logger logger = Logger.getLogger(GCEventsAgent.class); 
    private static final ExecutorService bosses = Executors.newCachedThreadPool(); 
    private static final ExecutorService workers = Executors.newCachedThreadPool(); 
    private static final Timer timer = new HashedWheelTimer(); 
    private static final String localHostName; 
    private static final ParseExceptionListener exceptionListener = new ExceptionListener(); 

static { 
    String name = ""; 
    try { 
     name = InetAddress.getLocalHost().getHostName(); 
    } catch (UnknownHostException e) { 
     logger.error("cannot retrieve local host name", e); 
    } 
    localHostName = name; 
} 

public static void main(final String[] args) { 
    checkArgument(args.length >= 3, "Usage: GCEventsAgent [log4j cfg] [mba cfg] [server cfg] [process 1] ... [process n]"); 

    System.setProperty("log4j.configuration", "file:log4j.properties"); 
    final String log4jConfig = args[0]; 
    DOMConfigurator.configure(log4jConfig); 

    final String mbaConfig = args[1]; 

    final String serverConfigPath = args[2]; 
    final ServerConfig serverCfg = new ServerConfig(serverConfigPath); 

    setup(serverCfg, args); 
} 

private static void setup(ServerConfig cfg, String[] args) { 
    final String host = cfg.getParameter(String.class, "host"); 
    final int port = cfg.getParameter(Integer.class, "port"); 
    if (args.length == 3) 
     configurePolling(cfg, host, port); 
    else 
     configureProcesses(cfg, args, host, port); 
} 

private static void configureProcesses(final ServerConfig cfg, 
             final String[] args, 
             final String host, 
             final int port) { 
    final List<File> logFiles = logFiles(cfg, args); 
    logger.info("Initializing GC Agent for [" + host + ":" + port + "]"); 

    final NioClientSocketChannelFactory channelFactory = 
      new NioClientSocketChannelFactory(bosses, workers); 
    final ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); 
    bootstrap.setOption("remoteAddress", new InetSocketAddress(host, port)); 
    final GCParserFactory parserFactory = new DefaultParserFactory(); 
    final AgentProcessHandler agentHandler = 
      new AgentProcessHandler(bootstrap, logFiles, parserFactory); 
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
     @Override 
     public ChannelPipeline getPipeline() throws Exception { 
      return Channels.pipeline(new ObjectEncoder(), agentHandler); 
     } 
    }); 
    bootstrap.connect().awaitUninterruptibly(); 
} 

private static void configurePolling(ServerConfig cfg, String host, int port) { 
    final int frequency = cfg.getParameter(Integer.class, "frequency"); 
    final NioClientSocketChannelFactory channelFactory = 
      new NioClientSocketChannelFactory(newCachedThreadPool(), newCachedThreadPool()); 
    final ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); 
    bootstrap.setOption("remoteAddress", new InetSocketAddress(host, port)); 
    final GcParserSupplier parserSupplier = new GcParserSupplier(); 
    final ConcurrentMap<File, Tailer> tailerMap = Maps.newConcurrentMap(); 
    final ParseExceptionListener exceptionListener = new ExceptionListener(); 
    final Set<File> discoveredSet = Sets.newHashSet(); 
    final File directory = new File(cfg.getParameter(String.class, "logDirectory")); 

    final TailManager tailManager = 
      new TailManager(discoveredSet, tailerMap, parserSupplier, exceptionListener, localHostName); 
    final DetectionTask detectionTask = new DetectionTask(directory, discoveredSet, tailManager); 

    final FileWatcher fileWatcher = 
      new FileWatcher(Executors.newScheduledThreadPool(1), detectionTask, frequency); 
    final Timer timer = new HashedWheelTimer(); 
    final EfxAgentHandler agentHandler = 
      new EfxAgentHandler(bootstrap, tailManager, fileWatcher, timer); 
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
     @Override public ChannelPipeline getPipeline() throws Exception { 
      return Channels.pipeline(new ObjectEncoder(), agentHandler); 
     } 
    }); 
    bootstrap.connect().awaitUninterruptibly(); 
} 

private static List<File> logFiles(ServerConfig cfg, String[] args) { 
    String logDir = cfg.getParameter(String.class, "logDirectory"); 
    final int n = args.length; 
    List<File> logFiles = new ArrayList<File>(n-3); 
    for (int i = 3; i < n; i++) { 
     String filePath = logDir + args[i] + ".gc.log"; 
     try { 
      File file = new File(filePath); 
      if (!file.exists()) { 
       logger.info("missing log file so creating empty file at path: " + filePath); 
       File dir = file.getParentFile(); 
       dir.mkdirs(); 
       if (file.createNewFile()) 
        logger.info("successfully created empty file at path: " + filePath); 
      } 
      logFiles.add(file); 
     } catch (IOException e) { 
      logger.error("error creating log file at path: " + filePath); 
     } 
    } 
    return logFiles; 
} 

static final class AgentPauseListener implements GCEventListener<CMSType, CMSHeap> { 
    private final Channel channel; 
    private final GcEventsPersister.Builder builder; 
    private byte state; 

    private AgentPauseListener(Channel channel, 
           GcEventsPersister.Builder builder) { 
     this.channel = channel; 
     this.builder = builder; 
    } 

    @Override 
    public void onPause(PauseDetail<CMSType> pauseDetail) { 
     logger.info("onPause"); 
     checkState(state == 0x00 || state == 0x01); 
     builder 
       .collectionType(pauseDetail.getType() == null 
         ? null : pauseDetail.getType().toString()) 
       .start(new Instant(pauseDetail.getStartTimestamp())) 
       .end(new Instant(pauseDetail.getEndTimestamp())) 
       .pause(pauseDetail.getType() == null 
         ? false : pauseDetail.getType().isPause()) 
       .duration(pauseDetail.getPauseMillis()); 
     if (state == 0x00) 
      channel.write(builder.build()); 
     else 
      state |= 0x02; 
    } 

    @Override 
    public void onHeapBefore(HeapDetail<CMSHeap> details) { 
     logger.info("onHeapBefore"); 
     checkState(state == 0x00); 
     builder.heapBefore(used(details)).heapBeforeTotal(total(details)); 
     state |= 0x01; 
    } 

    @Override 
    public void onHeapAfter(HeapDetail<CMSHeap> details) { 
     logger.info("onHeapAfter"); 
     checkState(state == 0x03); 
     builder.heapAfter(used(details)).heapAfterTotal(total(details)); 
     channel.write(builder.build()); 
     state = 0x00; 
    } 

    private final long used(HeapDetail<CMSHeap> details) { 
     return used(details, CMSHeap.PAR_NEW_GENERATION) 
       + used(details, CMSHeap.CMS_GENERATION) 
       + used(details, CMSHeap.CMS_PERM_GENERATION); 
    } 

    private final long used(HeapDetail<CMSHeap> heapDetail, 
          CMSHeap gen) { 
     final Map<CMSHeap, HeapDetail.HeapMetric> sizes = heapDetail.getSizes(); 
     final long used = sizes.get(gen).getUsed(); 
     logger.info("used = " + used); 
     return used; 
    } 

    private final long total(HeapDetail<CMSHeap> details) { 
     return total(details, CMSHeap.PAR_NEW_GENERATION) 
       + total(details, CMSHeap.CMS_GENERATION) 
       + total(details, CMSHeap.CMS_PERM_GENERATION); 
    } 

    private final long total(HeapDetail<CMSHeap> heapDetail, 
          CMSHeap gen) { 
     final Map<CMSHeap, HeapDetail.HeapMetric> sizes = heapDetail.getSizes(); 
     return sizes.get(gen).getTotal(); 
    } 
} 

static final class ExceptionListener implements ParseExceptionListener { 
    @Override 
    public void onParseError(int lineNo, String input, String error) { 
     logger.error("error parsing: " + lineNo + " - " + input + " - " + error); 
    } 
} 

static final class ReconnectTask implements TimerTask { 
    private final ClientBootstrap bootstrap; 

    ReconnectTask(ClientBootstrap bootstrap) { 
     this.bootstrap = bootstrap; 
    } 

    @Override 
    public void run(Timeout timeout) throws Exception { 
     bootstrap.connect(); 
    } 
} 

static class AgentProcessHandler extends SimpleChannelHandler { 
    private final ClientBootstrap bootstrap; 
    private final List<File> logFiles; 
    private final GCParserFactory parserFactory; 
    private final Set<Tailer> tailers = new HashSet<Tailer>(4); 

    public AgentProcessHandler(ClientBootstrap bootstrap, 
           List<File> logFiles, 
           GCParserFactory parserFactory) { 
     this.bootstrap = bootstrap; 
     this.logFiles = logFiles; 
     this.parserFactory = parserFactory; 
    } 

    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) 
      throws Exception { 
     logger.info("channel connected"); 
     for (File logFile : logFiles) { 
      logger.info("setting up scraper for logfile: " + logFile); 
      GCParser parser = parserFactory.getParser(); 
      GcEventsPersister.Builder builder = 
        new GcEventsPersister.Builder(logFile.getName(), localHostName); 
      GCEventListener gcEventListener = 
        new AgentPauseListener(e.getChannel(), builder); 
      TailerListener listener = 
        new LogLineListener(parser, gcEventListener, exceptionListener); 
      Tailer tailer = Tailer.create(logFile, listener, 1000L, true); 
      tailers.add(tailer); 
     } 
    } 

    @Override 
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) 
      throws Exception { 
     logger.error("channel disconnected"); 
     stopTailers(); 
     scheduleReconnect(); 
    } 

    private void scheduleReconnect() { 
     timer.newTimeout(new ReconnectTask(bootstrap), 5L, TimeUnit.SECONDS); 
    } 

    private final void stopTailers() { 
     for (Tailer tailer : tailers) { 
      tailer.stop(); 
     } 
     tailers.clear(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 
      throws Exception { 
     final Throwable cause = e.getCause(); 
     logger.error(cause); 
     if (cause instanceof ConnectException) { 
      stopTailers(); 
      scheduleReconnect(); 
     } 
    } 
}; 

private static class LogLineListener extends TailerListenerAdapter { 
    private final GCParser parser; 
    private final GCEventListener pauseListener; 
    private final ParseExceptionListener exceptionLister; 

    public LogLineListener(GCParser parser, 
          GCEventListener listener, 
          ParseExceptionListener exceptionLister) { 
     this.parser = parser; 
     this.pauseListener = listener; 
     this.exceptionLister = exceptionLister; 
    } 

    @Override 
    public void handle(String line) { 
     logger.info("handle(String line)"); 
     parser.matchLine(line, pauseListener, exceptionLister); 
    } 
} 

private static final class GcParserSupplier 
     implements Supplier<GCParser<CMSType, CMSHeap>> { 
    @Override public GCParser<CMSType, CMSHeap> get() { 
     return new CMSParser(); 
    } 
} 

private static final class TailManager implements FileHandler { 
    private final Set<File> discoveredSet; 
    private final ConcurrentMap<File, Tailer> tailerMap; 
    private final Supplier<GCParser<CMSType, CMSHeap>> parserSupplier; 
    private final ParseExceptionListener exceptionListener; 
    private final String host; 
    private volatile boolean go; 

    private TailManager(final Set<File> discoveredSet, 
         final ConcurrentMap<File, Tailer> tailerMap, 
         final Supplier<GCParser<CMSType, CMSHeap>> parserSupplier, 
         final ParseExceptionListener exceptionListener, 
         final String host) { 
     this.discoveredSet = discoveredSet; 
     this.tailerMap = tailerMap; 
     this.parserSupplier = parserSupplier; 
     this.exceptionListener = exceptionListener; 
     this.host = host; 
    } 

    public void stop() { 
     go = false; 
     for (Tailer tailer : tailerMap.values()) 
      tailer.stop(); 
     tailerMap.clear(); 
    } 

    public void start() { 
     go = true; 
    } 

    @Override public void onNew(final File file, 
           final Channel channel) { 
     checkState(go); 
     GCParser<CMSType, CMSHeap> parser = parserSupplier.get(); 
     String fileName = file.getName(); 
     GcEventsPersister.Builder builder = 
       new GcEventsPersister.Builder(fileName, host); 
     AgentPauseListener eventListener = 
       new AgentPauseListener(channel, builder); 
     Function<Void, Void> removeTail = new Function<Void, Void>() { 
      @Override 
      public Void apply(@Nullable final Void input) { 
       final Tailer tailer = tailerMap.remove(file); 
       tailer.stop(); 
       discoveredSet.remove(file); 
       return null; 
      } 
     }; 
     GcTailAdaptor listener = 
       new GcTailAdaptor(logger, parser, eventListener, exceptionListener, removeTail); 
     tailerMap.put(file, Tailer.create(file, listener, 1000L, true)); 
    } 

    @Override public void onDelete(File file, Channel channel) { 
     checkState(go); 
     final Tailer tailer = tailerMap.remove(file); 
     tailer.stop(); 
    } 
} 

static class EfxAgentHandler extends SimpleChannelHandler { 
    private final ClientBootstrap bootstrap; 
    private final TailManager tailManager; 
    private final FileWatcher fileWatcher; 
    private final Timer timer; 

    public EfxAgentHandler(ClientBootstrap bootstrap, 
          TailManager tailManager, 
          FileWatcher fileWatcher, 
          Timer timer) { 
     this.bootstrap = bootstrap; 
     this.tailManager = tailManager; 
     this.fileWatcher = fileWatcher; 
     this.timer = timer; 
    } 

    @Override public void channelConnected(ChannelHandlerContext ctx, 
              ChannelStateEvent e) throws Exception { 
     logger.info("channel connected"); 
     tailManager.start(); 
     fileWatcher.start(e.getChannel()); 
    } 

    @Override public void channelDisconnected(ChannelHandlerContext ctx, 
               ChannelStateEvent e) throws Exception { 
     logger.error("channel disconnected"); 
     tailManager.stop(); 
     fileWatcher.stop(); 
     scheduleReconnect(); 
    } 

    private void scheduleReconnect() { 
     timer.newTimeout(new ReconnectTask(bootstrap), 5L, TimeUnit.SECONDS); 
    } 

    @Override public void exceptionCaught(ChannelHandlerContext ctx, 
              ExceptionEvent e) throws Exception { 
     final Throwable cause = e.getCause(); 
     logger.error(cause); 
     if (cause instanceof ConnectException) { 
      tailManager.stop(); 
      scheduleReconnect(); 
     } 
    } 
} 

static final class GcTailAdaptor extends TailerListenerAdapter { 
    private final Logger logger; 
    private final GCParser parser; 
    private final GCEventListener eventListener; 
    private final ParseExceptionListener exceptionListener; 
    private final Function<Void, Void> removeTail; 
    private volatile long lastTail; 

    GcTailAdaptor(final Logger logger, 
        final GCParser parser, 
        final GCEventListener eventListener, 
        final ParseExceptionListener exceptionListener, 
        final Function<Void, Void> removeTail) { 
     this.logger = logger; 
     this.parser = parser; 
     this.eventListener = eventListener; 
     this.exceptionListener = exceptionListener; 
     this.removeTail = removeTail; 
    } 

    @Override public void handle(String line) { 
     lastTail(); 
     parser.matchLine(line, eventListener, exceptionListener); 
    } 

    private final void lastTail() { 
     final long t = System.currentTimeMillis(); 
     if (lastTail == 0L) { 
      lastTail = t; 
      return; 
     } 
     if ((t-lastTail)>=1800000L) 
      removeTail.apply(null); 
    } 

    @Override public void handle(Exception ex) { 
     logger.error(ex); 
    } 
} 

@VisibleForTesting 
final static class DetectionTask implements Runnable { 
    private final File directory; 
    private final Set<File> discovered; 
    private final FileHandler handler; 
    private volatile Channel channel; 

    DetectionTask(final File directory, 
        final Set<File> discovered, 
        final FileHandler handler) { 
     this.discovered = discovered; 
     this.handler = handler; 
     this.directory = directory; 
    } 

    public void setChannel(Channel channel) { 
     this.channel = channel; 
    } 

    public boolean removeStaleFile(File file) { 
     checkArgument(discovered.contains(file), 
       "file is not discovered so cannot be stale"); 
     return discovered.remove(file); 
    } 

    public void run() { 
     final File[] files = directory.listFiles(); 
     for (int i=0, n=files.length; i<n; i++) { 
      final File file = files[i]; 
      synchronized (discovered) { 
       if (!discovered.contains(file)) { 
        discovered.add(file); 
        handler.onNew(file, channel); 
       } 
      }        
     } 
     final ImmutableSet<File> logFiles = ImmutableSet.copyOf(files); 
     final ImmutableSet<File> diff = Sets.difference(discovered, logFiles).immutableCopy(); 
     for (File file : diff) { 
      discovered.remove(file); 
      handler.onDelete(file, channel); 
     } 
    } 
} 


@VisibleForTesting static interface FileHandler { 
    void onNew(File file, Channel channel); 
    void onDelete(File file, Channel channel); 
} 

@VisibleForTesting 
final static class FileWatcher { 
    private final ScheduledExecutorService executor; 
    private final DetectionTask detectionTask; 
    private final int frequency; 
    private volatile ScheduledFuture<?> task; 

    @VisibleForTesting 
    FileWatcher(ScheduledExecutorService executor, 
       DetectionTask detectionTask, 
       int frequency) { 
     this.executor = executor; 
     this.detectionTask = detectionTask; 
     this.frequency = frequency; 
    } 

    public void start(Channel chanel) { 
     task = executor.scheduleAtFixedRate(detectionTask, 0L, frequency, TimeUnit.SECONDS); 
     detectionTask.setChannel(chanel); 
    } 

    public void stop() { 
     task.cancel(true); 
     detectionTask.setChannel(null); 
    } 

    public static FileWatcher on(File directory, 
           FileHandler handler, 
           int frequency) { 
     checkNotNull(directory); 
     checkNotNull(handler); 
     checkArgument(directory.isDirectory(), "file is not a directory"); 
     checkArgument(directory.canRead(), "no read access to directory"); 
     checkArgument(0 < frequency, "frequency must be > 0"); 
     final HashSet<File> objects = Sets.newHashSet(); 
     final DetectionTask task = new DetectionTask(directory, objects, handler); 
     final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); 
     return new FileWatcher(executorService, task, frequency); 
    } 
} 

}

+0

Показать код, пожалуйста. –

+0

Отредактировано для добавления агента и кода коллекционера – algolicious

ответ

1

Я обнаружил, почему это происходит. Я использую устаревший ObjectDecoder, который несовместим с ObjectEncoder моего клиента. Я просто отправляю ByteBuffer, и теперь все в порядке.

1

У меня есть неправильный код где-то.

Исправить. В частности, вы почти наверняка имеете разные времена жизни ObjectInput/OutputStream на сервере и клиенте. Используйте те же потоки для жизни сокета и не делайте никаких операций ввода-вывода через эти сокеты с помощью любых других средств.

+0

Я не трогаю никаких OutputStreams за пределами netty. Однако я использую только одно соединение для отправки сериализованных объектов Java из разных потоков - это проблема? – algolicious

+0

@algolicious Почти наверняка, если они не будут тщательно синхронизированы с обоих концов. – EJP

Смежные вопросы