2016-09-28 3 views
0

У меня есть сценарий, который извлекает список активных «заданий» из таблицы MySQL, а затем создает экземпляр моего основного сценария один раз за активное задание с использованием библиотеки многопроцессорности. У моего многопроцессорного скрипта есть функция, которая проверяет, было ли задание задано другим потоком. Он делает это, проверяя, является ли конкретный столбец в таблице БД/не равным NULL. Запрос DB возвращает один кортеж пункта:python 27 - Boolean check сбой при многопроцессорной обработке

def check_if_job_claimed(): 
    #... 
    job_claimed = cursor.fetchone() #Returns (claim_id,) for claimed jobs, and (None,) for unclaimed jobs 
    if job_claimed: 
     print "This job has already been claimed by another thread." 
     return 
    else: 
     do_stuff_to_claim_the_job 

Когда я запускаю эту функцию без части многопроцессорной, проверка претензий работает нормально. Но когда я пытаюсь запустить задания параллельно, проверка заявки считывает все (None) кортежи как имеющие ценность и, следовательно, правду, и поэтому функция предполагает, что работа уже заявлена.

Я попытался настроить количество параллельных процессов, которые использует многопроцессор, но проверка претензий по-прежнему не работает ... даже когда я установил число процессов в 1. Я также попытался сыграть с утверждением if чтобы посмотреть, смогу ли я работать таким образом:

if job_claimed == True 
if job_claimed == (None,) 
# etc. 

Не повезло.

Кто-нибудь знает что-то в многопроцессорной библиотеке, которая препятствовала бы правильной интерпретации моей функции проверки претензий с помощью job_claimed tuple? Может быть, что-то не так с моим кодом?

EDIT

я запустить некоторые truthiness тесты на job_claimed переменной в режиме отладки. Вот результаты этих тестов:

(pdb) job_claimed 
    (None,) 
(pdb) len(job_claimed) 
    1 
(pdb) job_claimed == True 
    False 
(pdb) job_claimed == False 
    False 
(pdb) job_claimed[0] 
    None 
(pdb) job_claimed[0] == True 
    False 
(pdb) job_claimed[0] == False 
    False 
(pdb) any(job_claimed) 
    False 
(pdb) all(job_claimed) 
    False 
(pdb) job_claimed is not True 
    True 
(pdb) job_claimed is not False 
    True 

EDIT

В соответствии с просьбой:

with open('Resource_File.txt', 'r') as f: 
    creds = eval(f.read()) 
connection = mysql.connector.connect(user=creds["mysql_user"],password=creds["mysql_pw"],host=creds["mysql_host"],database=creds["mysql_db"],use_pure=False,buffered=True) 

def check_if_job_claimed(job_id): 
    cursor = connection.cursor() 
    thread_id_query = "SELECT Thread_Id FROM jobs WHERE Job_ID=\'{}\';".format(job_id) 
    cursor.execute(thread_id_query) 
    job_claimed = cursor.fetchone() 
    job_claimed = job_claimed[0] 
    if job_claimed: 
     print "This job has already been claimed by another thread. Moving on to next job..." 
     cursor.close() 
     return False 
    else: 
     thread_id = socket.gethostname()+':'+str(random.randint(0,1000)) 
     claim_job = "UPDATE jobs SET Thread_Id = \'{}\' WHERE Job_ID = \'{}\';".format(job_id) 
     cursor.execute(claim_job) 
     connection.commit() 
     print "Job is now claimed" 
     cursor.close() 
     return True 

def call_the_queen(dict_of_job_attributes): 
    if check_if_job_claimed(dict_of_job_attributes['job_id']): 
     instance = OM(dict_of_job_attributes) #<-- Create instance of my target class 
     instance.queen_bee() 

#multiprocessing code 
import multiprocessing as mp 
if __name__ == '__main__': 
    active_jobs = get_active_jobs() 
    pool = mp.Pool(processes = 4) 
    pool.map(call_the_queen,active_jobs) 
    pool.close() 
    pool.join() 
+0

Что касается этого - вместо этого сложного mumbo jumbo поместите все идентификаторы задания в очередь (например, список в Redis), а затем просто просто 'pop()' один идентификатор задания в момент времени. Это атомная операция, поэтому, когда рабочий получает идентификатор задания, ни один другой процесс не может его украсть. – yedpodtrzitko

+0

Можете ли вы включить код многопроцессорности, а также код, который создает курсор. Я предполагаю, что вы повторно используете объект-курсор среди процессов, и есть только 1 элемент –

+0

Да, те тесты на правдивость не полезны, это ожидаемые результаты для каждой программы python. –

ответ

1

Любого непустым кортеж (или список, строка, итерация и т.д.) будет оценивать до True. Не имеет значения, если содержимое итерабельного не является True. Чтобы проверить это, вы можете использовать либо any(iterable), либо all(iterable), чтобы проверить, соответствуют ли все или все элементы в iterable True.

Однако на основе ваших изменений ваша проблема, скорее всего, вызвана использованием глобального объекта соединения для нескольких процессов.

Вместо этого каждый процесс должен создать свое собственное соединение.

def check_if_job_claimed(job_id): 
    connection = mysql.connector.connect(user=creds["mysql_user"],password=creds["mysql_pw"],host=creds["mysql_host"],database=creds["mysql_db"],use_pure=False,buffered=True) 

Вы также можете попробовать использовать connection pooling, но я не уверен, что будет работать через процесс, и, вероятно, потребует от вас, чтобы перейти на резьбу вместо.

Кроме того, я бы переместил весь код под if __name__ == '__main__': в функцию. Обычно вы хотите избежать загрязнения глобального пространства имен при использовании многопроцессорности, поскольку, когда python создает новый процесс, он пытается скопировать глобальное состояние в новый процесс. Это может привести к некоторым нечетным ошибкам, поскольку глобальные переменные больше не разделяют состояние (поскольку они находятся в отдельных процессах), или объект либо не может быть сериализован, либо теряет некоторую информацию во время сериализации, когда он восстанавливается в новом процессе.

+0

Спасибо за ответ Брендан. Я ценю это. Первоначально я считал одно и то же, но затем начал запускать тесты правности на переменной job_claimed (включая использование any() и all()). Я должен был включить эти результаты тестов в мой OP. Я обновлю его сейчас. Короче, я получил некоторые запутанные результаты. Смущение, я признаю, может быть связано с непониманием с моей стороны. Однако ничто из этого не объясняет, почему точно такая же функция работает отлично, когда она не вызывается мультипроцессором и не выполняется, когда она есть. –

+0

@NickMiller Используете ли вы один и тот же «курсор» в каждом процессе? Или вы создаете новый для каждого процесса? 'fetchone' возвращает' None', когда вы исчерпали список элементов. –

+0

Это действительно хороший вопрос. Я загляну в это быстро и вернусь к вам. –