2016-12-30 4 views
1

Я использую Spark Streaming для создания системы для обогащения входящих данных из базы данных облачных вычислений. Пример -Оптимальный способ создания кеша в среде PySpark

Incoming Message: {"id" : 123} 
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"} 

Мой код для класса драйвера заключается в следующем:

from Sample.Job import EnrichmentJob 
from Sample.Job import FunctionJob 
import pyspark 
from pyspark.streaming.kafka import KafkaUtils 
from pyspark import SparkContext, SparkConf, SQLContext 
from pyspark.streaming import StreamingContext 
from pyspark.sql import SparkSession 

from kafka import KafkaConsumer, KafkaProducer 
import json 

class SampleFramework(): 

    def __init__(self): 
     pass 

    @staticmethod 
    def messageHandler(m): 
     return json.loads(m.message) 

    @staticmethod 
    def processData(rdd): 

     if (rdd.isEmpty()): 
      print("RDD is Empty") 
      return 

     # Expand 
     expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich) 

     # Score 
     scored_rdd = expanded_rdd.map(FunctionJob.function) 

     # Publish RDD 


    def run(self, ssc): 

     self.ssc = ssc 

     directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \ 
                  {"metadata.broker.list": META, 
                  "bootstrap.servers": SERVER}, \ 
                  messageHandler= SampleFramework.messageHandler) 

     directKafkaStream.foreachRDD(SampleFramework.processData) 

     ssc.start() 
     ssc.awaitTermination() 

Код для Обогащение работы заключается в следующем: класса EnrichmentJob:

cache = {} 

@staticmethod 
def enrich(data): 

    # Assume that Cloudant Connector using the available config 
    cloudantConnector = CloudantConnector(config, config["cloudant"]["host"]["req_db_name"]) 
    final_data = [] 
    for row in data: 
     id = row["id"] 
     if(id not in EnrichmentJob.cache.keys()): 
      data = cloudantConnector.getOne({"id": id}) 
      row["data"] = data 
      EnrichmentJob.cache[id]=data 
     else: 
      data = EnrichmentJob.cache[id] 
      row["data"] = data 
     final_data.append(row) 

    cloudantConnector.close() 

    return final_data 

Мой вопрос состоит в - Есть ли способ сохранить [1] «глобальный кэш в основной памяти, доступный для всех работников» или [2] «локальные кеши для каждого из рабочих, чтобы они оставались на переднем крае achRDD setting "?

я уже исследовал следующие -

  1. Broadcast Переменные - Здесь мы идем [1] путь. Насколько я понимаю, они предназначены для чтения и неизменности. Я проверил это reference, но он приводит пример неустановления/сохранения передаваемой переменной. Это хорошая практика?

  2. Статические переменные - здесь мы идем по пути [2]. Класс, на который ссылается («Enricher» в этом случае) поддерживает кеш в виде словаря статической переменной. Но оказывается, что функция ForEachRDD генерирует совершенно новый процесс для каждого входящего RDD, и это удаляет ранее инициированную статическую переменную. Это тот, который был закодирован выше.

У меня есть два возможных решения прямо сейчас -

  1. поддерживать автономный кэш в файловой системе.
  2. Выполняйте все вычисления этой задачи обогащения на моем узле драйвера. Это приведет к тому, что все данные вернутся на драйвер и будут поддерживаться там. Кэш-объект будет отправлен в задание обогащения в качестве аргумента функции сопоставления.

Здесь очевидно, что первый выглядит лучше второго, но я хочу заключить, что эти два являются единственными способами, прежде чем совершать их. Любые указатели будут оценены!

ответ

1

Есть ли каким-то образом поддерживать [1] «глобальный кэш на основной памяти, доступной для всех работников»

Нет. Там нет «основной памяти», которые могут быть доступны всем работников. Каждый рабочий работает в отдельном процессе и взаимодействует с внешним миром с сокетами. Не говоря уже о разделении различных физических узлов в нелокальном режиме.

Есть несколько методов, которые могут быть применены для обеспечения кэша рабочей области с данными, привязанными к памяти (с использованием SQLite, являющегося самым простым), но для реализации правильного пути требуются дополнительные усилия (избегайте конфликтов и т. Д.).

или [2] "местные тайники для каждого из рабочих, чтобы они сохранялись в настройке foreachRDD"?

Вы можете использовать стандартные методы кеширования с областью действия, ограниченной отдельными рабочими процессами. В зависимости от конфигурации (статический vs. dynamic resource allocation, spark.python.worker.reuse) он может быть или не быть сохранен между несколькими задачами и партиями.

Рассмотрим следующую, упрощенную, пример:

  • map_param.py:

    from pyspark import AccumulatorParam 
    from collections import Counter 
    
    class CounterParam(AccumulatorParam): 
        def zero(self, v: Counter) -> Counter: 
         return Counter() 
    
        def addInPlace(self, acc1: Counter, acc2: Counter) -> Counter: 
         acc1.update(acc2) 
         return acc1 
    
  • my_utils.py:

    from pyspark import Accumulator 
    from typing import Hashable 
    from collections import Counter 
    
    # Dummy cache. In production I would use functools.lru_cache 
    # but it is a bit more painful to show with accumulator 
    cached = {} 
    
    def f_cached(x: Hashable, counter: Accumulator) -> Hashable: 
        if cached.get(x) is None: 
         cached[x] = True 
         counter.add(Counter([x])) 
        return x 
    
    
    def f_uncached(x: Hashable, counter: Accumulator) -> Hashable: 
        counter.add(Counter([x])) 
        return x 
    
  • main.py:

    from pyspark.streaming import StreamingContext 
    from pyspark import SparkContext 
    
    from counter_param import CounterParam 
    import my_utils 
    
    from collections import Counter 
    
    def main(): 
        sc = SparkContext("local[1]") 
        ssc = StreamingContext(sc, 5) 
    
        cnt_cached = sc.accumulator(Counter(), CounterParam()) 
        cnt_uncached = sc.accumulator(Counter(), CounterParam()) 
    
        stream = ssc.queueStream([ 
         # Use single partition to show cache in work 
         sc.parallelize(data, 1) for data in 
         [[1, 2, 3], [1, 2, 5], [1, 3, 5]] 
        ]) 
    
        stream.foreachRDD(lambda rdd: rdd.foreach(
         lambda x: my_utils.f_cached(x, cnt_cached))) 
        stream.foreachRDD(lambda rdd: rdd.foreach(
         lambda x: my_utils.f_uncached(x, cnt_uncached))) 
    
        ssc.start() 
        ssc.awaitTerminationOrTimeout(15) 
        ssc.stop(stopGraceFully=True) 
    
        print("Counter cached {0}".format(cnt_cached.value)) 
        print("Counter uncached {0}".format(cnt_uncached.value)) 
    
    if __name__ == "__main__": 
        main() 
    

Пример запуска:

bin/spark-submit main.py 
Counter cached Counter({1: 1, 2: 1, 3: 1, 5: 1}) 
Counter uncached Counter({1: 3, 2: 2, 3: 2, 5: 2}) 

Как вы можете видеть, мы получаем ожидаемые результаты:

  • Для "кэшированные" объекты аккумулятор обновляется только один раз в уникальный ключ на рабочий процесс (раздел).
  • Для аккумуляторов, не кэшированных, каждый раз обновляется.
Смежные вопросы