2010-11-27 6 views
14

Прошел поиск в Интернете, нашел простые «учебные пособия» для использования именованных каналов. Однако, когда я делаю что-либо с фоновыми заданиями, я, кажется, теряю много данных.Использование именованных каналов с bash - Проблема с потерей данных

[[Изменить]: найдено гораздо более простое решение, см. Ответ на сообщение. Итак, вопрос, который я задал, теперь академичен - в случае, если вам может понадобиться сервер задания]]

Использование Ubuntu 10.04 с Linux 2.6.32-25-generiC# 45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/Linux

GNU bash, версия 4.1.5 (1) -release (x86_64-pc-linux-gnu).

Моя функция Баш:

function jqs 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT SIGKILL 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    if read txt <"$pipe" 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     fi 
    fi 
    done 
} 

я запускаю это в фоновом режиме:

> jqs& 
[1] 5336 

И теперь я кормлю его:

for i in 1 2 3 4 5 6 7 8 
do 
    (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &) 
done 

Выход противоречива. Я часто не получаю отклика на успех. Я получаю как можно больше новых текстовых эхо, так как успех эха, а иногда и меньше.

Если я удаляю '&' из 'feed', это, похоже, сработает, но я заблокирован до тех пор, пока не будет прочитан вывод. Следовательно, я хочу, чтобы субпроцессы блокировались, но не основной процесс.

Цель состоит в том, чтобы написать простой сценарий управления заданиями, чтобы я мог запускать, скажем, 10 заданий в большинстве случаев и очереди остальных для последующей обработки, но надежно знают, что они выполняются.

Полный менеджер Работа ниже:

function jq_manage 
{ 
    export __gn__="$1" 

    pipe=/tmp/__job_control_manager_"$__gn__"__ 
    trap "rm -f $pipe" EXIT 
    trap "break"  SIGKILL 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    date 
    jobs 
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__)) 
    then 
     echo "Waiting for new job" 
     if read new_job <"$pipe" 
     then 
    echo "new job is [[$new_job]]" 

    if [[ "$new_job" == 'quit' ]] 
    then 
     break 
    fi 

    echo "In group $__gn__, starting job $new_job" 
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &" 
     fi 
    else 
     sleep 3 
    fi 
    done 
} 

function jq 
{ 
    # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs) 
    # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently 

    export __gn__="$1" 
    shift 
    export __jN__="$1" 
    shift 

    export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l) 
    if (($__jq__ '<' 1)) 
    then 
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &" 
    fi 

    pipe=/tmp/__job_control_manager_"$__gn__"__ 

    echo [email protected] >$pipe 
} 

Вызов

jq <name> <max processes> <command> 
jq abc 2 sleep 20 

начнет один процесс. Эта часть работает нормально. Начните второй, отлично. Один за другим, кажется, работает нормально. Но запуск 10 в цикле, похоже, потерял систему, как в более простом примере выше.

Любые подсказки относительно того, что я могу сделать для решения этой очевидной потери данных МПК, будут весьма признательны.

С уважением, Alain.

+0

Посмотрите на [second 2018 Edit to * Как установить переменную на выход из команды *] (https://stackoverflow.com/a/41236640/1765658) или на [GitHub.com: Connector- Баш] (https://github.com/F-Hauri/Connector-bash). Где я * подключаю * подпроцесс hoding * tools * к текущему сеансу оболочки. – 2018-02-12 15:59:24

ответ

26

Ваша проблема if утверждение ниже:

while true 
do 
    if read txt <"$pipe" 
    .... 
done 

Что происходит в том, что сервер очереди задания, открывая и закрывая трубу каждый раз вокруг петли. Это означает, что некоторые из клиентов получают ошибку «сломанной трубы», когда они пытаются записать в трубу, то есть читатель трубки уходит после того, как писатель открывает его.

Чтобы это исправить, изменить свой цикл на сервере открыть трубу один раз для всего цикла:

while true 
do 
    if read txt 
    .... 
done < "$pipe" 

сделано таким образом, труба открыт один раз и остается открытым.

Вам нужно будет быть осторожным с тем, что вы запускаете внутри цикла, поскольку вся обработка внутри цикла будет иметь stdin, прикрепленный к именованному каналу. Вы захотите убедиться, что вы перенаправляете stdin всех своих процессов внутри цикла из другого места, иначе они могут потреблять данные из канала.

Редактировать: Если проблема заключается в том, что вы получаете EOF при чтении, когда последний клиент закрывает трубу, вы можете использовать метод jilles для обнуления файловых дескрипторов или вы можете просто убедиться, что вы тоже клиент, и держать на стороне записи в открытой трубе:

while true 
do 
    if read txt 
    .... 
done < "$pipe" 3> "$pipe" 

Это будет держать сторону записи трубы открыто на дескрипторе 3. То же самое относится и предостережение с этим дескриптором файла как с стандартным вводом. Вам нужно будет закрыть его, чтобы любые дочерние процессы не наследовали его. Это, вероятно, имеет меньшее значение, чем для stdin, но это было бы чище.

+0

Wow, отличный ответ. Имеет смысл. Благодарю. Попробуем немедленно. – asoundmove 2010-11-27 17:16:42

0

С одной стороны, проблема хуже, чем я думал: Теперь в моем более сложном примере (jq_manage) есть случай, когда одни и те же данные снова и снова считываются из трубы (хотя нет к нему записываются новые данные).

С другой стороны, я нашел простое решение (отредактирован следующий комментарий Дэнниса):

function jqn # compute the number of jobs running in that group 
{ 
    __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l) 
} 

