ayaka.driver.ayakabot.message

  1import re
  2from io import BytesIO
  3from pathlib import Path
  4from base64 import b64encode
  5from typing import Any, Dict, List, Type, Tuple, Union, Mapping, Iterable, Optional, cast, overload
  6from copy import deepcopy
  7from dataclasses import field, asdict, dataclass
  8from pydantic import parse_obj_as
  9
 10
 11from ayaka import logger
 12from .utils import bool_to_str, escape, unescape
 13from .template import MessageTemplate
 14
 15
 16@dataclass
 17class MessageSegment:
 18    """
 19    OneBot v11 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
 20    """
 21
 22    type: str
 23    """消息段类型"""
 24    data: Dict[str, Any] = field(default_factory=dict)
 25    """消息段数据"""
 26
 27    def __len__(self) -> int:
 28        return len(str(self))
 29
 30    def __ne__(self, other: "MessageSegment") -> bool:
 31        return not self == other
 32
 33    def __add__(self, other: Union[str, "MessageSegment", Iterable["MessageSegment"]]) -> "Message":
 34        return self.get_message_class()(self) + other
 35
 36    def __radd__(self, other: Union[str, "MessageSegment", Iterable["MessageSegment"]]) -> "Message":
 37        return self.get_message_class()(other) + self
 38
 39    @classmethod
 40    def __get_validators__(cls):
 41        yield cls._validate
 42
 43    @classmethod
 44    def _validate(cls, value):
 45        if isinstance(value, cls):
 46            return value
 47        if not isinstance(value, dict):
 48            raise ValueError(
 49                f"Expected dict for MessageSegment, got {type(value)}")
 50        return cls(**value)
 51
 52    def get(self, key: str, default: Any = None):
 53        return asdict(self).get(key, default)
 54
 55    def keys(self):
 56        return asdict(self).keys()
 57
 58    def values(self):
 59        return asdict(self).values()
 60
 61    def items(self):
 62        return asdict(self).items()
 63
 64    def copy(self) -> "MessageSegment":
 65        return deepcopy(self)
 66
 67    @classmethod
 68    def get_message_class(cls) -> Type["Message"]:
 69        return Message
 70
 71    def __str__(self) -> str:
 72        type_ = self.type
 73        data = self.data.copy()
 74
 75        # process special types
 76        if type_ == "text":
 77            # type: ignore
 78            return escape(data.get("text", ""), escape_comma=False)
 79
 80        params = ",".join(
 81            [f"{k}={escape(str(v))}" for k, v in data.items() if v is not None]
 82        )
 83        return f"[CQ:{type_}{',' if params else ''}{params}]"
 84
 85    def __add__(self, other) -> "Message":
 86        return Message(self) + (
 87            MessageSegment.text(other) if isinstance(other, str) else other
 88        )
 89
 90    def __radd__(self, other) -> "Message":
 91        return (
 92            MessageSegment.text(other) if isinstance(
 93                other, str) else Message(other)
 94        ) + self
 95
 96    def is_text(self) -> bool:
 97        return self.type == "text"
 98
 99    @staticmethod
