Я использую Spark Streaming для создания системы для обогащения входящих данных из базы данных облачных вычислений. Пример -Оптимальный способ создания кеша в среде PySpark
Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}
Мой код для класса драйвера заключается в следующем:
from Sample.Job import EnrichmentJob
from Sample.Job import FunctionJob
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from kafka import KafkaConsumer, KafkaProducer
import json
class SampleFramework():
def __init__(self):
pass
@staticmethod
def messageHandler(m):
return json.loads(m.message)
@staticmethod
def processData(rdd):
if (rdd.isEmpty()):
print("RDD is Empty")
return
# Expand
expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich)
# Score
scored_rdd = expanded_rdd.map(FunctionJob.function)
# Publish RDD
def run(self, ssc):
self.ssc = ssc
directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \
{"metadata.broker.list": META,
"bootstrap.servers": SERVER}, \
messageHandler= SampleFramework.messageHandler)
directKafkaStream.foreachRDD(SampleFramework.processData)
ssc.start()
ssc.awaitTermination()
Код для Обогащение работы заключается в следующем: класса EnrichmentJob:
cache = {}
@staticmethod
def enrich(data):
# Assume that Cloudant Connector using the available config
cloudantConnector = CloudantConnector(config, config["cloudant"]["host"]["req_db_name"])
final_data = []
for row in data:
id = row["id"]
if(id not in EnrichmentJob.cache.keys()):
data = cloudantConnector.getOne({"id": id})
row["data"] = data
EnrichmentJob.cache[id]=data
else:
data = EnrichmentJob.cache[id]
row["data"] = data
final_data.append(row)
cloudantConnector.close()
return final_data
Мой вопрос состоит в - Есть ли способ сохранить [1] «глобальный кэш в основной памяти, доступный для всех работников» или [2] «локальные кеши для каждого из рабочих, чтобы они оставались на переднем крае achRDD setting "?
я уже исследовал следующие -
Broadcast Переменные - Здесь мы идем [1] путь. Насколько я понимаю, они предназначены для чтения и неизменности. Я проверил это reference, но он приводит пример неустановления/сохранения передаваемой переменной. Это хорошая практика?
Статические переменные - здесь мы идем по пути [2]. Класс, на который ссылается («Enricher» в этом случае) поддерживает кеш в виде словаря статической переменной. Но оказывается, что функция ForEachRDD генерирует совершенно новый процесс для каждого входящего RDD, и это удаляет ранее инициированную статическую переменную. Это тот, который был закодирован выше.
У меня есть два возможных решения прямо сейчас -
- поддерживать автономный кэш в файловой системе.
- Выполняйте все вычисления этой задачи обогащения на моем узле драйвера. Это приведет к тому, что все данные вернутся на драйвер и будут поддерживаться там. Кэш-объект будет отправлен в задание обогащения в качестве аргумента функции сопоставления.
Здесь очевидно, что первый выглядит лучше второго, но я хочу заключить, что эти два являются единственными способами, прежде чем совершать их. Любые указатели будут оценены!