async def send(
self,
event: BaseEvent,
context: "Context",
) -> None:
# interrupters should follow registration order
for condition, interrupter in tuple(self._interrupters.values()):
if condition and not condition(event):
continue
async with AsyncExitStack() as stack:
if not (
e := await interrupter.asolve(
event,
cache_dependencies={},
stack=stack,
dependency_provider=context.dependency_provider,
**{CONTEXT_OPTION_NAME: context},
)
):
return
event = e
# TODO: we need to publish under RWLock to prevent
# subscribers dictionary mutation. Now it is protected by copy
for condition, s in tuple(self._subscribers.values()):
if condition and not condition(event):
continue
async with AsyncExitStack() as stack:
await s.asolve(
event,
cache_dependencies={},
stack=stack,
dependency_provider=context.dependency_provider,
**{CONTEXT_OPTION_NAME: context},
)