100    def anonymous(ignore_failure: Optional[bool] = None) -> "MessageSegment":
101        return MessageSegment("anonymous", {"ignore": bool_to_str(ignore_failure)})
102
103    @staticmethod
104    def at(user_id: Union[int, str]) -> "MessageSegment":
105        return MessageSegment("at", {"qq": str(user_id)})
106
107    @staticmethod
108    def contact(type_: str, id: int) -> "MessageSegment":
109        return MessageSegment("contact", {"type": type_, "id": str(id)})
110
111    @staticmethod
112    def contact_group(group_id: int) -> "MessageSegment":
113        return MessageSegment("contact", {"type": "group", "id": str(group_id)})
114
115    @staticmethod
116    def contact_user(user_id: int) -> "MessageSegment":
117        return MessageSegment("contact", {"type": "qq", "id": str(user_id)})
118
119    @staticmethod
120    def dice() -> "MessageSegment":
121        return MessageSegment("dice", {})
122
123    @staticmethod
124    def face(id_: int) -> "MessageSegment":
125        return MessageSegment("face", {"id": str(id_)})
126
127    @staticmethod
128    def forward(id_: str) -> "MessageSegment":
129        logger.warning("Forward Message only can be received!")
130        return MessageSegment("forward", {"id": id_})
131
132    @staticmethod
133    def image(
134        file: Union[str, bytes, BytesIO, Path],
135        type_: Optional[str] = None,
136        cache: bool = True,
137        proxy: bool = True,
138        timeout: Optional[int] = None,
139    ) -> "MessageSegment":
140        if isinstance(file, BytesIO):
141            file = file.getvalue()
142        if isinstance(file, bytes):
143            file = f"base64://{b64encode(file).decode()}"
144        elif isinstance(file, Path):
145            file = f"file:///{file.resolve()}"
146        return MessageSegment(
147            "image",
148            {
149                "file": file,
150                "type": type_,
151                "cache": bool_to_str(cache),
152                "proxy": bool_to_str(proxy),
153                "timeout": timeout,
154            },
155        )
156
157    @staticmethod
158    def json(data: str) -> "MessageSegment":
159        return MessageSegment("json", {"data": data})
160
161    @staticmethod
162    def location(
163        latitude: float,
164        longitude: float,
165        title: Optional[str] = None,
166        content: Optional[str] = None,
167    ) -> "MessageSegment":
168        return MessageSegment(
169            "location",
170            {
171                "lat": str(latitude),
172                "lon": str(longitude),
173                "title": title,
174                "content": content,
175            },
176        )
177
178    @staticmethod
179    def music(type_: str, id_: int) -> "MessageSegment":
180        return MessageSegment("music", {"type": type_, "id": id_})
181
182    @staticmethod
183    def music_custom(
184        url: str,
185        audio: str,
186        title: str,
187        content: Optional[str] = None,
188        img_url: Optional[str] = None,
189    ) -> "MessageSegment":
190        return MessageSegment(
191            "music",
192            {
193                "type": "custom",
194                "url": url,
195                "audio": audio,
196                "title": title,
197                "content": content,
198                "image": img_url,
199            },
200        )
201
202    @staticmethod
203    def node(id_: int) -> "MessageSegment":
204        return MessageSegment("node", {"id": str(id_)})
205
206    @staticmethod
207    def node_custom(
208        user_id: int, nickname: str, content: Union[str, "Message"]
209    ) -> "MessageSegment":
210        return MessageSegment(
211            "node", {"user_id": str(
212                user_id), "nickname": nickname, "content": content}
213        )
214
215    @staticmethod
216    def poke(type_: str, id_: str) -> "MessageSegment":
217        return MessageSegment("poke", {"type": type_, "id": id_})
218
219    @staticmethod
220    def record(
221        file: Union[str, bytes, BytesIO, Path],
222        magic: Optional[bool] = None,
223        cache: Optional[bool] = None,
224        proxy: Optional[bool] = None,
225        timeout: Optional[int] = None,
226    ) -> "MessageSegment":
227        if isinstance(file, BytesIO):
228            file = file.getvalue()
229        if isinstance(file, bytes):
230            file = f"base64://{b64encode(file).decode()}"
231        elif isinstance(file, Path):
232            file = f"file:///{file.resolve()}"
233        return MessageSegment(
234            "record",
235            {
236                "file": file,
237                "magic": bool_to_str(magic),
238                "cache": bool_to_str(cache),
239                "proxy": bool_to_str(proxy),
240                "timeout": timeout,
241            },
242        )
243
244    @staticmethod
245    def reply(id_: int) -> "MessageSegment":
246        return MessageSegment("reply", {"id": str(id_)})
247
248    @staticmethod
249    def rps() -> "MessageSegment":
250        return MessageSegment("rps", {})
251
252    @staticmethod
253    def shake() -> "MessageSegment":
254        return MessageSegment("shake", {})
255
256    @staticmethod
257    def share(
258        url: str = "",
259        title: str = "",
260        content: Optional[str] = None,
261        image: Optional[str] = None,
262    ) -> "MessageSegment":
263        return MessageSegment(
264            "share", {"url": url, "title": title,
265                      "content": content, "image": image}
266        )
267
268    @staticmethod
269    def text(text: str) -> "MessageSegment":
270        return MessageSegment("text", {"text": text})
271
272    @staticmethod
273    def video(
274        file: Union[str, bytes, BytesIO, Path],
275        cache: Optional[bool] = None,
276        proxy: Optional[bool] = None,
277        timeout: Optional[int] = None,
278    ) -> "MessageSegment":
279        if isinstance(file, BytesIO):
280            file = file.getvalue()
281        if isinstance(file, bytes):
282            file = f"base64://{b64encode(file).decode()}"
283        elif isinstance(file, Path):
284            file = f"file:///{file.resolve()}"
285        return MessageSegment(
286            "video",
287            {
288                "file": file,
289                "cache": bool_to_str(cache),
290                "proxy": bool_to_str(proxy),
291                "timeout": timeout,
292            },
293        )
294
295    @staticmethod
296    def xml(data: str) -> "MessageSegment":
297        return MessageSegment("xml", {"data": data})
298
299
300class Message(List[MessageSegment]):
301    """
302    OneBot v11 协议 Message 适配。
303    """
304
305    def __init__(
306        self,
307        message: Union[str, None, Iterable[MessageSegment],
308                       MessageSegment] = None,
309    ):
310        super().__init__()
311        if message is None:
312            return
313        elif isinstance(message, str):
314            self.extend(self._construct(message))
315        elif isinstance(message, MessageSegment):
316            self.append(message)
317        elif isinstance(message, Iterable):
318            self.extend(message)
319        else:
320            self.extend(self._construct(message))  # pragma: no cover
321
322    @classmethod
323    def template(cls: Type["Message"], format_string: Union[str, "Message"]) -> MessageTemplate["Message"]:
324        """创建消息模板。
325
326        用法和 `str.format` 大致相同, 但是可以输出消息对象, 并且支持以 `Message` 对象作为消息模板
327
328        并且提供了拓展的格式化控制符, 可以用适用于该消息类型的 `MessageSegment` 的工厂方法创建消息
329
330        参数:
331            format_string: 格式化模板
332
333        返回:
334            消息格式化器
335        """
336        return MessageTemplate(format_string, cls)
337
338    def __str__(self) -> str:
339        return "".join(str(seg) for seg in self)
340
341    @classmethod
342    def __get_validators__(cls):
343        yield cls._validate
344
345    @classmethod
346    def _validate(cls, value):
347        if isinstance(value, cls):
348            return value
349        elif isinstance(value, Message):
350            raise ValueError(
351                f"Type {type(value)} can not be converted to {cls}")
352        elif isinstance(value, str):
353            pass
354        elif isinstance(value, dict):
355            value = parse_obj_as(cls.get_segment_class(), value)
356        elif isinstance(value, Iterable):
357            value = [parse_obj_as(cls.get_segment_class(), v) for v in value]
358        else:
359            raise ValueError(
360                f"Expected str, dict or iterable for Message, got {type(value)}"
361            )
362        return cls(value)
363
364    def __add__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message":
365        result = self.copy()
366        result += other
367        return result
368
369    def __radd__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message":
370        result = self.__class__(other)
371        return result + self
372
373    def __iadd__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message":
374        if isinstance(other, str):
375            self.extend(self._construct(other))
376        elif isinstance(other, MessageSegment):
377            self.append(other)
378        elif isinstance(other, Iterable):
379            self.extend(other)
380        else:
381            raise ValueError(
382                f"Unsupported type: {type(other)}")  # pragma: no cover
383        return self
384
385    @overload
386    def __getitem__(self, __args: str) -> "Message":
387        """
388        参数:
389            __args: 消息段类型
390
391        返回:
392            所有类型为 `__args` 的消息段
393        """
394
395    @overload
396    def __getitem__(self, __args: Tuple[str, int]) -> MessageSegment:
397        """
398        参数:
399            __args: 消息段类型和索引
400
401        返回:
402            类型为 `__args[0]` 的消息段第 `__args[1]` 个
403        """
404
405    @overload
406    def __getitem__(self, __args: Tuple[str, slice]) -> "Message":
407        """
408        参数:
409            __args: 消息段类型和切片
410
411        返回:
412            类型为 `__args[0]` 的消息段切片 `__args[1]`
413        """
414
415    @overload
416    def __getitem__(self, __args: int) -> MessageSegment:
417        """
418        参数:
419            __args: 索引
420
421        返回:
422            第 `__args` 个消息段
423        """
424
425    @overload
426    def __getitem__(self, __args: slice) -> "Message":
427        """
428        参数:
429            __args: 切片
430
431        返回:
432            消息切片 `__args`
433        """
434
435    def __getitem__(
436        self,
437        args: Union[
438            str,
439            Tuple[str, int],
440            Tuple[str, slice],
441            int,
442            slice,
443        ],
444    ) -> Union[MessageSegment, "Message"]:
445        arg1, arg2 = args if isinstance(args, tuple) else (args, None)
446        if isinstance(arg1, int) and arg2 is None:
447            return super().__getitem__(arg1)
448        elif isinstance(arg1, slice) and arg2 is None:
449            return self.__class__(super().__getitem__(arg1))
450        elif isinstance(arg1, str) and arg2 is None:
451            return self.__class__(seg for seg in self if seg.type == arg1)
452        elif isinstance(arg1, str) and isinstance(arg2, int):
453            return [seg for seg in self if seg.type == arg1][arg2]
454        elif isinstance(arg1, str) and isinstance(arg2, slice):
455            return self.__class__([seg for seg in self if seg.type == arg1][arg2])
456        else:
457            raise ValueError(
458                "Incorrect arguments to slice")  # pragma: no cover
459
460    def index(self, value: Union[MessageSegment, str], *args) -> int:
461        if isinstance(value, str):
462            first_segment = next(
463                (seg for seg in self if seg.type == value), None)
464            if first_segment is None:
465                raise ValueError(
466                    f"Segment with type {value} is not in message")
467            return super().index(first_segment, *args)
468        return super().index(value, *args)
469
470    def get(self, type_: str, count: Optional[int] = None) -> "Message":
471        if count is None:
472            return self[type_]
473
474        iterator, filtered = (
475            seg for seg in self if seg.type == type_
476        ), self.__class__()
477        for _ in range(count):
478            seg = next(iterator, None)
479            if seg is None:
480                break
481            filtered.append(seg)
482        return filtered
483
484    def count(self, value: Union[MessageSegment, str]) -> int:
485        return len(self[value]) if isinstance(value, str) else super().count(value)
486
487    def append(self, obj: Union[str, MessageSegment]) -> "Message":
488        """添加一个消息段到消息数组末尾。
489
490        参数:
491            obj: 要添加的消息段
492        """
493        if isinstance(obj, MessageSegment):
494            super().append(obj)
495        elif isinstance(obj, str):
496            self.extend(self._construct(obj))
497        else:
498            raise ValueError(
499                f"Unexpected type: {type(obj)} {obj}")  # pragma: no cover
500        return self
501
502    def extend(self, obj: Union["Message", Iterable[MessageSegment]]) -> "Message":
503        """拼接一个消息数组或多个消息段到消息数组末尾。
504
505        参数:
506            obj: 要添加的消息数组
507        """
508        for segment in obj:
509            self.append(segment)
510        return self
511
512    def copy(self) -> "Message":
513        return deepcopy(self)
514
515    def extract_plain_text(self) -> str:
516        """提取消息内纯文本消息"""
517
518        return "".join(str(seg) for seg in self if seg.is_text())
519
520    @classmethod
521    def get_segment_class(cls) -> Type[MessageSegment]:
522        return MessageSegment
523
524    def __add__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> "Message":
525        return super(Message, self).__add__(
526            MessageSegment.text(other) if isinstance(other, str) else other
527        )
528
529    def __radd__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> "Message":
530        return super(Message, self).__radd__(
531            MessageSegment.text(other) if isinstance(other, str) else other
532        )
533
534    @staticmethod
535    def _construct(
536        msg: Union[str, Mapping, Iterable[Mapping]]
537    ) -> Iterable[MessageSegment]:
538        if isinstance(msg, Mapping):
539            msg = cast(Mapping[str, Any], msg)
540            yield MessageSegment(msg["type"], msg.get("data") or {})
541            return
542        elif isinstance(msg, Iterable) and not isinstance(msg, str):
543            for seg in msg:
544                yield MessageSegment(seg["type"], seg.get("data") or {})
545            return
546        elif isinstance(msg, str):
547
548            def _iter_message(msg: str) -> Iterable[Tuple[str, str]]:
549                text_begin = 0
550                for cqcode in re.finditer(
551                    r"\[CQ:(?P<type>[a-zA-Z0-9-_.]+)"
552                    r"(?P<params>"
553                    r"(?:,[a-zA-Z0-9-_.]+=[^,\]]+)*"
554                    r"),?\]",
555                    msg,
556                ):
557                    yield "text", msg[text_begin: cqcode.pos + cqcode.start()]
558                    text_begin = cqcode.pos + cqcode.end()
559                    yield cqcode.group("type"), cqcode.group("params").lstrip(",")
560                yield "text", msg[text_begin:]
561
562            for type_, data in _iter_message(msg):
563                if type_ == "text":
564                    if data:
565                        # only yield non-empty text segment
566                        yield MessageSegment(type_, {"text": unescape(data)})
567                else:
568                    data = {
569                        k: unescape(v)
570                        for k, v in map(
571                            lambda x: x.split("=", maxsplit=1),
572                            filter(lambda x: x, (x.lstrip()
573                                   for x in data.split(","))),
574                        )
575                    }
576                    yield MessageSegment(type_, data)
577
578    def extract_plain_text(self) -> str:
579        return "".join(seg.data["text"] for seg in self if seg.is_text())
class MessageSegment:
 18class MessageSegment:
 19    """
 20    OneBot v11 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
 21    """
 22
 23    type: str
 24    """消息段类型"""
 25    data: Dict[str, Any] = field(default_factory=dict)
 26    """消息段数据"""
 27
 28    def __len__(self) -> int:
 29        return len(str(self))
 30
 31    def __ne__(self, other: "MessageSegment") -> bool:
 32        return not self == other
 33
 34    def __add__(self, other: Union[str, "MessageSegment", Iterable["MessageSegment"]]) -> "Message":
 35        return self.get_message_class()(self) + other
 36
 37    def __radd__(self, other: Union[str, "MessageSegment", Iterable["MessageSegment"]]) -> "Message":
 38        return self.get_message_class()(other) + self
 39
 40    @classmethod
 41    def __get_validators__(cls):
 42        yield cls._validate
 43
 44    @classmethod
 45    def _validate(cls, value):
 46        if isinstance(value, cls):
 47            return value
 48        if not isinstance(value, dict):
 49            raise ValueError(
 50                f"Expected dict for MessageSegment, got {type(value)}")
 51        return cls(**value)
 52
 53    def get(self, key: str, default: Any = None):
 54        return asdict(self).get(key, default)
 55
 56    def keys(self):
 57        return asdict(self).keys()
 58
 59    def values(self):
 60        return asdict(self).values()
 61
 62    def items(self):
 63        return asdict(self).items()
 64
 65    def copy(self) -> "MessageSegment":
 66        return deepcopy(self)
 67
 68    @classmethod
 69    def get_message_class(cls) -> Type["Message"]:
 70        return Message
 71
 72    def __str__(self) -> str:
 73        type_ = self.type
 74        data = self.data.copy()
 75
 76        # process special types
 77        if type_ == "text":
 78            # type: ignore
 79            return escape(data.get("text", ""), escape_comma=False)
 80
 81        params = ",".join(
 82            [f"{k}={escape(str(v))}" for k, v in data.items() if v is not None]
 83        )
 84        return f"[CQ:{type_}{',' if params else ''}{params}]"
 85
 86    def __add__(self, other) -> "Message":
 87        return Message(self) + (
 88            MessageSegment.text(other) if isinstance(other, str) else other
 89        )
 90
 91    def __radd__(self, other) -> "Message":
 92        return (
 93            MessageSegment.text(other) if isinstance(
 94                other, str) else Message(other)
 95        ) + self
 96
 97    def is_text(self) -> bool:
 98        return self.type == "text"
 99
