Если у процесса есть входной поток, процесс ожидает поступление из него данных. Отсутствие данных на входе приводит к формированию исключения EmptyStream, и следом за ним исключения StopIteration. Согласно специфики реализации ProcessFlow на базе core2, это событие приведет к останову процесса и некорректной работе для случаев, когда предыдущие в цепочке процессы могут не возвращать данные.
Например:
В этом случае, как только на процесс Output не поступят данные, процесс завершит свою работу. Это приведет к ситуации, когда Generator и Filter еще будут работать, а вывод результата будет уже невозможен из-за останова процесса Output.
Необходим новый подход в управлении потоков как данных, так и процессов. Исходная схема для нового ядра core3:
Новый подход позволяет выполнять контроль работы процессов на основании внутреннего статуса процессов.
При создании процесс имеет статус NOT_STARTED. После первой итерации, если данных во входном потоке нет, он переходит в статус WAITING. При получении данных и и их обработке, статус процесса меняется на PROCESSING. Когда все процессы переходят в состояние WAITING, работа ProcessFlow завершается.
Пример метода main() для процесса на базе нового ядра core3:
def main(self):
while True:
try:
data = self.io['input'].get()
except EmptyStream:
yield WAITING
continue
self.io['output'].put(handle_data(data))
yield PROCESSINGЧасть процессов уже переведена на работу с core3:Остальные будут переводиться по мере необходимости.







0 комментариев:
Отправить комментарий