ayaka.driver.ayakabot_driver.result
1from typing import Any, Dict, Optional 2import sys 3import asyncio 4 5 6class ResultStore: 7 _seq = 1 8 _futures: Dict[int, asyncio.Future] = {} 9 10 @classmethod 11 def get_seq(cls) -> int: 12 s = cls._seq 13 cls._seq = (cls._seq + 1) % sys.maxsize 14 return s 15 16 @classmethod 17 def add_result(cls, result: Dict[str, Any]): 18 echo = result.get("echo") 19 if not isinstance(echo, dict): 20 return 21 22 seq = echo.get("seq") 23 if not isinstance(seq, int): 24 return 25 26 future = cls._futures.get(seq) 27 if not future: 28 return 29 30 future.set_result(result) 31 32 @classmethod 33 async def fetch( 34 cls, seq: int, timeout: Optional[float] 35 ) -> Dict[str, Any]: 36 future = asyncio.get_event_loop().create_future() 37 cls._futures[seq] = future 38 try: 39 return await asyncio.wait_for(future, timeout) 40 except asyncio.TimeoutError: 41 raise 42 finally: 43 del cls._futures[seq]
class
ResultStore:
7class ResultStore: 8 _seq = 1 9 _futures: Dict[int, asyncio.Future] = {} 10 11 @classmethod 12 def get_seq(cls) -> int: 13 s = cls._seq 14 cls._seq = (cls._seq + 1) % sys.maxsize 15 return s 16 17 @classmethod 18 def add_result(cls, result: Dict[str, Any]): 19 echo = result.get("echo") 20 if not isinstance(echo, dict): 21 return 22 23 seq = echo.get("seq") 24 if not isinstance(seq, int): 25 return 26 27 future = cls._futures.get(seq) 28 if not future: 29 return 30 31 future.set_result(result) 32 33 @classmethod 34 async def fetch( 35 cls, seq: int, timeout: Optional[float] 36 ) -> Dict[str, Any]: 37 future = asyncio.get_event_loop().create_future() 38 cls._futures[seq] = future 39 try: 40 return await asyncio.wait_for(future, timeout) 41 except asyncio.TimeoutError: 42 raise 43 finally: 44 del cls._futures[seq]
@classmethod
def
add_result(cls, result: Dict[str, Any]):
17 @classmethod 18 def add_result(cls, result: Dict[str, Any]): 19 echo = result.get("echo") 20 if not isinstance(echo, dict): 21 return 22 23 seq = echo.get("seq") 24 if not isinstance(seq, int): 25 return 26 27 future = cls._futures.get(seq) 28 if not future: 29 return 30 31 future.set_result(result)
@classmethod
async def
fetch(cls, seq: int, timeout: Union[float, NoneType]) -> Dict[str, Any]:
33 @classmethod 34 async def fetch( 35 cls, seq: int, timeout: Optional[float] 36 ) -> Dict[str, Any]: 37 future = asyncio.get_event_loop().create_future() 38 cls._futures[seq] = future 39 try: 40 return await asyncio.wait_for(future, timeout) 41 except asyncio.TimeoutError: 42 raise 43 finally: 44 del cls._futures[seq]