100    @staticmethod
101    def anonymous(ignore_failure: Optional[bool] = None) -> "MessageSegment":
102        return MessageSegment("anonymous", {"ignore": bool_to_str(ignore_failure)})
103
104    @staticmethod
105    def at(user_id: Union[int, str]) -> "MessageSegment":
106        return MessageSegment("at", {"qq": str(user_id)})
107
108    @staticmethod
109    def contact(type_: str, id: int) -> "MessageSegment":
110        return MessageSegment("contact", {"type": type_, "id": str(id)})
111
112    @staticmethod
113    def contact_group(group_id: int) -> "MessageSegment":
114        return MessageSegment("contact", {"type": "group", "id": str(group_id)})
115
116    @staticmethod
117    def contact_user(user_id: int) -> "MessageSegment":
118        return MessageSegment("contact", {"type": "qq", "id": str(user_id)})
119
120    @staticmethod
121    def dice() -> "MessageSegment":
122        return MessageSegment("dice", {})
123
124    @staticmethod
125    def face(id_: int) -> "MessageSegment":
126        return MessageSegment("face", {"id": str(id_)})
127
128    @staticmethod
129    def forward(id_: str) -> "MessageSegment":
130        logger.warning("Forward Message only can be received!")
131        return MessageSegment("forward", {"id": id_})
132
133    @staticmethod
134    def image(
135        file: Union[str, bytes, BytesIO, Path],
136        type_: Optional[str] = None,
137        cache: bool = True,
138        proxy: bool = True,
139        timeout: Optional[int] = None,
140    ) -> "MessageSegment":
141        if isinstance(file, BytesIO):
142            file = file.getvalue()
143        if isinstance(file, bytes):
144            file = f"base64://{b64encode(file).decode()}"
145        elif isinstance(file, Path):
146            file = f"file:///{file.resolve()}"
147        return MessageSegment(
148            "image",
149            {
150                "file": file,
151                "type": type_,
152                "cache": bool_to_str(cache),
153                "proxy": bool_to_str(proxy),
154                "timeout": timeout,
155            },
156        )
157
158    @staticmethod
159    def json(data: str) -> "MessageSegment":
160        return MessageSegment("json", {"data": data})
161
162    @staticmethod
163    def location(
164        latitude: float,
165        longitude: float,
166        title: Optional[str] = None,
167        content: Optional[str] = None,
168    ) -> "MessageSegment":
169        return MessageSegment(
170            "location",
171            {
172                "lat": str(latitude),
173                "lon": str(longitude),
174                "title": title,
175                "content": content,
176            },
177        )
178
179    @staticmethod
180    def music(type_: str, id_: int) -> "MessageSegment":
181        return MessageSegment("music", {"type": type_, "id": id_})
182
183    @staticmethod
184    def music_custom(
185        url: str,
186        audio: str,
187        title: str,
188        content: Optional[str] = None,
189        img_url: Optional[str] = None,
190    ) -> "MessageSegment":
191        return MessageSegment(
192            "music",
193            {
194                "type": "custom",
195                "url": url,
196                "audio": audio,
197                "title": title,
198                "content": content,
199                "image": img_url,
200            },
201        )
202
203    @staticmethod
204    def node(id_: int) -> "MessageSegment":
205        return MessageSegment("node", {"id": str(id_)})
206
207    @staticmethod
208    def node_custom(
209        user_id: int, nickname: str, content: Union[str, "Message"]
210    ) -> "MessageSegment":
211        return MessageSegment(
212            "node", {"user_id": str(
213                user_id), "nickname": nickname, "content": content}
214        )
215
216    @staticmethod
217    def poke(type_: str, id_: str) -> "MessageSegment":
218        return MessageSegment("poke", {"type": type_, "id": id_})
219
220    @staticmethod
221    def record(
222        file: Union[str, bytes, BytesIO, Path],
223        magic: Optional[bool] = None,
224        cache: Optional[bool] = None,
225        proxy: Optional[bool] = None,
226        timeout: Optional[int] = None,
227    ) -> "MessageSegment":
228        if isinstance(file, BytesIO):
229            file = file.getvalue()
230        if isinstance(file, bytes):
231            file = f"base64://{b64encode(file).decode()}"
232        elif isinstance(file, Path):
233            file = f"file:///{file.resolve()}"
234        return MessageSegment(
235            "record",
236            {
237                "file": file,
238                "magic": bool_to_str(magic),
239                "cache": bool_to_str(cache),
240                "proxy": bool_to_str(proxy),
241                "timeout": timeout,
242            },
243        )
244
245    @staticmethod
246    def reply(id_: int) -> "MessageSegment":
247        return MessageSegment("reply", {"id": str(id_)})
248
249    @staticmethod
250    def rps() -> "MessageSegment":
251        return MessageSegment("rps", {})
252
253    @staticmethod
254    def shake() -> "MessageSegment":
255        return MessageSegment("shake", {})
256
257    @staticmethod
258    def share(
259        url: str = "",
260        title: str = "",
261        content: Optional[str] = None,
262        image: Optional[str] = None,
263    ) -> "MessageSegment":
264        return MessageSegment(
265            "share", {"url": url, "title": title,
266                      "content": content, "image": image}
267        )
268
269    @staticmethod
270    def text(text: str) -> "MessageSegment":
271        return MessageSegment("text", {"text": text})
272
273    @staticmethod
274    def video(
275        file: Union[str, bytes, BytesIO, Path],
276        cache: Optional[bool] = None,
277        proxy: Optional[bool] = None,
278        timeout: Optional[int] = None,
279    ) -> "MessageSegment":
280        if isinstance(file, BytesIO):
281            file = file.getvalue()
282        if isinstance(file, bytes):
283            file = f"base64://{b64encode(file).decode()}"
284        elif isinstance(file, Path):
285            file = f"file:///{file.resolve()}"
286        return MessageSegment(
287            "video",
288            {
289                "file": file,
290                "cache": bool_to_str(cache),
291                "proxy": bool_to_str(proxy),
292                "timeout": timeout,
293            },
294        )
295
296    @staticmethod
297    def xml(data: str) -> "MessageSegment":
298        return MessageSegment("xml", {"data": data})

