ayaka.ayaka
ayaka核心
1'''ayaka核心''' 2import asyncio 3import datetime 4import inspect 5import json 6from math import ceil 7from pathlib import Path 8from loguru import logger 9from typing import List, Dict, Literal, Union 10from .depend import AyakaCache 11from .config import ayaka_root_config, ayaka_data_path 12from .constant import _bot, _event, _group, _arg, _args, _message, _cmd, _enter_exit_during, app_list, group_list, bot_list, private_listener_dict 13from .driver import on_message, get_driver, Message, MessageSegment, Bot, MessageEvent, GroupMessageEvent 14from .state import AyakaState, AyakaTrigger, root_state 15from .on import AyakaOn, AyakaTimer 16 17 18class AyakaGroup: 19 def __repr__(self) -> str: 20 return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})" 21 22 def __init__(self, bot_id: int, group_id: int) -> None: 23 self.bot_id = bot_id 24 self.group_id = group_id 25 self.state = root_state 26 27 # 添加app,并分配独立数据空间 28 self.apps: List["AyakaApp"] = [] 29 self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {} 30 for app in app_list: 31 # if app.name not in forbid_names: 32 self.apps.append(app) 33 self.cache_dict[app.name] = {} 34 35 group_list.append(self) 36 37 if ayaka_root_config.debug: 38 print(self) 39 40 async def back(self): 41 if self.state.parent: 42 await self.state.exit() 43 self.state = self.state.parent 44 return self.state 45 46 async def goto(self, state: AyakaState): 47 if _enter_exit_during.get() > 0: 48 logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误") 49 50 keys = state.keys 51 52 # 找到第一个不同的结点 53 n0 = len(keys) 54 n1 = len(self.state.keys) 55 n = min(n0, n1) 56 for i in range(n): 57 if keys[i] != self.state.keys[i]: 58 break 59 else: 60 i += 1 61 62 # 回退 63 for j in range(i, n1): 64 await self.back() 65 keys = keys[i:] 66 67 # 重新出发 68 for key in keys: 69 self.state = self.state[key] 70 await self.state.enter() 71 logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>") 72 return self.state 73 74 def get_app(self, name: str): 75 '''根据app名获取该group所启用的app,不存在则返回None''' 76 for app in self.apps: 77 if app.name == name: 78 return app 79 80 81class AyakaApp: 82 def __repr__(self) -> str: 83 return f"{self.__class__.__name__}({self.name})" 84 85 def __init__(self, name: str) -> None: 86 self.path = Path(inspect.stack()[1].filename) 87 88 for app in app_list: 89 if app.name == name: 90 raise Exception( 91 f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 92 93 self.name = name 94 self.ayaka_root_config = ayaka_root_config 95 self.funcs = [] 96 self.on = AyakaOn(self) 97 self.timers: List[AyakaTimer] = [] 98 self.root_state = root_state 99 self._intro = "没有介绍" 100 self.state_helps: Dict[str, List[str]] = {} 101 self.idle_helps: List[str] = [] 102 103 logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"") 104 app_list.append(self) 105 if ayaka_root_config.debug: 106 print(self) 107 108 @property 109 def intro(self): 110 help = self._intro 111 for h in self.idle_helps: 112 help += "\n" + h 113 return help 114 115 @property 116 def all_help(self): 117 help = self.intro 118 for s, hs in self.state_helps.items(): 119 help += f"\n[{s}]" 120 for h in hs: 121 help += "\n" + h 122 return help 123 124 @property 125 def help(self): 126 '''获取当前状态下的帮助,没有找到则返回介绍''' 127 total_triggers = get_cascade_triggers(self.state) 128 129 helps = [] 130 cmds = [] 131 for ts in total_triggers: 132 flag = 1 133 for t in ts: 134 if t.app == self: 135 for c in t.cmds: 136 if c in cmds: 137 break 138 else: 139 if flag: 140 helps.append(f"[{t.state[1:]}]") 141 flag = 0 142 cmds.extend(t.cmds) 143 helps.append(t.help) 144 145 if not helps: 146 return self.intro 147 return "\n".join(helps) 148 149 @help.setter 150 def help(self, help: str): 151 self._intro = help 152 153 @property 154 def user_name(self): 155 '''*timer触发时不可用* 156 157 当前消息的发送人的群名片或昵称 158 ''' 159 s = self.event.sender 160 name = s.card or s.nickname 161 return name 162 163 @property 164 def user_id(self): 165 '''*timer触发时不可用* 166 167 当前消息的发送人的uid 168 ''' 169 return self.event.user_id 170 171 @property 172 def bot(self): 173 '''*timer触发时不可用* 174 175 当前bot 176 ''' 177 return _bot.get() 178 179 @property 180 def event(self): 181 '''*timer触发时不可用* 182 183 当前消息 184 ''' 185 return _event.get() 186 187 @property 188 def cache(self): 189 '''*timer触发时不可用* 190 191 当前群组的缓存空间''' 192 return self.group.cache_dict[self.name] 193 194 @property 195 def group_id(self): 196 '''*timer触发时不可用* 197 198 当前群组的id 199 200 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id 201 ''' 202 return self.group.group_id 203 204 @property 205 def bot_id(self): 206 '''*timer触发时不可用* 207 208 当前bot的id 209 ''' 210 return self.group.bot_id 211 212 @property 213 def group(self): 214 '''*timer触发时不可用* 215 216 当前群组 217 218 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A 219 ''' 220 return _group.get() 221 222 @property 223 def arg(self): 224 '''*timer触发时不可用* 225 226 当前消息在移除了命令后的剩余部分 227 ''' 228 return _arg.get() 229 230 @property 231 def args(self): 232 '''*timer触发时不可用* 233 234 当前消息在移除了命令后,剩余部分按照空格分割后的数组 235 236 注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素 237 ''' 238 return _args.get() 239 240 @property 241 def cmd(self): 242 '''*timer触发时不可用* 243 244 当前消息的命令头 245 ''' 246 return _cmd.get() 247 248 @property 249 def message(self): 250 '''*timer触发时不可用* 251 252 当前消息 253 ''' 254 return _message.get() 255 256 @property 257 def state(self): 258 return self.group.state 259 260 def get_state(self, *keys: str): 261 ''' 262 假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项` 263 264 >>> get_state(key1, key2) -> [root.test].key1.key2 265 266 特别的,keys可以为空,例如: 267 268 >>> get_state() -> [root.test] 269 ''' 270 keys = [self.name, *keys] 271 return root_state.join(*keys) 272 273 async def set_state(self, state_or_key: Union[AyakaState, str], *keys: str): 274 return await self.goto(state_or_key, *keys) 275 276 async def goto(self, state_or_key: Union[AyakaState, str], *keys: str): 277 '''当state_or_key为字符串时,调用get_state进行初始化(state_or_key.keys1.keys2),keys可选填''' 278 if isinstance(state_or_key, AyakaState): 279 state = state_or_key 280 else: 281 state = self.get_state(*state_or_key.split("."), *keys) 282 return await self.group.goto(state) 283 284 async def back(self): 285 return await self.group.back() 286 287 def _add_func(self, func): 288 if func not in self.funcs: 289 self.funcs.append(func) 290 291 def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"): 292 '''注册深度监听''' 293 def decorator(func): 294 func.deep = deep 295 self._add_func(func) 296 return func 297 return decorator 298 299 def on_no_block(self, block: bool = False): 300 '''注册非阻断''' 301 def decorator(func): 302 func.block = block 303 self._add_func(func) 304 return func 305 return decorator 306 307 def on_cmd(self, *cmds: str): 308 '''注册命令触发,不填写命令则视为文本消息''' 309 def decorator(func): 310 func.cmds = cmds 311 self._add_func(func) 312 return func 313 return decorator 314 315 def on_state(self, *states: Union[AyakaState, str, List[str]]): 316 '''注册有状态响应,不填写states则为root.插件名状态''' 317 _states = [] 318 if not states: 319 states = [[]] 320 for s in states: 321 if isinstance(s, str): 322 s = self.get_state(s) 323 elif isinstance(s, list): 324 s = self.get_state(*s) 325 _states.append(s) 326 327 def decorator(func): 328 func.states = _states 329 self._add_func(func) 330 return func 331 return decorator 332 333 def set_start_cmds(self, *cmds: str): 334 '''设置应用启动命令,当然,你也可以通过on_cmd自定义启动方式''' 335 async def func(): 336 '''打开应用''' 337 await self.start() 338 func.cmds = cmds 339 self.funcs.append(func) 340 341 async def start(self, state: str = None): 342 '''*timer触发时不可用* 343 344 启动应用,并发送提示 345 346 state参数为兼容旧API''' 347 if not state: 348 state = self.get_state() 349 await self.goto(state) 350 else: 351 states = state.split(".") 352 await self.goto(*states) 353 await self.send(f"已打开应用 [{self.name}]") 354 355 async def close(self): 356 '''*timer触发时不可用* 357 358 关闭应用,并发送提示''' 359 await self.goto(root_state) 360 await self.send(f"已关闭应用 [{self.name}]") 361 362 def add_listener(self, user_id: int): 363 '''为该群组添加对指定私聊的监听''' 364 private_listener_dict[user_id].append(self.group_id) 365 366 def remove_listener(self, user_id: int = 0): 367 '''默认移除该群组对其他私聊的所有监听''' 368 id = self.group_id 369 370 if user_id == 0: 371 for ids in private_listener_dict.values(): 372 if id in ids: 373 ids.remove(id) 374 return 375 376 if id in private_listener_dict[user_id]: 377 private_listener_dict[user_id].remove(self.group_id) 378 379 async def send(self, message): 380 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 381 # 这里不使用event,因为一些event可能来自其他设备的监听传递 382 await self.bot.send_group_msg(group_id=self.group_id, message=message) 383 384 def pack_messages(self, bot_id, messages): 385 '''转换为cqhttp node格式''' 386 data: List[MessageSegment] = [] 387 for m in messages: 388 if isinstance(m, MessageSegment) and m.type == "node": 389 data.append(m) 390 else: 391 m = MessageSegment.node_custom( 392 user_id=bot_id, 393 nickname="Ayaka Bot", 394 content=str(m) 395 ) 396 data.append(m) 397 return data 398 399 async def send_many(self, messages): 400 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 401 # 分割长消息组(不可超过100条 402 div_len = 100 403 div_cnt = ceil(len(messages) / div_len) 404 for i in range(div_cnt): 405 msgs = self.pack_messages( 406 self.bot_id, 407 messages[i*div_len: (i+1)*div_len] 408 ) 409 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs) 410 411 def t_check(self, bot_id: int): 412 # 未连接 413 bot = get_bot(bot_id) 414 if not bot: 415 logger.warning(f"BOT({bot_id}) 未连接") 416 return 417 418 return bot 419 420 async def t_send(self, bot_id: int, group_id: int, message): 421 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 422 bot = self.t_check(bot_id) 423 if not bot: 424 return 425 426 await bot.send_group_msg(group_id=group_id, message=message) 427 428 async def t_send_many(self, bot_id: int, group_id: int, messages): 429 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 430 bot = self.t_check(bot_id) 431 if not bot: 432 return 433 434 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 435 div_len = 80 436 div_cnt = ceil(len(messages) / div_len) 437 for i in range(div_cnt): 438 msgs = self.pack_messages( 439 bot_id, 440 messages[i*div_len: (i+1)*div_len] 441 ) 442 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs) 443 444 445def get_bot(bot_id: int): 446 '''获取已连接的bot''' 447 bot_id = str(bot_id) 448 for bot in bot_list: 449 if bot.self_id == bot_id: 450 return bot 451 452 453def get_group(bot_id: int, group_id: int): 454 '''获取对应的AyakaGroup对象,自动增加''' 455 for group in group_list: 456 if group.bot_id == bot_id and group.group_id == group_id: 457 break 458 else: 459 group = AyakaGroup(bot_id, group_id) 460 return group 461 462 463def get_app(app_name: str): 464 for app in app_list: 465 if app.name == app_name: 466 return app 467 468 469async def deal_event(bot: Bot, event: MessageEvent): 470 '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组''' 471 if ayaka_root_config.exclude_old_msg: 472 time_i = int(datetime.datetime.now().timestamp()) 473 if event.time < time_i - 60: 474 return 475 476 _bot.set(bot) 477 _event.set(event) 478 479 bot_id = int(bot.self_id) 480 481 if isinstance(event, GroupMessageEvent): 482 group_id = event.group_id 483 await deal_group(bot_id, group_id) 484 485 else: 486 id = event.user_id 487 group_ids = private_listener_dict.get(id, []) 488 ts = [asyncio.create_task(deal_group(bot_id, group_id)) 489 for group_id in group_ids] 490 await asyncio.gather(*ts) 491 492 493async def deal_group(bot_id: int, group_id: int): 494 prefix = ayaka_root_config.prefix 495 496 # 群组 497 group = get_group(bot_id, group_id) 498 _group.set(group) 499 500 # 消息 501 message = _event.get().message 502 _message.set(message) 503 504 # 从子状态开始向上查找可用的触发 505 state = group.state 506 cascade_triggers = get_cascade_triggers(state, 0) 507 508 # 命令 509 # 消息前缀文本 510 first = get_first(message) 511 if first.startswith(prefix): 512 first = first[len(prefix):] 513 for ts in cascade_triggers: 514 if await deal_cmd_triggers(ts, message, first, state): 515 return 516 517 # 命令退化成消息 518 for ts in cascade_triggers: 519 if await deal_text_triggers(ts, message, state): 520 return 521 522 523async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState): 524 sep = ayaka_root_config.separate 525 526 # 命令 527 temp = [t for t in triggers if t.cmds] 528 # 根据命令长度排序,长命令优先级更高 529 cmd_ts = [(c, t) for t in temp for c in t.cmds] 530 cmd_ts.sort(key=lambda x: len(x[0]), reverse=1) 531 532 for c, t in cmd_ts: 533 if first.startswith(c): 534 # 设置上下文 535 # 设置命令 536 _cmd.set(c) 537 # 设置参数 538 left = first[len(c):].lstrip(sep) 539 if left: 540 arg = Message([MessageSegment.text(left), *message[1:]]) 541 else: 542 arg = Message(message[1:]) 543 _arg.set(arg) 544 _args.set(divide_message(arg)) 545 546 # 触发 547 log_trigger(c, t.app.name, state, t.func.__name__) 548 await t.run() 549 550 # 阻断后续 551 if t.block: 552 return True 553 554 555async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState): 556 # 消息 557 text_ts = [t for t in triggers if not t.cmds] 558 559 # 设置上下文 560 # 设置命令 561 _cmd.set("") 562 # 设置参数 563 _arg.set(message) 564 _args.set(divide_message(message)) 565 566 for t in text_ts: 567 # 触发 568 log_trigger("", t.app.name, state, t.func.__name__) 569 await t.run() 570 571 # 阻断后续 572 if t.block: 573 return True 574 575 576def get_cascade_triggers(state: AyakaState, deep: int = 0): 577 # 根据深度筛选funcs 578 ts = [ 579 t for t in state.triggers 580 if t.deep == "all" or t.deep >= deep 581 ] 582 cascade_triggers = [ts] 583 584 # 获取父状态的方法 585 if state.parent: 586 cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1)) 587 588 # 排除空项 589 cascade_triggers = [ts for ts in cascade_triggers if ts] 590 return cascade_triggers 591 592 593def log_trigger(cmd, app_name, state, func_name): 594 '''日志记录''' 595 items = [] 596 items.append(f"状态:<c>{state}</c>") 597 items.append(f"应用:<y>{app_name}</y>") 598 if cmd: 599 items.append(f"命令:<y>{cmd}</y>") 600 else: 601 items.append("命令:<g>无</g>") 602 items.append(f"回调:<c>{func_name}</c>") 603 info = " | ".join(items) 604 logger.opt(colors=True).debug(info) 605 606 607def get_first(message: Message): 608 first = "" 609 for m in message: 610 if m.type == "text": 611 first += str(m) 612 else: 613 break 614 return first 615 616 617def divide_message(message: Message) -> List[MessageSegment]: 618 args = [] 619 sep = ayaka_root_config.separate 620 621 for m in message: 622 if m.is_text(): 623 ss = str(m).split(sep) 624 args.extend(MessageSegment.text(s) for s in ss if s) 625 else: 626 args.append(m) 627 628 return args 629 630 631def regist_func(app: AyakaApp, func): 632 '''注册回调''' 633 # 默认是无状态应用,从root开始触发 634 states: List[AyakaState] = getattr(func, "states", [root_state]) 635 # 默认是消息响应 636 cmds: List[str] = getattr(func, "cmds", []) 637 # 默认监听深度为0 638 deep: int = getattr(func, "deep", 0) 639 # 默认阻断 640 block: bool = getattr(func, "block", True) 641 642 # 注册 643 for s in states: 644 s.on_cmd(cmds, app, deep, block)(func) 645 646 return func 647 648 649driver = get_driver() 650 651 652@driver.on_startup 653async def startup(): 654 # 注册所有回调 655 for app in app_list: 656 for func in app.funcs: 657 regist_func(app, func) 658 659 if ayaka_root_config.debug: 660 s = json.dumps( 661 root_state.dict(), ensure_ascii=0, 662 indent=4, default=repr 663 ) 664 path = ayaka_data_path / "all_state.json" 665 with path.open("w+", encoding="utf8") as f: 666 f.write(s) 667 668on_message(priority=20, block=False, handlers=[deal_event])
class
AyakaGroup:
19class AyakaGroup: 20 def __repr__(self) -> str: 21 return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})" 22 23 def __init__(self, bot_id: int, group_id: int) -> None: 24 self.bot_id = bot_id 25 self.group_id = group_id 26 self.state = root_state 27 28 # 添加app,并分配独立数据空间 29 self.apps: List["AyakaApp"] = [] 30 self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {} 31 for app in app_list: 32 # if app.name not in forbid_names: 33 self.apps.append(app) 34 self.cache_dict[app.name] = {} 35 36 group_list.append(self) 37 38 if ayaka_root_config.debug: 39 print(self) 40 41 async def back(self): 42 if self.state.parent: 43 await self.state.exit() 44 self.state = self.state.parent 45 return self.state 46 47 async def goto(self, state: AyakaState): 48 if _enter_exit_during.get() > 0: 49 logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误") 50 51 keys = state.keys 52 53 # 找到第一个不同的结点 54 n0 = len(keys) 55 n1 = len(self.state.keys) 56 n = min(n0, n1) 57 for i in range(n): 58 if keys[i] != self.state.keys[i]: 59 break 60 else: 61 i += 1 62 63 # 回退 64 for j in range(i, n1): 65 await self.back() 66 keys = keys[i:] 67 68 # 重新出发 69 for key in keys: 70 self.state = self.state[key] 71 await self.state.enter() 72 logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>") 73 return self.state 74 75 def get_app(self, name: str): 76 '''根据app名获取该group所启用的app,不存在则返回None''' 77 for app in self.apps: 78 if app.name == name: 79 return app
AyakaGroup(bot_id: int, group_id: int)
23 def __init__(self, bot_id: int, group_id: int) -> None: 24 self.bot_id = bot_id 25 self.group_id = group_id 26 self.state = root_state 27 28 # 添加app,并分配独立数据空间 29 self.apps: List["AyakaApp"] = [] 30 self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {} 31 for app in app_list: 32 # if app.name not in forbid_names: 33 self.apps.append(app) 34 self.cache_dict[app.name] = {} 35 36 group_list.append(self) 37 38 if ayaka_root_config.debug: 39 print(self)
47 async def goto(self, state: AyakaState): 48 if _enter_exit_during.get() > 0: 49 logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误") 50 51 keys = state.keys 52 53 # 找到第一个不同的结点 54 n0 = len(keys) 55 n1 = len(self.state.keys) 56 n = min(n0, n1) 57 for i in range(n): 58 if keys[i] != self.state.keys[i]: 59 break 60 else: 61 i += 1 62 63 # 回退 64 for j in range(i, n1): 65 await self.back() 66 keys = keys[i:] 67 68 # 重新出发 69 for key in keys: 70 self.state = self.state[key] 71 await self.state.enter() 72 logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>") 73 return self.state
class
AyakaApp:
82class AyakaApp: 83 def __repr__(self) -> str: 84 return f"{self.__class__.__name__}({self.name})" 85 86 def __init__(self, name: str) -> None: 87 self.path = Path(inspect.stack()[1].filename) 88 89 for app in app_list: 90 if app.name == name: 91 raise Exception( 92 f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 93 94 self.name = name 95 self.ayaka_root_config = ayaka_root_config 96 self.funcs = [] 97 self.on = AyakaOn(self) 98 self.timers: List[AyakaTimer] = [] 99 self.root_state = root_state 100 self._intro = "没有介绍" 101 self.state_helps: Dict[str, List[str]] = {} 102 self.idle_helps: List[str] = [] 103 104 logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"") 105 app_list.append(self) 106 if ayaka_root_config.debug: 107 print(self) 108 109 @property 110 def intro(self): 111 help = self._intro 112 for h in self.idle_helps: 113 help += "\n" + h 114 return help 115 116 @property 117 def all_help(self): 118 help = self.intro 119 for s, hs in self.state_helps.items(): 120 help += f"\n[{s}]" 121 for h in hs: 122 help += "\n" + h 123 return help 124 125 @property 126 def help(self): 127 '''获取当前状态下的帮助,没有找到则返回介绍''' 128 total_triggers = get_cascade_triggers(self.state) 129 130 helps = [] 131 cmds = [] 132 for ts in total_triggers: 133 flag = 1 134 for t in ts: 135 if t.app == self: 136 for c in t.cmds: 137 if c in cmds: 138 break 139 else: 140 if flag: 141 helps.append(f"[{t.state[1:]}]") 142 flag = 0 143 cmds.extend(t.cmds) 144 helps.append(t.help) 145 146 if not helps: 147 return self.intro 148 return "\n".join(helps) 149 150 @help.setter 151 def help(self, help: str): 152 self._intro = help 153 154 @property 155 def user_name(self): 156 '''*timer触发时不可用* 157 158 当前消息的发送人的群名片或昵称 159 ''' 160 s = self.event.sender 161 name = s.card or s.nickname 162 return name 163 164 @property 165 def user_id(self): 166 '''*timer触发时不可用* 167 168 当前消息的发送人的uid 169 ''' 170 return self.event.user_id 171 172 @property 173 def bot(self): 174 '''*timer触发时不可用* 175 176 当前bot 177 ''' 178 return _bot.get() 179 180 @property 181 def event(self): 182 '''*timer触发时不可用* 183 184 当前消息 185 ''' 186 return _event.get() 187 188 @property 189 def cache(self): 190 '''*timer触发时不可用* 191 192 当前群组的缓存空间''' 193 return self.group.cache_dict[self.name] 194 195 @property 196 def group_id(self): 197 '''*timer触发时不可用* 198 199 当前群组的id 200 201 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id 202 ''' 203 return self.group.group_id 204 205 @property 206 def bot_id(self): 207 '''*timer触发时不可用* 208 209 当前bot的id 210 ''' 211 return self.group.bot_id 212 213 @property 214 def group(self): 215 '''*timer触发时不可用* 216 217 当前群组 218 219 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A 220 ''' 221 return _group.get() 222 223 @property 224 def arg(self): 225 '''*timer触发时不可用* 226 227 当前消息在移除了命令后的剩余部分 228 ''' 229 return _arg.get() 230 231 @property 232 def args(self): 233 '''*timer触发时不可用* 234 235 当前消息在移除了命令后,剩余部分按照空格分割后的数组 236 237 注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素 238 ''' 239 return _args.get() 240 241 @property 242 def cmd(self): 243 '''*timer触发时不可用* 244 245 当前消息的命令头 246 ''' 247 return _cmd.get() 248 249 @property 250 def message(self): 251 '''*timer触发时不可用* 252 253 当前消息 254 ''' 255 return _message.get() 256 257 @property 258 def state(self): 259 return self.group.state 260 261 def get_state(self, *keys: str): 262 ''' 263 假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项` 264 265 >>> get_state(key1, key2) -> [root.test].key1.key2 266 267 特别的,keys可以为空,例如: 268 269 >>> get_state() -> [root.test] 270 ''' 271 keys = [self.name, *keys] 272 return root_state.join(*keys) 273 274 async def set_state(self, state_or_key: Union[AyakaState, str], *keys: str): 275 return await self.goto(state_or_key, *keys) 276 277 async def goto(self, state_or_key: Union[AyakaState, str], *keys: str): 278 '''当state_or_key为字符串时,调用get_state进行初始化(state_or_key.keys1.keys2),keys可选填''' 279 if isinstance(state_or_key, AyakaState): 280 state = state_or_key 281 else: 282 state = self.get_state(*state_or_key.split("."), *keys) 283 return await self.group.goto(state) 284 285 async def back(self): 286 return await self.group.back() 287 288 def _add_func(self, func): 289 if func not in self.funcs: 290 self.funcs.append(func) 291 292 def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"): 293 '''注册深度监听''' 294 def decorator(func): 295 func.deep = deep 296 self._add_func(func) 297 return func 298 return decorator 299 300 def on_no_block(self, block: bool = False): 301 '''注册非阻断''' 302 def decorator(func): 303 func.block = block 304 self._add_func(func) 305 return func 306 return decorator 307 308 def on_cmd(self, *cmds: str): 309 '''注册命令触发,不填写命令则视为文本消息''' 310 def decorator(func): 311 func.cmds = cmds 312 self._add_func(func) 313 return func 314 return decorator 315 316 def on_state(self, *states: Union[AyakaState, str, List[str]]): 317 '''注册有状态响应,不填写states则为root.插件名状态''' 318 _states = [] 319 if not states: 320 states = [[]] 321 for s in states: 322 if isinstance(s, str): 323 s = self.get_state(s) 324 elif isinstance(s, list): 325 s = self.get_state(*s) 326 _states.append(s) 327 328 def decorator(func): 329 func.states = _states 330 self._add_func(func) 331 return func 332 return decorator 333 334 def set_start_cmds(self, *cmds: str): 335 '''设置应用启动命令,当然,你也可以通过on_cmd自定义启动方式''' 336 async def func(): 337 '''打开应用''' 338 await self.start() 339 func.cmds = cmds 340 self.funcs.append(func) 341 342 async def start(self, state: str = None): 343 '''*timer触发时不可用* 344 345 启动应用,并发送提示 346 347 state参数为兼容旧API''' 348 if not state: 349 state = self.get_state() 350 await self.goto(state) 351 else: 352 states = state.split(".") 353 await self.goto(*states) 354 await self.send(f"已打开应用 [{self.name}]") 355 356 async def close(self): 357 '''*timer触发时不可用* 358 359 关闭应用,并发送提示''' 360 await self.goto(root_state) 361 await self.send(f"已关闭应用 [{self.name}]") 362 363 def add_listener(self, user_id: int): 364 '''为该群组添加对指定私聊的监听''' 365 private_listener_dict[user_id].append(self.group_id) 366 367 def remove_listener(self, user_id: int = 0): 368 '''默认移除该群组对其他私聊的所有监听''' 369 id = self.group_id 370 371 if user_id == 0: 372 for ids in private_listener_dict.values(): 373 if id in ids: 374 ids.remove(id) 375 return 376 377 if id in private_listener_dict[user_id]: 378 private_listener_dict[user_id].remove(self.group_id) 379 380 async def send(self, message): 381 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 382 # 这里不使用event,因为一些event可能来自其他设备的监听传递 383 await self.bot.send_group_msg(group_id=self.group_id, message=message) 384 385 def pack_messages(self, bot_id, messages): 386 '''转换为cqhttp node格式''' 387 data: List[MessageSegment] = [] 388 for m in messages: 389 if isinstance(m, MessageSegment) and m.type == "node": 390 data.append(m) 391 else: 392 m = MessageSegment.node_custom( 393 user_id=bot_id, 394 nickname="Ayaka Bot", 395 content=str(m) 396 ) 397 data.append(m) 398 return data 399 400 async def send_many(self, messages): 401 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 402 # 分割长消息组(不可超过100条 403 div_len = 100 404 div_cnt = ceil(len(messages) / div_len) 405 for i in range(div_cnt): 406 msgs = self.pack_messages( 407 self.bot_id, 408 messages[i*div_len: (i+1)*div_len] 409 ) 410 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs) 411 412 def t_check(self, bot_id: int): 413 # 未连接 414 bot = get_bot(bot_id) 415 if not bot: 416 logger.warning(f"BOT({bot_id}) 未连接") 417 return 418 419 return bot 420 421 async def t_send(self, bot_id: int, group_id: int, message): 422 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 423 bot = self.t_check(bot_id) 424 if not bot: 425 return 426 427 await bot.send_group_msg(group_id=group_id, message=message) 428 429 async def t_send_many(self, bot_id: int, group_id: int, messages): 430 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 431 bot = self.t_check(bot_id) 432 if not bot: 433 return 434 435 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 436 div_len = 80 437 div_cnt = ceil(len(messages) / div_len) 438 for i in range(div_cnt): 439 msgs = self.pack_messages( 440 bot_id, 441 messages[i*div_len: (i+1)*div_len] 442 ) 443 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
AyakaApp(name: str)
86 def __init__(self, name: str) -> None: 87 self.path = Path(inspect.stack()[1].filename) 88 89 for app in app_list: 90 if app.name == name: 91 raise Exception( 92 f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 93 94 self.name = name 95 self.ayaka_root_config = ayaka_root_config 96 self.funcs = [] 97 self.on = AyakaOn(self) 98 self.timers: List[AyakaTimer] = [] 99 self.root_state = root_state 100 self._intro = "没有介绍" 101 self.state_helps: Dict[str, List[str]] = {} 102 self.idle_helps: List[str] = [] 103 104 logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"") 105 app_list.append(self) 106 if ayaka_root_config.debug: 107 print(self)
def
get_state(self, *keys: str):
261 def get_state(self, *keys: str): 262 ''' 263 假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项` 264 265 >>> get_state(key1, key2) -> [root.test].key1.key2 266 267 特别的,keys可以为空,例如: 268 269 >>> get_state() -> [root.test] 270 ''' 271 keys = [self.name, *keys] 272 return root_state.join(*keys)
假设当前状态为 root.test.a
即 根.插件名.一级菜单项
>>> get_state(key1, key2) -> [root.test].key1.key2
特别的,keys可以为空,例如:
>>> get_state() -> [root.test]
277 async def goto(self, state_or_key: Union[AyakaState, str], *keys: str): 278 '''当state_or_key为字符串时,调用get_state进行初始化(state_or_key.keys1.keys2),keys可选填''' 279 if isinstance(state_or_key, AyakaState): 280 state = state_or_key 281 else: 282 state = self.get_state(*state_or_key.split("."), *keys) 283 return await self.group.goto(state)
当state_or_key为字符串时,调用get_state进行初始化(state_or_key.keys1.keys2),keys可选填
def
on_deep_all(self, deep: Union[int, Literal['all']] = 'all'):
292 def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"): 293 '''注册深度监听''' 294 def decorator(func): 295 func.deep = deep 296 self._add_func(func) 297 return func 298 return decorator
注册深度监听
def
on_no_block(self, block: bool = False):
300 def on_no_block(self, block: bool = False): 301 '''注册非阻断''' 302 def decorator(func): 303 func.block = block 304 self._add_func(func) 305 return func 306 return decorator
注册非阻断
def
on_cmd(self, *cmds: str):
308 def on_cmd(self, *cmds: str): 309 '''注册命令触发,不填写命令则视为文本消息''' 310 def decorator(func): 311 func.cmds = cmds 312 self._add_func(func) 313 return func 314 return decorator
注册命令触发,不填写命令则视为文本消息
316 def on_state(self, *states: Union[AyakaState, str, List[str]]): 317 '''注册有状态响应,不填写states则为root.插件名状态''' 318 _states = [] 319 if not states: 320 states = [[]] 321 for s in states: 322 if isinstance(s, str): 323 s = self.get_state(s) 324 elif isinstance(s, list): 325 s = self.get_state(*s) 326 _states.append(s) 327 328 def decorator(func): 329 func.states = _states 330 self._add_func(func) 331 return func 332 return decorator
注册有状态响应,不填写states则为root.插件名状态
def
set_start_cmds(self, *cmds: str):
334 def set_start_cmds(self, *cmds: str): 335 '''设置应用启动命令,当然,你也可以通过on_cmd自定义启动方式''' 336 async def func(): 337 '''打开应用''' 338 await self.start() 339 func.cmds = cmds 340 self.funcs.append(func)
设置应用启动命令,当然,你也可以通过on_cmd自定义启动方式
async def
start(self, state: str = None):
342 async def start(self, state: str = None): 343 '''*timer触发时不可用* 344 345 启动应用,并发送提示 346 347 state参数为兼容旧API''' 348 if not state: 349 state = self.get_state() 350 await self.goto(state) 351 else: 352 states = state.split(".") 353 await self.goto(*states) 354 await self.send(f"已打开应用 [{self.name}]")
timer触发时不可用
启动应用,并发送提示
state参数为兼容旧API
async def
close(self):
356 async def close(self): 357 '''*timer触发时不可用* 358 359 关闭应用,并发送提示''' 360 await self.goto(root_state) 361 await self.send(f"已关闭应用 [{self.name}]")
timer触发时不可用
关闭应用,并发送提示
def
add_listener(self, user_id: int):
363 def add_listener(self, user_id: int): 364 '''为该群组添加对指定私聊的监听''' 365 private_listener_dict[user_id].append(self.group_id)
为该群组添加对指定私聊的监听
def
remove_listener(self, user_id: int = 0):
367 def remove_listener(self, user_id: int = 0): 368 '''默认移除该群组对其他私聊的所有监听''' 369 id = self.group_id 370 371 if user_id == 0: 372 for ids in private_listener_dict.values(): 373 if id in ids: 374 ids.remove(id) 375 return 376 377 if id in private_listener_dict[user_id]: 378 private_listener_dict[user_id].remove(self.group_id)
默认移除该群组对其他私聊的所有监听
async def
send(self, message):
380 async def send(self, message): 381 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 382 # 这里不使用event,因为一些event可能来自其他设备的监听传递 383 await self.bot.send_group_msg(group_id=self.group_id, message=message)
发送消息,消息的类型可以是 Message | MessageSegment | str
def
pack_messages(self, bot_id, messages):
385 def pack_messages(self, bot_id, messages): 386 '''转换为cqhttp node格式''' 387 data: List[MessageSegment] = [] 388 for m in messages: 389 if isinstance(m, MessageSegment) and m.type == "node": 390 data.append(m) 391 else: 392 m = MessageSegment.node_custom( 393 user_id=bot_id, 394 nickname="Ayaka Bot", 395 content=str(m) 396 ) 397 data.append(m) 398 return data
转换为cqhttp node格式
async def
send_many(self, messages):
400 async def send_many(self, messages): 401 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 402 # 分割长消息组(不可超过100条 403 div_len = 100 404 div_cnt = ceil(len(messages) / div_len) 405 for i in range(div_cnt): 406 msgs = self.pack_messages( 407 self.bot_id, 408 messages[i*div_len: (i+1)*div_len] 409 ) 410 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]
async def
t_send(self, bot_id: int, group_id: int, message):
421 async def t_send(self, bot_id: int, group_id: int, message): 422 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 423 bot = self.t_check(bot_id) 424 if not bot: 425 return 426 427 await bot.send_group_msg(group_id=group_id, message=message)
timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用
async def
t_send_many(self, bot_id: int, group_id: int, messages):
429 async def t_send_many(self, bot_id: int, group_id: int, messages): 430 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 431 bot = self.t_check(bot_id) 432 if not bot: 433 return 434 435 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 436 div_len = 80 437 div_cnt = ceil(len(messages) / div_len) 438 for i in range(div_cnt): 439 msgs = self.pack_messages( 440 bot_id, 441 messages[i*div_len: (i+1)*div_len] 442 ) 443 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用
def
get_bot(bot_id: int):
446def get_bot(bot_id: int): 447 '''获取已连接的bot''' 448 bot_id = str(bot_id) 449 for bot in bot_list: 450 if bot.self_id == bot_id: 451 return bot
获取已连接的bot
def
get_group(bot_id: int, group_id: int):
454def get_group(bot_id: int, group_id: int): 455 '''获取对应的AyakaGroup对象,自动增加''' 456 for group in group_list: 457 if group.bot_id == bot_id and group.group_id == group_id: 458 break 459 else: 460 group = AyakaGroup(bot_id, group_id) 461 return group
获取对应的AyakaGroup对象,自动增加
def
get_app(app_name: str):
async def
deal_event( bot: nonebot.adapters.onebot.v11.bot.Bot, event: nonebot.adapters.onebot.v11.event.MessageEvent):
470async def deal_event(bot: Bot, event: MessageEvent): 471 '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组''' 472 if ayaka_root_config.exclude_old_msg: 473 time_i = int(datetime.datetime.now().timestamp()) 474 if event.time < time_i - 60: 475 return 476 477 _bot.set(bot) 478 _event.set(event) 479 480 bot_id = int(bot.self_id) 481 482 if isinstance(event, GroupMessageEvent): 483 group_id = event.group_id 484 await deal_group(bot_id, group_id) 485 486 else: 487 id = event.user_id 488 group_ids = private_listener_dict.get(id, []) 489 ts = [asyncio.create_task(deal_group(bot_id, group_id)) 490 for group_id in group_ids] 491 await asyncio.gather(*ts)
处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组
async def
deal_group(bot_id: int, group_id: int):
494async def deal_group(bot_id: int, group_id: int): 495 prefix = ayaka_root_config.prefix 496 497 # 群组 498 group = get_group(bot_id, group_id) 499 _group.set(group) 500 501 # 消息 502 message = _event.get().message 503 _message.set(message) 504 505 # 从子状态开始向上查找可用的触发 506 state = group.state 507 cascade_triggers = get_cascade_triggers(state, 0) 508 509 # 命令 510 # 消息前缀文本 511 first = get_first(message) 512 if first.startswith(prefix): 513 first = first[len(prefix):] 514 for ts in cascade_triggers: 515 if await deal_cmd_triggers(ts, message, first, state): 516 return 517 518 # 命令退化成消息 519 for ts in cascade_triggers: 520 if await deal_text_triggers(ts, message, state): 521 return
async def
deal_cmd_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, first: str, state: ayaka.state.AyakaState):
524async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState): 525 sep = ayaka_root_config.separate 526 527 # 命令 528 temp = [t for t in triggers if t.cmds] 529 # 根据命令长度排序,长命令优先级更高 530 cmd_ts = [(c, t) for t in temp for c in t.cmds] 531 cmd_ts.sort(key=lambda x: len(x[0]), reverse=1) 532 533 for c, t in cmd_ts: 534 if first.startswith(c): 535 # 设置上下文 536 # 设置命令 537 _cmd.set(c) 538 # 设置参数 539 left = first[len(c):].lstrip(sep) 540 if left: 541 arg = Message([MessageSegment.text(left), *message[1:]]) 542 else: 543 arg = Message(message[1:]) 544 _arg.set(arg) 545 _args.set(divide_message(arg)) 546 547 # 触发 548 log_trigger(c, t.app.name, state, t.func.__name__) 549 await t.run() 550 551 # 阻断后续 552 if t.block: 553 return True
async def
deal_text_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, state: ayaka.state.AyakaState):
556async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState): 557 # 消息 558 text_ts = [t for t in triggers if not t.cmds] 559 560 # 设置上下文 561 # 设置命令 562 _cmd.set("") 563 # 设置参数 564 _arg.set(message) 565 _args.set(divide_message(message)) 566 567 for t in text_ts: 568 # 触发 569 log_trigger("", t.app.name, state, t.func.__name__) 570 await t.run() 571 572 # 阻断后续 573 if t.block: 574 return True
577def get_cascade_triggers(state: AyakaState, deep: int = 0): 578 # 根据深度筛选funcs 579 ts = [ 580 t for t in state.triggers 581 if t.deep == "all" or t.deep >= deep 582 ] 583 cascade_triggers = [ts] 584 585 # 获取父状态的方法 586 if state.parent: 587 cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1)) 588 589 # 排除空项 590 cascade_triggers = [ts for ts in cascade_triggers if ts] 591 return cascade_triggers
def
log_trigger(cmd, app_name, state, func_name):
594def log_trigger(cmd, app_name, state, func_name): 595 '''日志记录''' 596 items = [] 597 items.append(f"状态:<c>{state}</c>") 598 items.append(f"应用:<y>{app_name}</y>") 599 if cmd: 600 items.append(f"命令:<y>{cmd}</y>") 601 else: 602 items.append("命令:<g>无</g>") 603 items.append(f"回调:<c>{func_name}</c>") 604 info = " | ".join(items) 605 logger.opt(colors=True).debug(info)
日志记录
def
get_first(message: nonebot.adapters.onebot.v11.message.Message):
def
divide_message( message: nonebot.adapters.onebot.v11.message.Message) -> List[nonebot.adapters.onebot.v11.message.MessageSegment]:
632def regist_func(app: AyakaApp, func): 633 '''注册回调''' 634 # 默认是无状态应用,从root开始触发 635 states: List[AyakaState] = getattr(func, "states", [root_state]) 636 # 默认是消息响应 637 cmds: List[str] = getattr(func, "cmds", []) 638 # 默认监听深度为0 639 deep: int = getattr(func, "deep", 0) 640 # 默认阻断 641 block: bool = getattr(func, "block", True) 642 643 # 注册 644 for s in states: 645 s.on_cmd(cmds, app, deep, block)(func) 646 647 return func
注册回调
@driver.on_startup
async def
startup():
653@driver.on_startup 654async def startup(): 655 # 注册所有回调 656 for app in app_list: 657 for func in app.funcs: 658 regist_func(app, func) 659 660 if ayaka_root_config.debug: 661 s = json.dumps( 662 root_state.dict(), ensure_ascii=0, 663 indent=4, default=repr 664 ) 665 path = ayaka_data_path / "all_state.json" 666 with path.open("w+", encoding="utf8") as f: 667 f.write(s)