ayaka.ayaka
ayaka核心
1'''ayaka核心''' 2import asyncio 3import datetime 4import inspect 5import json 6from math import ceil 7from pathlib import Path 8import re 9from loguru import logger 10from typing import List, Dict, Literal, Tuple, Union 11from .depend import AyakaCache 12from .config import ayaka_root_config, ayaka_data_path 13from .constant import _bot, _event, _group, _arg, _args, _message, _cmd, _cmd_regex, _enter_exit_during, app_list, group_list, bot_list, private_listener_dict 14from .driver import on_message, get_driver, Message, MessageSegment, Bot, MessageEvent, GroupMessageEvent 15from .state import AyakaState, AyakaTrigger, AyakaTimer, root_state, ensure_regex_list 16from .on import AyakaOn 17 18 19class AyakaGroup: 20 def __repr__(self) -> str: 21 return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})" 22 23 def __init__(self, bot_id: int, group_id: int) -> None: 24 self.bot_id = bot_id 25 self.group_id = group_id 26 self.state = root_state 27 28 # 添加app,并分配独立数据空间 29 self.apps: List["AyakaApp"] = [] 30 self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {} 31 for app in app_list: 32 # if app.name not in forbid_names: 33 self.apps.append(app) 34 self.cache_dict[app.name] = {} 35 36 group_list.append(self) 37 38 if ayaka_root_config.debug: 39 print(self) 40 41 async def enter(self, state: Union[str, List[str], AyakaState]): 42 if isinstance(state, list): 43 next_state = self.state.join(*state) 44 elif isinstance(state, str): 45 next_state = self.state.join(state) 46 else: 47 next_state = self.state.join(*state.keys) 48 return await self.goto(next_state) 49 50 async def back(self): 51 if self.state.parent: 52 await self.state.exit() 53 self.state = self.state.parent 54 return self.state 55 56 async def goto(self, state: AyakaState): 57 if _enter_exit_during.get() > 0: 58 logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误") 59 60 keys = state.keys 61 62 # 找到第一个不同的结点 63 n0 = len(keys) 64 n1 = len(self.state.keys) 65 n = min(n0, n1) 66 for i in range(n): 67 if keys[i] != self.state.keys[i]: 68 break 69 else: 70 i += 1 71 72 # 回退 73 for j in range(i, n1): 74 await self.back() 75 keys = keys[i:] 76 77 # 重新出发 78 for key in keys: 79 self.state = self.state[key] 80 await self.state.enter() 81 logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>") 82 return self.state 83 84 def get_app(self, name: str): 85 '''根据app名获取该group所启用的app,不存在则返回None''' 86 for app in self.apps: 87 if app.name == name: 88 return app 89 90 91class AyakaApp: 92 def __repr__(self) -> str: 93 return f"{self.__class__.__name__}({self.name})" 94 95 def __init__(self, name: str) -> None: 96 self.path = Path(inspect.stack()[1].filename) 97 logger.opt(colors=True).debug(f"加载应用 \"<c>{name}</c>\"") 98 99 for app in app_list: 100 if app.name == name: 101 raise Exception( 102 f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 103 104 self.name = name 105 self.ayaka_root_config = ayaka_root_config 106 self.funcs = [] 107 self.on = AyakaOn(self) 108 self.timers: List[AyakaTimer] = [] 109 110 self.root_state = root_state 111 self.plugin_state = self.get_state() 112 113 self._intro = "没有介绍" 114 self.state_helps: Dict[str, List[str]] = {} 115 self.idle_helps: List[str] = [] 116 117 app_list.append(self) 118 if ayaka_root_config.debug: 119 print(self) 120 121 @property 122 def intro(self): 123 help = self._intro 124 for h in self.idle_helps: 125 help += "\n" + h 126 return help 127 128 @property 129 def all_help(self): 130 help = self.intro 131 for s, hs in self.state_helps.items(): 132 help += f"\n[{s}]" 133 for h in hs: 134 help += "\n" + h 135 return help 136 137 @property 138 def help(self): 139 '''获取当前状态下的帮助,没有找到则返回介绍''' 140 total_triggers = get_cascade_triggers(self.state) 141 142 helps = [] 143 cmds = [] 144 for ts in total_triggers: 145 flag = 1 146 for t in ts: 147 if t.app == self: 148 for c in t.raw_cmds: 149 if c in cmds: 150 break 151 else: 152 if flag: 153 helps.append(f"[{t.state[1:]}]") 154 flag = 0 155 cmds.extend(t.raw_cmds) 156 helps.append(t.help) 157 158 if not helps: 159 return self.intro 160 return "\n".join(helps) 161 162 @help.setter 163 def help(self, help: str): 164 self._intro = help 165 166 @property 167 def user_name(self): 168 '''*timer触发时不可用* 169 170 当前消息的发送人的群名片或昵称 171 ''' 172 s = self.event.sender 173 name = s.card or s.nickname 174 return name 175 176 @property 177 def user_id(self): 178 '''*timer触发时不可用* 179 180 当前消息的发送人的uid 181 ''' 182 return self.event.user_id 183 184 @property 185 def bot(self): 186 '''*timer触发时不可用* 187 188 当前bot 189 ''' 190 return _bot.get() 191 192 @property 193 def event(self): 194 '''*timer触发时不可用* 195 196 当前消息 197 ''' 198 return _event.get() 199 200 @property 201 def cache(self): 202 '''*timer触发时不可用* 203 204 当前群组的缓存空间''' 205 return self.group.cache_dict[self.name] 206 207 @property 208 def group_id(self): 209 '''*timer触发时不可用* 210 211 当前群组的id 212 213 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id 214 ''' 215 return self.group.group_id 216 217 @property 218 def bot_id(self): 219 '''*timer触发时不可用* 220 221 当前bot的id 222 ''' 223 return self.group.bot_id 224 225 @property 226 def group(self): 227 '''*timer触发时不可用* 228 229 当前群组 230 231 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A 232 ''' 233 return _group.get() 234 235 @property 236 def arg(self): 237 '''*timer触发时不可用* 238 239 当前消息在移除了命令后的剩余部分 240 ''' 241 return _arg.get() 242 243 @property 244 def args(self): 245 '''*timer触发时不可用* 246 247 当前消息在移除了命令后,剩余部分按照空格分割后的数组 248 249 注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素 250 ''' 251 return _args.get() 252 253 @property 254 def cmd(self): 255 '''*timer触发时不可用* 256 257 当前消息的命令头 258 ''' 259 return _cmd.get() 260 261 @property 262 def cmd_regex(self): 263 '''*timer触发时不可用* 264 265 当前消息的命令头 266 ''' 267 return _cmd_regex.get() 268 269 @property 270 def message(self): 271 '''*timer触发时不可用* 272 273 当前消息 274 ''' 275 return _message.get() 276 277 @property 278 def state(self): 279 return self.group.state 280 281 def get_state(self, key: Union[str, List[str]] = [], *_keys: str): 282 # _keys为兼容旧API(0.5.3及以前 283 ''' 284 假设当前app.name为test 285 286 >>> get_state(key1) -> [root.test].key1 287 288 >>> get_state([key1, key2]) -> [root.test].key1.key2 289 290 特别的,参数可以为空,例如: 291 292 >>> get_state() -> [root.test] 293 ''' 294 if isinstance(key, list): 295 _keys = [self.name, *key] 296 else: 297 _keys = [self.name, key, *_keys] 298 299 keys = [] 300 for k in _keys: 301 keys.extend(k.split(ayaka_root_config.state_separate)) 302 303 return root_state.join(*keys) 304 305 async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str): 306 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 307 return await self.goto(state, *keys) 308 309 async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str): 310 # keys为兼容旧API(0.5.2及以前 311 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 312 if isinstance(state, str): 313 state = self.get_state(state, *keys) 314 elif isinstance(state, list): 315 state = self.get_state(*state) 316 return await self.group.goto(state) 317 318 async def enter(self, state: Union[str, List[str], AyakaState]): 319 '''进入子状态''' 320 return await self.group.enter(state) 321 322 async def back(self): 323 '''回退当前群组的状态''' 324 return await self.group.back() 325 326 def _add_func(self, func): 327 '''如果不存在就加入self.funcs''' 328 if func not in self.funcs: 329 self.funcs.append(func) 330 331 def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"): 332 '''注册深度监听''' 333 def decorator(func): 334 func.deep = deep 335 self._add_func(func) 336 return func 337 return decorator 338 339 def on_no_block(self, block: bool = False): 340 '''注册非阻断''' 341 def decorator(func): 342 func.block = block 343 self._add_func(func) 344 return func 345 return decorator 346 347 def on_cmd(self, *cmds: Union[str, re.Pattern]): 348 '''注册命令触发,不填写命令则视为文本消息''' 349 def decorator(func): 350 func.cmds = getattr(func, "cmds", []) 351 func.cmds.extend(ensure_regex_list(cmds)) 352 self._add_func(func) 353 return func 354 return decorator 355 356 def on_cmd_regex(self, *cmds: Union[str, re.Pattern]): 357 '''注册命令触发,不填写命令则视为文本消息''' 358 def decorator(func): 359 func.cmds = getattr(func, "cmds", []) 360 func.cmds.extend(ensure_regex_list(cmds, False)) 361 self._add_func(func) 362 return func 363 return decorator 364 365 def on_text(self): 366 '''注册消息触发''' 367 def decorator(func): 368 func = self.on_no_block()(func) 369 self._add_func(func) 370 return func 371 return decorator 372 373 def on_state(self, *states: Union[AyakaState, str, List[str]]): 374 '''注册有状态响应,不填写states则为plugin_state''' 375 _states = [] 376 377 if not states: 378 _states = [self.plugin_state] 379 380 else: 381 for s in states: 382 if isinstance(s, str): 383 s = self.get_state(s) 384 elif isinstance(s, list): 385 s = self.get_state(*s) 386 _states.append(s) 387 388 def decorator(func): 389 func.states = _states 390 self._add_func(func) 391 return func 392 return decorator 393 394 def on_idle(self): 395 '''注册根结点回调''' 396 return self.on_state(self.root_state) 397 398 def on_everyday(self, h: int, m: int, s: int): 399 '''每日定时触发''' 400 return self.on_interval(86400, h, m, s) 401 402 def on_interval(self, gap: int, h=-1, m=-1, s=-1, show=True): 403 '''在指定的时间点后循环触发''' 404 def decorator(func): 405 t = AyakaTimer(self, gap, h, m, s, func, show) 406 self.timers.append(t) 407 return func 408 return decorator 409 410 def on_start_cmds(self, *cmds: Union[str, re.Pattern]): 411 def decorator(func): 412 func = self.on_idle()(func) 413 func = self.on_cmd(*cmds)(func) 414 return func 415 return decorator 416 417 def on_close_cmds(self, *cmds: Union[str, re.Pattern]): 418 def decorator(func): 419 func = self.on_state()(func) 420 func = self.on_deep_all()(func) 421 func = self.on_cmd(*cmds)(func) 422 return func 423 return decorator 424 425 def set_start_cmds(self, *cmds: Union[str, re.Pattern]): 426 '''设置应用启动命令,当然,你也可以通过app.on_start_cmds自定义启动方式''' 427 @self.on_start_cmds(*cmds) 428 async def start(): 429 '''打开应用''' 430 await self.start() 431 432 def set_close_cmds(self, *cmds: Union[str, re.Pattern]): 433 '''设置应用关闭命令,当然,你也可以通过app.on_close_cmds自定义关闭方式''' 434 @self.on_close_cmds(*cmds) 435 async def close(): 436 '''关闭应用''' 437 await self.close() 438 439 async def start(self, state: str = ""): 440 '''*timer触发时不可用* 441 442 启动应用,并发送提示 443 444 state参数为兼容旧API''' 445 if not state: 446 state = self.get_state() 447 await self.goto(state) 448 await self.send(f"已打开应用 [{self.name}]") 449 450 async def close(self): 451 '''*timer触发时不可用* 452 453 关闭应用,并发送提示''' 454 await self.goto(root_state) 455 await self.send(f"已关闭应用 [{self.name}]") 456 457 def add_listener(self, user_id: int): 458 '''为该群组添加对指定私聊的监听''' 459 private_listener_dict[user_id].append(self.group_id) 460 461 def remove_listener(self, user_id: int = 0): 462 '''默认移除该群组对其他私聊的所有监听''' 463 id = self.group_id 464 465 if user_id == 0: 466 for ids in private_listener_dict.values(): 467 if id in ids: 468 ids.remove(id) 469 return 470 471 if id in private_listener_dict[user_id]: 472 private_listener_dict[user_id].remove(self.group_id) 473 474 async def send(self, message): 475 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 476 # 这里不使用event,因为一些event可能来自其他设备的监听传递 477 await self.bot.send_group_msg(group_id=self.group_id, message=message) 478 479 def pack_messages(self, bot_id, messages): 480 '''转换为cqhttp node格式''' 481 data: List[MessageSegment] = [] 482 for m in messages: 483 if isinstance(m, MessageSegment) and m.type == "node": 484 data.append(m) 485 else: 486 m = MessageSegment.node_custom( 487 user_id=bot_id, 488 nickname="Ayaka Bot", 489 content=str(m) 490 ) 491 data.append(m) 492 return data 493 494 async def send_many(self, messages): 495 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 496 # 分割长消息组(不可超过100条 497 div_len = 100 498 div_cnt = ceil(len(messages) / div_len) 499 for i in range(div_cnt): 500 msgs = self.pack_messages( 501 self.bot_id, 502 messages[i*div_len: (i+1)*div_len] 503 ) 504 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs) 505 506 def t_check(self, bot_id: int): 507 # 未连接 508 bot = get_bot(bot_id) 509 if not bot: 510 logger.warning(f"BOT({bot_id}) 未连接") 511 return 512 513 return bot 514 515 async def t_send(self, bot_id: int, group_id: int, message): 516 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 517 bot = self.t_check(bot_id) 518 if not bot: 519 return 520 521 await bot.send_group_msg(group_id=group_id, message=message) 522 523 async def t_send_many(self, bot_id: int, group_id: int, messages): 524 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 525 bot = self.t_check(bot_id) 526 if not bot: 527 return 528 529 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 530 div_len = 80 531 div_cnt = ceil(len(messages) / div_len) 532 for i in range(div_cnt): 533 msgs = self.pack_messages( 534 bot_id, 535 messages[i*div_len: (i+1)*div_len] 536 ) 537 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs) 538 539 540def get_bot(bot_id: int): 541 '''获取已连接的bot''' 542 bot_id = str(bot_id) 543 for bot in bot_list: 544 if bot.self_id == bot_id: 545 return bot 546 547 548def get_group(bot_id: int, group_id: int): 549 '''获取对应的AyakaGroup对象,自动增加''' 550 for group in group_list: 551 if group.bot_id == bot_id and group.group_id == group_id: 552 break 553 else: 554 group = AyakaGroup(bot_id, group_id) 555 return group 556 557 558def get_app(app_name: str): 559 for app in app_list: 560 if app.name == app_name: 561 return app 562 563 564async def deal_event(bot: Bot, event: MessageEvent): 565 '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组''' 566 if ayaka_root_config.exclude_old_msg: 567 time_i = int(datetime.datetime.now().timestamp()) 568 if event.time < time_i - 60: 569 return 570 571 _bot.set(bot) 572 _event.set(event) 573 574 bot_id = int(bot.self_id) 575 576 if isinstance(event, GroupMessageEvent): 577 group_id = event.group_id 578 await deal_group(bot_id, group_id) 579 580 else: 581 id = event.user_id 582 group_ids = private_listener_dict.get(id, []) 583 ts = [asyncio.create_task(deal_group(bot_id, group_id)) 584 for group_id in group_ids] 585 await asyncio.gather(*ts) 586 587 588async def deal_group(bot_id: int, group_id: int): 589 prefix = ayaka_root_config.prefix 590 591 # 群组 592 group = get_group(bot_id, group_id) 593 _group.set(group) 594 595 # 消息 596 message = _event.get().message 597 _message.set(message) 598 599 # 从子状态开始向上查找可用的触发 600 state = group.state 601 cascade_triggers = get_cascade_triggers(state, 0) 602 603 # 命令 604 # 消息前缀文本 605 first = get_first(message) 606 if first.startswith(prefix): 607 first = first[len(prefix):] 608 for ts in cascade_triggers: 609 if await deal_cmd_triggers(ts, message, first, state): 610 return 611 612 # 命令退化成消息 613 for ts in cascade_triggers: 614 if await deal_text_triggers(ts, message, state): 615 return 616 617 618async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState): 619 sep = ayaka_root_config.separate 620 621 # 找到触发命令 622 cmd_ts: List[Tuple[re.Pattern, str, AyakaTrigger]] = [] 623 for t in triggers: 624 for cmd in t.cmds: 625 r = cmd.match(first) 626 if r: 627 cmd_ts.append([r, r.group(), t]) 628 break 629 630 # 根据命令长度排序,长命令优先级更高 631 cmd_ts.sort(key=lambda x: len(x[1]), reverse=1) 632 633 for r, c, t in cmd_ts: 634 # 设置上下文 635 # 设置命令 636 _cmd.set(c) 637 _cmd_regex.set(r) 638 # 设置参数 639 left = first[len(c):].lstrip(sep) 640 if left: 641 arg = Message([MessageSegment.text(left), *message[1:]]) 642 else: 643 arg = Message(message[1:]) 644 _arg.set(arg) 645 _args.set(divide_message(arg)) 646 647 # 记录触发 648 log_trigger(c, t.app.name, state, t.func.__name__) 649 f = await t.run() 650 651 # 阻断后续 652 if f and t.block: 653 return True 654 655 656async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState): 657 # 消息 658 text_ts = [t for t in triggers if not t.cmds] 659 660 # 设置上下文 661 # 设置参数 662 _arg.set(message) 663 _args.set(divide_message(message)) 664 665 for t in text_ts: 666 # 记录触发 667 log_trigger("", t.app.name, state, t.func.__name__) 668 f = await t.run() 669 670 # 阻断后续 671 if f and t.block: 672 return True 673 674 675def get_cascade_triggers(state: AyakaState, deep: int = 0): 676 # 根据深度筛选funcs 677 ts = [ 678 t for t in state.triggers 679 if t.deep == "all" or t.deep >= deep 680 ] 681 cascade_triggers = [ts] 682 683 # 获取父状态的方法 684 if state.parent: 685 cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1)) 686 687 # 排除空项 688 cascade_triggers = [ts for ts in cascade_triggers if ts] 689 return cascade_triggers 690 691 692def log_trigger(cmd, app_name, state, func_name): 693 '''日志记录''' 694 items = [] 695 items.append(f"状态:<c>{state}</c>") 696 items.append(f"应用:<y>{app_name}</y>") 697 if cmd: 698 items.append(f"命令:<y>{cmd}</y>") 699 else: 700 items.append("命令:<g>无</g>") 701 items.append(f"回调:<c>{func_name}</c>") 702 info = " | ".join(items) 703 logger.opt(colors=True).debug(info) 704 705 706def get_first(message: Message): 707 first = "" 708 for m in message: 709 if m.type == "text": 710 first += str(m) 711 else: 712 break 713 return first 714 715 716def divide_message(message: Message) -> List[MessageSegment]: 717 args = [] 718 sep = ayaka_root_config.separate 719 720 for m in message: 721 if m.is_text(): 722 ss = str(m).split(sep) 723 args.extend(MessageSegment.text(s) for s in ss if s) 724 else: 725 args.append(m) 726 727 return args 728 729 730def regist_func(app: AyakaApp, func): 731 '''注册回调''' 732 # 默认是无状态应用,从root开始触发 733 states: List[AyakaState] = getattr(func, "states", [root_state]) 734 # 默认是消息响应 735 cmds: List[re.Pattern] = getattr(func, "cmds", []) 736 # 默认监听深度为0 737 deep: int = getattr(func, "deep", 0) 738 # 默认阻断 739 block: bool = getattr(func, "block", True) 740 741 # 注册 742 for s in states: 743 s.on_cmd(cmds, app, deep, block)(func) 744 745 return func 746 747 748driver = get_driver() 749 750 751@driver.on_startup 752async def startup(): 753 # 注册所有回调 754 for app in app_list: 755 for func in app.funcs: 756 regist_func(app, func) 757 758 if ayaka_root_config.debug: 759 s = json.dumps( 760 root_state.dict(), ensure_ascii=0, 761 indent=4, default=repr 762 ) 763 path = ayaka_data_path / "all_state.json" 764 with path.open("w+", encoding="utf8") as f: 765 f.write(s) 766 767on_message(priority=20, block=False, handlers=[deal_event])
class
AyakaGroup:
20class AyakaGroup: 21 def __repr__(self) -> str: 22 return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})" 23 24 def __init__(self, bot_id: int, group_id: int) -> None: 25 self.bot_id = bot_id 26 self.group_id = group_id 27 self.state = root_state 28 29 # 添加app,并分配独立数据空间 30 self.apps: List["AyakaApp"] = [] 31 self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {} 32 for app in app_list: 33 # if app.name not in forbid_names: 34 self.apps.append(app) 35 self.cache_dict[app.name] = {} 36 37 group_list.append(self) 38 39 if ayaka_root_config.debug: 40 print(self) 41 42 async def enter(self, state: Union[str, List[str], AyakaState]): 43 if isinstance(state, list): 44 next_state = self.state.join(*state) 45 elif isinstance(state, str): 46 next_state = self.state.join(state) 47 else: 48 next_state = self.state.join(*state.keys) 49 return await self.goto(next_state) 50 51 async def back(self): 52 if self.state.parent: 53 await self.state.exit() 54 self.state = self.state.parent 55 return self.state 56 57 async def goto(self, state: AyakaState): 58 if _enter_exit_during.get() > 0: 59 logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误") 60 61 keys = state.keys 62 63 # 找到第一个不同的结点 64 n0 = len(keys) 65 n1 = len(self.state.keys) 66 n = min(n0, n1) 67 for i in range(n): 68 if keys[i] != self.state.keys[i]: 69 break 70 else: 71 i += 1 72 73 # 回退 74 for j in range(i, n1): 75 await self.back() 76 keys = keys[i:] 77 78 # 重新出发 79 for key in keys: 80 self.state = self.state[key] 81 await self.state.enter() 82 logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>") 83 return self.state 84 85 def get_app(self, name: str): 86 '''根据app名获取该group所启用的app,不存在则返回None''' 87 for app in self.apps: 88 if app.name == name: 89 return app
AyakaGroup(bot_id: int, group_id: int)
24 def __init__(self, bot_id: int, group_id: int) -> None: 25 self.bot_id = bot_id 26 self.group_id = group_id 27 self.state = root_state 28 29 # 添加app,并分配独立数据空间 30 self.apps: List["AyakaApp"] = [] 31 self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {} 32 for app in app_list: 33 # if app.name not in forbid_names: 34 self.apps.append(app) 35 self.cache_dict[app.name] = {} 36 37 group_list.append(self) 38 39 if ayaka_root_config.debug: 40 print(self)
42 async def enter(self, state: Union[str, List[str], AyakaState]): 43 if isinstance(state, list): 44 next_state = self.state.join(*state) 45 elif isinstance(state, str): 46 next_state = self.state.join(state) 47 else: 48 next_state = self.state.join(*state.keys) 49 return await self.goto(next_state)
57 async def goto(self, state: AyakaState): 58 if _enter_exit_during.get() > 0: 59 logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误") 60 61 keys = state.keys 62 63 # 找到第一个不同的结点 64 n0 = len(keys) 65 n1 = len(self.state.keys) 66 n = min(n0, n1) 67 for i in range(n): 68 if keys[i] != self.state.keys[i]: 69 break 70 else: 71 i += 1 72 73 # 回退 74 for j in range(i, n1): 75 await self.back() 76 keys = keys[i:] 77 78 # 重新出发 79 for key in keys: 80 self.state = self.state[key] 81 await self.state.enter() 82 logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>") 83 return self.state
class
AyakaApp:
92class AyakaApp: 93 def __repr__(self) -> str: 94 return f"{self.__class__.__name__}({self.name})" 95 96 def __init__(self, name: str) -> None: 97 self.path = Path(inspect.stack()[1].filename) 98 logger.opt(colors=True).debug(f"加载应用 \"<c>{name}</c>\"") 99 100 for app in app_list: 101 if app.name == name: 102 raise Exception( 103 f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 104 105 self.name = name 106 self.ayaka_root_config = ayaka_root_config 107 self.funcs = [] 108 self.on = AyakaOn(self) 109 self.timers: List[AyakaTimer] = [] 110 111 self.root_state = root_state 112 self.plugin_state = self.get_state() 113 114 self._intro = "没有介绍" 115 self.state_helps: Dict[str, List[str]] = {} 116 self.idle_helps: List[str] = [] 117 118 app_list.append(self) 119 if ayaka_root_config.debug: 120 print(self) 121 122 @property 123 def intro(self): 124 help = self._intro 125 for h in self.idle_helps: 126 help += "\n" + h 127 return help 128 129 @property 130 def all_help(self): 131 help = self.intro 132 for s, hs in self.state_helps.items(): 133 help += f"\n[{s}]" 134 for h in hs: 135 help += "\n" + h 136 return help 137 138 @property 139 def help(self): 140 '''获取当前状态下的帮助,没有找到则返回介绍''' 141 total_triggers = get_cascade_triggers(self.state) 142 143 helps = [] 144 cmds = [] 145 for ts in total_triggers: 146 flag = 1 147 for t in ts: 148 if t.app == self: 149 for c in t.raw_cmds: 150 if c in cmds: 151 break 152 else: 153 if flag: 154 helps.append(f"[{t.state[1:]}]") 155 flag = 0 156 cmds.extend(t.raw_cmds) 157 helps.append(t.help) 158 159 if not helps: 160 return self.intro 161 return "\n".join(helps) 162 163 @help.setter 164 def help(self, help: str): 165 self._intro = help 166 167 @property 168 def user_name(self): 169 '''*timer触发时不可用* 170 171 当前消息的发送人的群名片或昵称 172 ''' 173 s = self.event.sender 174 name = s.card or s.nickname 175 return name 176 177 @property 178 def user_id(self): 179 '''*timer触发时不可用* 180 181 当前消息的发送人的uid 182 ''' 183 return self.event.user_id 184 185 @property 186 def bot(self): 187 '''*timer触发时不可用* 188 189 当前bot 190 ''' 191 return _bot.get() 192 193 @property 194 def event(self): 195 '''*timer触发时不可用* 196 197 当前消息 198 ''' 199 return _event.get() 200 201 @property 202 def cache(self): 203 '''*timer触发时不可用* 204 205 当前群组的缓存空间''' 206 return self.group.cache_dict[self.name] 207 208 @property 209 def group_id(self): 210 '''*timer触发时不可用* 211 212 当前群组的id 213 214 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id 215 ''' 216 return self.group.group_id 217 218 @property 219 def bot_id(self): 220 '''*timer触发时不可用* 221 222 当前bot的id 223 ''' 224 return self.group.bot_id 225 226 @property 227 def group(self): 228 '''*timer触发时不可用* 229 230 当前群组 231 232 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A 233 ''' 234 return _group.get() 235 236 @property 237 def arg(self): 238 '''*timer触发时不可用* 239 240 当前消息在移除了命令后的剩余部分 241 ''' 242 return _arg.get() 243 244 @property 245 def args(self): 246 '''*timer触发时不可用* 247 248 当前消息在移除了命令后,剩余部分按照空格分割后的数组 249 250 注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素 251 ''' 252 return _args.get() 253 254 @property 255 def cmd(self): 256 '''*timer触发时不可用* 257 258 当前消息的命令头 259 ''' 260 return _cmd.get() 261 262 @property 263 def cmd_regex(self): 264 '''*timer触发时不可用* 265 266 当前消息的命令头 267 ''' 268 return _cmd_regex.get() 269 270 @property 271 def message(self): 272 '''*timer触发时不可用* 273 274 当前消息 275 ''' 276 return _message.get() 277 278 @property 279 def state(self): 280 return self.group.state 281 282 def get_state(self, key: Union[str, List[str]] = [], *_keys: str): 283 # _keys为兼容旧API(0.5.3及以前 284 ''' 285 假设当前app.name为test 286 287 >>> get_state(key1) -> [root.test].key1 288 289 >>> get_state([key1, key2]) -> [root.test].key1.key2 290 291 特别的,参数可以为空,例如: 292 293 >>> get_state() -> [root.test] 294 ''' 295 if isinstance(key, list): 296 _keys = [self.name, *key] 297 else: 298 _keys = [self.name, key, *_keys] 299 300 keys = [] 301 for k in _keys: 302 keys.extend(k.split(ayaka_root_config.state_separate)) 303 304 return root_state.join(*keys) 305 306 async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str): 307 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 308 return await self.goto(state, *keys) 309 310 async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str): 311 # keys为兼容旧API(0.5.2及以前 312 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 313 if isinstance(state, str): 314 state = self.get_state(state, *keys) 315 elif isinstance(state, list): 316 state = self.get_state(*state) 317 return await self.group.goto(state) 318 319 async def enter(self, state: Union[str, List[str], AyakaState]): 320 '''进入子状态''' 321 return await self.group.enter(state) 322 323 async def back(self): 324 '''回退当前群组的状态''' 325 return await self.group.back() 326 327 def _add_func(self, func): 328 '''如果不存在就加入self.funcs''' 329 if func not in self.funcs: 330 self.funcs.append(func) 331 332 def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"): 333 '''注册深度监听''' 334 def decorator(func): 335 func.deep = deep 336 self._add_func(func) 337 return func 338 return decorator 339 340 def on_no_block(self, block: bool = False): 341 '''注册非阻断''' 342 def decorator(func): 343 func.block = block 344 self._add_func(func) 345 return func 346 return decorator 347 348 def on_cmd(self, *cmds: Union[str, re.Pattern]): 349 '''注册命令触发,不填写命令则视为文本消息''' 350 def decorator(func): 351 func.cmds = getattr(func, "cmds", []) 352 func.cmds.extend(ensure_regex_list(cmds)) 353 self._add_func(func) 354 return func 355 return decorator 356 357 def on_cmd_regex(self, *cmds: Union[str, re.Pattern]): 358 '''注册命令触发,不填写命令则视为文本消息''' 359 def decorator(func): 360 func.cmds = getattr(func, "cmds", []) 361 func.cmds.extend(ensure_regex_list(cmds, False)) 362 self._add_func(func) 363 return func 364 return decorator 365 366 def on_text(self): 367 '''注册消息触发''' 368 def decorator(func): 369 func = self.on_no_block()(func) 370 self._add_func(func) 371 return func 372 return decorator 373 374 def on_state(self, *states: Union[AyakaState, str, List[str]]): 375 '''注册有状态响应,不填写states则为plugin_state''' 376 _states = [] 377 378 if not states: 379 _states = [self.plugin_state] 380 381 else: 382 for s in states: 383 if isinstance(s, str): 384 s = self.get_state(s) 385 elif isinstance(s, list): 386 s = self.get_state(*s) 387 _states.append(s) 388 389 def decorator(func): 390 func.states = _states 391 self._add_func(func) 392 return func 393 return decorator 394 395 def on_idle(self): 396 '''注册根结点回调''' 397 return self.on_state(self.root_state) 398 399 def on_everyday(self, h: int, m: int, s: int): 400 '''每日定时触发''' 401 return self.on_interval(86400, h, m, s) 402 403 def on_interval(self, gap: int, h=-1, m=-1, s=-1, show=True): 404 '''在指定的时间点后循环触发''' 405 def decorator(func): 406 t = AyakaTimer(self, gap, h, m, s, func, show) 407 self.timers.append(t) 408 return func 409 return decorator 410 411 def on_start_cmds(self, *cmds: Union[str, re.Pattern]): 412 def decorator(func): 413 func = self.on_idle()(func) 414 func = self.on_cmd(*cmds)(func) 415 return func 416 return decorator 417 418 def on_close_cmds(self, *cmds: Union[str, re.Pattern]): 419 def decorator(func): 420 func = self.on_state()(func) 421 func = self.on_deep_all()(func) 422 func = self.on_cmd(*cmds)(func) 423 return func 424 return decorator 425 426 def set_start_cmds(self, *cmds: Union[str, re.Pattern]): 427 '''设置应用启动命令,当然,你也可以通过app.on_start_cmds自定义启动方式''' 428 @self.on_start_cmds(*cmds) 429 async def start(): 430 '''打开应用''' 431 await self.start() 432 433 def set_close_cmds(self, *cmds: Union[str, re.Pattern]): 434 '''设置应用关闭命令,当然,你也可以通过app.on_close_cmds自定义关闭方式''' 435 @self.on_close_cmds(*cmds) 436 async def close(): 437 '''关闭应用''' 438 await self.close() 439 440 async def start(self, state: str = ""): 441 '''*timer触发时不可用* 442 443 启动应用,并发送提示 444 445 state参数为兼容旧API''' 446 if not state: 447 state = self.get_state() 448 await self.goto(state) 449 await self.send(f"已打开应用 [{self.name}]") 450 451 async def close(self): 452 '''*timer触发时不可用* 453 454 关闭应用,并发送提示''' 455 await self.goto(root_state) 456 await self.send(f"已关闭应用 [{self.name}]") 457 458 def add_listener(self, user_id: int): 459 '''为该群组添加对指定私聊的监听''' 460 private_listener_dict[user_id].append(self.group_id) 461 462 def remove_listener(self, user_id: int = 0): 463 '''默认移除该群组对其他私聊的所有监听''' 464 id = self.group_id 465 466 if user_id == 0: 467 for ids in private_listener_dict.values(): 468 if id in ids: 469 ids.remove(id) 470 return 471 472 if id in private_listener_dict[user_id]: 473 private_listener_dict[user_id].remove(self.group_id) 474 475 async def send(self, message): 476 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 477 # 这里不使用event,因为一些event可能来自其他设备的监听传递 478 await self.bot.send_group_msg(group_id=self.group_id, message=message) 479 480 def pack_messages(self, bot_id, messages): 481 '''转换为cqhttp node格式''' 482 data: List[MessageSegment] = [] 483 for m in messages: 484 if isinstance(m, MessageSegment) and m.type == "node": 485 data.append(m) 486 else: 487 m = MessageSegment.node_custom( 488 user_id=bot_id, 489 nickname="Ayaka Bot", 490 content=str(m) 491 ) 492 data.append(m) 493 return data 494 495 async def send_many(self, messages): 496 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 497 # 分割长消息组(不可超过100条 498 div_len = 100 499 div_cnt = ceil(len(messages) / div_len) 500 for i in range(div_cnt): 501 msgs = self.pack_messages( 502 self.bot_id, 503 messages[i*div_len: (i+1)*div_len] 504 ) 505 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs) 506 507 def t_check(self, bot_id: int): 508 # 未连接 509 bot = get_bot(bot_id) 510 if not bot: 511 logger.warning(f"BOT({bot_id}) 未连接") 512 return 513 514 return bot 515 516 async def t_send(self, bot_id: int, group_id: int, message): 517 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 518 bot = self.t_check(bot_id) 519 if not bot: 520 return 521 522 await bot.send_group_msg(group_id=group_id, message=message) 523 524 async def t_send_many(self, bot_id: int, group_id: int, messages): 525 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 526 bot = self.t_check(bot_id) 527 if not bot: 528 return 529 530 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 531 div_len = 80 532 div_cnt = ceil(len(messages) / div_len) 533 for i in range(div_cnt): 534 msgs = self.pack_messages( 535 bot_id, 536 messages[i*div_len: (i+1)*div_len] 537 ) 538 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
AyakaApp(name: str)
96 def __init__(self, name: str) -> None: 97 self.path = Path(inspect.stack()[1].filename) 98 logger.opt(colors=True).debug(f"加载应用 \"<c>{name}</c>\"") 99 100 for app in app_list: 101 if app.name == name: 102 raise Exception( 103 f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 104 105 self.name = name 106 self.ayaka_root_config = ayaka_root_config 107 self.funcs = [] 108 self.on = AyakaOn(self) 109 self.timers: List[AyakaTimer] = [] 110 111 self.root_state = root_state 112 self.plugin_state = self.get_state() 113 114 self._intro = "没有介绍" 115 self.state_helps: Dict[str, List[str]] = {} 116 self.idle_helps: List[str] = [] 117 118 app_list.append(self) 119 if ayaka_root_config.debug: 120 print(self)
def
get_state(self, key: Union[str, List[str]] = [], *_keys: str):
282 def get_state(self, key: Union[str, List[str]] = [], *_keys: str): 283 # _keys为兼容旧API(0.5.3及以前 284 ''' 285 假设当前app.name为test 286 287 >>> get_state(key1) -> [root.test].key1 288 289 >>> get_state([key1, key2]) -> [root.test].key1.key2 290 291 特别的,参数可以为空,例如: 292 293 >>> get_state() -> [root.test] 294 ''' 295 if isinstance(key, list): 296 _keys = [self.name, *key] 297 else: 298 _keys = [self.name, key, *_keys] 299 300 keys = [] 301 for k in _keys: 302 keys.extend(k.split(ayaka_root_config.state_separate)) 303 304 return root_state.join(*keys)
假设当前app.name为test
>>> get_state(key1) -> [root.test].key1
>>> get_state([key1, key2]) -> [root.test].key1.key2
特别的,参数可以为空,例如:
>>> get_state() -> [root.test]
306 async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str): 307 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 308 return await self.goto(state, *keys)
变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割
310 async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str): 311 # keys为兼容旧API(0.5.2及以前 312 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 313 if isinstance(state, str): 314 state = self.get_state(state, *keys) 315 elif isinstance(state, list): 316 state = self.get_state(*state) 317 return await self.group.goto(state)
变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割
319 async def enter(self, state: Union[str, List[str], AyakaState]): 320 '''进入子状态''' 321 return await self.group.enter(state)
进入子状态
def
on_deep_all(self, deep: Union[int, Literal['all']] = 'all'):
332 def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"): 333 '''注册深度监听''' 334 def decorator(func): 335 func.deep = deep 336 self._add_func(func) 337 return func 338 return decorator
注册深度监听
def
on_no_block(self, block: bool = False):
340 def on_no_block(self, block: bool = False): 341 '''注册非阻断''' 342 def decorator(func): 343 func.block = block 344 self._add_func(func) 345 return func 346 return decorator
注册非阻断
def
on_cmd(self, *cmds: Union[str, re.Pattern]):
348 def on_cmd(self, *cmds: Union[str, re.Pattern]): 349 '''注册命令触发,不填写命令则视为文本消息''' 350 def decorator(func): 351 func.cmds = getattr(func, "cmds", []) 352 func.cmds.extend(ensure_regex_list(cmds)) 353 self._add_func(func) 354 return func 355 return decorator
注册命令触发,不填写命令则视为文本消息
def
on_cmd_regex(self, *cmds: Union[str, re.Pattern]):
357 def on_cmd_regex(self, *cmds: Union[str, re.Pattern]): 358 '''注册命令触发,不填写命令则视为文本消息''' 359 def decorator(func): 360 func.cmds = getattr(func, "cmds", []) 361 func.cmds.extend(ensure_regex_list(cmds, False)) 362 self._add_func(func) 363 return func 364 return decorator
注册命令触发,不填写命令则视为文本消息
def
on_text(self):
366 def on_text(self): 367 '''注册消息触发''' 368 def decorator(func): 369 func = self.on_no_block()(func) 370 self._add_func(func) 371 return func 372 return decorator
注册消息触发
374 def on_state(self, *states: Union[AyakaState, str, List[str]]): 375 '''注册有状态响应,不填写states则为plugin_state''' 376 _states = [] 377 378 if not states: 379 _states = [self.plugin_state] 380 381 else: 382 for s in states: 383 if isinstance(s, str): 384 s = self.get_state(s) 385 elif isinstance(s, list): 386 s = self.get_state(*s) 387 _states.append(s) 388 389 def decorator(func): 390 func.states = _states 391 self._add_func(func) 392 return func 393 return decorator
注册有状态响应,不填写states则为plugin_state
def
on_everyday(self, h: int, m: int, s: int):
399 def on_everyday(self, h: int, m: int, s: int): 400 '''每日定时触发''' 401 return self.on_interval(86400, h, m, s)
每日定时触发
def
on_interval(self, gap: int, h=-1, m=-1, s=-1, show=True):
403 def on_interval(self, gap: int, h=-1, m=-1, s=-1, show=True): 404 '''在指定的时间点后循环触发''' 405 def decorator(func): 406 t = AyakaTimer(self, gap, h, m, s, func, show) 407 self.timers.append(t) 408 return func 409 return decorator
在指定的时间点后循环触发
def
set_start_cmds(self, *cmds: Union[str, re.Pattern]):
426 def set_start_cmds(self, *cmds: Union[str, re.Pattern]): 427 '''设置应用启动命令,当然,你也可以通过app.on_start_cmds自定义启动方式''' 428 @self.on_start_cmds(*cmds) 429 async def start(): 430 '''打开应用''' 431 await self.start()
设置应用启动命令,当然,你也可以通过app.on_start_cmds自定义启动方式
def
set_close_cmds(self, *cmds: Union[str, re.Pattern]):
433 def set_close_cmds(self, *cmds: Union[str, re.Pattern]): 434 '''设置应用关闭命令,当然,你也可以通过app.on_close_cmds自定义关闭方式''' 435 @self.on_close_cmds(*cmds) 436 async def close(): 437 '''关闭应用''' 438 await self.close()
设置应用关闭命令,当然,你也可以通过app.on_close_cmds自定义关闭方式
async def
start(self, state: str = ''):
440 async def start(self, state: str = ""): 441 '''*timer触发时不可用* 442 443 启动应用,并发送提示 444 445 state参数为兼容旧API''' 446 if not state: 447 state = self.get_state() 448 await self.goto(state) 449 await self.send(f"已打开应用 [{self.name}]")
timer触发时不可用
启动应用,并发送提示
state参数为兼容旧API
async def
close(self):
451 async def close(self): 452 '''*timer触发时不可用* 453 454 关闭应用,并发送提示''' 455 await self.goto(root_state) 456 await self.send(f"已关闭应用 [{self.name}]")
timer触发时不可用
关闭应用,并发送提示
def
add_listener(self, user_id: int):
458 def add_listener(self, user_id: int): 459 '''为该群组添加对指定私聊的监听''' 460 private_listener_dict[user_id].append(self.group_id)
为该群组添加对指定私聊的监听
def
remove_listener(self, user_id: int = 0):
462 def remove_listener(self, user_id: int = 0): 463 '''默认移除该群组对其他私聊的所有监听''' 464 id = self.group_id 465 466 if user_id == 0: 467 for ids in private_listener_dict.values(): 468 if id in ids: 469 ids.remove(id) 470 return 471 472 if id in private_listener_dict[user_id]: 473 private_listener_dict[user_id].remove(self.group_id)
默认移除该群组对其他私聊的所有监听
async def
send(self, message):
475 async def send(self, message): 476 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 477 # 这里不使用event,因为一些event可能来自其他设备的监听传递 478 await self.bot.send_group_msg(group_id=self.group_id, message=message)
发送消息,消息的类型可以是 Message | MessageSegment | str
def
pack_messages(self, bot_id, messages):
480 def pack_messages(self, bot_id, messages): 481 '''转换为cqhttp node格式''' 482 data: List[MessageSegment] = [] 483 for m in messages: 484 if isinstance(m, MessageSegment) and m.type == "node": 485 data.append(m) 486 else: 487 m = MessageSegment.node_custom( 488 user_id=bot_id, 489 nickname="Ayaka Bot", 490 content=str(m) 491 ) 492 data.append(m) 493 return data
转换为cqhttp node格式
async def
send_many(self, messages):
495 async def send_many(self, messages): 496 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 497 # 分割长消息组(不可超过100条 498 div_len = 100 499 div_cnt = ceil(len(messages) / div_len) 500 for i in range(div_cnt): 501 msgs = self.pack_messages( 502 self.bot_id, 503 messages[i*div_len: (i+1)*div_len] 504 ) 505 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]
async def
t_send(self, bot_id: int, group_id: int, message):
516 async def t_send(self, bot_id: int, group_id: int, message): 517 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 518 bot = self.t_check(bot_id) 519 if not bot: 520 return 521 522 await bot.send_group_msg(group_id=group_id, message=message)
timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用
async def
t_send_many(self, bot_id: int, group_id: int, messages):
524 async def t_send_many(self, bot_id: int, group_id: int, messages): 525 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 526 bot = self.t_check(bot_id) 527 if not bot: 528 return 529 530 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 531 div_len = 80 532 div_cnt = ceil(len(messages) / div_len) 533 for i in range(div_cnt): 534 msgs = self.pack_messages( 535 bot_id, 536 messages[i*div_len: (i+1)*div_len] 537 ) 538 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用
def
get_bot(bot_id: int):
541def get_bot(bot_id: int): 542 '''获取已连接的bot''' 543 bot_id = str(bot_id) 544 for bot in bot_list: 545 if bot.self_id == bot_id: 546 return bot
获取已连接的bot
def
get_group(bot_id: int, group_id: int):
549def get_group(bot_id: int, group_id: int): 550 '''获取对应的AyakaGroup对象,自动增加''' 551 for group in group_list: 552 if group.bot_id == bot_id and group.group_id == group_id: 553 break 554 else: 555 group = AyakaGroup(bot_id, group_id) 556 return group
获取对应的AyakaGroup对象,自动增加
def
get_app(app_name: str):
async def
deal_event( bot: nonebot.adapters.onebot.v11.bot.Bot, event: nonebot.adapters.onebot.v11.event.MessageEvent):
565async def deal_event(bot: Bot, event: MessageEvent): 566 '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组''' 567 if ayaka_root_config.exclude_old_msg: 568 time_i = int(datetime.datetime.now().timestamp()) 569 if event.time < time_i - 60: 570 return 571 572 _bot.set(bot) 573 _event.set(event) 574 575 bot_id = int(bot.self_id) 576 577 if isinstance(event, GroupMessageEvent): 578 group_id = event.group_id 579 await deal_group(bot_id, group_id) 580 581 else: 582 id = event.user_id 583 group_ids = private_listener_dict.get(id, []) 584 ts = [asyncio.create_task(deal_group(bot_id, group_id)) 585 for group_id in group_ids] 586 await asyncio.gather(*ts)
处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组
async def
deal_group(bot_id: int, group_id: int):
589async def deal_group(bot_id: int, group_id: int): 590 prefix = ayaka_root_config.prefix 591 592 # 群组 593 group = get_group(bot_id, group_id) 594 _group.set(group) 595 596 # 消息 597 message = _event.get().message 598 _message.set(message) 599 600 # 从子状态开始向上查找可用的触发 601 state = group.state 602 cascade_triggers = get_cascade_triggers(state, 0) 603 604 # 命令 605 # 消息前缀文本 606 first = get_first(message) 607 if first.startswith(prefix): 608 first = first[len(prefix):] 609 for ts in cascade_triggers: 610 if await deal_cmd_triggers(ts, message, first, state): 611 return 612 613 # 命令退化成消息 614 for ts in cascade_triggers: 615 if await deal_text_triggers(ts, message, state): 616 return
async def
deal_cmd_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, first: str, state: ayaka.state.AyakaState):
619async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState): 620 sep = ayaka_root_config.separate 621 622 # 找到触发命令 623 cmd_ts: List[Tuple[re.Pattern, str, AyakaTrigger]] = [] 624 for t in triggers: 625 for cmd in t.cmds: 626 r = cmd.match(first) 627 if r: 628 cmd_ts.append([r, r.group(), t]) 629 break 630 631 # 根据命令长度排序,长命令优先级更高 632 cmd_ts.sort(key=lambda x: len(x[1]), reverse=1) 633 634 for r, c, t in cmd_ts: 635 # 设置上下文 636 # 设置命令 637 _cmd.set(c) 638 _cmd_regex.set(r) 639 # 设置参数 640 left = first[len(c):].lstrip(sep) 641 if left: 642 arg = Message([MessageSegment.text(left), *message[1:]]) 643 else: 644 arg = Message(message[1:]) 645 _arg.set(arg) 646 _args.set(divide_message(arg)) 647 648 # 记录触发 649 log_trigger(c, t.app.name, state, t.func.__name__) 650 f = await t.run() 651 652 # 阻断后续 653 if f and t.block: 654 return True
async def
deal_text_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, state: ayaka.state.AyakaState):
657async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState): 658 # 消息 659 text_ts = [t for t in triggers if not t.cmds] 660 661 # 设置上下文 662 # 设置参数 663 _arg.set(message) 664 _args.set(divide_message(message)) 665 666 for t in text_ts: 667 # 记录触发 668 log_trigger("", t.app.name, state, t.func.__name__) 669 f = await t.run() 670 671 # 阻断后续 672 if f and t.block: 673 return True
676def get_cascade_triggers(state: AyakaState, deep: int = 0): 677 # 根据深度筛选funcs 678 ts = [ 679 t for t in state.triggers 680 if t.deep == "all" or t.deep >= deep 681 ] 682 cascade_triggers = [ts] 683 684 # 获取父状态的方法 685 if state.parent: 686 cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1)) 687 688 # 排除空项 689 cascade_triggers = [ts for ts in cascade_triggers if ts] 690 return cascade_triggers
def
log_trigger(cmd, app_name, state, func_name):
693def log_trigger(cmd, app_name, state, func_name): 694 '''日志记录''' 695 items = [] 696 items.append(f"状态:<c>{state}</c>") 697 items.append(f"应用:<y>{app_name}</y>") 698 if cmd: 699 items.append(f"命令:<y>{cmd}</y>") 700 else: 701 items.append("命令:<g>无</g>") 702 items.append(f"回调:<c>{func_name}</c>") 703 info = " | ".join(items) 704 logger.opt(colors=True).debug(info)
日志记录
def
get_first(message: nonebot.adapters.onebot.v11.message.Message):
def
divide_message( message: nonebot.adapters.onebot.v11.message.Message) -> List[nonebot.adapters.onebot.v11.message.MessageSegment]:
731def regist_func(app: AyakaApp, func): 732 '''注册回调''' 733 # 默认是无状态应用,从root开始触发 734 states: List[AyakaState] = getattr(func, "states", [root_state]) 735 # 默认是消息响应 736 cmds: List[re.Pattern] = getattr(func, "cmds", []) 737 # 默认监听深度为0 738 deep: int = getattr(func, "deep", 0) 739 # 默认阻断 740 block: bool = getattr(func, "block", True) 741 742 # 注册 743 for s in states: 744 s.on_cmd(cmds, app, deep, block)(func) 745 746 return func
注册回调
@driver.on_startup
async def
startup():
752@driver.on_startup 753async def startup(): 754 # 注册所有回调 755 for app in app_list: 756 for func in app.funcs: 757 regist_func(app, func) 758 759 if ayaka_root_config.debug: 760 s = json.dumps( 761 root_state.dict(), ensure_ascii=0, 762 indent=4, default=repr 763 ) 764 path = ayaka_data_path / "all_state.json" 765 with path.open("w+", encoding="utf8") as f: 766 f.write(s)