OneBot v11 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。

MessageSegment(type: str, data: Dict[str, Any] = <factory>)
type: str

消息段类型

data: Dict[str, Any]

消息段数据

def get(self, key: str, default: Any = None):
53    def get(self, key: str, default: Any = None):
54        return asdict(self).get(key, default)
def keys(self):
56    def keys(self):
57        return asdict(self).keys()
def values(self):
59    def values(self):
60        return asdict(self).values()
def items(self):
62    def items(self):
63        return asdict(self).items()
def copy(self) -> ayaka.driver.ayakabot.message.MessageSegment:
65    def copy(self) -> "MessageSegment":
66        return deepcopy(self)
@classmethod
def get_message_class(cls) -> type[ayaka.driver.ayakabot.message.Message]:
68    @classmethod
69    def get_message_class(cls) -> Type["Message"]:
70        return Message
def is_text(self) -> bool:
97    def is_text(self) -> bool:
98        return self.type == "text"
@staticmethod
def anonymous( ignore_failure: Union[bool, NoneType] = None) -> ayaka.driver.ayakabot.message.MessageSegment:
100    @staticmethod
101    def anonymous(ignore_failure: Optional[bool] = None) -> "MessageSegment":
102        return MessageSegment("anonymous", {"ignore": bool_to_str(ignore_failure)})
@staticmethod
def at(user_id: Union[int, str]) -> ayaka.driver.ayakabot.message.MessageSegment:
104    @staticmethod
105    def at(user_id: Union[int, str]) -> "MessageSegment":
106        return MessageSegment("at", {"qq": str(user_id)})
@staticmethod
def contact(type_: str, id: int) -> ayaka.driver.ayakabot.message.MessageSegment:
108    @staticmethod
109    def contact(type_: str, id: int) -> "MessageSegment":
110        return MessageSegment("contact", {"type": type_, "id": str(id)})
@staticmethod
def contact_group(group_id: int) -> ayaka.driver.ayakabot.message.MessageSegment:
112    @staticmethod
113    def contact_group(group_id: int) -> "MessageSegment":
114        return MessageSegment("contact", {"type": "group", "id": str(group_id)})
@staticmethod
def contact_user(user_id: int) -> ayaka.driver.ayakabot.message.MessageSegment:
116    @staticmethod
117    def contact_user(user_id: int) -> "MessageSegment":
118        return MessageSegment("contact", {"type": "qq", "id": str(user_id)})
@staticmethod
def dice() -> ayaka.driver.ayakabot.message.MessageSegment:
120    @staticmethod
121    def dice() -> "MessageSegment":
122        return MessageSegment("dice", {})
@staticmethod
def face(id_: int) -> ayaka.driver.ayakabot.message.MessageSegment:
124    @staticmethod
125    def face(id_: int) -> "MessageSegment":
126        return MessageSegment("face", {"id": str(id_)})
@staticmethod
def forward(id_: str) -> ayaka.driver.ayakabot.message.MessageSegment:
128    @staticmethod
129    def forward(id_: str) -> "MessageSegment":
130        logger.warning("Forward Message only can be received!")
131        return MessageSegment("forward", {"id": id_})
@staticmethod
def image( file: Union[str, bytes, _io.BytesIO, pathlib.Path], type_: Union[str, NoneType] = None, cache: bool = True, proxy: bool = True, timeout: Union[int, NoneType] = None) -> ayaka.driver.ayakabot.message.MessageSegment:
133    @staticmethod
134    def image(
135        file: Union[str, bytes, BytesIO, Path],
136        type_: Optional[str] = None,
137        cache: bool = True,
138        proxy: bool = True,
139        timeout: Optional[int] = None,
140    ) -> "MessageSegment":
141        if isinstance(file, BytesIO):
142            file = file.getvalue()
143        if isinstance(file, bytes):
144            file = f"base64://{b64encode(file).decode()}"
145        elif isinstance(file, Path):
146            file = f"file:///{file.resolve()}"
147        return MessageSegment(
148            "image",
149            {
150                "file": file,
151                "type": type_,
152                "cache": bool_to_str(cache),
153                "proxy": bool_to_str(proxy),
154                "timeout": timeout,
155            },
156        )
@staticmethod
def json(data: str) -> ayaka.driver.ayakabot.message.MessageSegment:
158    @staticmethod
159    def json(data: str) -> "MessageSegment":
160        return MessageSegment("json", {"data": data})
@staticmethod
def location( latitude: float, longitude: float, title: Union[str, NoneType] = None, content: Union[str, NoneType] = None) -> ayaka.driver.ayakabot.message.MessageSegment:
162    @staticmethod
163    def location(
164        latitude: float,
165        longitude: float,
166        title: Optional[str] = None,
167        content: Optional[str] = None,
168    ) -> "MessageSegment":
169        return MessageSegment(
170            "location",
171            {
172                "lat": str(latitude),
173                "lon": str(longitude),
174                "title": title,
175                "content": content,
176            },
177        )
@staticmethod
def music(type_: str, id_: int) -> ayaka.driver.ayakabot.message.MessageSegment:
179    @staticmethod
180    def music(type_: str, id_: int) -> "MessageSegment":
181        return MessageSegment("music", {"type": type_, "id": id_})
@staticmethod
def music_custom( url: str, audio: str, title: str, content: Union[str, NoneType] = None, img_url: Union[str, NoneType] = None) -> ayaka.driver.ayakabot.message.MessageSegment:
183    @staticmethod
184    def music_custom(
185        url: str,
186        audio: str,
187        title: str,
188        content: Optional[str] = None,
189        img_url: Optional[str] = None,
190    ) -> "MessageSegment":
191        return MessageSegment(
192            "music",
193            {
194                "type": "custom",
195                "url": url,
196                "audio": audio,
197                "title": title,
198                "content": content,
199                "image": img_url,
200            },
201        )
@staticmethod
def node(id_: int) -> ayaka.driver.ayakabot.message.MessageSegment:
203    @staticmethod
204    def node(id_: int) -> "MessageSegment":
205        return MessageSegment("node", {"id": str(id_)})
@staticmethod
def node_custom( user_id: int, nickname: str, content: Union[str, ayaka.driver.ayakabot.message.Message]) -> ayaka.driver.ayakabot.message.MessageSegment:
207    @staticmethod
208    def node_custom(
209        user_id: int, nickname: str, content: Union[str, "Message"]
210    ) -> "MessageSegment":
211        return MessageSegment(
212            "node", {"user_id": str(
213                user_id), "nickname": nickname, "content": content}
214        )
@staticmethod
def poke(type_: str, id_: str) -> ayaka.driver.ayakabot.message.MessageSegment:
216    @staticmethod
217    def poke(type_: str, id_: str) -> "MessageSegment":
218        return MessageSegment("poke", {"type": type_, "id": id_})
@staticmethod
def record( file: Union[str, bytes, _io.BytesIO, pathlib.Path], magic: Union[bool, NoneType] = None, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) -> ayaka.driver.ayakabot.message.MessageSegment:
220    @staticmethod
221    def record(
222        file: Union[str, bytes, BytesIO, Path],
223        magic: Optional[bool] = None,
224        cache: Optional[bool] = None,
225        proxy: Optional[bool] = None,
226        timeout: Optional[int] = None,
227    ) -> "MessageSegment":
228        if isinstance(file, BytesIO):
229            file = file.getvalue()
230        if isinstance(file, bytes):
231            file = f"base64://{b64encode(file).decode()}"
232        elif isinstance(file, Path):
233            file = f"file:///{file.resolve()}"
234        return MessageSegment(
235            "record",
236            {
237                "file": file,
238                "magic": bool_to_str(magic),
239                "cache": bool_to_str(cache),
240                "proxy": bool_to_str(proxy),
241                "timeout": timeout,
242            },
243        )
@staticmethod
def reply(id_: int) -> ayaka.driver.ayakabot.message.MessageSegment:
245    @staticmethod
246    def reply(id_: int) -> "MessageSegment":
247        return MessageSegment("reply", {"id": str(id_)})
@staticmethod
def rps() -> ayaka.driver.ayakabot.message.MessageSegment:
249    @staticmethod
250    def rps() -> "MessageSegment":
251        return MessageSegment("rps", {})
@staticmethod
def shake() -> ayaka.driver.ayakabot.message.MessageSegment:
253    @staticmethod
254    def shake() -> "MessageSegment":
255        return MessageSegment("shake", {})
@staticmethod
def share( url: str = '', title: str = '', content: Union[str, NoneType] = None, image: Union[str, NoneType] = None) -> ayaka.driver.ayakabot.message.MessageSegment:
257    @staticmethod
258    def share(
259        url: str = "",
260        title: str = "",
261        content: Optional[str] = None,
262        image: Optional[str] = None,
263    ) -> "MessageSegment":
264        return MessageSegment(
265            "share", {"url": url, "title": title,
266                      "content": content, "image": image}
267        )
@staticmethod
def text(text: str) -> ayaka.driver.ayakabot.message.MessageSegment:
269    @staticmethod
270    def text(text: str) -> "MessageSegment":
271        return MessageSegment("text", {"text": text})
@staticmethod
def video( file: Union[str, bytes, _io.BytesIO, pathlib.Path], cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) -> ayaka.driver.ayakabot.message.MessageSegment:
273    @staticmethod
274    def video(
275        file: Union[str, bytes, BytesIO, Path],
276        cache: Optional[bool] = None,
277        proxy: Optional[bool] = None,
278        timeout: Optional[int] = None,
279    ) -> "MessageSegment":
280        if isinstance(file, BytesIO):
281            file = file.getvalue()
282        if isinstance(file, bytes):
283            file = f"base64://{b64encode(file).decode()}"
284        elif isinstance(file, Path):
285            file = f"file:///{file.resolve()}"
286        return MessageSegment(
287            "video",
288            {
289                "file": file,
290                "cache": bool_to_str(cache),
291                "proxy": bool_to_str(proxy),
292                "timeout": timeout,
293            },
294        )
@staticmethod
def xml(data: str) -> ayaka.driver.ayakabot.message.MessageSegment:
296    @staticmethod
297    def xml(data: str) -> "MessageSegment":
298        return MessageSegment("xml", {"data": data})
class Message(typing.List[ayaka.driver.ayakabot.message.MessageSegment]):
301class Message(List[MessageSegment]):
302    """
303    OneBot v11 协议 Message 适配。
304    """
305
306    def __init__(
307        self,
308        message: Union[str, None, Iterable[MessageSegment],
309                       MessageSegment] = None,
310    ):
311        super().__init__()
312        if message is None:
313            return
314        elif isinstance(message, str):
315            self.extend(self._construct(message))
316        elif isinstance(message, MessageSegment):
317            self.append(message)
318        elif isinstance(message, Iterable):
319            self.extend(message)
320        else:
321            self.extend(self._construct(message))  # pragma: no cover
322
323    @classmethod
324    def template(cls: Type["Message"], format_string: Union[str, "Message"]) -> MessageTemplate["Message"]:
325        """创建消息模板。
326
327        用法和 `str.format` 大致相同, 但是可以输出消息对象, 并且支持以 `Message` 对象作为消息模板
328
329        并且提供了拓展的格式化控制符, 可以用适用于该消息类型的 `MessageSegment` 的工厂方法创建消息
330
331        参数:
332            format_string: 格式化模板
333
334        返回:
335            消息格式化器
336        """
337        return MessageTemplate(format_string, cls)
338
339    def __str__(self) -> str:
340        return "".join(str(seg) for seg in self)
341
342    @classmethod
343    def __get_validators__(cls):
344        yield cls._validate
345
346    @classmethod
347    def _validate(cls, value):
348        if isinstance(value, cls):
349            return value
350        elif isinstance(value, Message):
351            raise ValueError(
352                f"Type {type(value)} can not be converted to {cls}")
353        elif isinstance(value, str):
354            pass
355        elif isinstance(value, dict):
356            value = parse_obj_as(cls.get_segment_class(), value)
357        elif isinstance(value, Iterable):
358            value = [parse_obj_as(cls.get_segment_class(), v) for v in value]
359        else:
360            raise ValueError(
361                f"Expected str, dict or iterable for Message, got {type(value)}"
362            )
363        return cls(value)
364
365    def __add__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message":
366        result = self.copy()
367        result += other
368        return result
369
370    def __radd__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message":
371        result = self.__class__(other)
372        return result + self
373
374    def __iadd__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message":
375        if isinstance(other, str):
376            self.extend(self._construct(other))
377        elif isinstance(other, MessageSegment):
378            self.append(other)
379        elif isinstance(other, Iterable):
380            self.extend(other)
381        else:
382            raise ValueError(
383                f"Unsupported type: {type(other)}")  # pragma: no cover
384        return self
385
386    @overload
387    def __getitem__(self, __args: str) -> "Message":
388        """
389        参数:
390            __args: 消息段类型
391
392        返回:
393            所有类型为 `__args` 的消息段
394        """
395
396    @overload
397    def __getitem__(self, __args: Tuple[str, int]) -> MessageSegment:
398        """
399        参数:
400            __args: 消息段类型和索引
401
402        返回:
403            类型为 `__args[0]` 的消息段第 `__args[1]` 个
404        """
405
406    @overload
407    def __getitem__(self, __args: Tuple[str, slice]) -> "Message":
408        """
409        参数:
410            __args: 消息段类型和切片
411
412        返回:
413            类型为 `__args[0]` 的消息段切片 `__args[1]`
414        """
415
416    @overload
417    def __getitem__(self, __args: int) -> MessageSegment:
418        """
419        参数:
420            __args: 索引
421
422        返回:
423            第 `__args` 个消息段
424        """
425
426    @overload
427    def __getitem__(self, __args: slice) -> "Message":
428        """
429        参数:
430            __args: 切片
431
432        返回:
433            消息切片 `__args`
434        """
435
436    def __getitem__(
437        self,
438        args: Union[
439            str,
440            Tuple[str, int],
441            Tuple[str, slice],
442            int,
443            slice,
444        ],
445    ) -> Union[MessageSegment, "Message"]:
446        arg1, arg2 = args if isinstance(args, tuple) else (args, None)
447        if isinstance(arg1, int) and arg2 is None:
448            return super().__getitem__(arg1)
449        elif isinstance(arg1, slice) and arg2 is None:
450            return self.__class__(super().__getitem__(arg1))
451        elif isinstance(arg1, str) and arg2 is None:
452            return self.__class__(seg for seg in self if seg.type == arg1)
453        elif isinstance(arg1, str) and isinstance(arg2, int):
454            return [seg for seg in self if seg.type == arg1][arg2]
455        elif isinstance(arg1, str) and isinstance(arg2, slice):
456            return self.__class__([seg for seg in self if seg.type == arg1][arg2])
457        else:
458            raise ValueError(
459                "Incorrect arguments to slice")  # pragma: no cover
460
461    def index(self, value: Union[MessageSegment, str], *args) -> int:
462        if isinstance(value, str):
463            first_segment = next(
464                (seg for seg in self if seg.type == value), None)
465            if first_segment is None:
466                raise ValueError(
467                    f"Segment with type {value} is not in message")
468            return super().index(first_segment, *args)
469        return super().index(value, *args)
470
471    def get(self, type_: str, count: Optional[int] = None) -> "Message":
472        if count is None:
473            return self[type_]
474
475        iterator, filtered = (
476            seg for seg in self if seg.type == type_
477        ), self.__class__()
478        for _ in range(count):
479            seg = next(iterator, None)
480            if seg is None:
481                break
482            filtered.append(seg)
483        return filtered
484
485    def count(self, value: Union[MessageSegment, str]) -> int:
486        return len(self[value]) if isinstance(value, str) else super().count(value)
487
488    def append(self, obj: Union[str, MessageSegment]) -> "Message":
489        """添加一个消息段到消息数组末尾。
490
491        参数:
492            obj: 要添加的消息段
493        """
494        if isinstance(obj, MessageSegment):
495            super().append(obj)
496        elif isinstance(obj, str):
497            self.extend(self._construct(obj))
498        else:
499            raise ValueError(
500                f"Unexpected type: {type(obj)} {obj}")  # pragma: no cover
501        return self
502
503    def extend(self, obj: Union["Message", Iterable[MessageSegment]]) -> "Message":
504        """拼接一个消息数组或多个消息段到消息数组末尾。
505
506        参数:
507            obj: 要添加的消息数组
508        """
509        for segment in obj:
510            self.append(segment)
511        return self
512
513    def copy(self) -> "Message":
514        return deepcopy(self)
515
516    def extract_plain_text(self) -> str:
517        """提取消息内纯文本消息"""
518
519        return "".join(str(seg) for seg in self if seg.is_text())
520
521    @classmethod
522    def get_segment_class(cls) -> Type[MessageSegment]:
523        return MessageSegment
524
525    def __add__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> "Message":
526        return super(Message, self).__add__(
527            MessageSegment.text(other) if isinstance(other, str) else other
528        )
529
530    def __radd__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> "Message":
531        return super(Message, self).__radd__(
532            MessageSegment.text(other) if isinstance(other, str) else other
533        )
534
535    @staticmethod
536    def _construct(
537        msg: Union[str, Mapping, Iterable[Mapping]]
538    ) -> Iterable[MessageSegment]:
539        if isinstance(msg, Mapping):
540            msg = cast(Mapping[str, Any], msg)
541            yield MessageSegment(msg["type"], msg.get("data") or {})
542            return
543        elif isinstance(msg, Iterable) and not isinstance(msg, str):
544            for seg in msg:
545                yield MessageSegment(seg["type"], seg.get("data") or {})
546            return
547        elif isinstance(msg, str):
548
549            def _iter_message(msg: str) -> Iterable[Tuple[str, str]]:
550                text_begin = 0
551                for cqcode in re.finditer(
552                    r"\[CQ:(?P<type>[a-zA-Z0-9-_.]+)"
553                    r"(?P<params>"
554                    r"(?:,[a-zA-Z0-9-_.]+=[^,\]]+)*"
555                    r"),?\]",
556                    msg,
557                ):
558                    yield "text", msg[text_begin: cqcode.pos + cqcode.start()]
559                    text_begin = cqcode.pos + cqcode.end()
560                    yield cqcode.group("type"), cqcode.group("params").lstrip(",")
561                yield "text", msg[text_begin:]
562
563            for type_, data in _iter_message(msg):
564                if type_ == "text":
565                    if data:
566                        # only yield non-empty text segment
567                        yield MessageSegment(type_, {"text": unescape(data)})
568                else:
569                    data = {
570                        k: unescape(v)
571                        for k, v in map(
572                            lambda x: x.split("=", maxsplit=1),
573                            filter(lambda x: x, (x.lstrip()
574                                   for x in data.split(","))),
575                        )
576                    }
577                    yield MessageSegment(type_, data)
578
579    def extract_plain_text(self) -> str:
580        return "".join(seg.data["text"] for seg in self if seg.is_text())

