2015-11-19 2 views
17

Мой текущий подход к тестированию Java/Spark Unit Test (подробный here) путем создания экземпляра SparkContext с использованием «локальных» и запущенных модульных тестов с использованием JUnit.Как я могу протестировать программы PySpark?

Код должен быть организован для ввода/вывода в одной функции, а затем для вызова другого с несколькими RDD.

Это прекрасно работает. У меня очень проверенное преобразование данных, написанное на Java + Spark.

Могу ли я сделать то же самое с Python?

Как запустить тесты Spark с помощью Python?

+2

вы можете сделать то же самое с pySpark и использованием UnitTest модуль. Сам тест проекта использует этот модуль: https://github.com/apache/spark/blob/master/python/pyspark/tests.py –

ответ

6

Я использую pytest, что позволяет тестировать светильники, чтобы вы могли создавать экземпляр контекста pyspark и вводить его во все ваши тесты, которые его требуют. Что-то вдоль линий

@pytest.fixture(scope="session", 
       params=[pytest.mark.spark_local('local'), 
         pytest.mark.spark_yarn('yarn')]) 
def spark_context(request): 
    if request.param == 'local': 
     conf = (SparkConf() 
       .setMaster("local[2]") 
       .setAppName("pytest-pyspark-local-testing") 
       ) 
    elif request.param == 'yarn': 
     conf = (SparkConf() 
       .setMaster("yarn-client") 
       .setAppName("pytest-pyspark-yarn-testing") 
       .set("spark.executor.memory", "1g") 
       .set("spark.executor.instances", 2) 
       ) 
    request.addfinalizer(lambda: sc.stop()) 

    sc = SparkContext(conf=conf) 
    return sc 

def my_test_that_requires_sc(spark_context): 
    assert spark_context.textFile('/path/to/a/file').count() == 10 

Затем вы можете запускать тесты в локальном режиме, позвонив py.test -m spark_local или в пряжу с py.test -m spark_yarn. Это сработало хорошо для меня.

14

Я бы рекомендовал использовать py.test. py.test позволяет легко создавать повторно используемые тестовые приборы SparkContext и использовать его для написания кратких тестовых функций. Вы также можете специализировать приборы (например, для создания StreamingContext) и использовать один или несколько из них в своих тестах.

я написал сообщение в блоге среды на эту тему:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

Вот отрывок из поста:

pytestmark = pytest.mark.usefixtures("spark_context") 
def test_do_word_counts(spark_context): 
    """ test word couting 
    Args: 
     spark_context: test fixture SparkContext 
    """ 
    test_input = [ 
     ' hello spark ', 
     ' hello again spark spark' 
    ] 

    input_rdd = spark_context.parallelize(test_input, 1) 
    results = wordcount.do_word_counts(input_rdd) 

    expected_results = {'hello':2, 'spark':3, 'again':1} 
    assert results == expected_results 
+2

Добро пожаловать в SO! В основном ответы на ссылки не одобряются. (То есть ответы, которые, если бы ссылка исчезла, не имели бы никакой ценности.) Рекомендуется добавить немного полезного текста, суммирующего или выделяющего ключевые моменты из связанного ресурса. – sclv

+0

@Vikas Kawadia не могли бы вы взглянуть на 'https: // stackoverflow.com/questions/49420660/unit-test-pyspark-code-using-python' – user9367133

4

Вот решение с pytest, если вы используете Спарк 2.x и SparkSession. Я также импортирую сторонний пакет.

import logging 

import pytest 
from pyspark.sql import SparkSession 

def quiet_py4j(): 
    """Suppress spark logging for the test context.""" 
    logger = logging.getLogger('py4j') 
    logger.setLevel(logging.WARN) 


@pytest.fixture(scope="session") 
def spark_session(request): 
    """Fixture for creating a spark context.""" 

    spark = (SparkSession 
      .builder 
      .master('local[2]') 
      .config('spark.jars.packages', 'com.databricks:spark-avro_2.11:3.0.1') 
      .appName('pytest-pyspark-local-testing') 
      .enableHiveSupport() 
      .getOrCreate()) 
    request.addfinalizer(lambda: spark.stop()) 

    quiet_py4j() 
    return spark 


def test_my_app(spark_session): 
    ... 

Примечание при использовании Python 3, я должен был указать, что в качестве переменной среды PYSPARK_PYTHON:

import os 
import sys 

IS_PY2 = sys.version_info < (3,) 

if not IS_PY2: 
    os.environ['PYSPARK_PYTHON'] = 'python3' 

В противном случае вы получите ошибку:

Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

+0

Плагин avro не работает, когда я использую этот код на Spark 2.0.2 – clay

+1

Плагин Avro может быть загружен с помощью Spark 2.1, но не Spark 2.0.2. Вы не получите ошибку, пока не попытаетесь использовать формат Avro. Я сам это испытал. – clay

+1

Немного проще установить правильное значение PYSPARK_PYTHON: 'os.environ ['PYSPARK_PYTHON'] = sys.executable' - это установит, что будет в текущем запущенном python, и будет лучше справляться с venvs надеюсь, –

0

Некоторое время назад я также столкнулись с одной и той же проблемой, и после прочтения нескольких статей, форумов и некоторых ответов StackOverflow я закончил писать небольшой плагин для pytest: pytest-spark

Я уже использую его в течение нескольких месяцев и общий рабочий процесс выглядит хорошо на Linux: (распределение настройки виртуальной машины Java + распаковывать искры из какой-нибудь каталог)

  1. Установка Apache Спарк
  2. Установить «pytest» + плагин «pytest-spark»
  3. Создайте «pytest.ini» в каталоге проекта и укажите там место искры.
  4. Выполняйте ваши тесты с помощью pytest, как обычно.
  5. При необходимости вы можете использовать прибор «spark_context» в ваших тестах, который предоставляется плагином - он пытается минимизировать журналы Spark на выходе.
1

Если у вас есть pyspark установлена, вы можете использовать ниже класс для UnitTest его в unittest:

import unittest 
import pyspark 


class PySparkTestCase(unittest.TestCase): 

    @classmethod 
    def setUpClass(cls): 
     conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing") 
     cls.sc = pyspark.SparkContext(conf=conf) 

    @classmethod 
    def tearDownClass(cls): 
     cls.sc.stop() 

Пример:

class SimpleTestCase(PySparkTestCase): 

    def test_basic(self): 
     test_input = [ 
      ' hello spark ', 
      ' hello again spark spark' 
     ] 

     input_rdd = self.sc.parallelize(test_input, 1) 

     from operator import add 

     results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect() 
     self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])