ayaka.driver.ayakabot.result

 1from typing import Any, Dict, Optional
 2import sys
 3import asyncio
 4
 5
 6class ResultStore:
 7    _seq = 1
 8    _futures: Dict[int, asyncio.Future] = {}
 9
10    @classmethod
11    def get_seq(cls) -> int:
12        s = cls._seq
13        cls._seq = (cls._seq + 1) % sys.maxsize
14        return s
15
16    @classmethod
17    def add_result(cls, result: Dict[str, Any]):
18        echo = result.get("echo")
19        if not isinstance(echo, dict):
20            return
21
22        seq = echo.get("seq")
23        if not isinstance(seq, int):
24            return
25
26        future = cls._futures.get(seq)
27        if not future:
28            return
29
30        future.set_result(result)
31
32    @classmethod
33    async def fetch(
34        cls, seq: int, timeout: Optional[float]
35    ) -> Dict[str, Any]:
36        future = asyncio.get_event_loop().create_future()
37        cls._futures[seq] = future
38        try:
39            return await asyncio.wait_for(future, timeout)
40        except asyncio.TimeoutError:
41            raise
42        finally:
43            del cls._futures[seq]
class ResultStore:
 7class ResultStore:
 8    _seq = 1
 9    _futures: Dict[int, asyncio.Future] = {}
10
11    @classmethod
12    def get_seq(cls) -> int:
13        s = cls._seq
14        cls._seq = (cls._seq + 1) % sys.maxsize
15        return s
16
17    @classmethod
18    def add_result(cls, result: Dict[str, Any]):
19        echo = result.get("echo")
20        if not isinstance(echo, dict):
21            return
22
23        seq = echo.get("seq")
24        if not isinstance(seq, int):
25            return
26
27        future = cls._futures.get(seq)
28        if not future:
29            return
30
31        future.set_result(result)
32
33    @classmethod
34    async def fetch(
35        cls, seq: int, timeout: Optional[float]
36    ) -> Dict[str, Any]:
37        future = asyncio.get_event_loop().create_future()
38        cls._futures[seq] = future
39        try:
40            return await asyncio.wait_for(future, timeout)
41        except asyncio.TimeoutError:
42            raise
43        finally:
44            del cls._futures[seq]
ResultStore()
@classmethod
def get_seq(cls) -> int:
11    @classmethod
12    def get_seq(cls) -> int:
13        s = cls._seq
14        cls._seq = (cls._seq + 1) % sys.maxsize
15        return s
@classmethod
def add_result(cls, result: Dict[str, Any]):
17    @classmethod
18    def add_result(cls, result: Dict[str, Any]):
19        echo = result.get("echo")
20        if not isinstance(echo, dict):
21            return
22
23        seq = echo.get("seq")
24        if not isinstance(seq, int):
25            return
26
27        future = cls._futures.get(seq)
28        if not future:
29            return
30
31        future.set_result(result)
@classmethod
async def fetch(cls, seq: int, timeout: Union[float, NoneType]) -> Dict[str, Any]:
33    @classmethod
34    async def fetch(
35        cls, seq: int, timeout: Optional[float]
36    ) -> Dict[str, Any]:
37        future = asyncio.get_event_loop().create_future()
38        cls._futures[seq] = future
39        try:
40            return await asyncio.wait_for(future, timeout)
41        except asyncio.TimeoutError:
42            raise
43        finally:
44            del cls._futures[seq]