OneBot v11 协议 Message 适配。

Message( message: Union[str, NoneType, Iterable[ayaka.driver.ayakabot.message.MessageSegment], ayaka.driver.ayakabot.message.MessageSegment] = None)
306    def __init__(
307        self,
308        message: Union[str, None, Iterable[MessageSegment],
309                       MessageSegment] = None,
310    ):
311        super().__init__()
312        if message is None:
313            return
314        elif isinstance(message, str):
315            self.extend(self._construct(message))
316        elif isinstance(message, MessageSegment):
317            self.append(message)
318        elif isinstance(message, Iterable):
319            self.extend(message)
320        else:
321            self.extend(self._construct(message))  # pragma: no cover
323    @classmethod
324    def template(cls: Type["Message"], format_string: Union[str, "Message"]) -> MessageTemplate["Message"]:
325        """创建消息模板。
326
327        用法和 `str.format` 大致相同, 但是可以输出消息对象, 并且支持以 `Message` 对象作为消息模板
328
329        并且提供了拓展的格式化控制符, 可以用适用于该消息类型的 `MessageSegment` 的工厂方法创建消息
330
331        参数:
332            format_string: 格式化模板
333
334        返回:
335            消息格式化器
336        """
337        return MessageTemplate(format_string, cls)

创建消息模板。

