2016-08-07 1 views
3

Предположим, что Flink получает поток из 1000 твитов в секунду и что где-то в процессе, ему нужно классифицировать их как спам или нет. У меня есть группа, например. 20 машин, которые обеспечивают «классификацию» микросервиса через REST API, и они могут обеспечить максимальную пропускную способность в 10 тыс. Твитов в секунду, а их задержка составляет 3 секунды. Это означает, что в худшем случае у меня может быть 30k твитов «на лету», и все в порядке. Я предполагаю, что потреблять эту услугу от Флинка, реализация будет что-то вроде этого:Преобразование Flink, которое выполняет вызов REST (async, Future, Netty)

public class Classifier implements MapFunction<Tweet, TweetWithClass> { 
    @Override 
    public TweetWithClass map(Tweet tweet) { 
    TweetWithClass twc = new TweetWithClass(tweet); 
    twc.classes = (new Post('http://my.classifier.com', data = tweet.body)).bodyAsStringArrayFromJson(); 
    return twc; 
    } 
} 

DataSet<TweetWithClass> outTweets = inTweets.map(new Classifier()).setParallelism(30000); 

Теперь, с учетом этого API, я думаю, что Flink не будет иметь никакого другого выбора, кроме запуска 30k темы, и это было бы потенциально плохо. Я вижу в исходном коде, что Flink использует Netty, я думаю, что он мог бы поддерживать эту операцию более эффективно, используя асинхронные вызовы ... Если бы существовали фиктивные красивые Netty, Flink и Java API, это выглядело бы примерно так:

public class Classifier implements MapFunction<Tweet, TweetWithClass> { 
    @Override 
    public Future<TweetWithClass> map(Tweet tweet) { 
    Future<String[]> classes = (new NettyPost('http://my.classifier.com', data = tweet.body)).asyncBodyAsStringArrayFromJson(); 
    return classes.onGet((String[] classes) -> new TweetWithClass(tweet, twc.classes)); 
    } 
} 

DataSet<TweetWithClass> outTweets = inTweets.nettyMap(new Classifier()).setMaxParallelism(30000); 

Есть ли способ использовать асинхронные вызовы, чтобы иметь массивную масштабируемость с очень небольшим количеством потоков во Flink?

ответ

1

Я знаю, что это относительно старый вопрос, но с Flink 1.2 (который вышел в феврале 2017 года) Flink предлагает API для этой цели. Он называется асинхронным вводом/выводом.

С асинхронным вводом-выводом вы можете выполнять асинхронные вызовы внешним базам данных или в вашем внешнем веб-сервисе и получать результаты с обратным вызовом в будущем.

Дополнительную информацию можно найти здесь: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

Смежные вопросы