event
Classes
EventHandler
Source code in pyflowlauncher/event.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 | class EventHandler:
def __init__(self):
self._events = {}
self._handlers = {}
def _get_callable_name(self, method: Union[Callable[..., Any], Exception]):
return getattr(method, '__name__', method.__class__.__name__).lower()
def add_event(self, event: Callable[..., Any], *, name=None) -> str:
key = name or self._get_callable_name(event)
self._events[key] = event
return key
def add_events(self, events: Iterable[Callable[..., Any]]):
for event in events:
self.add_event(event)
def add_exception_handler(self, exception: Type[Exception], handler: Callable[..., Any]):
self._handlers[exception] = handler
def get_event(self, event: str) -> Callable[..., Any]:
try:
return self._events[event]
except KeyError:
raise EventNotFound(event)
async def _await_maybe(self, result: Any) -> Any:
if asyncio.iscoroutine(result):
return await result
return result
async def trigger_exception_handler(self, exception: Exception) -> Any:
try:
handler = self._handlers[exception.__class__]
return await self._await_maybe(handler(exception))
except KeyError:
raise exception
async def trigger_event(self, event: str, *args, **kwargs) -> Any:
try:
result = self.get_event(event)(*args, **kwargs)
return await self._await_maybe(result)
except Exception as e:
return await self.trigger_exception_handler(e)
|
Functions
__init__
Source code in pyflowlauncher/event.py
| def __init__(self):
self._events = {}
self._handlers = {}
|
add_event
add_event(event: Callable[..., Any], *, name=None) -> str
Source code in pyflowlauncher/event.py
| def add_event(self, event: Callable[..., Any], *, name=None) -> str:
key = name or self._get_callable_name(event)
self._events[key] = event
return key
|
add_events
add_events(events: Iterable[Callable[..., Any]])
Source code in pyflowlauncher/event.py
| def add_events(self, events: Iterable[Callable[..., Any]]):
for event in events:
self.add_event(event)
|
add_exception_handler
add_exception_handler(exception: Type[Exception], handler: Callable[..., Any])
Source code in pyflowlauncher/event.py
| def add_exception_handler(self, exception: Type[Exception], handler: Callable[..., Any]):
self._handlers[exception] = handler
|
get_event
get_event(event: str) -> Callable[..., Any]
Source code in pyflowlauncher/event.py
| def get_event(self, event: str) -> Callable[..., Any]:
try:
return self._events[event]
except KeyError:
raise EventNotFound(event)
|
trigger_event
async
trigger_event(event: str, *args, **kwargs) -> Any
Source code in pyflowlauncher/event.py
| async def trigger_event(self, event: str, *args, **kwargs) -> Any:
try:
result = self.get_event(event)(*args, **kwargs)
return await self._await_maybe(result)
except Exception as e:
return await self.trigger_exception_handler(e)
|
trigger_exception_handler
async
trigger_exception_handler(exception: Exception) -> Any
Source code in pyflowlauncher/event.py
| async def trigger_exception_handler(self, exception: Exception) -> Any:
try:
handler = self._handlers[exception.__class__]
return await self._await_maybe(handler(exception))
except KeyError:
raise exception
|
EventNotFound
Bases: Exception
Source code in pyflowlauncher/event.py
| class EventNotFound(Exception):
def __init__(self, event: str):
self.event = event
super().__init__(f"Event '{event}' not found.")
|
Attributes
Functions
__init__
Source code in pyflowlauncher/event.py
| def __init__(self, event: str):
self.event = event
super().__init__(f"Event '{event}' not found.")
|
handler: python
options:
separate_signature: true
show_signature_annotations: true