用法和 str.format 大致相同, 但是可以输出消息对象, 并且支持以 Message 对象作为消息模板

并且提供了拓展的格式化控制符, 可以用适用于该消息类型的 MessageSegment 的工厂方法创建消息

参数: format_string: 格式化模板

返回: 消息格式化器

def index( self, value: Union[ayaka.driver.ayakabot.message.MessageSegment, str], *args) -> int:
461    def index(self, value: Union[MessageSegment, str], *args) -> int:
462        if isinstance(value, str):
463            first_segment = next(
464                (seg for seg in self if seg.type == value), None)
465            if first_segment is None:
466                raise ValueError(
467                    f"Segment with type {value} is not in message")
468            return super().index(first_segment, *args)
469        return super().index(value, *args)

Return first index of value.

Raises ValueError if the value is not present.

def get( self, type_: str, count: Union[int, NoneType] = None) -> ayaka.driver.ayakabot.message.Message:
471    def get(self, type_: str, count: Optional[int] = None) -> "Message":
472        if count is None:
473            return self[type_]
474
475        iterator, filtered = (
476            seg for seg in self if seg.type == type_
477        ), self.__class__()
478        for _ in range(count):
479            seg = next(iterator, None)
480            if seg is None:
481                break
482            filtered.append(seg)
483        return filtered
def count( self, value: Union[ayaka.driver.ayakabot.message.MessageSegment, str]) -> int:
485    def count(self, value: Union[MessageSegment, str]) -> int:
486        return len(self[value]) if isinstance(value, str) else super().count(value)

