Предположим, что Flink получает поток из 1000 твитов в секунду и что где-то в процессе, ему нужно классифицировать их как спам или нет. У меня есть группа, например. 20 машин, которые обеспечивают «классификацию» микросервиса через REST API, и они могут обеспечить максимальную пропускную способность в 10 тыс. Твитов в секунду, а их задержка составляет 3 секунды. Это означает, что в худшем случае у меня может быть 30k твитов «на лету», и все в порядке. Я предполагаю, что потреблять эту услугу от Флинка, реализация будет что-то вроде этого:Преобразование Flink, которое выполняет вызов REST (async, Future, Netty)
public class Classifier implements MapFunction<Tweet, TweetWithClass> {
@Override
public TweetWithClass map(Tweet tweet) {
TweetWithClass twc = new TweetWithClass(tweet);
twc.classes = (new Post('http://my.classifier.com', data = tweet.body)).bodyAsStringArrayFromJson();
return twc;
}
}
DataSet<TweetWithClass> outTweets = inTweets.map(new Classifier()).setParallelism(30000);
Теперь, с учетом этого API, я думаю, что Flink не будет иметь никакого другого выбора, кроме запуска 30k темы, и это было бы потенциально плохо. Я вижу в исходном коде, что Flink использует Netty, я думаю, что он мог бы поддерживать эту операцию более эффективно, используя асинхронные вызовы ... Если бы существовали фиктивные красивые Netty, Flink и Java API, это выглядело бы примерно так:
public class Classifier implements MapFunction<Tweet, TweetWithClass> {
@Override
public Future<TweetWithClass> map(Tweet tweet) {
Future<String[]> classes = (new NettyPost('http://my.classifier.com', data = tweet.body)).asyncBodyAsStringArrayFromJson();
return classes.onGet((String[] classes) -> new TweetWithClass(tweet, twc.classes));
}
}
DataSet<TweetWithClass> outTweets = inTweets.nettyMap(new Classifier()).setMaxParallelism(30000);
Есть ли способ использовать асинхронные вызовы, чтобы иметь массивную масштабируемость с очень небольшим количеством потоков во Flink?