ayaka.driver.ayakabot.bot
1import json 2from typing import Any, Callable, Awaitable, List, Union 3from functools import partial 4from typing_extensions import Protocol 5 6# from .utils import safe_cqhttp_utf8 7from .result import ResultStore 8from .event import Event 9from .websocket import FastAPIWebSocket 10from .message import Message, MessageSegment 11from .model import DataclassEncoder 12 13from ayaka import logger 14 15 16class _ApiCall(Protocol): 17 async def __call__(self, **kwargs: Any) -> Any: 18 ... 19 20 21class Bot: 22 """ 23 OneBot v11 协议 Bot 适配。 24 """ 25 26 _calls: List[Callable[["Bot", str, dict], Awaitable]] = [] 27 28 def __init__(self, ws: FastAPIWebSocket, self_id: str): 29 self.ws = ws 30 self.self_id = self_id 31 32 def __getattr__(self, name: str) -> _ApiCall: 33 return partial(self.call_api, name) 34 35 @classmethod 36 def on_call(self, func: Callable[["Bot", str, dict], Awaitable]): 37 self._calls.append(func) 38 39 async def call_api(self, api: str, **data) -> Any: 40 """ 41 :说明: 42 43 调用 OneBot 协议 API 44 """ 45 logger.opt(colors=True).debug(f"Calling API <y>{api}</y>") 46 47 websocket = self.ws 48 if not websocket: 49 logger.warning("没有建立ws连接,无法发送消息") 50 return 51 52 # 调用钩子 53 for call in self._calls: 54 await call(self, api, data) 55 56 # # 解决cqhttp莫名其妙被风控的问题 57 # data = safe_cqhttp_utf8(api, data) 58 59 # 生成 seq码 和 待发送的json数据 60 seq = ResultStore.get_seq() 61 json_data = json.dumps( 62 {"action": api, "params": data, "echo": {"seq": seq}}, 63 cls=DataclassEncoder, 64 ) 65 66 try: 67 await websocket.send(json_data) 68 # 默认30s超时 69 result = await ResultStore.fetch(seq, 30) 70 if isinstance(result, dict): 71 if result.get("status") == "failed": 72 raise 73 return result.get("data") 74 75 except: 76 logger.exception("发送消息失败") 77 78 async def send( 79 self, 80 event: Event, 81 message: Union[str, Message, MessageSegment], 82 **kwargs, 83 ) -> Any: 84 """ 85 :说明: 86 87 根据 ``event`` 向触发事件的主体发送消息。 88 """ 89 90 # 生成Message 91 if isinstance(message, Message): 92 msg = message 93 else: 94 msg = Message(message) 95 96 params = {} 97 98 # 填写目标id 99 if getattr(event, "user_id", None): 100 params["user_id"] = getattr(event, "user_id") 101 if getattr(event, "group_id", None): 102 params["group_id"] = getattr(event, "group_id") 103 104 # 加载其他信息 105 params.update(kwargs) 106 107 # 填写群聊/私聊 108 if "message_type" not in params: 109 if params.get("group_id", None): 110 params["message_type"] = "group" 111 elif params.get("user_id", None): 112 params["message_type"] = "private" 113 else: 114 raise ValueError("Cannot guess message type to reply!") 115 116 params["message"] = msg 117 118 return await self.call_api("send_msg", **params)
class
Bot:
22class Bot: 23 """ 24 OneBot v11 协议 Bot 适配。 25 """ 26 27 _calls: List[Callable[["Bot", str, dict], Awaitable]] = [] 28 29 def __init__(self, ws: FastAPIWebSocket, self_id: str): 30 self.ws = ws 31 self.self_id = self_id 32 33 def __getattr__(self, name: str) -> _ApiCall: 34 return partial(self.call_api, name) 35 36 @classmethod 37 def on_call(self, func: Callable[["Bot", str, dict], Awaitable]): 38 self._calls.append(func) 39 40 async def call_api(self, api: str, **data) -> Any: 41 """ 42 :说明: 43 44 调用 OneBot 协议 API 45 """ 46 logger.opt(colors=True).debug(f"Calling API <y>{api}</y>") 47 48 websocket = self.ws 49 if not websocket: 50 logger.warning("没有建立ws连接,无法发送消息") 51 return 52 53 # 调用钩子 54 for call in self._calls: 55 await call(self, api, data) 56 57 # # 解决cqhttp莫名其妙被风控的问题 58 # data = safe_cqhttp_utf8(api, data) 59 60 # 生成 seq码 和 待发送的json数据 61 seq = ResultStore.get_seq() 62 json_data = json.dumps( 63 {"action": api, "params": data, "echo": {"seq": seq}}, 64 cls=DataclassEncoder, 65 ) 66 67 try: 68 await websocket.send(json_data) 69 # 默认30s超时 70 result = await ResultStore.fetch(seq, 30) 71 if isinstance(result, dict): 72 if result.get("status") == "failed": 73 raise 74 return result.get("data") 75 76 except: 77 logger.exception("发送消息失败") 78 79 async def send( 80 self, 81 event: Event, 82 message: Union[str, Message, MessageSegment], 83 **kwargs, 84 ) -> Any: 85 """ 86 :说明: 87 88 根据 ``event`` 向触发事件的主体发送消息。 89 """ 90 91 # 生成Message 92 if isinstance(message, Message): 93 msg = message 94 else: 95 msg = Message(message) 96 97 params = {} 98 99 # 填写目标id 100 if getattr(event, "user_id", None): 101 params["user_id"] = getattr(event, "user_id") 102 if getattr(event, "group_id", None): 103 params["group_id"] = getattr(event, "group_id") 104 105 # 加载其他信息 106 params.update(kwargs) 107 108 # 填写群聊/私聊 109 if "message_type" not in params: 110 if params.get("group_id", None): 111 params["message_type"] = "group" 112 elif params.get("user_id", None): 113 params["message_type"] = "private" 114 else: 115 raise ValueError("Cannot guess message type to reply!") 116 117 params["message"] = msg 118 119 return await self.call_api("send_msg", **params)
OneBot v11 协议 Bot 适配。
Bot(ws: ayaka.driver.ayakabot.websocket.FastAPIWebSocket, self_id: str)
async def
call_api(self, api: str, **data) -> Any:
40 async def call_api(self, api: str, **data) -> Any: 41 """ 42 :说明: 43 44 调用 OneBot 协议 API 45 """ 46 logger.opt(colors=True).debug(f"Calling API <y>{api}</y>") 47 48 websocket = self.ws 49 if not websocket: 50 logger.warning("没有建立ws连接,无法发送消息") 51 return 52 53 # 调用钩子 54 for call in self._calls: 55 await call(self, api, data) 56 57 # # 解决cqhttp莫名其妙被风控的问题 58 # data = safe_cqhttp_utf8(api, data) 59 60 # 生成 seq码 和 待发送的json数据 61 seq = ResultStore.get_seq() 62 json_data = json.dumps( 63 {"action": api, "params": data, "echo": {"seq": seq}}, 64 cls=DataclassEncoder, 65 ) 66 67 try: 68 await websocket.send(json_data) 69 # 默认30s超时 70 result = await ResultStore.fetch(seq, 30) 71 if isinstance(result, dict): 72 if result.get("status") == "failed": 73 raise 74 return result.get("data") 75 76 except: 77 logger.exception("发送消息失败")
:说明:
调用 OneBot 协议 API
async def
send( self, event: ayaka.driver.ayakabot.event.Event, message: Union[str, ayaka.driver.ayakabot.message.Message, ayaka.driver.ayakabot.message.MessageSegment], **kwargs) -> Any:
79 async def send( 80 self, 81 event: Event, 82 message: Union[str, Message, MessageSegment], 83 **kwargs, 84 ) -> Any: 85 """ 86 :说明: 87 88 根据 ``event`` 向触发事件的主体发送消息。 89 """ 90 91 # 生成Message 92 if isinstance(message, Message): 93 msg = message 94 else: 95 msg = Message(message) 96 97 params = {} 98 99 # 填写目标id 100 if getattr(event, "user_id", None): 101 params["user_id"] = getattr(event, "user_id") 102 if getattr(event, "group_id", None): 103 params["group_id"] = getattr(event, "group_id") 104 105 # 加载其他信息 106 params.update(kwargs) 107 108 # 填写群聊/私聊 109 if "message_type" not in params: 110 if params.get("group_id", None): 111 params["message_type"] = "group" 112 elif params.get("user_id", None): 113 params["message_type"] = "private" 114 else: 115 raise ValueError("Cannot guess message type to reply!") 116 117 params["message"] = msg 118 119 return await self.call_api("send_msg", **params)
:说明:
根据 event
向触发事件的主体发送消息。