Return number of occurrences of value.

def append( self, obj: Union[str, ayaka.driver.ayakabot.message.MessageSegment]) -> ayaka.driver.ayakabot.message.Message:
488    def append(self, obj: Union[str, MessageSegment]) -> "Message":
489        """添加一个消息段到消息数组末尾。
490
491        参数:
492            obj: 要添加的消息段
493        """
494        if isinstance(obj, MessageSegment):
495            super().append(obj)
496        elif isinstance(obj, str):
497            self.extend(self._construct(obj))
498        else:
499            raise ValueError(
500                f"Unexpected type: {type(obj)} {obj}")  # pragma: no cover
501        return self

添加一个消息段到消息数组末尾。

参数: obj: 要添加的消息段

503    def extend(self, obj: Union["Message", Iterable[MessageSegment]]) -> "Message":
504        """拼接一个消息数组或多个消息段到消息数组末尾。
505
506        参数:
507            obj: 要添加的消息数组
508        """
509        for segment in obj:
510            self.append(segment)
511        return self

拼接一个消息数组或多个消息段到消息数组末尾。

参数: obj: 要添加的消息数组

def copy(self) -> ayaka.driver.ayakabot.message.Message:
513    def copy(self) -> "Message":
514        return deepcopy(self)

Return a shallow copy of the list.

def extract_plain_text(self) -> str:
579    def extract_plain_text(self) -> str:
580        return "".join(seg.data["text"] for seg in self if seg.is_text())

提取消息内纯文本消息

@classmethod
def get_segment_class(cls) -> Type[ayaka.driver.ayakabot.message.MessageSegment]:
521    @classmethod
522    def get_segment_class(cls) -> Type[MessageSegment]:
523        return MessageSegment
Inherited Members
builtins.list
clear
insert
pop
remove
reverse
sort