Если вы хотите передать данные по цепочке вызовов, самый простой способ — использовать аргументы функций.
Однако в некоторых случаях модифицировать все функции в цепочке, чтобы передать новые данные, крайне неудобно. Вместо этого вы можете настроить некоторый контекст, который будет доступен для всех функций по цепочке. Как это можно реализовать технически?
Самое простое решение — глобальная переменная. В Python также можно использовать модули и классы как хранилища контекста, так как они, строго говоря, тоже являются глобальными переменными. Вы, вероятно, делаете это ежедневно, например, для работы с логгерами.
Если ваше приложение многопоточное, обычная глобальная переменная не подойдет, так как она не является потокобезопасной. Одновременно может выполняться несколько цепочек вызовов, и каждая из них нуждается в собственном контексте. Модуль
threading
решает эту проблему с помощью объекта
threading.local()
, который является потокобезопасным. Вы можете хранить данные, просто устанавливая атрибуты, например:
threading.local().symbol = '@'
.
Но оба этих подхода не подходят для асинхронных приложений, где функции не только вызываются, но и могут быть приостановлены с помощью
await
. Если корутина выполняет
await
, цикл событий может переключиться на другую корутину из совершенно другой цепочки вызовов. Это приведет к некорректной работе, как в следующем примере:
import asyncio
import sys
global_symbol = '.'
async def indication(timeout):
while True:
print(global_symbol, end='')
sys.stdout.flush()
await asyncio.sleep(timeout)
async def sleep(t, indication_t, symbol='.'):
loop = asyncio.get_event_loop()
global global_symbol
global_symbol = symbol
loop.create_task(indication(indication_t))
await asyncio.sleep(t)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
sleep(1, 0.1, '0'),
sleep(1, 0.1, 'a'),
sleep(1, 0.1, 'b'),
sleep(1, 0.1, 'c'),
))
Решить эту проблему можно, если цикл событий будет устанавливать и восстанавливать контекст каждый раз, когда он возобновляет выполнение корутины. Модуль
aiotask_context
реализует это, изменяя способ создания задач с помощью
loop.set_task_factory
. Пример рабочей версии:
import asyncio
import sys
import aiotask_context as context
async def indication(timeout):
while True:
print(context.get('symbol'), end='')
sys.stdout.flush()
await asyncio.sleep(timeout)
async def sleep(t, indication_t, symbol='.'):
loop = asyncio.get_event_loop()
context.set(key='symbol', value=symbol)
loop.create_task(indication(indication_t))
await asyncio.sleep(t)
loop = asyncio.get_event_loop()
loop.set_task_factory(context.task_factory)
loop.run_until_complete(asyncio.gather(
sleep(1, 0.1, '0'),
sleep(1, 0.1, 'a'),
sleep(1, 0.1, 'b'),
sleep(1, 0.1, 'c'),
))
👉@BookPython