function jq 
{ 
    __groupn__="$1"; shift # job group name (the pool within which to allocate $__jmax__ jobs) 
    __jmax__="$1"; shift # maximum of job numbers to run concurrently 

    jqn 
    while (($__jqty__ '>=' $__jmax__)) 
    do 
    sleep 1 
    jqn 
    done 

    eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; [email protected]) &" 
} 

работает как шарм. Не используется сокет или трубка. Простой.

+1

Нет причин для экспорта `__jqty__` (или любого из экспорта в оригинале). Почему вы эхом что-то прямо на `/ dev/null`? Зачем использовать `eval`? Почему бы просто не «$ @ &»? Нет необходимости указывать `> =`. Я согласен с ответом Camh. – 2010-11-27 15:32:24

+0

Все это сводится к чтению и фильтрации выходных данных ps. echo to/dev/null, потому что на самом деле я не хочу вывода, я просто хочу, чтобы правильные строки выводились на «ps». То же самое происходит с eval, иначе ps показывает имена переменных, а не расширенные переменные, eval делает расширение. Я никогда не использовал ((...)) раньше, поэтому спасибо за указание, что мне не нужны кавычки, я просто ушел с примера, который я где-то читал, и спасибо также об экспорте, это осталось от предыдущего более сложные скрипты, которые имеют подпроцессы и требуют экспорта. – asoundmove 2010-11-27 17:14:13

+0

Жаль, что я имел в виду 'jobs', а не 'ps' – asoundmove 2010-11-27 17:45:48

1

Как camh & Деннис Уильямсон говорит, что не сломайте трубу.

Теперь у меня есть меньшие примеры, прямые в командной строке:

Сервер:

(
    for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9}; 
    do 
    if read s; 
     then echo ">>$i--$s//"; 
    else 
     echo "<<$i"; 
    fi; 
    done < tst-fifo 
)& 

Клиент:

(
    for i in {%a,#b}{1,2}{0,1}; 
    do 
    echo "Test-$i" > tst-fifo; 
    done 
)& 

Может заменить ключевую линию:

(echo "Test-$i" > tst-fifo&); 

Все clien t, отправленные в трубу, считываются, хотя с вариантом второй из клиентов, возможно, потребуется запустить сервер пару раз, прежде чем все данные будут прочитаны.

Но хотя чтение ожидает данных в трубе, чтобы начать с того, как данные были нажаты, он читает пустую строку навсегда.

Любой способ остановить это?

Спасибо за понимание.

6

Как сказано в других ответах, вы должны постоянно держать fifo открытым во избежание потери данных.

Однако, как только все авторы покинули после того, как фиолетовый был открыт (так был писатель), немедленно возвращается обратно (и poll() возвращает POLLHUP). Единственный способ очистить это состояние - это снова открыть fifo.

POSIX не дает решения для этого, но, по крайней мере, Linux и FreeBSD: если чтение начинается с сбоя, снова откройте fifo, сохраняя исходный дескриптор открытым. Это работает, потому что в Linux и FreeBSD состояние «зависания» локально относится к определенному описанию открытого файла, а в POSIX оно глобально относится к fifo.

Это может быть сделано в сценарии оболочки, как это:

while :; do 
    exec 3<tmp/testfifo 
    exec 4<&- 
    while read x; do 
     echo "input: $x" 
    done <&3 
    exec 4<&3 
    exec 3<&- 
done 
1

Только для тех, кто может быть заинтересован, [[переиздан]] следующие комментарии CAMH и jilles, вот две новые версии скрипт тестового сервера.

Обе версии теперь работают точно так, как надеялись. версия

CAMH для управления труб: версия

function jqs # Job queue manager 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT TERM 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    if read -u 3 txt 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     else 
     sleep 1 
     # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand 
     fi 
    fi 
    done 3< "$pipe" 4> "$pipe" # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF 
} 

jille для управления труб:

function jqs # Job queue manager 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT TERM 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    exec 3< "$pipe" 
    exec 4<&- 

    while true 
    do 
    if read -u 3 txt 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     else 
     sleep 1 
     # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand 
     fi 
    else 
     # Close the pipe and reconnect it so that the next read does not end up returning EOF 
     exec 4<&3 
     exec 3<&- 
     exec 3< "$pipe" 
     exec 4<&- 
    fi 
    done 
} 

Спасибо всем за вашу помощь.

0

запуска говорят 10 рабочих мест параллельно самое большее и очереди остальные для последующей обработки, но достоверно известно, что они бегут

Вы можете сделать это с помощью GNU Parallel. Вам не понадобится этот скриптинг.

http://www.gnu.org/software/parallel/man.html#options

Вы можете установить максимальное количество-проки "Количество jobslots. Запуск до N заданий параллельно." Существует возможность установить количество ядер процессора, которые вы хотите использовать. Вы можете сохранить список выполненных заданий в файл журнала, но это бета-функция.

Смежные вопросы