Skip to content

event

Classes

EventHandler

Source code in pyflowlauncher/event.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class EventHandler:

    def __init__(self):
        self._events = {}
        self._handlers = {}

    def _get_callable_name(self, method: Union[Callable[..., Any], Exception]):
        return getattr(method, '__name__', method.__class__.__name__).lower()

    def add_event(self, event: Callable[..., Any], *, name=None) -> str:
        key = name or self._get_callable_name(event)
        self._events[key] = event
        return key

    def add_events(self, events: Iterable[Callable[..., Any]]):
        for event in events:
            self.add_event(event)

    def add_exception_handler(self, exception: Type[Exception], handler: Callable[..., Any]):
        self._handlers[exception] = handler

    def get_event(self, event: str) -> Callable[..., Any]:
        try:
            return self._events[event]
        except KeyError:
            raise EventNotFound(event)

    async def _await_maybe(self, result: Any) -> Any:
        if asyncio.iscoroutine(result):
            return await result
        return result

    async def trigger_exception_handler(self, exception: Exception) -> Any:
        try:
            handler = self._handlers[exception.__class__]
            return await self._await_maybe(handler(exception))
        except KeyError:
            raise exception

    async def trigger_event(self, event: str, *args, **kwargs) -> Any:
        try:
            result = self.get_event(event)(*args, **kwargs)
            return await self._await_maybe(result)
        except Exception as e:
            return await self.trigger_exception_handler(e)

Functions

__init__
__init__()
Source code in pyflowlauncher/event.py
14
15
16
def __init__(self):
    self._events = {}
    self._handlers = {}
add_event
add_event(event: Callable[..., Any], *, name=None) -> str
Source code in pyflowlauncher/event.py
21
22
23
24
def add_event(self, event: Callable[..., Any], *, name=None) -> str:
    key = name or self._get_callable_name(event)
    self._events[key] = event
    return key
add_events
add_events(events: Iterable[Callable[..., Any]])
Source code in pyflowlauncher/event.py
26
27
28
def add_events(self, events: Iterable[Callable[..., Any]]):
    for event in events:
        self.add_event(event)
add_exception_handler
add_exception_handler(exception: Type[Exception], handler: Callable[..., Any])
Source code in pyflowlauncher/event.py
30
31
def add_exception_handler(self, exception: Type[Exception], handler: Callable[..., Any]):
    self._handlers[exception] = handler
get_event
get_event(event: str) -> Callable[..., Any]
Source code in pyflowlauncher/event.py
33
34
35
36
37
def get_event(self, event: str) -> Callable[..., Any]:
    try:
        return self._events[event]
    except KeyError:
        raise EventNotFound(event)
trigger_event async
trigger_event(event: str, *args, **kwargs) -> Any
Source code in pyflowlauncher/event.py
51
52
53
54
55
56
async def trigger_event(self, event: str, *args, **kwargs) -> Any:
    try:
        result = self.get_event(event)(*args, **kwargs)
        return await self._await_maybe(result)
    except Exception as e:
        return await self.trigger_exception_handler(e)
trigger_exception_handler async
trigger_exception_handler(exception: Exception) -> Any
Source code in pyflowlauncher/event.py
44
45
46
47
48
49
async def trigger_exception_handler(self, exception: Exception) -> Any:
    try:
        handler = self._handlers[exception.__class__]
        return await self._await_maybe(handler(exception))
    except KeyError:
        raise exception

EventNotFound

Bases: Exception

Source code in pyflowlauncher/event.py
5
6
7
8
9
class EventNotFound(Exception):

    def __init__(self, event: str):
        self.event = event
        super().__init__(f"Event '{event}' not found.")

Attributes

event instance-attribute
event = event

Functions

__init__
__init__(event: str)
Source code in pyflowlauncher/event.py
7
8
9
def __init__(self, event: str):
    self.event = event
    super().__init__(f"Event '{event}' not found.")

handler: python options: separate_signature: true show_signature_annotations: true