ayaka.driver.ayakabot_driver.message
1import re 2from io import BytesIO 3from pathlib import Path 4from base64 import b64encode 5from typing import Any, Dict, List, Type, Tuple, Union, Mapping, Iterable, Optional, cast, overload 6from copy import deepcopy 7from dataclasses import field, asdict, dataclass 8from pydantic import parse_obj_as 9 10 11from loguru import logger 12from .utils import bool_to_str, escape, unescape 13from .template import MessageTemplate 14 15 16@dataclass 17class MessageSegment: 18 """ 19 OneBot v11 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。 20 """ 21 22 type: str 23 """消息段类型""" 24 data: Dict[str, Any] = field(default_factory=dict) 25 """消息段数据""" 26 27 def __len__(self) -> int: 28 return len(str(self)) 29 30 def __ne__(self, other: "MessageSegment") -> bool: 31 return not self == other 32 33 def __add__(self, other: Union[str, "MessageSegment", Iterable["MessageSegment"]]) -> "Message": 34 return self.get_message_class()(self) + other 35 36 def __radd__(self, other: Union[str, "MessageSegment", Iterable["MessageSegment"]]) -> "Message": 37 return self.get_message_class()(other) + self 38 39 @classmethod 40 def __get_validators__(cls): 41 yield cls._validate 42 43 @classmethod 44 def _validate(cls, value): 45 if isinstance(value, cls): 46 return value 47 if not isinstance(value, dict): 48 raise ValueError( 49 f"Expected dict for MessageSegment, got {type(value)}") 50 return cls(**value) 51 52 def get(self, key: str, default: Any = None): 53 return asdict(self).get(key, default) 54 55 def keys(self): 56 return asdict(self).keys() 57 58 def values(self): 59 return asdict(self).values() 60 61 def items(self): 62 return asdict(self).items() 63 64 def copy(self) -> "MessageSegment": 65 return deepcopy(self) 66 67 @classmethod 68 def get_message_class(cls) -> Type["Message"]: 69 return Message 70 71 def __str__(self) -> str: 72 type_ = self.type 73 data = self.data.copy() 74 75 # process special types 76 if type_ == "text": 77 # type: ignore 78 return escape(data.get("text", ""), escape_comma=False) 79 80 params = ",".join( 81 [f"{k}={escape(str(v))}" for k, v in data.items() if v is not None] 82 ) 83 return f"[CQ:{type_}{',' if params else ''}{params}]" 84 85 def __add__(self, other) -> "Message": 86 return Message(self) + ( 87 MessageSegment.text(other) if isinstance(other, str) else other 88 ) 89 90 def __radd__(self, other) -> "Message": 91 return ( 92 MessageSegment.text(other) if isinstance( 93 other, str) else Message(other) 94 ) + self 95 96 def is_text(self) -> bool: 97 return self.type == "text" 98 99 @staticmethod 100 def anonymous(ignore_failure: Optional[bool] = None) -> "MessageSegment": 101 return MessageSegment("anonymous", {"ignore": bool_to_str(ignore_failure)}) 102 103 @staticmethod 104 def at(user_id: Union[int, str]) -> "MessageSegment": 105 return MessageSegment("at", {"qq": str(user_id)}) 106 107 @staticmethod 108 def contact(type_: str, id: int) -> "MessageSegment": 109 return MessageSegment("contact", {"type": type_, "id": str(id)}) 110 111 @staticmethod 112 def contact_group(group_id: int) -> "MessageSegment": 113 return MessageSegment("contact", {"type": "group", "id": str(group_id)}) 114 115 @staticmethod 116 def contact_user(user_id: int) -> "MessageSegment": 117 return MessageSegment("contact", {"type": "qq", "id": str(user_id)}) 118 119 @staticmethod 120 def dice() -> "MessageSegment": 121 return MessageSegment("dice", {}) 122 123 @staticmethod 124 def face(id_: int) -> "MessageSegment": 125 return MessageSegment("face", {"id": str(id_)}) 126 127 @staticmethod 128 def forward(id_: str) -> "MessageSegment": 129 logger.warning("Forward Message only can be received!") 130 return MessageSegment("forward", {"id": id_}) 131 132 @staticmethod 133 def image( 134 file: Union[str, bytes, BytesIO, Path], 135 type_: Optional[str] = None, 136 cache: bool = True, 137 proxy: bool = True, 138 timeout: Optional[int] = None, 139 ) -> "MessageSegment": 140 if isinstance(file, BytesIO): 141 file = file.getvalue() 142 if isinstance(file, bytes): 143 file = f"base64://{b64encode(file).decode()}" 144 elif isinstance(file, Path): 145 file = f"file:///{file.resolve()}" 146 return MessageSegment( 147 "image", 148 { 149 "file": file, 150 "type": type_, 151 "cache": bool_to_str(cache), 152 "proxy": bool_to_str(proxy), 153 "timeout": timeout, 154 }, 155 ) 156 157 @staticmethod 158 def json(data: str) -> "MessageSegment": 159 return MessageSegment("json", {"data": data}) 160 161 @staticmethod 162 def location( 163 latitude: float, 164 longitude: float, 165 title: Optional[str] = None, 166 content: Optional[str] = None, 167 ) -> "MessageSegment": 168 return MessageSegment( 169 "location", 170 { 171 "lat": str(latitude), 172 "lon": str(longitude), 173 "title": title, 174 "content": content, 175 }, 176 ) 177 178 @staticmethod 179 def music(type_: str, id_: int) -> "MessageSegment": 180 return MessageSegment("music", {"type": type_, "id": id_}) 181 182 @staticmethod 183 def music_custom( 184 url: str, 185 audio: str, 186 title: str, 187 content: Optional[str] = None, 188 img_url: Optional[str] = None, 189 ) -> "MessageSegment": 190 return MessageSegment( 191 "music", 192 { 193 "type": "custom", 194 "url": url, 195 "audio": audio, 196 "title": title, 197 "content": content, 198 "image": img_url, 199 }, 200 ) 201 202 @staticmethod 203 def node(id_: int) -> "MessageSegment": 204 return MessageSegment("node", {"id": str(id_)}) 205 206 @staticmethod 207 def node_custom( 208 user_id: int, nickname: str, content: Union[str, "Message"] 209 ) -> "MessageSegment": 210 return MessageSegment( 211 "node", {"user_id": str( 212 user_id), "nickname": nickname, "content": content} 213 ) 214 215 @staticmethod 216 def poke(type_: str, id_: str) -> "MessageSegment": 217 return MessageSegment("poke", {"type": type_, "id": id_}) 218 219 @staticmethod 220 def record( 221 file: Union[str, bytes, BytesIO, Path], 222 magic: Optional[bool] = None, 223 cache: Optional[bool] = None, 224 proxy: Optional[bool] = None, 225 timeout: Optional[int] = None, 226 ) -> "MessageSegment": 227 if isinstance(file, BytesIO): 228 file = file.getvalue() 229 if isinstance(file, bytes): 230 file = f"base64://{b64encode(file).decode()}" 231 elif isinstance(file, Path): 232 file = f"file:///{file.resolve()}" 233 return MessageSegment( 234 "record", 235 { 236 "file": file, 237 "magic": bool_to_str(magic), 238 "cache": bool_to_str(cache), 239 "proxy": bool_to_str(proxy), 240 "timeout": timeout, 241 }, 242 ) 243 244 @staticmethod 245 def reply(id_: int) -> "MessageSegment": 246 return MessageSegment("reply", {"id": str(id_)}) 247 248 @staticmethod 249 def rps() -> "MessageSegment": 250 return MessageSegment("rps", {}) 251 252 @staticmethod 253 def shake() -> "MessageSegment": 254 return MessageSegment("shake", {}) 255 256 @staticmethod 257 def share( 258 url: str = "", 259 title: str = "", 260 content: Optional[str] = None, 261 image: Optional[str] = None, 262 ) -> "MessageSegment": 263 return MessageSegment( 264 "share", {"url": url, "title": title, 265 "content": content, "image": image} 266 ) 267 268 @staticmethod 269 def text(text: str) -> "MessageSegment": 270 return MessageSegment("text", {"text": text}) 271 272 @staticmethod 273 def video( 274 file: Union[str, bytes, BytesIO, Path], 275 cache: Optional[bool] = None, 276 proxy: Optional[bool] = None, 277 timeout: Optional[int] = None, 278 ) -> "MessageSegment": 279 if isinstance(file, BytesIO): 280 file = file.getvalue() 281 if isinstance(file, bytes): 282 file = f"base64://{b64encode(file).decode()}" 283 elif isinstance(file, Path): 284 file = f"file:///{file.resolve()}" 285 return MessageSegment( 286 "video", 287 { 288 "file": file, 289 "cache": bool_to_str(cache), 290 "proxy": bool_to_str(proxy), 291 "timeout": timeout, 292 }, 293 ) 294 295 @staticmethod 296 def xml(data: str) -> "MessageSegment": 297 return MessageSegment("xml", {"data": data}) 298 299 300class Message(List[MessageSegment]): 301 """ 302 OneBot v11 协议 Message 适配。 303 """ 304 305 def __init__( 306 self, 307 message: Union[str, None, Iterable[MessageSegment], 308 MessageSegment] = None, 309 ): 310 super().__init__() 311 if message is None: 312 return 313 elif isinstance(message, str): 314 self.extend(self._construct(message)) 315 elif isinstance(message, MessageSegment): 316 self.append(message) 317 elif isinstance(message, Iterable): 318 self.extend(message) 319 else: 320 self.extend(self._construct(message)) # pragma: no cover 321 322 @classmethod 323 def template(cls: Type["Message"], format_string: Union[str, "Message"]) -> MessageTemplate["Message"]: 324 """创建消息模板。 325 326 用法和 `str.format` 大致相同, 但是可以输出消息对象, 并且支持以 `Message` 对象作为消息模板 327 328 并且提供了拓展的格式化控制符, 可以用适用于该消息类型的 `MessageSegment` 的工厂方法创建消息 329 330 参数: 331 format_string: 格式化模板 332 333 返回: 334 消息格式化器 335 """ 336 return MessageTemplate(format_string, cls) 337 338 def __str__(self) -> str: 339 return "".join(str(seg) for seg in self) 340 341 @classmethod 342 def __get_validators__(cls): 343 yield cls._validate 344 345 @classmethod 346 def _validate(cls, value): 347 if isinstance(value, cls): 348 return value 349 elif isinstance(value, Message): 350 raise ValueError( 351 f"Type {type(value)} can not be converted to {cls}") 352 elif isinstance(value, str): 353 pass 354 elif isinstance(value, dict): 355 value = parse_obj_as(cls.get_segment_class(), value) 356 elif isinstance(value, Iterable): 357 value = [parse_obj_as(cls.get_segment_class(), v) for v in value] 358 else: 359 raise ValueError( 360 f"Expected str, dict or iterable for Message, got {type(value)}" 361 ) 362 return cls(value) 363 364 def __add__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message": 365 result = self.copy() 366 result += other 367 return result 368 369 def __radd__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message": 370 result = self.__class__(other) 371 return result + self 372 373 def __iadd__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message": 374 if isinstance(other, str): 375 self.extend(self._construct(other)) 376 elif isinstance(other, MessageSegment): 377 self.append(other) 378 elif isinstance(other, Iterable): 379 self.extend(other) 380 else: 381 raise ValueError( 382 f"Unsupported type: {type(other)}") # pragma: no cover 383 return self 384 385 @overload 386 def __getitem__(self, __args: str) -> "Message": 387 """ 388 参数: 389 __args: 消息段类型 390 391 返回: 392 所有类型为 `__args` 的消息段 393 """ 394 395 @overload 396 def __getitem__(self, __args: Tuple[str, int]) -> MessageSegment: 397 """ 398 参数: 399 __args: 消息段类型和索引 400 401 返回: 402 类型为 `__args[0]` 的消息段第 `__args[1]` 个 403 """ 404 405 @overload 406 def __getitem__(self, __args: Tuple[str, slice]) -> "Message": 407 """ 408 参数: 409 __args: 消息段类型和切片 410 411 返回: 412 类型为 `__args[0]` 的消息段切片 `__args[1]` 413 """ 414 415 @overload 416 def __getitem__(self, __args: int) -> MessageSegment: 417 """ 418 参数: 419 __args: 索引 420 421 返回: 422 第 `__args` 个消息段 423 """ 424 425 @overload 426 def __getitem__(self, __args: slice) -> "Message": 427 """ 428 参数: 429 __args: 切片 430 431 返回: 432 消息切片 `__args` 433 """ 434 435 def __getitem__( 436 self, 437 args: Union[ 438 str, 439 Tuple[str, int], 440 Tuple[str, slice], 441 int, 442 slice, 443 ], 444 ) -> Union[MessageSegment, "Message"]: 445 arg1, arg2 = args if isinstance(args, tuple) else (args, None) 446 if isinstance(arg1, int) and arg2 is None: 447 return super().__getitem__(arg1) 448 elif isinstance(arg1, slice) and arg2 is None: 449 return self.__class__(super().__getitem__(arg1)) 450 elif isinstance(arg1, str) and arg2 is None: 451 return self.__class__(seg for seg in self if seg.type == arg1) 452 elif isinstance(arg1, str) and isinstance(arg2, int): 453 return [seg for seg in self if seg.type == arg1][arg2] 454 elif isinstance(arg1, str) and isinstance(arg2, slice): 455 return self.__class__([seg for seg in self if seg.type == arg1][arg2]) 456 else: 457 raise ValueError( 458 "Incorrect arguments to slice") # pragma: no cover 459 460 def index(self, value: Union[MessageSegment, str], *args) -> int: 461 if isinstance(value, str): 462 first_segment = next( 463 (seg for seg in self if seg.type == value), None) 464 if first_segment is None: 465 raise ValueError( 466 f"Segment with type {value} is not in message") 467 return super().index(first_segment, *args) 468 return super().index(value, *args) 469 470 def get(self, type_: str, count: Optional[int] = None) -> "Message": 471 if count is None: 472 return self[type_] 473 474 iterator, filtered = ( 475 seg for seg in self if seg.type == type_ 476 ), self.__class__() 477 for _ in range(count): 478 seg = next(iterator, None) 479 if seg is None: 480 break 481 filtered.append(seg) 482 return filtered 483 484 def count(self, value: Union[MessageSegment, str]) -> int: 485 return len(self[value]) if isinstance(value, str) else super().count(value) 486 487 def append(self, obj: Union[str, MessageSegment]) -> "Message": 488 """添加一个消息段到消息数组末尾。 489 490 参数: 491 obj: 要添加的消息段 492 """ 493 if isinstance(obj, MessageSegment): 494 super().append(obj) 495 elif isinstance(obj, str): 496 self.extend(self._construct(obj)) 497 else: 498 raise ValueError( 499 f"Unexpected type: {type(obj)} {obj}") # pragma: no cover 500 return self 501 502 def extend(self, obj: Union["Message", Iterable[MessageSegment]]) -> "Message": 503 """拼接一个消息数组或多个消息段到消息数组末尾。 504 505 参数: 506 obj: 要添加的消息数组 507 """ 508 for segment in obj: 509 self.append(segment) 510 return self 511 512 def copy(self) -> "Message": 513 return deepcopy(self) 514 515 def extract_plain_text(self) -> str: 516 """提取消息内纯文本消息""" 517 518 return "".join(str(seg) for seg in self if seg.is_text()) 519 520 @classmethod 521 def get_segment_class(cls) -> Type[MessageSegment]: 522 return MessageSegment 523 524 def __add__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> "Message": 525 return super(Message, self).__add__( 526 MessageSegment.text(other) if isinstance(other, str) else other 527 ) 528 529 def __radd__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> "Message": 530 return super(Message, self).__radd__( 531 MessageSegment.text(other) if isinstance(other, str) else other 532 ) 533 534 @staticmethod 535 def _construct( 536 msg: Union[str, Mapping, Iterable[Mapping]] 537 ) -> Iterable[MessageSegment]: 538 if isinstance(msg, Mapping): 539 msg = cast(Mapping[str, Any], msg) 540 yield MessageSegment(msg["type"], msg.get("data") or {}) 541 return 542 elif isinstance(msg, Iterable) and not isinstance(msg, str): 543 for seg in msg: 544 yield MessageSegment(seg["type"], seg.get("data") or {}) 545 return 546 elif isinstance(msg, str): 547 548 def _iter_message(msg: str) -> Iterable[Tuple[str, str]]: 549 text_begin = 0 550 for cqcode in re.finditer( 551 r"\[CQ:(?P<type>[a-zA-Z0-9-_.]+)" 552 r"(?P<params>" 553 r"(?:,[a-zA-Z0-9-_.]+=[^,\]]+)*" 554 r"),?\]", 555 msg, 556 ): 557 yield "text", msg[text_begin: cqcode.pos + cqcode.start()] 558 text_begin = cqcode.pos + cqcode.end() 559 yield cqcode.group("type"), cqcode.group("params").lstrip(",") 560 yield "text", msg[text_begin:] 561 562 for type_, data in _iter_message(msg): 563 if type_ == "text": 564 if data: 565 # only yield non-empty text segment 566 yield MessageSegment(type_, {"text": unescape(data)}) 567 else: 568 data = { 569 k: unescape(v) 570 for k, v in map( 571 lambda x: x.split("=", maxsplit=1), 572 filter(lambda x: x, (x.lstrip() 573 for x in data.split(","))), 574 ) 575 } 576 yield MessageSegment(type_, data) 577 578 def extract_plain_text(self) -> str: 579 return "".join(seg.data["text"] for seg in self if seg.is_text())
class
MessageSegment:
18class MessageSegment: 19 """ 20 OneBot v11 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。 21 """ 22 23 type: str 24 """消息段类型""" 25 data: Dict[str, Any] = field(default_factory=dict) 26 """消息段数据""" 27 28 def __len__(self) -> int: 29 return len(str(self)) 30 31 def __ne__(self, other: "MessageSegment") -> bool: 32 return not self == other 33 34 def __add__(self, other: Union[str, "MessageSegment", Iterable["MessageSegment"]]) -> "Message": 35 return self.get_message_class()(self) + other 36 37 def __radd__(self, other: Union[str, "MessageSegment", Iterable["MessageSegment"]]) -> "Message": 38 return self.get_message_class()(other) + self 39 40 @classmethod 41 def __get_validators__(cls): 42 yield cls._validate 43 44 @classmethod 45 def _validate(cls, value): 46 if isinstance(value, cls): 47 return value 48 if not isinstance(value, dict): 49 raise ValueError( 50 f"Expected dict for MessageSegment, got {type(value)}") 51 return cls(**value) 52 53 def get(self, key: str, default: Any = None): 54 return asdict(self).get(key, default) 55 56 def keys(self): 57 return asdict(self).keys() 58 59 def values(self): 60 return asdict(self).values() 61 62 def items(self): 63 return asdict(self).items() 64 65 def copy(self) -> "MessageSegment": 66 return deepcopy(self) 67 68 @classmethod 69 def get_message_class(cls) -> Type["Message"]: 70 return Message 71 72 def __str__(self) -> str: 73 type_ = self.type 74 data = self.data.copy() 75 76 # process special types 77 if type_ == "text": 78 # type: ignore 79 return escape(data.get("text", ""), escape_comma=False) 80 81 params = ",".join( 82 [f"{k}={escape(str(v))}" for k, v in data.items() if v is not None] 83 ) 84 return f"[CQ:{type_}{',' if params else ''}{params}]" 85 86 def __add__(self, other) -> "Message": 87 return Message(self) + ( 88 MessageSegment.text(other) if isinstance(other, str) else other 89 ) 90 91 def __radd__(self, other) -> "Message": 92 return ( 93 MessageSegment.text(other) if isinstance( 94 other, str) else Message(other) 95 ) + self 96 97 def is_text(self) -> bool: 98 return self.type == "text" 99 100 @staticmethod 101 def anonymous(ignore_failure: Optional[bool] = None) -> "MessageSegment": 102 return MessageSegment("anonymous", {"ignore": bool_to_str(ignore_failure)}) 103 104 @staticmethod 105 def at(user_id: Union[int, str]) -> "MessageSegment": 106 return MessageSegment("at", {"qq": str(user_id)}) 107 108 @staticmethod 109 def contact(type_: str, id: int) -> "MessageSegment": 110 return MessageSegment("contact", {"type": type_, "id": str(id)}) 111 112 @staticmethod 113 def contact_group(group_id: int) -> "MessageSegment": 114 return MessageSegment("contact", {"type": "group", "id": str(group_id)}) 115 116 @staticmethod 117 def contact_user(user_id: int) -> "MessageSegment": 118 return MessageSegment("contact", {"type": "qq", "id": str(user_id)}) 119 120 @staticmethod 121 def dice() -> "MessageSegment": 122 return MessageSegment("dice", {}) 123 124 @staticmethod 125 def face(id_: int) -> "MessageSegment": 126 return MessageSegment("face", {"id": str(id_)}) 127 128 @staticmethod 129 def forward(id_: str) -> "MessageSegment": 130 logger.warning("Forward Message only can be received!") 131 return MessageSegment("forward", {"id": id_}) 132 133 @staticmethod 134 def image( 135 file: Union[str, bytes, BytesIO, Path], 136 type_: Optional[str] = None, 137 cache: bool = True, 138 proxy: bool = True, 139 timeout: Optional[int] = None, 140 ) -> "MessageSegment": 141 if isinstance(file, BytesIO): 142 file = file.getvalue() 143 if isinstance(file, bytes): 144 file = f"base64://{b64encode(file).decode()}" 145 elif isinstance(file, Path): 146 file = f"file:///{file.resolve()}" 147 return MessageSegment( 148 "image", 149 { 150 "file": file, 151 "type": type_, 152 "cache": bool_to_str(cache), 153 "proxy": bool_to_str(proxy), 154 "timeout": timeout, 155 }, 156 ) 157 158 @staticmethod 159 def json(data: str) -> "MessageSegment": 160 return MessageSegment("json", {"data": data}) 161 162 @staticmethod 163 def location( 164 latitude: float, 165 longitude: float, 166 title: Optional[str] = None, 167 content: Optional[str] = None, 168 ) -> "MessageSegment": 169 return MessageSegment( 170 "location", 171 { 172 "lat": str(latitude), 173 "lon": str(longitude), 174 "title": title, 175 "content": content, 176 }, 177 ) 178 179 @staticmethod 180 def music(type_: str, id_: int) -> "MessageSegment": 181 return MessageSegment("music", {"type": type_, "id": id_}) 182 183 @staticmethod 184 def music_custom( 185 url: str, 186 audio: str, 187 title: str, 188 content: Optional[str] = None, 189 img_url: Optional[str] = None, 190 ) -> "MessageSegment": 191 return MessageSegment( 192 "music", 193 { 194 "type": "custom", 195 "url": url, 196 "audio": audio, 197 "title": title, 198 "content": content, 199 "image": img_url, 200 }, 201 ) 202 203 @staticmethod 204 def node(id_: int) -> "MessageSegment": 205 return MessageSegment("node", {"id": str(id_)}) 206 207 @staticmethod 208 def node_custom( 209 user_id: int, nickname: str, content: Union[str, "Message"] 210 ) -> "MessageSegment": 211 return MessageSegment( 212 "node", {"user_id": str( 213 user_id), "nickname": nickname, "content": content} 214 ) 215 216 @staticmethod 217 def poke(type_: str, id_: str) -> "MessageSegment": 218 return MessageSegment("poke", {"type": type_, "id": id_}) 219 220 @staticmethod 221 def record( 222 file: Union[str, bytes, BytesIO, Path], 223 magic: Optional[bool] = None, 224 cache: Optional[bool] = None, 225 proxy: Optional[bool] = None, 226 timeout: Optional[int] = None, 227 ) -> "MessageSegment": 228 if isinstance(file, BytesIO): 229 file = file.getvalue() 230 if isinstance(file, bytes): 231 file = f"base64://{b64encode(file).decode()}" 232 elif isinstance(file, Path): 233 file = f"file:///{file.resolve()}" 234 return MessageSegment( 235 "record", 236 { 237 "file": file, 238 "magic": bool_to_str(magic), 239 "cache": bool_to_str(cache), 240 "proxy": bool_to_str(proxy), 241 "timeout": timeout, 242 }, 243 ) 244 245 @staticmethod 246 def reply(id_: int) -> "MessageSegment": 247 return MessageSegment("reply", {"id": str(id_)}) 248 249 @staticmethod 250 def rps() -> "MessageSegment": 251 return MessageSegment("rps", {}) 252 253 @staticmethod 254 def shake() -> "MessageSegment": 255 return MessageSegment("shake", {}) 256 257 @staticmethod 258 def share( 259 url: str = "", 260 title: str = "", 261 content: Optional[str] = None, 262 image: Optional[str] = None, 263 ) -> "MessageSegment": 264 return MessageSegment( 265 "share", {"url": url, "title": title, 266 "content": content, "image": image} 267 ) 268 269 @staticmethod 270 def text(text: str) -> "MessageSegment": 271 return MessageSegment("text", {"text": text}) 272 273 @staticmethod 274 def video( 275 file: Union[str, bytes, BytesIO, Path], 276 cache: Optional[bool] = None, 277 proxy: Optional[bool] = None, 278 timeout: Optional[int] = None, 279 ) -> "MessageSegment": 280 if isinstance(file, BytesIO): 281 file = file.getvalue() 282 if isinstance(file, bytes): 283 file = f"base64://{b64encode(file).decode()}" 284 elif isinstance(file, Path): 285 file = f"file:///{file.resolve()}" 286 return MessageSegment( 287 "video", 288 { 289 "file": file, 290 "cache": bool_to_str(cache), 291 "proxy": bool_to_str(proxy), 292 "timeout": timeout, 293 }, 294 ) 295 296 @staticmethod 297 def xml(data: str) -> "MessageSegment": 298 return MessageSegment("xml", {"data": data})
OneBot v11 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
@staticmethod
def
anonymous( ignore_failure: Union[bool, NoneType] = None) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
@staticmethod
def
at( user_id: Union[int, str]) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
@staticmethod
def
contact( type_: str, id: int) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
@staticmethod
def
contact_group(group_id: int) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
@staticmethod
def
contact_user(user_id: int) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
@staticmethod
def
image( file: Union[str, bytes, _io.BytesIO, pathlib.Path], type_: Union[str, NoneType] = None, cache: bool = True, proxy: bool = True, timeout: Union[int, NoneType] = None) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
133 @staticmethod 134 def image( 135 file: Union[str, bytes, BytesIO, Path], 136 type_: Optional[str] = None, 137 cache: bool = True, 138 proxy: bool = True, 139 timeout: Optional[int] = None, 140 ) -> "MessageSegment": 141 if isinstance(file, BytesIO): 142 file = file.getvalue() 143 if isinstance(file, bytes): 144 file = f"base64://{b64encode(file).decode()}" 145 elif isinstance(file, Path): 146 file = f"file:///{file.resolve()}" 147 return MessageSegment( 148 "image", 149 { 150 "file": file, 151 "type": type_, 152 "cache": bool_to_str(cache), 153 "proxy": bool_to_str(proxy), 154 "timeout": timeout, 155 }, 156 )
@staticmethod
def
location( latitude: float, longitude: float, title: Union[str, NoneType] = None, content: Union[str, NoneType] = None) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
162 @staticmethod 163 def location( 164 latitude: float, 165 longitude: float, 166 title: Optional[str] = None, 167 content: Optional[str] = None, 168 ) -> "MessageSegment": 169 return MessageSegment( 170 "location", 171 { 172 "lat": str(latitude), 173 "lon": str(longitude), 174 "title": title, 175 "content": content, 176 }, 177 )
@staticmethod
def
music( type_: str, id_: int) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
@staticmethod
def
music_custom( url: str, audio: str, title: str, content: Union[str, NoneType] = None, img_url: Union[str, NoneType] = None) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
183 @staticmethod 184 def music_custom( 185 url: str, 186 audio: str, 187 title: str, 188 content: Optional[str] = None, 189 img_url: Optional[str] = None, 190 ) -> "MessageSegment": 191 return MessageSegment( 192 "music", 193 { 194 "type": "custom", 195 "url": url, 196 "audio": audio, 197 "title": title, 198 "content": content, 199 "image": img_url, 200 }, 201 )
@staticmethod
def
node_custom( user_id: int, nickname: str, content: Union[str, ayaka.driver.ayakabot_driver.message.Message]) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
@staticmethod
def
poke( type_: str, id_: str) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
@staticmethod
def
record( file: Union[str, bytes, _io.BytesIO, pathlib.Path], magic: Union[bool, NoneType] = None, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
220 @staticmethod 221 def record( 222 file: Union[str, bytes, BytesIO, Path], 223 magic: Optional[bool] = None, 224 cache: Optional[bool] = None, 225 proxy: Optional[bool] = None, 226 timeout: Optional[int] = None, 227 ) -> "MessageSegment": 228 if isinstance(file, BytesIO): 229 file = file.getvalue() 230 if isinstance(file, bytes): 231 file = f"base64://{b64encode(file).decode()}" 232 elif isinstance(file, Path): 233 file = f"file:///{file.resolve()}" 234 return MessageSegment( 235 "record", 236 { 237 "file": file, 238 "magic": bool_to_str(magic), 239 "cache": bool_to_str(cache), 240 "proxy": bool_to_str(proxy), 241 "timeout": timeout, 242 }, 243 )
@staticmethod
def
video( file: Union[str, bytes, _io.BytesIO, pathlib.Path], cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) -> ayaka.driver.ayakabot_driver.message.MessageSegment:
273 @staticmethod 274 def video( 275 file: Union[str, bytes, BytesIO, Path], 276 cache: Optional[bool] = None, 277 proxy: Optional[bool] = None, 278 timeout: Optional[int] = None, 279 ) -> "MessageSegment": 280 if isinstance(file, BytesIO): 281 file = file.getvalue() 282 if isinstance(file, bytes): 283 file = f"base64://{b64encode(file).decode()}" 284 elif isinstance(file, Path): 285 file = f"file:///{file.resolve()}" 286 return MessageSegment( 287 "video", 288 { 289 "file": file, 290 "cache": bool_to_str(cache), 291 "proxy": bool_to_str(proxy), 292 "timeout": timeout, 293 }, 294 )
class
Message(typing.List[ayaka.driver.ayakabot_driver.message.MessageSegment]):
301class Message(List[MessageSegment]): 302 """ 303 OneBot v11 协议 Message 适配。 304 """ 305 306 def __init__( 307 self, 308 message: Union[str, None, Iterable[MessageSegment], 309 MessageSegment] = None, 310 ): 311 super().__init__() 312 if message is None: 313 return 314 elif isinstance(message, str): 315 self.extend(self._construct(message)) 316 elif isinstance(message, MessageSegment): 317 self.append(message) 318 elif isinstance(message, Iterable): 319 self.extend(message) 320 else: 321 self.extend(self._construct(message)) # pragma: no cover 322 323 @classmethod 324 def template(cls: Type["Message"], format_string: Union[str, "Message"]) -> MessageTemplate["Message"]: 325 """创建消息模板。 326 327 用法和 `str.format` 大致相同, 但是可以输出消息对象, 并且支持以 `Message` 对象作为消息模板 328 329 并且提供了拓展的格式化控制符, 可以用适用于该消息类型的 `MessageSegment` 的工厂方法创建消息 330 331 参数: 332 format_string: 格式化模板 333 334 返回: 335 消息格式化器 336 """ 337 return MessageTemplate(format_string, cls) 338 339 def __str__(self) -> str: 340 return "".join(str(seg) for seg in self) 341 342 @classmethod 343 def __get_validators__(cls): 344 yield cls._validate 345 346 @classmethod 347 def _validate(cls, value): 348 if isinstance(value, cls): 349 return value 350 elif isinstance(value, Message): 351 raise ValueError( 352 f"Type {type(value)} can not be converted to {cls}") 353 elif isinstance(value, str): 354 pass 355 elif isinstance(value, dict): 356 value = parse_obj_as(cls.get_segment_class(), value) 357 elif isinstance(value, Iterable): 358 value = [parse_obj_as(cls.get_segment_class(), v) for v in value] 359 else: 360 raise ValueError( 361 f"Expected str, dict or iterable for Message, got {type(value)}" 362 ) 363 return cls(value) 364 365 def __add__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message": 366 result = self.copy() 367 result += other 368 return result 369 370 def __radd__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message": 371 result = self.__class__(other) 372 return result + self 373 374 def __iadd__(self, other: Union[str, MessageSegment, Iterable[MessageSegment]]) -> "Message": 375 if isinstance(other, str): 376 self.extend(self._construct(other)) 377 elif isinstance(other, MessageSegment): 378 self.append(other) 379 elif isinstance(other, Iterable): 380 self.extend(other) 381 else: 382 raise ValueError( 383 f"Unsupported type: {type(other)}") # pragma: no cover 384 return self 385 386 @overload 387 def __getitem__(self, __args: str) -> "Message": 388 """ 389 参数: 390 __args: 消息段类型 391 392 返回: 393 所有类型为 `__args` 的消息段 394 """ 395 396 @overload 397 def __getitem__(self, __args: Tuple[str, int]) -> MessageSegment: 398 """ 399 参数: 400 __args: 消息段类型和索引 401 402 返回: 403 类型为 `__args[0]` 的消息段第 `__args[1]` 个 404 """ 405 406 @overload 407 def __getitem__(self, __args: Tuple[str, slice]) -> "Message": 408 """ 409 参数: 410 __args: 消息段类型和切片 411 412 返回: 413 类型为 `__args[0]` 的消息段切片 `__args[1]` 414 """ 415 416 @overload 417 def __getitem__(self, __args: int) -> MessageSegment: 418 """ 419 参数: 420 __args: 索引 421 422 返回: 423 第 `__args` 个消息段 424 """ 425 426 @overload 427 def __getitem__(self, __args: slice) -> "Message": 428 """ 429 参数: 430 __args: 切片 431 432 返回: 433 消息切片 `__args` 434 """ 435 436 def __getitem__( 437 self, 438 args: Union[ 439 str, 440 Tuple[str, int], 441 Tuple[str, slice], 442 int, 443 slice, 444 ], 445 ) -> Union[MessageSegment, "Message"]: 446 arg1, arg2 = args if isinstance(args, tuple) else (args, None) 447 if isinstance(arg1, int) and arg2 is None: 448 return super().__getitem__(arg1) 449 elif isinstance(arg1, slice) and arg2 is None: 450 return self.__class__(super().__getitem__(arg1)) 451 elif isinstance(arg1, str) and arg2 is None: 452 return self.__class__(seg for seg in self if seg.type == arg1) 453 elif isinstance(arg1, str) and isinstance(arg2, int): 454 return [seg for seg in self if seg.type == arg1][arg2] 455 elif isinstance(arg1, str) and isinstance(arg2, slice): 456 return self.__class__([seg for seg in self if seg.type == arg1][arg2]) 457 else: 458 raise ValueError( 459 "Incorrect arguments to slice") # pragma: no cover 460 461 def index(self, value: Union[MessageSegment, str], *args) -> int: 462 if isinstance(value, str): 463 first_segment = next( 464 (seg for seg in self if seg.type == value), None) 465 if first_segment is None: 466 raise ValueError( 467 f"Segment with type {value} is not in message") 468 return super().index(first_segment, *args) 469 return super().index(value, *args) 470 471 def get(self, type_: str, count: Optional[int] = None) -> "Message": 472 if count is None: 473 return self[type_] 474 475 iterator, filtered = ( 476 seg for seg in self if seg.type == type_ 477 ), self.__class__() 478 for _ in range(count): 479 seg = next(iterator, None) 480 if seg is None: 481 break 482 filtered.append(seg) 483 return filtered 484 485 def count(self, value: Union[MessageSegment, str]) -> int: 486 return len(self[value]) if isinstance(value, str) else super().count(value) 487 488 def append(self, obj: Union[str, MessageSegment]) -> "Message": 489 """添加一个消息段到消息数组末尾。 490 491 参数: 492 obj: 要添加的消息段 493 """ 494 if isinstance(obj, MessageSegment): 495 super().append(obj) 496 elif isinstance(obj, str): 497 self.extend(self._construct(obj)) 498 else: 499 raise ValueError( 500 f"Unexpected type: {type(obj)} {obj}") # pragma: no cover 501 return self 502 503 def extend(self, obj: Union["Message", Iterable[MessageSegment]]) -> "Message": 504 """拼接一个消息数组或多个消息段到消息数组末尾。 505 506 参数: 507 obj: 要添加的消息数组 508 """ 509 for segment in obj: 510 self.append(segment) 511 return self 512 513 def copy(self) -> "Message": 514 return deepcopy(self) 515 516 def extract_plain_text(self) -> str: 517 """提取消息内纯文本消息""" 518 519 return "".join(str(seg) for seg in self if seg.is_text()) 520 521 @classmethod 522 def get_segment_class(cls) -> Type[MessageSegment]: 523 return MessageSegment 524 525 def __add__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> "Message": 526 return super(Message, self).__add__( 527 MessageSegment.text(other) if isinstance(other, str) else other 528 ) 529 530 def __radd__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> "Message": 531 return super(Message, self).__radd__( 532 MessageSegment.text(other) if isinstance(other, str) else other 533 ) 534 535 @staticmethod 536 def _construct( 537 msg: Union[str, Mapping, Iterable[Mapping]] 538 ) -> Iterable[MessageSegment]: 539 if isinstance(msg, Mapping): 540 msg = cast(Mapping[str, Any], msg) 541 yield MessageSegment(msg["type"], msg.get("data") or {}) 542 return 543 elif isinstance(msg, Iterable) and not isinstance(msg, str): 544 for seg in msg: 545 yield MessageSegment(seg["type"], seg.get("data") or {}) 546 return 547 elif isinstance(msg, str): 548 549 def _iter_message(msg: str) -> Iterable[Tuple[str, str]]: 550 text_begin = 0 551 for cqcode in re.finditer( 552 r"\[CQ:(?P<type>[a-zA-Z0-9-_.]+)" 553 r"(?P<params>" 554 r"(?:,[a-zA-Z0-9-_.]+=[^,\]]+)*" 555 r"),?\]", 556 msg, 557 ): 558 yield "text", msg[text_begin: cqcode.pos + cqcode.start()] 559 text_begin = cqcode.pos + cqcode.end() 560 yield cqcode.group("type"), cqcode.group("params").lstrip(",") 561 yield "text", msg[text_begin:] 562 563 for type_, data in _iter_message(msg): 564 if type_ == "text": 565 if data: 566 # only yield non-empty text segment 567 yield MessageSegment(type_, {"text": unescape(data)}) 568 else: 569 data = { 570 k: unescape(v) 571 for k, v in map( 572 lambda x: x.split("=", maxsplit=1), 573 filter(lambda x: x, (x.lstrip() 574 for x in data.split(","))), 575 ) 576 } 577 yield MessageSegment(type_, data) 578 579 def extract_plain_text(self) -> str: 580 return "".join(seg.data["text"] for seg in self if seg.is_text())
OneBot v11 协议 Message 适配。
Message( message: Union[str, NoneType, Iterable[ayaka.driver.ayakabot_driver.message.MessageSegment], ayaka.driver.ayakabot_driver.message.MessageSegment] = None)
306 def __init__( 307 self, 308 message: Union[str, None, Iterable[MessageSegment], 309 MessageSegment] = None, 310 ): 311 super().__init__() 312 if message is None: 313 return 314 elif isinstance(message, str): 315 self.extend(self._construct(message)) 316 elif isinstance(message, MessageSegment): 317 self.append(message) 318 elif isinstance(message, Iterable): 319 self.extend(message) 320 else: 321 self.extend(self._construct(message)) # pragma: no cover
@classmethod
def
template( cls: type[ayaka.driver.ayakabot_driver.message.Message], format_string: Union[str, ayaka.driver.ayakabot_driver.message.Message]) -> ayaka.driver.ayakabot_driver.template.MessageTemplate[ayaka.driver.ayakabot_driver.message.Message]:
323 @classmethod 324 def template(cls: Type["Message"], format_string: Union[str, "Message"]) -> MessageTemplate["Message"]: 325 """创建消息模板。 326 327 用法和 `str.format` 大致相同, 但是可以输出消息对象, 并且支持以 `Message` 对象作为消息模板 328 329 并且提供了拓展的格式化控制符, 可以用适用于该消息类型的 `MessageSegment` 的工厂方法创建消息 330 331 参数: 332 format_string: 格式化模板 333 334 返回: 335 消息格式化器 336 """ 337 return MessageTemplate(format_string, cls)
创建消息模板。
用法和 str.format
大致相同, 但是可以输出消息对象, 并且支持以 Message
对象作为消息模板
并且提供了拓展的格式化控制符, 可以用适用于该消息类型的 MessageSegment
的工厂方法创建消息
参数: format_string: 格式化模板
返回: 消息格式化器
def
index( self, value: Union[ayaka.driver.ayakabot_driver.message.MessageSegment, str], *args) -> int:
461 def index(self, value: Union[MessageSegment, str], *args) -> int: 462 if isinstance(value, str): 463 first_segment = next( 464 (seg for seg in self if seg.type == value), None) 465 if first_segment is None: 466 raise ValueError( 467 f"Segment with type {value} is not in message") 468 return super().index(first_segment, *args) 469 return super().index(value, *args)
Return first index of value.
Raises ValueError if the value is not present.
def
get( self, type_: str, count: Union[int, NoneType] = None) -> ayaka.driver.ayakabot_driver.message.Message:
471 def get(self, type_: str, count: Optional[int] = None) -> "Message": 472 if count is None: 473 return self[type_] 474 475 iterator, filtered = ( 476 seg for seg in self if seg.type == type_ 477 ), self.__class__() 478 for _ in range(count): 479 seg = next(iterator, None) 480 if seg is None: 481 break 482 filtered.append(seg) 483 return filtered
485 def count(self, value: Union[MessageSegment, str]) -> int: 486 return len(self[value]) if isinstance(value, str) else super().count(value)
Return number of occurrences of value.
def
append( self, obj: Union[str, ayaka.driver.ayakabot_driver.message.MessageSegment]) -> ayaka.driver.ayakabot_driver.message.Message:
488 def append(self, obj: Union[str, MessageSegment]) -> "Message": 489 """添加一个消息段到消息数组末尾。 490 491 参数: 492 obj: 要添加的消息段 493 """ 494 if isinstance(obj, MessageSegment): 495 super().append(obj) 496 elif isinstance(obj, str): 497 self.extend(self._construct(obj)) 498 else: 499 raise ValueError( 500 f"Unexpected type: {type(obj)} {obj}") # pragma: no cover 501 return self
添加一个消息段到消息数组末尾。
参数: obj: 要添加的消息段
def
extend( self, obj: Union[ayaka.driver.ayakabot_driver.message.Message, Iterable[ayaka.driver.ayakabot_driver.message.MessageSegment]]) -> ayaka.driver.ayakabot_driver.message.Message:
503 def extend(self, obj: Union["Message", Iterable[MessageSegment]]) -> "Message": 504 """拼接一个消息数组或多个消息段到消息数组末尾。 505 506 参数: 507 obj: 要添加的消息数组 508 """ 509 for segment in obj: 510 self.append(segment) 511 return self
拼接一个消息数组或多个消息段到消息数组末尾。
参数: obj: 要添加的消息数组
def
extract_plain_text(self) -> str:
579 def extract_plain_text(self) -> str: 580 return "".join(seg.data["text"] for seg in self if seg.is_text())
提取消息内纯文本消息
@classmethod
def
get_segment_class(cls) -> Type[ayaka.driver.ayakabot_driver.message.MessageSegment]:
Inherited Members
- builtins.list
- clear
- insert
- pop
- remove
- reverse
- sort