ayaka.ayaka
ayaka核心
1'''ayaka核心''' 2import asyncio 3import datetime 4import inspect 5import json 6from math import ceil 7from pathlib import Path 8from loguru import logger 9from typing import List, Dict, Literal, Union 10from .depend import AyakaCache 11from .config import ayaka_root_config, ayaka_data_path 12from .constant import _bot, _event, _group, _arg, _args, _message, _cmd, _enter_exit_during, app_list, group_list, bot_list, private_listener_dict 13from .driver import on_message, get_driver, Message, MessageSegment, Bot, MessageEvent, GroupMessageEvent 14from .state import AyakaState, AyakaTrigger, root_state 15from .on import AyakaOn, AyakaTimer 16 17 18class AyakaGroup: 19 def __repr__(self) -> str: 20 return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})" 21 22 def __init__(self, bot_id: int, group_id: int) -> None: 23 self.bot_id = bot_id 24 self.group_id = group_id 25 self.state = root_state 26 27 # 添加app,并分配独立数据空间 28 self.apps: List["AyakaApp"] = [] 29 self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {} 30 for app in app_list: 31 # if app.name not in forbid_names: 32 self.apps.append(app) 33 self.cache_dict[app.name] = {} 34 35 group_list.append(self) 36 37 if ayaka_root_config.debug: 38 print(self) 39 40 async def back(self): 41 if self.state.parent: 42 await self.state.exit() 43 self.state = self.state.parent 44 return self.state 45 46 async def goto(self, state: AyakaState): 47 if _enter_exit_during.get() > 0: 48 logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误") 49 50 keys = state.keys 51 52 # 找到第一个不同的结点 53 n0 = len(keys) 54 n1 = len(self.state.keys) 55 n = min(n0, n1) 56 for i in range(n): 57 if keys[i] != self.state.keys[i]: 58 break 59 else: 60 i += 1 61 62 # 回退 63 for j in range(i, n1): 64 await self.back() 65 keys = keys[i:] 66 67 # 重新出发 68 for key in keys: 69 self.state = self.state[key] 70 await self.state.enter() 71 logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>") 72 return self.state 73 74 def get_app(self, name: str): 75 '''根据app名获取该group所启用的app,不存在则返回None''' 76 for app in self.apps: 77 if app.name == name: 78 return app 79 80 81class AyakaApp: 82 def __repr__(self) -> str: 83 return f"{self.__class__.__name__}({self.name})" 84 85 def __init__(self, name: str) -> None: 86 self.path = Path(inspect.stack()[1].filename) 87 88 for app in app_list: 89 if app.name == name: 90 raise Exception( 91 f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 92 93 self.name = name 94 self.ayaka_root_config = ayaka_root_config 95 self.funcs = [] 96 self.on = AyakaOn(self) 97 self.timers: List[AyakaTimer] = [] 98 self.root_state = root_state 99 self._intro = "没有介绍" 100 self.state_helps: Dict[str, List[str]] = {} 101 self.idle_helps: List[str] = [] 102 103 logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"") 104 app_list.append(self) 105 if ayaka_root_config.debug: 106 print(self) 107 108 @property 109 def intro(self): 110 help = self._intro 111 for h in self.idle_helps: 112 help += "\n" + h 113 return help 114 115 @property 116 def all_help(self): 117 help = self.intro 118 for s, hs in self.state_helps.items(): 119 help += f"\n[{s}]" 120 for h in hs: 121 help += "\n" + h 122 return help 123 124 @property 125 def help(self): 126 '''获取当前状态下的帮助,没有找到则返回介绍''' 127 total_triggers = get_cascade_triggers(self.state) 128 129 helps = [] 130 cmds = [] 131 for ts in total_triggers: 132 flag = 1 133 for t in ts: 134 if t.app == self: 135 for c in t.cmds: 136 if c in cmds: 137 break 138 else: 139 if flag: 140 helps.append(f"[{t.state[1:]}]") 141 flag = 0 142 cmds.extend(t.cmds) 143 helps.append(t.help) 144 145 if not helps: 146 return self.intro 147 return "\n".join(helps) 148 149 @help.setter 150 def help(self, help: str): 151 self._intro = help 152 153 @property 154 def user_name(self): 155 '''*timer触发时不可用* 156 157 当前消息的发送人的群名片或昵称 158 ''' 159 s = self.event.sender 160 name = s.card or s.nickname 161 return name 162 163 @property 164 def user_id(self): 165 '''*timer触发时不可用* 166 167 当前消息的发送人的uid 168 ''' 169 return self.event.user_id 170 171 @property 172 def bot(self): 173 '''*timer触发时不可用* 174 175 当前bot 176 ''' 177 return _bot.get() 178 179 @property 180 def event(self): 181 '''*timer触发时不可用* 182 183 当前消息 184 ''' 185 return _event.get() 186 187 @property 188 def cache(self): 189 '''*timer触发时不可用* 190 191 当前群组的缓存空间''' 192 return self.group.cache_dict[self.name] 193 194 @property 195 def group_id(self): 196 '''*timer触发时不可用* 197 198 当前群组的id 199 200 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id 201 ''' 202 return self.group.group_id 203 204 @property 205 def bot_id(self): 206 '''*timer触发时不可用* 207 208 当前bot的id 209 ''' 210 return self.group.bot_id 211 212 @property 213 def group(self): 214 '''*timer触发时不可用* 215 216 当前群组 217 218 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A 219 ''' 220 return _group.get() 221 222 @property 223 def arg(self): 224 '''*timer触发时不可用* 225 226 当前消息在移除了命令后的剩余部分 227 ''' 228 return _arg.get() 229 230 @property 231 def args(self): 232 '''*timer触发时不可用* 233 234 当前消息在移除了命令后,剩余部分按照空格分割后的数组 235 236 注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素 237 ''' 238 return _args.get() 239 240 @property 241 def cmd(self): 242 '''*timer触发时不可用* 243 244 当前消息的命令头 245 ''' 246 return _cmd.get() 247 248 @property 249 def message(self): 250 '''*timer触发时不可用* 251 252 当前消息 253 ''' 254 return _message.get() 255 256 @property 257 def state(self): 258 return self.group.state 259 260 def get_state(self, *keys: str): 261 ''' 262 假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项` 263 264 >>> get_state(key1, key2) -> [root.test].key1.key2 265 266 特别的,keys可以为空,例如: 267 268 >>> get_state() -> [root.test] 269 ''' 270 keys = [self.name, *keys] 271 return root_state.join(*keys) 272 273 async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str): 274 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 275 return await self.goto(state, *keys) 276 277 async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str): 278 # keys为兼容旧API(0.5.2及以前 279 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 280 if isinstance(state, str): 281 if "." in state: 282 state = self.get_state(*state.split(".")) 283 else: 284 state = self.get_state(state, *keys) 285 elif isinstance(state, list): 286 state = self.get_state(*state) 287 return await self.group.goto(state) 288 289 async def back(self): 290 '''回退当前群组的状态''' 291 return await self.group.back() 292 293 def _add_func(self, func): 294 '''如果不存在就加入self.funcs''' 295 if func not in self.funcs: 296 self.funcs.append(func) 297 298 def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"): 299 '''注册深度监听''' 300 def decorator(func): 301 func.deep = deep 302 self._add_func(func) 303 return func 304 return decorator 305 306 def on_no_block(self, block: bool = False): 307 '''注册非阻断''' 308 def decorator(func): 309 func.block = block 310 self._add_func(func) 311 return func 312 return decorator 313 314 def on_cmd(self, *cmds: str): 315 '''注册命令触发,不填写命令则视为文本消息''' 316 def decorator(func): 317 func.cmds = cmds 318 self._add_func(func) 319 return func 320 return decorator 321 322 def on_text(self): 323 '''注册消息触发''' 324 def decorator(func): 325 self._add_func(func) 326 return func 327 return decorator 328 329 def on_state(self, *states: Union[AyakaState, str, List[str]]): 330 '''注册有状态响应,不填写states则为root.插件名状态''' 331 _states = [] 332 if not states: 333 states = [[]] 334 for s in states: 335 if isinstance(s, str): 336 s = self.get_state(s) 337 elif isinstance(s, list): 338 s = self.get_state(*s) 339 _states.append(s) 340 341 def decorator(func): 342 func.states = _states 343 self._add_func(func) 344 return func 345 return decorator 346 347 def on_idle(self): 348 '''注册根结点回调''' 349 return self.on_state(self.root_state) 350 351 def set_start_cmds(self, *cmds: str): 352 '''设置应用启动命令,当然,你也可以通过app.on_cmd自定义启动方式''' 353 @self.on_idle() 354 @self.on_cmd(*cmds) 355 async def start(): 356 '''打开应用''' 357 await self.start() 358 359 def set_close_cmds(self, *cmds: str): 360 '''设置应用关闭命令,当然,你也可以通过app.on_cmd自定义关闭方式''' 361 @self.on_state() 362 @self.on_deep_all() 363 @self.on_cmd(*cmds) 364 async def close(): 365 '''关闭应用''' 366 await self.close() 367 368 async def start(self, state: str = None): 369 '''*timer触发时不可用* 370 371 启动应用,并发送提示 372 373 state参数为兼容旧API''' 374 if not state: 375 state = self.get_state() 376 await self.goto(state) 377 else: 378 states = state.split(".") 379 await self.goto(*states) 380 await self.send(f"已打开应用 [{self.name}]") 381 382 async def close(self): 383 '''*timer触发时不可用* 384 385 关闭应用,并发送提示''' 386 await self.goto(root_state) 387 await self.send(f"已关闭应用 [{self.name}]") 388 389 def add_listener(self, user_id: int): 390 '''为该群组添加对指定私聊的监听''' 391 private_listener_dict[user_id].append(self.group_id) 392 393 def remove_listener(self, user_id: int = 0): 394 '''默认移除该群组对其他私聊的所有监听''' 395 id = self.group_id 396 397 if user_id == 0: 398 for ids in private_listener_dict.values(): 399 if id in ids: 400 ids.remove(id) 401 return 402 403 if id in private_listener_dict[user_id]: 404 private_listener_dict[user_id].remove(self.group_id) 405 406 async def send(self, message): 407 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 408 # 这里不使用event,因为一些event可能来自其他设备的监听传递 409 await self.bot.send_group_msg(group_id=self.group_id, message=message) 410 411 def pack_messages(self, bot_id, messages): 412 '''转换为cqhttp node格式''' 413 data: List[MessageSegment] = [] 414 for m in messages: 415 if isinstance(m, MessageSegment) and m.type == "node": 416 data.append(m) 417 else: 418 m = MessageSegment.node_custom( 419 user_id=bot_id, 420 nickname="Ayaka Bot", 421 content=str(m) 422 ) 423 data.append(m) 424 return data 425 426 async def send_many(self, messages): 427 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 428 # 分割长消息组(不可超过100条 429 div_len = 100 430 div_cnt = ceil(len(messages) / div_len) 431 for i in range(div_cnt): 432 msgs = self.pack_messages( 433 self.bot_id, 434 messages[i*div_len: (i+1)*div_len] 435 ) 436 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs) 437 438 def t_check(self, bot_id: int): 439 # 未连接 440 bot = get_bot(bot_id) 441 if not bot: 442 logger.warning(f"BOT({bot_id}) 未连接") 443 return 444 445 return bot 446 447 async def t_send(self, bot_id: int, group_id: int, message): 448 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 449 bot = self.t_check(bot_id) 450 if not bot: 451 return 452 453 await bot.send_group_msg(group_id=group_id, message=message) 454 455 async def t_send_many(self, bot_id: int, group_id: int, messages): 456 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 457 bot = self.t_check(bot_id) 458 if not bot: 459 return 460 461 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 462 div_len = 80 463 div_cnt = ceil(len(messages) / div_len) 464 for i in range(div_cnt): 465 msgs = self.pack_messages( 466 bot_id, 467 messages[i*div_len: (i+1)*div_len] 468 ) 469 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs) 470 471 472def get_bot(bot_id: int): 473 '''获取已连接的bot''' 474 bot_id = str(bot_id) 475 for bot in bot_list: 476 if bot.self_id == bot_id: 477 return bot 478 479 480def get_group(bot_id: int, group_id: int): 481 '''获取对应的AyakaGroup对象,自动增加''' 482 for group in group_list: 483 if group.bot_id == bot_id and group.group_id == group_id: 484 break 485 else: 486 group = AyakaGroup(bot_id, group_id) 487 return group 488 489 490def get_app(app_name: str): 491 for app in app_list: 492 if app.name == app_name: 493 return app 494 495 496async def deal_event(bot: Bot, event: MessageEvent): 497 '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组''' 498 if ayaka_root_config.exclude_old_msg: 499 time_i = int(datetime.datetime.now().timestamp()) 500 if event.time < time_i - 60: 501 return 502 503 _bot.set(bot) 504 _event.set(event) 505 506 bot_id = int(bot.self_id) 507 508 if isinstance(event, GroupMessageEvent): 509 group_id = event.group_id 510 await deal_group(bot_id, group_id) 511 512 else: 513 id = event.user_id 514 group_ids = private_listener_dict.get(id, []) 515 ts = [asyncio.create_task(deal_group(bot_id, group_id)) 516 for group_id in group_ids] 517 await asyncio.gather(*ts) 518 519 520async def deal_group(bot_id: int, group_id: int): 521 prefix = ayaka_root_config.prefix 522 523 # 群组 524 group = get_group(bot_id, group_id) 525 _group.set(group) 526 527 # 消息 528 message = _event.get().message 529 _message.set(message) 530 531 # 从子状态开始向上查找可用的触发 532 state = group.state 533 cascade_triggers = get_cascade_triggers(state, 0) 534 535 # 命令 536 # 消息前缀文本 537 first = get_first(message) 538 if first.startswith(prefix): 539 first = first[len(prefix):] 540 for ts in cascade_triggers: 541 if await deal_cmd_triggers(ts, message, first, state): 542 return 543 544 # 命令退化成消息 545 for ts in cascade_triggers: 546 if await deal_text_triggers(ts, message, state): 547 return 548 549 550async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState): 551 sep = ayaka_root_config.separate 552 553 # 命令 554 temp = [t for t in triggers if t.cmds] 555 # 根据命令长度排序,长命令优先级更高 556 cmd_ts = [(c, t) for t in temp for c in t.cmds] 557 cmd_ts.sort(key=lambda x: len(x[0]), reverse=1) 558 559 for c, t in cmd_ts: 560 if first.startswith(c): 561 # 设置上下文 562 # 设置命令 563 _cmd.set(c) 564 # 设置参数 565 left = first[len(c):].lstrip(sep) 566 if left: 567 arg = Message([MessageSegment.text(left), *message[1:]]) 568 else: 569 arg = Message(message[1:]) 570 _arg.set(arg) 571 _args.set(divide_message(arg)) 572 573 # 触发 574 log_trigger(c, t.app.name, state, t.func.__name__) 575 await t.run() 576 577 # 阻断后续 578 if t.block: 579 return True 580 581 582async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState): 583 # 消息 584 text_ts = [t for t in triggers if not t.cmds] 585 586 # 设置上下文 587 # 设置命令 588 _cmd.set("") 589 # 设置参数 590 _arg.set(message) 591 _args.set(divide_message(message)) 592 593 for t in text_ts: 594 # 触发 595 log_trigger("", t.app.name, state, t.func.__name__) 596 await t.run() 597 598 # 阻断后续 599 if t.block: 600 return True 601 602 603def get_cascade_triggers(state: AyakaState, deep: int = 0): 604 # 根据深度筛选funcs 605 ts = [ 606 t for t in state.triggers 607 if t.deep == "all" or t.deep >= deep 608 ] 609 cascade_triggers = [ts] 610 611 # 获取父状态的方法 612 if state.parent: 613 cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1)) 614 615 # 排除空项 616 cascade_triggers = [ts for ts in cascade_triggers if ts] 617 return cascade_triggers 618 619 620def log_trigger(cmd, app_name, state, func_name): 621 '''日志记录''' 622 items = [] 623 items.append(f"状态:<c>{state}</c>") 624 items.append(f"应用:<y>{app_name}</y>") 625 if cmd: 626 items.append(f"命令:<y>{cmd}</y>") 627 else: 628 items.append("命令:<g>无</g>") 629 items.append(f"回调:<c>{func_name}</c>") 630 info = " | ".join(items) 631 logger.opt(colors=True).debug(info) 632 633 634def get_first(message: Message): 635 first = "" 636 for m in message: 637 if m.type == "text": 638 first += str(m) 639 else: 640 break 641 return first 642 643 644def divide_message(message: Message) -> List[MessageSegment]: 645 args = [] 646 sep = ayaka_root_config.separate 647 648 for m in message: 649 if m.is_text(): 650 ss = str(m).split(sep) 651 args.extend(MessageSegment.text(s) for s in ss if s) 652 else: 653 args.append(m) 654 655 return args 656 657 658def regist_func(app: AyakaApp, func): 659 '''注册回调''' 660 # 默认是无状态应用,从root开始触发 661 states: List[AyakaState] = getattr(func, "states", [root_state]) 662 # 默认是消息响应 663 cmds: List[str] = getattr(func, "cmds", []) 664 # 默认监听深度为0 665 deep: int = getattr(func, "deep", 0) 666 # 默认阻断 667 block: bool = getattr(func, "block", True) 668 669 # 注册 670 for s in states: 671 s.on_cmd(cmds, app, deep, block)(func) 672 673 return func 674 675 676driver = get_driver() 677 678 679@driver.on_startup 680async def startup(): 681 # 注册所有回调 682 for app in app_list: 683 for func in app.funcs: 684 regist_func(app, func) 685 686 if ayaka_root_config.debug: 687 s = json.dumps( 688 root_state.dict(), ensure_ascii=0, 689 indent=4, default=repr 690 ) 691 path = ayaka_data_path / "all_state.json" 692 with path.open("w+", encoding="utf8") as f: 693 f.write(s) 694 695on_message(priority=20, block=False, handlers=[deal_event])
class
AyakaGroup:
19class AyakaGroup: 20 def __repr__(self) -> str: 21 return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})" 22 23 def __init__(self, bot_id: int, group_id: int) -> None: 24 self.bot_id = bot_id 25 self.group_id = group_id 26 self.state = root_state 27 28 # 添加app,并分配独立数据空间 29 self.apps: List["AyakaApp"] = [] 30 self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {} 31 for app in app_list: 32 # if app.name not in forbid_names: 33 self.apps.append(app) 34 self.cache_dict[app.name] = {} 35 36 group_list.append(self) 37 38 if ayaka_root_config.debug: 39 print(self) 40 41 async def back(self): 42 if self.state.parent: 43 await self.state.exit() 44 self.state = self.state.parent 45 return self.state 46 47 async def goto(self, state: AyakaState): 48 if _enter_exit_during.get() > 0: 49 logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误") 50 51 keys = state.keys 52 53 # 找到第一个不同的结点 54 n0 = len(keys) 55 n1 = len(self.state.keys) 56 n = min(n0, n1) 57 for i in range(n): 58 if keys[i] != self.state.keys[i]: 59 break 60 else: 61 i += 1 62 63 # 回退 64 for j in range(i, n1): 65 await self.back() 66 keys = keys[i:] 67 68 # 重新出发 69 for key in keys: 70 self.state = self.state[key] 71 await self.state.enter() 72 logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>") 73 return self.state 74 75 def get_app(self, name: str): 76 '''根据app名获取该group所启用的app,不存在则返回None''' 77 for app in self.apps: 78 if app.name == name: 79 return app
AyakaGroup(bot_id: int, group_id: int)
23 def __init__(self, bot_id: int, group_id: int) -> None: 24 self.bot_id = bot_id 25 self.group_id = group_id 26 self.state = root_state 27 28 # 添加app,并分配独立数据空间 29 self.apps: List["AyakaApp"] = [] 30 self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {} 31 for app in app_list: 32 # if app.name not in forbid_names: 33 self.apps.append(app) 34 self.cache_dict[app.name] = {} 35 36 group_list.append(self) 37 38 if ayaka_root_config.debug: 39 print(self)
47 async def goto(self, state: AyakaState): 48 if _enter_exit_during.get() > 0: 49 logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误") 50 51 keys = state.keys 52 53 # 找到第一个不同的结点 54 n0 = len(keys) 55 n1 = len(self.state.keys) 56 n = min(n0, n1) 57 for i in range(n): 58 if keys[i] != self.state.keys[i]: 59 break 60 else: 61 i += 1 62 63 # 回退 64 for j in range(i, n1): 65 await self.back() 66 keys = keys[i:] 67 68 # 重新出发 69 for key in keys: 70 self.state = self.state[key] 71 await self.state.enter() 72 logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>") 73 return self.state
class
AyakaApp:
82class AyakaApp: 83 def __repr__(self) -> str: 84 return f"{self.__class__.__name__}({self.name})" 85 86 def __init__(self, name: str) -> None: 87 self.path = Path(inspect.stack()[1].filename) 88 89 for app in app_list: 90 if app.name == name: 91 raise Exception( 92 f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 93 94 self.name = name 95 self.ayaka_root_config = ayaka_root_config 96 self.funcs = [] 97 self.on = AyakaOn(self) 98 self.timers: List[AyakaTimer] = [] 99 self.root_state = root_state 100 self._intro = "没有介绍" 101 self.state_helps: Dict[str, List[str]] = {} 102 self.idle_helps: List[str] = [] 103 104 logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"") 105 app_list.append(self) 106 if ayaka_root_config.debug: 107 print(self) 108 109 @property 110 def intro(self): 111 help = self._intro 112 for h in self.idle_helps: 113 help += "\n" + h 114 return help 115 116 @property 117 def all_help(self): 118 help = self.intro 119 for s, hs in self.state_helps.items(): 120 help += f"\n[{s}]" 121 for h in hs: 122 help += "\n" + h 123 return help 124 125 @property 126 def help(self): 127 '''获取当前状态下的帮助,没有找到则返回介绍''' 128 total_triggers = get_cascade_triggers(self.state) 129 130 helps = [] 131 cmds = [] 132 for ts in total_triggers: 133 flag = 1 134 for t in ts: 135 if t.app == self: 136 for c in t.cmds: 137 if c in cmds: 138 break 139 else: 140 if flag: 141 helps.append(f"[{t.state[1:]}]") 142 flag = 0 143 cmds.extend(t.cmds) 144 helps.append(t.help) 145 146 if not helps: 147 return self.intro 148 return "\n".join(helps) 149 150 @help.setter 151 def help(self, help: str): 152 self._intro = help 153 154 @property 155 def user_name(self): 156 '''*timer触发时不可用* 157 158 当前消息的发送人的群名片或昵称 159 ''' 160 s = self.event.sender 161 name = s.card or s.nickname 162 return name 163 164 @property 165 def user_id(self): 166 '''*timer触发时不可用* 167 168 当前消息的发送人的uid 169 ''' 170 return self.event.user_id 171 172 @property 173 def bot(self): 174 '''*timer触发时不可用* 175 176 当前bot 177 ''' 178 return _bot.get() 179 180 @property 181 def event(self): 182 '''*timer触发时不可用* 183 184 当前消息 185 ''' 186 return _event.get() 187 188 @property 189 def cache(self): 190 '''*timer触发时不可用* 191 192 当前群组的缓存空间''' 193 return self.group.cache_dict[self.name] 194 195 @property 196 def group_id(self): 197 '''*timer触发时不可用* 198 199 当前群组的id 200 201 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id 202 ''' 203 return self.group.group_id 204 205 @property 206 def bot_id(self): 207 '''*timer触发时不可用* 208 209 当前bot的id 210 ''' 211 return self.group.bot_id 212 213 @property 214 def group(self): 215 '''*timer触发时不可用* 216 217 当前群组 218 219 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A 220 ''' 221 return _group.get() 222 223 @property 224 def arg(self): 225 '''*timer触发时不可用* 226 227 当前消息在移除了命令后的剩余部分 228 ''' 229 return _arg.get() 230 231 @property 232 def args(self): 233 '''*timer触发时不可用* 234 235 当前消息在移除了命令后,剩余部分按照空格分割后的数组 236 237 注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素 238 ''' 239 return _args.get() 240 241 @property 242 def cmd(self): 243 '''*timer触发时不可用* 244 245 当前消息的命令头 246 ''' 247 return _cmd.get() 248 249 @property 250 def message(self): 251 '''*timer触发时不可用* 252 253 当前消息 254 ''' 255 return _message.get() 256 257 @property 258 def state(self): 259 return self.group.state 260 261 def get_state(self, *keys: str): 262 ''' 263 假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项` 264 265 >>> get_state(key1, key2) -> [root.test].key1.key2 266 267 特别的,keys可以为空,例如: 268 269 >>> get_state() -> [root.test] 270 ''' 271 keys = [self.name, *keys] 272 return root_state.join(*keys) 273 274 async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str): 275 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 276 return await self.goto(state, *keys) 277 278 async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str): 279 # keys为兼容旧API(0.5.2及以前 280 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 281 if isinstance(state, str): 282 if "." in state: 283 state = self.get_state(*state.split(".")) 284 else: 285 state = self.get_state(state, *keys) 286 elif isinstance(state, list): 287 state = self.get_state(*state) 288 return await self.group.goto(state) 289 290 async def back(self): 291 '''回退当前群组的状态''' 292 return await self.group.back() 293 294 def _add_func(self, func): 295 '''如果不存在就加入self.funcs''' 296 if func not in self.funcs: 297 self.funcs.append(func) 298 299 def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"): 300 '''注册深度监听''' 301 def decorator(func): 302 func.deep = deep 303 self._add_func(func) 304 return func 305 return decorator 306 307 def on_no_block(self, block: bool = False): 308 '''注册非阻断''' 309 def decorator(func): 310 func.block = block 311 self._add_func(func) 312 return func 313 return decorator 314 315 def on_cmd(self, *cmds: str): 316 '''注册命令触发,不填写命令则视为文本消息''' 317 def decorator(func): 318 func.cmds = cmds 319 self._add_func(func) 320 return func 321 return decorator 322 323 def on_text(self): 324 '''注册消息触发''' 325 def decorator(func): 326 self._add_func(func) 327 return func 328 return decorator 329 330 def on_state(self, *states: Union[AyakaState, str, List[str]]): 331 '''注册有状态响应,不填写states则为root.插件名状态''' 332 _states = [] 333 if not states: 334 states = [[]] 335 for s in states: 336 if isinstance(s, str): 337 s = self.get_state(s) 338 elif isinstance(s, list): 339 s = self.get_state(*s) 340 _states.append(s) 341 342 def decorator(func): 343 func.states = _states 344 self._add_func(func) 345 return func 346 return decorator 347 348 def on_idle(self): 349 '''注册根结点回调''' 350 return self.on_state(self.root_state) 351 352 def set_start_cmds(self, *cmds: str): 353 '''设置应用启动命令,当然,你也可以通过app.on_cmd自定义启动方式''' 354 @self.on_idle() 355 @self.on_cmd(*cmds) 356 async def start(): 357 '''打开应用''' 358 await self.start() 359 360 def set_close_cmds(self, *cmds: str): 361 '''设置应用关闭命令,当然,你也可以通过app.on_cmd自定义关闭方式''' 362 @self.on_state() 363 @self.on_deep_all() 364 @self.on_cmd(*cmds) 365 async def close(): 366 '''关闭应用''' 367 await self.close() 368 369 async def start(self, state: str = None): 370 '''*timer触发时不可用* 371 372 启动应用,并发送提示 373 374 state参数为兼容旧API''' 375 if not state: 376 state = self.get_state() 377 await self.goto(state) 378 else: 379 states = state.split(".") 380 await self.goto(*states) 381 await self.send(f"已打开应用 [{self.name}]") 382 383 async def close(self): 384 '''*timer触发时不可用* 385 386 关闭应用,并发送提示''' 387 await self.goto(root_state) 388 await self.send(f"已关闭应用 [{self.name}]") 389 390 def add_listener(self, user_id: int): 391 '''为该群组添加对指定私聊的监听''' 392 private_listener_dict[user_id].append(self.group_id) 393 394 def remove_listener(self, user_id: int = 0): 395 '''默认移除该群组对其他私聊的所有监听''' 396 id = self.group_id 397 398 if user_id == 0: 399 for ids in private_listener_dict.values(): 400 if id in ids: 401 ids.remove(id) 402 return 403 404 if id in private_listener_dict[user_id]: 405 private_listener_dict[user_id].remove(self.group_id) 406 407 async def send(self, message): 408 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 409 # 这里不使用event,因为一些event可能来自其他设备的监听传递 410 await self.bot.send_group_msg(group_id=self.group_id, message=message) 411 412 def pack_messages(self, bot_id, messages): 413 '''转换为cqhttp node格式''' 414 data: List[MessageSegment] = [] 415 for m in messages: 416 if isinstance(m, MessageSegment) and m.type == "node": 417 data.append(m) 418 else: 419 m = MessageSegment.node_custom( 420 user_id=bot_id, 421 nickname="Ayaka Bot", 422 content=str(m) 423 ) 424 data.append(m) 425 return data 426 427 async def send_many(self, messages): 428 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 429 # 分割长消息组(不可超过100条 430 div_len = 100 431 div_cnt = ceil(len(messages) / div_len) 432 for i in range(div_cnt): 433 msgs = self.pack_messages( 434 self.bot_id, 435 messages[i*div_len: (i+1)*div_len] 436 ) 437 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs) 438 439 def t_check(self, bot_id: int): 440 # 未连接 441 bot = get_bot(bot_id) 442 if not bot: 443 logger.warning(f"BOT({bot_id}) 未连接") 444 return 445 446 return bot 447 448 async def t_send(self, bot_id: int, group_id: int, message): 449 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 450 bot = self.t_check(bot_id) 451 if not bot: 452 return 453 454 await bot.send_group_msg(group_id=group_id, message=message) 455 456 async def t_send_many(self, bot_id: int, group_id: int, messages): 457 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 458 bot = self.t_check(bot_id) 459 if not bot: 460 return 461 462 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 463 div_len = 80 464 div_cnt = ceil(len(messages) / div_len) 465 for i in range(div_cnt): 466 msgs = self.pack_messages( 467 bot_id, 468 messages[i*div_len: (i+1)*div_len] 469 ) 470 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
AyakaApp(name: str)
86 def __init__(self, name: str) -> None: 87 self.path = Path(inspect.stack()[1].filename) 88 89 for app in app_list: 90 if app.name == name: 91 raise Exception( 92 f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 93 94 self.name = name 95 self.ayaka_root_config = ayaka_root_config 96 self.funcs = [] 97 self.on = AyakaOn(self) 98 self.timers: List[AyakaTimer] = [] 99 self.root_state = root_state 100 self._intro = "没有介绍" 101 self.state_helps: Dict[str, List[str]] = {} 102 self.idle_helps: List[str] = [] 103 104 logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"") 105 app_list.append(self) 106 if ayaka_root_config.debug: 107 print(self)
def
get_state(self, *keys: str):
261 def get_state(self, *keys: str): 262 ''' 263 假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项` 264 265 >>> get_state(key1, key2) -> [root.test].key1.key2 266 267 特别的,keys可以为空,例如: 268 269 >>> get_state() -> [root.test] 270 ''' 271 keys = [self.name, *keys] 272 return root_state.join(*keys)
假设当前状态为 root.test.a
即 根.插件名.一级菜单项
>>> get_state(key1, key2) -> [root.test].key1.key2
特别的,keys可以为空,例如:
>>> get_state() -> [root.test]
274 async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str): 275 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 276 return await self.goto(state, *keys)
变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割
278 async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str): 279 # keys为兼容旧API(0.5.2及以前 280 '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割''' 281 if isinstance(state, str): 282 if "." in state: 283 state = self.get_state(*state.split(".")) 284 else: 285 state = self.get_state(state, *keys) 286 elif isinstance(state, list): 287 state = self.get_state(*state) 288 return await self.group.goto(state)
变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割
def
on_deep_all(self, deep: Union[int, Literal['all']] = 'all'):
299 def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"): 300 '''注册深度监听''' 301 def decorator(func): 302 func.deep = deep 303 self._add_func(func) 304 return func 305 return decorator
注册深度监听
def
on_no_block(self, block: bool = False):
307 def on_no_block(self, block: bool = False): 308 '''注册非阻断''' 309 def decorator(func): 310 func.block = block 311 self._add_func(func) 312 return func 313 return decorator
注册非阻断
def
on_cmd(self, *cmds: str):
315 def on_cmd(self, *cmds: str): 316 '''注册命令触发,不填写命令则视为文本消息''' 317 def decorator(func): 318 func.cmds = cmds 319 self._add_func(func) 320 return func 321 return decorator
注册命令触发,不填写命令则视为文本消息
def
on_text(self):
323 def on_text(self): 324 '''注册消息触发''' 325 def decorator(func): 326 self._add_func(func) 327 return func 328 return decorator
注册消息触发
330 def on_state(self, *states: Union[AyakaState, str, List[str]]): 331 '''注册有状态响应,不填写states则为root.插件名状态''' 332 _states = [] 333 if not states: 334 states = [[]] 335 for s in states: 336 if isinstance(s, str): 337 s = self.get_state(s) 338 elif isinstance(s, list): 339 s = self.get_state(*s) 340 _states.append(s) 341 342 def decorator(func): 343 func.states = _states 344 self._add_func(func) 345 return func 346 return decorator
注册有状态响应,不填写states则为root.插件名状态
def
set_start_cmds(self, *cmds: str):
352 def set_start_cmds(self, *cmds: str): 353 '''设置应用启动命令,当然,你也可以通过app.on_cmd自定义启动方式''' 354 @self.on_idle() 355 @self.on_cmd(*cmds) 356 async def start(): 357 '''打开应用''' 358 await self.start()
设置应用启动命令,当然,你也可以通过app.on_cmd自定义启动方式
def
set_close_cmds(self, *cmds: str):
360 def set_close_cmds(self, *cmds: str): 361 '''设置应用关闭命令,当然,你也可以通过app.on_cmd自定义关闭方式''' 362 @self.on_state() 363 @self.on_deep_all() 364 @self.on_cmd(*cmds) 365 async def close(): 366 '''关闭应用''' 367 await self.close()
设置应用关闭命令,当然,你也可以通过app.on_cmd自定义关闭方式
async def
start(self, state: str = None):
369 async def start(self, state: str = None): 370 '''*timer触发时不可用* 371 372 启动应用,并发送提示 373 374 state参数为兼容旧API''' 375 if not state: 376 state = self.get_state() 377 await self.goto(state) 378 else: 379 states = state.split(".") 380 await self.goto(*states) 381 await self.send(f"已打开应用 [{self.name}]")
timer触发时不可用
启动应用,并发送提示
state参数为兼容旧API
async def
close(self):
383 async def close(self): 384 '''*timer触发时不可用* 385 386 关闭应用,并发送提示''' 387 await self.goto(root_state) 388 await self.send(f"已关闭应用 [{self.name}]")
timer触发时不可用
关闭应用,并发送提示
def
add_listener(self, user_id: int):
390 def add_listener(self, user_id: int): 391 '''为该群组添加对指定私聊的监听''' 392 private_listener_dict[user_id].append(self.group_id)
为该群组添加对指定私聊的监听
def
remove_listener(self, user_id: int = 0):
394 def remove_listener(self, user_id: int = 0): 395 '''默认移除该群组对其他私聊的所有监听''' 396 id = self.group_id 397 398 if user_id == 0: 399 for ids in private_listener_dict.values(): 400 if id in ids: 401 ids.remove(id) 402 return 403 404 if id in private_listener_dict[user_id]: 405 private_listener_dict[user_id].remove(self.group_id)
默认移除该群组对其他私聊的所有监听
async def
send(self, message):
407 async def send(self, message): 408 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 409 # 这里不使用event,因为一些event可能来自其他设备的监听传递 410 await self.bot.send_group_msg(group_id=self.group_id, message=message)
发送消息,消息的类型可以是 Message | MessageSegment | str
def
pack_messages(self, bot_id, messages):
412 def pack_messages(self, bot_id, messages): 413 '''转换为cqhttp node格式''' 414 data: List[MessageSegment] = [] 415 for m in messages: 416 if isinstance(m, MessageSegment) and m.type == "node": 417 data.append(m) 418 else: 419 m = MessageSegment.node_custom( 420 user_id=bot_id, 421 nickname="Ayaka Bot", 422 content=str(m) 423 ) 424 data.append(m) 425 return data
转换为cqhttp node格式
async def
send_many(self, messages):
427 async def send_many(self, messages): 428 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 429 # 分割长消息组(不可超过100条 430 div_len = 100 431 div_cnt = ceil(len(messages) / div_len) 432 for i in range(div_cnt): 433 msgs = self.pack_messages( 434 self.bot_id, 435 messages[i*div_len: (i+1)*div_len] 436 ) 437 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]
async def
t_send(self, bot_id: int, group_id: int, message):
448 async def t_send(self, bot_id: int, group_id: int, message): 449 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 450 bot = self.t_check(bot_id) 451 if not bot: 452 return 453 454 await bot.send_group_msg(group_id=group_id, message=message)
timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用
async def
t_send_many(self, bot_id: int, group_id: int, messages):
456 async def t_send_many(self, bot_id: int, group_id: int, messages): 457 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 458 bot = self.t_check(bot_id) 459 if not bot: 460 return 461 462 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 463 div_len = 80 464 div_cnt = ceil(len(messages) / div_len) 465 for i in range(div_cnt): 466 msgs = self.pack_messages( 467 bot_id, 468 messages[i*div_len: (i+1)*div_len] 469 ) 470 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用
def
get_bot(bot_id: int):
473def get_bot(bot_id: int): 474 '''获取已连接的bot''' 475 bot_id = str(bot_id) 476 for bot in bot_list: 477 if bot.self_id == bot_id: 478 return bot
获取已连接的bot
def
get_group(bot_id: int, group_id: int):
481def get_group(bot_id: int, group_id: int): 482 '''获取对应的AyakaGroup对象,自动增加''' 483 for group in group_list: 484 if group.bot_id == bot_id and group.group_id == group_id: 485 break 486 else: 487 group = AyakaGroup(bot_id, group_id) 488 return group
获取对应的AyakaGroup对象,自动增加
def
get_app(app_name: str):
async def
deal_event( bot: nonebot.adapters.onebot.v11.bot.Bot, event: nonebot.adapters.onebot.v11.event.MessageEvent):
497async def deal_event(bot: Bot, event: MessageEvent): 498 '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组''' 499 if ayaka_root_config.exclude_old_msg: 500 time_i = int(datetime.datetime.now().timestamp()) 501 if event.time < time_i - 60: 502 return 503 504 _bot.set(bot) 505 _event.set(event) 506 507 bot_id = int(bot.self_id) 508 509 if isinstance(event, GroupMessageEvent): 510 group_id = event.group_id 511 await deal_group(bot_id, group_id) 512 513 else: 514 id = event.user_id 515 group_ids = private_listener_dict.get(id, []) 516 ts = [asyncio.create_task(deal_group(bot_id, group_id)) 517 for group_id in group_ids] 518 await asyncio.gather(*ts)
处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组
async def
deal_group(bot_id: int, group_id: int):
521async def deal_group(bot_id: int, group_id: int): 522 prefix = ayaka_root_config.prefix 523 524 # 群组 525 group = get_group(bot_id, group_id) 526 _group.set(group) 527 528 # 消息 529 message = _event.get().message 530 _message.set(message) 531 532 # 从子状态开始向上查找可用的触发 533 state = group.state 534 cascade_triggers = get_cascade_triggers(state, 0) 535 536 # 命令 537 # 消息前缀文本 538 first = get_first(message) 539 if first.startswith(prefix): 540 first = first[len(prefix):] 541 for ts in cascade_triggers: 542 if await deal_cmd_triggers(ts, message, first, state): 543 return 544 545 # 命令退化成消息 546 for ts in cascade_triggers: 547 if await deal_text_triggers(ts, message, state): 548 return
async def
deal_cmd_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, first: str, state: ayaka.state.AyakaState):
551async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState): 552 sep = ayaka_root_config.separate 553 554 # 命令 555 temp = [t for t in triggers if t.cmds] 556 # 根据命令长度排序,长命令优先级更高 557 cmd_ts = [(c, t) for t in temp for c in t.cmds] 558 cmd_ts.sort(key=lambda x: len(x[0]), reverse=1) 559 560 for c, t in cmd_ts: 561 if first.startswith(c): 562 # 设置上下文 563 # 设置命令 564 _cmd.set(c) 565 # 设置参数 566 left = first[len(c):].lstrip(sep) 567 if left: 568 arg = Message([MessageSegment.text(left), *message[1:]]) 569 else: 570 arg = Message(message[1:]) 571 _arg.set(arg) 572 _args.set(divide_message(arg)) 573 574 # 触发 575 log_trigger(c, t.app.name, state, t.func.__name__) 576 await t.run() 577 578 # 阻断后续 579 if t.block: 580 return True
async def
deal_text_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, state: ayaka.state.AyakaState):
583async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState): 584 # 消息 585 text_ts = [t for t in triggers if not t.cmds] 586 587 # 设置上下文 588 # 设置命令 589 _cmd.set("") 590 # 设置参数 591 _arg.set(message) 592 _args.set(divide_message(message)) 593 594 for t in text_ts: 595 # 触发 596 log_trigger("", t.app.name, state, t.func.__name__) 597 await t.run() 598 599 # 阻断后续 600 if t.block: 601 return True
604def get_cascade_triggers(state: AyakaState, deep: int = 0): 605 # 根据深度筛选funcs 606 ts = [ 607 t for t in state.triggers 608 if t.deep == "all" or t.deep >= deep 609 ] 610 cascade_triggers = [ts] 611 612 # 获取父状态的方法 613 if state.parent: 614 cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1)) 615 616 # 排除空项 617 cascade_triggers = [ts for ts in cascade_triggers if ts] 618 return cascade_triggers
def
log_trigger(cmd, app_name, state, func_name):
621def log_trigger(cmd, app_name, state, func_name): 622 '''日志记录''' 623 items = [] 624 items.append(f"状态:<c>{state}</c>") 625 items.append(f"应用:<y>{app_name}</y>") 626 if cmd: 627 items.append(f"命令:<y>{cmd}</y>") 628 else: 629 items.append("命令:<g>无</g>") 630 items.append(f"回调:<c>{func_name}</c>") 631 info = " | ".join(items) 632 logger.opt(colors=True).debug(info)
日志记录
def
get_first(message: nonebot.adapters.onebot.v11.message.Message):
def
divide_message( message: nonebot.adapters.onebot.v11.message.Message) -> List[nonebot.adapters.onebot.v11.message.MessageSegment]:
659def regist_func(app: AyakaApp, func): 660 '''注册回调''' 661 # 默认是无状态应用,从root开始触发 662 states: List[AyakaState] = getattr(func, "states", [root_state]) 663 # 默认是消息响应 664 cmds: List[str] = getattr(func, "cmds", []) 665 # 默认监听深度为0 666 deep: int = getattr(func, "deep", 0) 667 # 默认阻断 668 block: bool = getattr(func, "block", True) 669 670 # 注册 671 for s in states: 672 s.on_cmd(cmds, app, deep, block)(func) 673 674 return func
注册回调
@driver.on_startup
async def
startup():
680@driver.on_startup 681async def startup(): 682 # 注册所有回调 683 for app in app_list: 684 for func in app.funcs: 685 regist_func(app, func) 686 687 if ayaka_root_config.debug: 688 s = json.dumps( 689 root_state.dict(), ensure_ascii=0, 690 indent=4, default=repr 691 ) 692 path = ayaka_data_path / "all_state.json" 693 with path.open("w+", encoding="utf8") as f: 694 f.write(s)