В будущем на нем будут публиковаться изменения начиная с ветки core4. С переходом на новую версию изменена структура директорий проекта:
- core4.py перенесен в директорию core4 и__init__.py
- в директорию core4 добавлены под-директории processes/ для хранения кода процессов, streams/ для потоков, tests/ для тестов, examples/ для примеров работы с библиотекой, libs/ для хранения сторонних библиотек.
Изменения в логике работы core4 от core3 (core4 базируется на core3):
В классе Process изменены атрибуты доступа к входным/выходным потокам. Сейчас доступ к ним осуществляется с помощью атрибутов input/output, которые нужно явно описать для нового процесса
class TestProcess(Process):
def __init__(self):
super(TestProcess, self).__init__()
self.input = Stream()
self.output = Stream()Пример определения основной функции процессаdef main(self):
while True:
try:
packet = self.input.get()
except EmptyStream:
yield WAITING
continue
# основной код для работы с данными пакета
self.output.put(packet)
yield PROCESSINGВ классе Stream убрана поддержка описания типизации потоков. В core3 давать описание потоков было не обязательно и зачастую потоки в процессах так и оставались без описаний, так как типизация данных приводила к сложностям соединения процессов. В будущем механизм описания данных в потоках будет доработан. В текущем релизе - это открытый вопрос.Основной единицей переноса данных между процессами является информационный пакет, класс Packet, см. “Информационные пакеты в pyflowctrl”
При создании пакета можно передавать “корневые ” переменные, которые он будет содержать
>>> p = Packet(counter=10, name=’test_process’) >>> p.counter, p.name 10 test_processВ классе ProcessFlow упрощен механизм создания сети взаимодействия процессов - словарь с описанием:
network = {
'processes': {
'process1': TestProcess(),
'process2': TestProcess(),
'printer': Printer(),
},
'links': {
# соединение между process1 и process2
('process1.output', 'process2.input', p1p2_handler),
# соединение между process2 и printer
('process2.output', 'printer.input', p2prn_handler),
}
}network[‘processes’] описывает процессы, участвующие в работе. Доступ к процессам осуществляется по их именам. network[‘links’] описывает взаимодействие между процессами. Каждое соединение - это список из трех аргументов: источник данных, получатель данных, обработчик данных. Источник и получатель данных - это символьное описание, состоящее из имени процесса и имени потока. Обработчик данных позволяем изменить структуру пакета на этапе передачи его от процесса-источника до процесса-получателя. Это дает возможность адаптировать структуру пакета под нужды следующего процесса. Обработчик данных - это функция на вход которой поступает пакет и на выходе она должна возвращать его же, пример:
def p1p2_handler(packet):
# process1 -> process2
packet.set_namespace('test-env1')
packet['test-env1'].name = packet.name
packet['test-env1'].url = packet.url
del packet.name
del packet.url
return packetЕсли данные в обработке не нуждаются, то вместо обработчика данных передается None. 'links': {
('process1.output', 'process2.input', None),
}Обработчики данных можно использовать также как функции-отладчики для вывода данных обменивающихся между процессами.'links': {
('process1.output', 'process2.input', p1p2debug),
}Загрузка сети network в ProcessFlow выполняется с помощью метода upload(network)flow = ProcessFlow() flow.upload(network) for i in xrange(10): flow.pmap['process1'].input.put(Packet(counter=i)) flow.run()Метод run() выполняет запуск сети в работу
Пример работы с core4 расположен в директории examples репозитория




0 комментариев:
Отправить комментарий