2015-09-10 4 views
3

При использовании потоков Akka, есть ли способ закрыть/закрыть поток, который больше не требуется для очистки ресурсов?Закрытие потока Akka для очистки ресурсов

EDIT: Когда источник состоит из бесконечного потока, он никогда не может быть завершен, и я хотел бы остановить его до завершенного источника.

Пример использования:

Source.from(publisher) 
     .map((p) -> p) 
     .to(Sink.ignore()) 
     .run(materializer) 

Есть ли способ остановки потока?

+0

Поток должен прекращаться естественным образом, когда восходящий источник больше не имеет данных для удовлетворения потребностей в нисходящем потоке. Поэтому, когда издатель сигнализирует о завершении вниз, этот материализованный экземпляр потока должен остановиться. Или, если у вас есть длинный поток, вы можете рассмотреть возможность использования отдельного материализатора только для него, а затем вызвать 'shutdown()' на материализаторе – cmbaxter

+0

Поскольку Source является издателем, и он может быть бесконечным потоком (в моем случае это поток из Кафки), он никогда не будет завершен. – aseychell

+0

Через «Издатель» вы можете получить доступ к «Подписка» из потока «Поток», чтобы его можно было отменить? Что такое класс impl для вашего 'Publisher'? – cmbaxter

ответ

4

Вы можете запустить Stream на независимой ActorMaterializer и вызвать остановку на ActorMaterializer по истечению определенного периода времени:

val actorSystem = ActorSystem() 

val temporaryStream = { 

    val localMat = ActorMaterializer()(actorSystem) 

    import actorSystem.dispatcher 
    actorSystem.scheduler.scheduleOnce(10 minutes) { localMat.shutdown() } 

    Source.from(publisher) 
     .map((p) -> p) 
     .to(Sink.ignore()) 
     .run()(localMat) 
} 

Точно так же вы можете вернуть ActorMaterializer вместо материализованного потока и выключений ActorMaterializer на основе некоторые внешние условия, кроме времени.

+0

Я считаю, что новая система актеров для каждого потока довольно неэффективна. В моем сценарии я использую потоки akka для подписки на темы Apache Kafka, и для этого требуется несколько потоков, и я буду использовать много ресурсов (пул потоков для каждой темы), если я воспользуюсь вашим предложением. – aseychell

+0

См. Мое обновленное предложение, основанное на вашем новом требовании ... –

+0

Я использовал интерфейс 'Materializer' вместо' ActorMaterializer' и не нашел способ останова на интерфейсе. Благодаря! – aseychell

Смежные вопросы