среда, 25 мая 2011 г.

pyflowctrl: управление процеcсами в python


Принципы, заложенные в командной строке Unix, позволяют добиться невероятной гибкости в обработке данных. Возможность передачи вывода одной команды в другую позволяет строить достаточно сложные цепочки обработки данных. Чтобы оценить достоинства командной строки достаточно посетить сайт commandlinefu.com. Встречающиеся решения впечатляют. Всегда хотелось иметь под рукой инструмент, позволяющий легко оперировать созданными ранее компонентами. Поиск готовых решений привел к проекту Kamaelia, о котором я уже упоминал в одном из постов ‘Practical concurrent systems made simple using Kamaelia’, а именно к статье ‘MiniAxon - Build your own Kamaelia Core’. Попытка упрощения реализации этого проекта послужила основой создания pyflowctrl.

В основе лежат три основных класса: Stream, Process и ProcessFlow. Логику Stream и Process проще объяснить при помощи диаграммы:
Процесс (Process) может содержать либо не содержать входные и выходные потоки Stream. Так, например, процесс для чтения лог файла (LogFileReader) может содержать только один выходной поток, а имя файла передаваться в момент создания процесса как параметр, процесс записи информации в лог файл, может содержать только входной поток или потоки.

Соединение процессов между собой осуществляется с помощью входных/выходных потоков. Каждому выходному потоку одного процесса соответствует входной поток другого процесса. В текущей реализации соединение один к многим не поддерживается для случая, когда необходимо, чтобы данные одного выхода дублировались на все входы другого процесса. Тем не менее данное свойство можно использовать для организации распределения данных между процессами.
Если все-таки необходимо дублирование данных между процессами, это можно сделать путем добавления выходов с одинаковыми данными на уровне процесса.
Для лучшего понимания работы, рассмотрим более подробно каждый из элементов системы:

Класс Stream представляет собой очередь данных, типа FIFO (first in - first out). Очередь управляется двумя методами put() и get(). Метод put() размещает данные в очереди, метод get() данные из очереди извлекает. При попытки извлечь данные из пустого потока, формируется исключение EmptyStream.

Класс Process - это простой класс-генератор. Весь секрет этого класса заключен в методе main() - выполнив одну итерацию, управление передается другим процессам с помощью ProcessFlow. Детали реализации и применения будут рассмотрены на примерах ниже.

Класс ProcessFlow управляет работой процессов, отвечает за связь между ними согласно топологии входов/выходов.

Использование pyflowctrl рассмотрим на простом примере. Допустим, необходимо к каждому целому числу в диапазоне от 1 до 1 млн. добавить 100 и результат вывести на экран. Все конечно можно реализовать на базе одного цикла, но мы используем этот пример для того, чтобы показать связь между процессами. Для этого нам понадобится:

- процесс-генератор целых чисел от 1 до бесконечности;
- процесс, увеличивающий каждое входное значение на 100 и помещающий результат в выходной поток;
- процесс, выводящий на экран значения из входного потока;

Генератор целых чисел
class IntGeneratorTask(Process):
    def __init__(self):
           super(IntGeneratorTask, self).__init__()
           self.io = {
                   'output': Stream(),
           }
   
    def main(self):
           counter = 0
           while True:
                   counter += 1
                   self.io['output'].put(counter)
                   yield
Генератор целых чисел имеет только один выход, определяется аргументом self.io в методе __init__. Единственное, что выполняется в методе main() - это в цикле значение counter увеличивается на 1 и записывается в выходной поток. После чего выполнение метода main() приостанавливается yield.

Добавление 100
class Add100Process(Process):
    def __init__(self):
           super(Add100Process, self).__init__()
           self.io = {
                   'input': Stream(),
                   'output': Stream(),
           }
   
    def main(self):
           while True:
                   data = self.io['input'].get()
                   self.io['output'].put(100+data)
                   yield
В методе __init__() уже два потока: один входной и один выходной. По ним не трудно догадаться, что будет выполняться в методе main(): к полученному входному значению, добавляется 100 и результат записывается в выходной поток.

Вывод результата на экран
class PrinterProcess(Process):
    def __init__(self):
           super(PrinterProcess, self).__init__()
           self.io = {
                   'input': Stream(),
           }
   
    def main(self):
           while True:
                   print self.io['input'].get(),
                   yield
Входные данные с помощью print выводятся на экран.

Объединим процессы вместе

Код, объединяющий процессы в одну логическую цепочку приведен ниже. Пояснение работы в комментариях:
# создается поток процессов
pflow1 = ProcessFlow()

# в поток добавляются процессы. Процессы нужно добавлять в той
# последовательности, в которой они должны будут выполняться.
# метод new() возвращает id процесса
id_igen = pflow1.new(IntGeneratorProcess())
id_add100proc1 = pflow1.new(Add100Process())
id_printer = pflow1.new(PrinterProcess())

# создаем топологию процессов, добавляем связи между процессами.
# линк между процессам представляет собой словарь, в котором,
# sid - source process id, son - source output name,
# tid - target process id, tin - target input name

# IntGenerator -> Add100
pflow1.link({'sid': id_igen, 'son': 'output', 'tid': id_add100proc1, 'tin': 'input'})

# Add100 -> Printer
pflow1.link({'sid': id_add100proc1, 'son': 'output', 'tid': id_printer, 'tin': 'input'})

# метод run() запускает на выполнение поток процессов. в цикле, каждый процесс
# выполняет одну итерацию, результаты передаются другим процессам согласно
# топологии связей
pflow1.run()

Результат выполнения будет виден на экране как последовательность чисел:

$ python core1.test.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 ...

Пока все достаточно просто :)

Исходный код pyflowctrl

0 комментариев:

Отправить комментарий