2016-12-05 3 views
1

Я пытаюсь запустить тест wordcount с помощью pytest с этого сайта - Unit testing Apache Spark with py.test. Проблема в том, что я не могу начать искровой контекст. Код я использую для запуска Искровой Context:Тестирование Spark с pytest - невозможно запустить Spark в локальном режиме

@pytest.fixture(scope="session") 
def spark_context(request): 
    """ fixture for creating a spark context 
    Args: 
     request: pytest.FixtureRequest object 
    """ 
    conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")) 
    sc = SparkContext(conf=conf) 
    request.addfinalizer(lambda: sc.stop()) 

    quiet_py4j() 
    return sc 

я выполнить этот код с помощью команды:

#first way 
pytest spark_context_fixture.py 

#second way 
python spark_context_fixture.py 

Выход:

platform linux2 -- Python 2.7.5, pytest-3.0.4, py-1.4.31, pluggy-0.4.0 
rootdir: /home/mgr/test, inifile: 
collected 0 items 

Тогда я хочу, чтобы запустить WordCount тест с использованием pytest.

pytestmark = pytest.mark.usefixtures("spark_context") 

def test_do_word_counts(spark_context): 
    """ test word couting 
    Args: 
     spark_context: test fixture SparkContext 
    """ 
    test_input = [ 
     ' hello spark ', 
     ' hello again spark spark' 
    ] 

    input_rdd = spark_context.parallelize(test_input, 1) 
    results = wordcount.do_word_counts(input_rdd) 

    expected_results = {'hello':2, 'spark':3, 'again':1} 
    assert results == expected_results 

Но выход:

________ ERROR at setup of test_do_word_counts _________ 
file /home/mgrabowski/test/wordcount_test.py, line 5 
    def test_do_word_counts(spark_context): 
E  fixture 'spark_context' not found 
>  available fixtures: cache, capfd, capsys, doctest_namespace, monkeypatch, pytestconfig, record_xml_property, recwarn, tmpdir, tmpdir_factory 
>  use 'pytest --fixtures [testpath]' for help on them. 

Кто-нибудь знает, что является причиной этой проблемы?

+0

Вы установили искру на вашу машину? – Yaron

+0

Да, я установил Spark 1.6. Я могу запустить pyspark в командной строке, чтобы он выглядел нормально. –

ответ

3

Я провел некоторое исследование и, наконец, нашел решение. Я использую Spark 1.6.

Прежде всего, я добавил две строки в мой .bashrc-файл.

export SPARK_HOME=/usr/hdp/2.5.0.0-1245/spark 
export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPA‌​TH 

Тогда я создал файл "conftest.py". Имя файла действительно важно, вы не должны его изменять, иначе вы увидите ошибку с spark_context. Если вы используете искру в локальном режиме и не использовать пряжу, conftest.py должен выглядеть так:

import logging 
import pytest 

from pyspark import HiveContext 
from pyspark import SparkConf 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

def quiet_py4j(): 
    logger = logging.getLogger('py4j') 
    logger.setLevel(logging.WARN) 

@pytest.fixture(scope="session") 
def spark_context(request): 
    conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")) 
    request.addfinalizer(lambda: sc.stop()) 

    sc = SparkContext(conf=conf) 
    quiet_py4j() 
    return sc 

@pytest.fixture(scope="session") 
def hive_context(spark_context): 
    return HiveContext(spark_context) 

@pytest.fixture(scope="session") 
def streaming_context(spark_context): 
    return StreamingContext(spark_context, 1) 

Теперь вы можете запускать тесты с помощью простой команды pytest. Pytest должен запустить Spark и остановить его в конце концов.

Если вы используете НИТИ вы можете изменить conftest.py к: лесозаготовок импорта импорта pytest

from pyspark import HiveContext 
from pyspark import SparkConf 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

def quiet_py4j(): 
    """ turn down spark logging for the test context """ 
    logger = logging.getLogger('py4j') 
    logger.setLevel(logging.WARN) 

@pytest.fixture(scope="session", 
      params=[pytest.mark.spark_local('local'), 
        pytest.mark.spark_yarn('yarn')]) 
def spark_context(request): 
    if request.param == 'local': 
     conf = (SparkConf() 
       .setMaster("local[2]") 
       .setAppName("pytest-pyspark-local-testing") 
       ) 
    elif request.param == 'yarn': 
     conf = (SparkConf() 
       .setMaster("yarn-client") 
       .setAppName("pytest-pyspark-yarn-testing") 
       .set("spark.executor.memory", "1g") 
       .set("spark.executor.instances", 2) 
       ) 
    request.addfinalizer(lambda: sc.stop()) 

    sc = SparkContext(conf=conf) 
    return sc 

@pytest.fixture(scope="session") 
def hive_context(spark_context): 
    return HiveContext(spark_context) 

@pytest.fixture(scope="session") 
def streaming_context(spark_context): 
    return StreamingContext(spark_context, 1) 

Теперь вы можете запускать тесты в локальном режиме, вызвав py.test -m spark_local и в режиме ПРЯЖИ по телефону py.test -m spark_yarn.

пример WordCount

В той же папке создать три файла: conftest.py (выше), wordcount.py:

def do_word_counts(lines): 
    counts = (lines.flatMap(lambda x: x.split()) 
        .map(lambda x: (x, 1)) 
        .reduceByKey(lambda x, y: x+y) 
      ) 
    results = {word: count for word, count in counts.collect()} 
    return results 

И wordcount_test.py:

import pytest 
import wordcount 

pytestmark = pytest.mark.usefixtures("spark_context") 

def test_do_word_counts(spark_context): 
    test_input = [ 
     ' hello spark ', 
     ' hello again spark spark' 
    ] 

    input_rdd = spark_context.parallelize(test_input, 1) 
    results = wordcount.do_word_counts(input_rdd) 

    expected_results = {'hello':2, 'spark':3, 'again':1} 
    assert results == expected_results 

Теперь вы можете запускать тесты, вызывая pytest.

+0

Это фантастика. Благодарю. Один вопрос: Нет, если у меня есть более крупный проект, и я хочу организовать свои искровые тесты в нескольких папках; как мне теперь управлять работой с conftest.py, так как кажется, что они в той же папке, где это важно. –

Смежные вопросы