У меня есть сценарий, который извлекает список активных «заданий» из таблицы MySQL, а затем создает экземпляр моего основного сценария один раз за активное задание с использованием библиотеки многопроцессорности. У моего многопроцессорного скрипта есть функция, которая проверяет, было ли задание задано другим потоком. Он делает это, проверяя, является ли конкретный столбец в таблице БД/не равным NULL. Запрос DB возвращает один кортеж пункта:python 27 - Boolean check сбой при многопроцессорной обработке
def check_if_job_claimed():
#...
job_claimed = cursor.fetchone() #Returns (claim_id,) for claimed jobs, and (None,) for unclaimed jobs
if job_claimed:
print "This job has already been claimed by another thread."
return
else:
do_stuff_to_claim_the_job
Когда я запускаю эту функцию без части многопроцессорной, проверка претензий работает нормально. Но когда я пытаюсь запустить задания параллельно, проверка заявки считывает все (None) кортежи как имеющие ценность и, следовательно, правду, и поэтому функция предполагает, что работа уже заявлена.
Я попытался настроить количество параллельных процессов, которые использует многопроцессор, но проверка претензий по-прежнему не работает ... даже когда я установил число процессов в 1. Я также попытался сыграть с утверждением if чтобы посмотреть, смогу ли я работать таким образом:
if job_claimed == True
if job_claimed == (None,)
# etc.
Не повезло.
Кто-нибудь знает что-то в многопроцессорной библиотеке, которая препятствовала бы правильной интерпретации моей функции проверки претензий с помощью job_claimed tuple? Может быть, что-то не так с моим кодом?
EDIT
я запустить некоторые truthiness тесты на job_claimed переменной в режиме отладки. Вот результаты этих тестов:
(pdb) job_claimed
(None,)
(pdb) len(job_claimed)
1
(pdb) job_claimed == True
False
(pdb) job_claimed == False
False
(pdb) job_claimed[0]
None
(pdb) job_claimed[0] == True
False
(pdb) job_claimed[0] == False
False
(pdb) any(job_claimed)
False
(pdb) all(job_claimed)
False
(pdb) job_claimed is not True
True
(pdb) job_claimed is not False
True
EDIT
В соответствии с просьбой:
with open('Resource_File.txt', 'r') as f:
creds = eval(f.read())
connection = mysql.connector.connect(user=creds["mysql_user"],password=creds["mysql_pw"],host=creds["mysql_host"],database=creds["mysql_db"],use_pure=False,buffered=True)
def check_if_job_claimed(job_id):
cursor = connection.cursor()
thread_id_query = "SELECT Thread_Id FROM jobs WHERE Job_ID=\'{}\';".format(job_id)
cursor.execute(thread_id_query)
job_claimed = cursor.fetchone()
job_claimed = job_claimed[0]
if job_claimed:
print "This job has already been claimed by another thread. Moving on to next job..."
cursor.close()
return False
else:
thread_id = socket.gethostname()+':'+str(random.randint(0,1000))
claim_job = "UPDATE jobs SET Thread_Id = \'{}\' WHERE Job_ID = \'{}\';".format(job_id)
cursor.execute(claim_job)
connection.commit()
print "Job is now claimed"
cursor.close()
return True
def call_the_queen(dict_of_job_attributes):
if check_if_job_claimed(dict_of_job_attributes['job_id']):
instance = OM(dict_of_job_attributes) #<-- Create instance of my target class
instance.queen_bee()
#multiprocessing code
import multiprocessing as mp
if __name__ == '__main__':
active_jobs = get_active_jobs()
pool = mp.Pool(processes = 4)
pool.map(call_the_queen,active_jobs)
pool.close()
pool.join()
Что касается этого - вместо этого сложного mumbo jumbo поместите все идентификаторы задания в очередь (например, список в Redis), а затем просто просто 'pop()' один идентификатор задания в момент времени. Это атомная операция, поэтому, когда рабочий получает идентификатор задания, ни один другой процесс не может его украсть. – yedpodtrzitko
Можете ли вы включить код многопроцессорности, а также код, который создает курсор. Я предполагаю, что вы повторно используете объект-курсор среди процессов, и есть только 1 элемент –
Да, те тесты на правдивость не полезны, это ожидаемые результаты для каждой программы python. –