ayaka.driver.ayakabot.bot

  1import json
  2from typing import Any, Callable, Awaitable, List, Union
  3from functools import partial
  4from typing_extensions import Protocol
  5
  6# from .utils import safe_cqhttp_utf8
  7from .result import ResultStore
  8from .event import Event
  9from .websocket import FastAPIWebSocket
 10from .message import Message, MessageSegment
 11from .model import DataclassEncoder
 12
 13from ayaka import logger
 14
 15
 16class _ApiCall(Protocol):
 17    async def __call__(self, **kwargs: Any) -> Any:
 18        ...
 19
 20
 21class Bot:
 22    """
 23    OneBot v11 协议 Bot 适配。
 24    """
 25
 26    _calls: List[Callable[["Bot", str, dict], Awaitable]] = []
 27
 28    def __init__(self, ws: FastAPIWebSocket, self_id: str):
 29        self.ws = ws
 30        self.self_id = self_id
 31
 32    def __getattr__(self, name: str) -> _ApiCall:
 33        return partial(self.call_api, name)
 34
 35    @classmethod
 36    def on_call(self, func: Callable[["Bot", str, dict], Awaitable]):
 37        self._calls.append(func)
 38
 39    async def call_api(self, api: str, **data) -> Any:
 40        """
 41        :说明:
 42
 43          调用 OneBot 协议 API
 44        """
 45        logger.opt(colors=True).debug(f"Calling API <y>{api}</y>")
 46
 47        websocket = self.ws
 48        if not websocket:
 49            logger.warning("没有建立ws连接,无法发送消息")
 50            return
 51
 52        # 调用钩子
 53        for call in self._calls:
 54            await call(self, api, data)
 55
 56        # # 解决cqhttp莫名其妙被风控的问题
 57        # data = safe_cqhttp_utf8(api, data)
 58
 59        # 生成 seq码 和 待发送的json数据
 60        seq = ResultStore.get_seq()
 61        json_data = json.dumps(
 62            {"action": api, "params": data, "echo": {"seq": seq}},
 63            cls=DataclassEncoder,
 64        )
 65
 66        try:
 67            await websocket.send(json_data)
 68            # 默认30s超时
 69            result = await ResultStore.fetch(seq, 30)
 70            if isinstance(result, dict):
 71                if result.get("status") == "failed":
 72                    raise
 73                return result.get("data")
 74
 75        except:
 76            logger.exception("发送消息失败")
 77
 78    async def send(
 79        self,
 80        event: Event,
 81        message: Union[str, Message, MessageSegment],
 82        **kwargs,
 83    ) -> Any:
 84        """
 85        :说明:
 86
 87          根据 ``event``  向触发事件的主体发送消息。
 88        """
 89
 90        # 生成Message
 91        if isinstance(message, Message):
 92            msg = message
 93        else:
 94            msg = Message(message)
 95
 96        params = {}
 97
 98        # 填写目标id
 99        if getattr(event, "user_id", None):
100            params["user_id"] = getattr(event, "user_id")
101        if getattr(event, "group_id", None):
102            params["group_id"] = getattr(event, "group_id")
103
104        # 加载其他信息
105        params.update(kwargs)
106
107        # 填写群聊/私聊
108        if "message_type" not in params:
109            if params.get("group_id", None):
110                params["message_type"] = "group"
111            elif params.get("user_id", None):
112                params["message_type"] = "private"
113            else:
114                raise ValueError("Cannot guess message type to reply!")
115
116        params["message"] = msg
117
118        return await self.call_api("send_msg", **params)
class Bot:
 22class Bot:
 23    """
 24    OneBot v11 协议 Bot 适配。
 25    """
 26
 27    _calls: List[Callable[["Bot", str, dict], Awaitable]] = []
 28
 29    def __init__(self, ws: FastAPIWebSocket, self_id: str):
 30        self.ws = ws
 31        self.self_id = self_id
 32
 33    def __getattr__(self, name: str) -> _ApiCall:
 34        return partial(self.call_api, name)
 35
 36    @classmethod
 37    def on_call(self, func: Callable[["Bot", str, dict], Awaitable]):
 38        self._calls.append(func)
 39
 40    async def call_api(self, api: str, **data) -> Any:
 41        """
 42        :说明:
 43
 44          调用 OneBot 协议 API
 45        """
 46        logger.opt(colors=True).debug(f"Calling API <y>{api}</y>")
 47
 48        websocket = self.ws
 49        if not websocket:
 50            logger.warning("没有建立ws连接,无法发送消息")
 51            return
 52
 53        # 调用钩子
 54        for call in self._calls:
 55            await call(self, api, data)
 56
 57        # # 解决cqhttp莫名其妙被风控的问题
 58        # data = safe_cqhttp_utf8(api, data)
 59
 60        # 生成 seq码 和 待发送的json数据
 61        seq = ResultStore.get_seq()
 62        json_data = json.dumps(
 63            {"action": api, "params": data, "echo": {"seq": seq}},
 64            cls=DataclassEncoder,
 65        )
 66
 67        try:
 68            await websocket.send(json_data)
 69            # 默认30s超时
 70            result = await ResultStore.fetch(seq, 30)
 71            if isinstance(result, dict):
 72                if result.get("status") == "failed":
 73                    raise
 74                return result.get("data")
 75
 76        except:
 77            logger.exception("发送消息失败")
 78
 79    async def send(
 80        self,
 81        event: Event,
 82        message: Union[str, Message, MessageSegment],
 83        **kwargs,
 84    ) -> Any:
 85        """
 86        :说明:
 87
 88          根据 ``event``  向触发事件的主体发送消息。
 89        """
 90
 91        # 生成Message
 92        if isinstance(message, Message):
 93            msg = message
 94        else:
 95            msg = Message(message)
 96
 97        params = {}
 98
 99        # 填写目标id
100        if getattr(event, "user_id", None):
101            params["user_id"] = getattr(event, "user_id")
102        if getattr(event, "group_id", None):
103            params["group_id"] = getattr(event, "group_id")
104
105        # 加载其他信息
106        params.update(kwargs)
107
108        # 填写群聊/私聊
109        if "message_type" not in params:
110            if params.get("group_id", None):
111                params["message_type"] = "group"
112            elif params.get("user_id", None):
113                params["message_type"] = "private"
114            else:
115                raise ValueError("Cannot guess message type to reply!")
116
117        params["message"] = msg
118
119        return await self.call_api("send_msg", **params)

OneBot v11 协议 Bot 适配。

Bot(ws: ayaka.driver.ayakabot.websocket.FastAPIWebSocket, self_id: str)
29    def __init__(self, ws: FastAPIWebSocket, self_id: str):
30        self.ws = ws
31        self.self_id = self_id
@classmethod
def on_call(self, func) -> None:
36    @classmethod
37    def on_call(self, func: Callable[["Bot", str, dict], Awaitable]):
38        self._calls.append(func)
async def call_api(self, api: str, **data) -> Any:
40    async def call_api(self, api: str, **data) -> Any:
41        """
42        :说明:
43
44          调用 OneBot 协议 API
45        """
46        logger.opt(colors=True).debug(f"Calling API <y>{api}</y>")
47
48        websocket = self.ws
49        if not websocket:
50            logger.warning("没有建立ws连接,无法发送消息")
51            return
52
53        # 调用钩子
54        for call in self._calls:
55            await call(self, api, data)
56
57        # # 解决cqhttp莫名其妙被风控的问题
58        # data = safe_cqhttp_utf8(api, data)
59
60        # 生成 seq码 和 待发送的json数据
61        seq = ResultStore.get_seq()
62        json_data = json.dumps(
63            {"action": api, "params": data, "echo": {"seq": seq}},
64            cls=DataclassEncoder,
65        )
66
67        try:
68            await websocket.send(json_data)
69            # 默认30s超时
70            result = await ResultStore.fetch(seq, 30)
71            if isinstance(result, dict):
72                if result.get("status") == "failed":
73                    raise
74                return result.get("data")
75
76        except:
77            logger.exception("发送消息失败")

:说明:

调用 OneBot 协议 API

async def send( self, event: ayaka.driver.ayakabot.event.Event, message: Union[str, ayaka.driver.ayakabot.message.Message, ayaka.driver.ayakabot.message.MessageSegment], **kwargs) -> Any:
 79    async def send(
 80        self,
 81        event: Event,
 82        message: Union[str, Message, MessageSegment],
 83        **kwargs,
 84    ) -> Any:
 85        """
 86        :说明:
 87
 88          根据 ``event``  向触发事件的主体发送消息。
 89        """
 90
 91        # 生成Message
 92        if isinstance(message, Message):
 93            msg = message
 94        else:
 95            msg = Message(message)
 96
 97        params = {}
 98
 99        # 填写目标id
100        if getattr(event, "user_id", None):
101            params["user_id"] = getattr(event, "user_id")
102        if getattr(event, "group_id", None):
103            params["group_id"] = getattr(event, "group_id")
104
105        # 加载其他信息
106        params.update(kwargs)
107
108        # 填写群聊/私聊
109        if "message_type" not in params:
110            if params.get("group_id", None):
111                params["message_type"] = "group"
112            elif params.get("user_id", None):
113                params["message_type"] = "private"
114            else:
115                raise ValueError("Cannot guess message type to reply!")
116
117        params["message"] = msg
118
119        return await self.call_api("send_msg", **params)

:说明:

根据 event 向触发事件的主体发送消息。