ayaka.ayaka

ayaka核心

  1'''ayaka核心'''
  2import asyncio
  3import datetime
  4import inspect
  5import json
  6from math import ceil
  7from pathlib import Path
  8import re
  9from loguru import logger
 10from typing import List, Dict, Literal, Tuple, Union
 11from .depend import AyakaCache
 12from .config import ayaka_root_config, ayaka_data_path
 13from .constant import _bot, _event, _group, _arg, _args, _message, _cmd, _cmd_regex, _enter_exit_during, app_list, group_list, bot_list, private_listener_dict
 14from .driver import on_message, get_driver, Message, MessageSegment, Bot, MessageEvent, GroupMessageEvent
 15from .state import AyakaState, AyakaTrigger, AyakaTimer, root_state, ensure_regex_list
 16from .on import AyakaOn
 17
 18
 19class AyakaGroup:
 20    def __repr__(self) -> str:
 21        return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})"
 22
 23    def __init__(self, bot_id: int, group_id: int) -> None:
 24        self.bot_id = bot_id
 25        self.group_id = group_id
 26        self.state = root_state
 27
 28        # 添加app,并分配独立数据空间
 29        self.apps: List["AyakaApp"] = []
 30        self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {}
 31        for app in app_list:
 32            # if app.name not in forbid_names:
 33            self.apps.append(app)
 34            self.cache_dict[app.name] = {}
 35
 36        group_list.append(self)
 37
 38        if ayaka_root_config.debug:
 39            print(self)
 40
 41    async def enter(self, state: Union[str, List[str], AyakaState]):
 42        if isinstance(state, list):
 43            next_state = self.state.join(*state)
 44        elif isinstance(state, str):
 45            next_state = self.state.join(state)
 46        else:
 47            next_state = self.state.join(*state.keys)
 48        return await self.goto(next_state)
 49
 50    async def back(self):
 51        if self.state.parent:
 52            await self.state.exit()
 53            self.state = self.state.parent
 54            return self.state
 55
 56    async def goto(self, state: AyakaState):
 57        if _enter_exit_during.get() > 0:
 58            logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误")
 59
 60        keys = state.keys
 61
 62        # 找到第一个不同的结点
 63        n0 = len(keys)
 64        n1 = len(self.state.keys)
 65        n = min(n0, n1)
 66        for i in range(n):
 67            if keys[i] != self.state.keys[i]:
 68                break
 69        else:
 70            i += 1
 71
 72        # 回退
 73        for j in range(i, n1):
 74            await self.back()
 75        keys = keys[i:]
 76
 77        # 重新出发
 78        for key in keys:
 79            self.state = self.state[key]
 80            await self.state.enter()
 81        logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>")
 82        return self.state
 83
 84    def get_app(self, name: str):
 85        '''根据app名获取该group所启用的app,不存在则返回None'''
 86        for app in self.apps:
 87            if app.name == name:
 88                return app
 89
 90
 91class AyakaApp:
 92    def __repr__(self) -> str:
 93        return f"{self.__class__.__name__}({self.name})"
 94
 95    def __init__(self, name: str) -> None:
 96        self.path = Path(inspect.stack()[1].filename)
 97        logger.opt(colors=True).debug(f"加载应用 \"<c>{name}</c>\"")
 98
 99        for app in app_list:
100            if app.name == name:
101                raise Exception(
102                    f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)")
103
104        self.name = name
105        self.ayaka_root_config = ayaka_root_config
106        self.funcs = []
107        self.on = AyakaOn(self)
108        self.timers: List[AyakaTimer] = []
109
110        self.root_state = root_state
111        self.plugin_state = self.get_state()
112
113        self._intro = "没有介绍"
114        self.state_helps: Dict[str, List[str]] = {}
115        self.idle_helps: List[str] = []
116
117        app_list.append(self)
118        if ayaka_root_config.debug:
119            print(self)
120
121    @property
122    def intro(self):
123        help = self._intro
124        for h in self.idle_helps:
125            help += "\n" + h
126        return help
127
128    @property
129    def all_help(self):
130        help = self.intro
131        for s, hs in self.state_helps.items():
132            help += f"\n[{s}]"
133            for h in hs:
134                help += "\n" + h
135        return help
136
137    @property
138    def help(self):
139        '''获取当前状态下的帮助,没有找到则返回介绍'''
140        total_triggers = get_cascade_triggers(self.state)
141
142        helps = []
143        cmds = []
144        for ts in total_triggers:
145            flag = 1
146            for t in ts:
147                if t.app == self:
148                    for c in t.raw_cmds:
149                        if c in cmds:
150                            break
151                    else:
152                        if flag:
153                            helps.append(f"[{t.state[1:]}]")
154                            flag = 0
155                        cmds.extend(t.raw_cmds)
156                        helps.append(t.help)
157
158        if not helps:
159            return self.intro
160        return "\n".join(helps)
161
162    @help.setter
163    def help(self, help: str):
164        self._intro = help
165
166    @property
167    def user_name(self):
168        '''*timer触发时不可用*
169
170        当前消息的发送人的群名片或昵称
171        '''
172        s = self.event.sender
173        name = s.card or s.nickname
174        return name
175
176    @property
177    def user_id(self):
178        '''*timer触发时不可用*
179
180        当前消息的发送人的uid
181        '''
182        return self.event.user_id
183
184    @property
185    def bot(self):
186        '''*timer触发时不可用*
187
188        当前bot
189        '''
190        return _bot.get()
191
192    @property
193    def event(self):
194        '''*timer触发时不可用*
195
196        当前消息
197        '''
198        return _event.get()
199
200    @property
201    def cache(self):
202        '''*timer触发时不可用*
203
204        当前群组的缓存空间'''
205        return self.group.cache_dict[self.name]
206
207    @property
208    def group_id(self):
209        '''*timer触发时不可用*
210
211        当前群组的id
212
213        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id
214        '''
215        return self.group.group_id
216
217    @property
218    def bot_id(self):
219        '''*timer触发时不可用*
220
221        当前bot的id
222        '''
223        return self.group.bot_id
224
225    @property
226    def group(self):
227        '''*timer触发时不可用*
228
229        当前群组
230
231        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A
232        '''
233        return _group.get()
234
235    @property
236    def arg(self):
237        '''*timer触发时不可用*
238
239        当前消息在移除了命令后的剩余部分
240        '''
241        return _arg.get()
242
243    @property
244    def args(self):
245        '''*timer触发时不可用*
246
247        当前消息在移除了命令后,剩余部分按照空格分割后的数组
248
249        注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素
250        '''
251        return _args.get()
252
253    @property
254    def cmd(self):
255        '''*timer触发时不可用*
256
257        当前消息的命令头
258        '''
259        return _cmd.get()
260
261    @property
262    def cmd_regex(self):
263        '''*timer触发时不可用*
264
265        当前消息的命令头
266        '''
267        return _cmd_regex.get()
268
269    @property
270    def message(self):
271        '''*timer触发时不可用*
272
273        当前消息
274        '''
275        return _message.get()
276
277    @property
278    def state(self):
279        return self.group.state
280
281    def get_state(self, key: Union[str, List[str]] = [], *_keys: str):
282        # _keys为兼容旧API(0.5.3及以前
283        '''
284            假设当前app.name为test
285
286            >>> get_state(key1) -> [root.test].key1
287
288            >>> get_state([key1, key2]) -> [root.test].key1.key2
289
290            特别的,参数可以为空,例如:
291
292            >>> get_state() -> [root.test]
293        '''
294        if isinstance(key, list):
295            _keys = [self.name, *key]
296        else:
297            _keys = [self.name, key, *_keys]
298
299        keys = []
300        for k in _keys:
301            keys.extend(k.split(ayaka_root_config.state_separate))
302
303        return root_state.join(*keys)
304
305    async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str):
306        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
307        return await self.goto(state, *keys)
308
309    async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str):
310        # keys为兼容旧API(0.5.2及以前
311        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
312        if isinstance(state, str):
313            state = self.get_state(state, *keys)
314        elif isinstance(state, list):
315            state = self.get_state(*state)
316        return await self.group.goto(state)
317
318    async def enter(self, state: Union[str, List[str], AyakaState]):
319        '''进入子状态'''
320        return await self.group.enter(state)
321
322    async def back(self):
323        '''回退当前群组的状态'''
324        return await self.group.back()
325
326    def _add_func(self, func):
327        '''如果不存在就加入self.funcs'''
328        if func not in self.funcs:
329            self.funcs.append(func)
330
331    def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"):
332        '''注册深度监听'''
333        def decorator(func):
334            func.deep = deep
335            self._add_func(func)
336            return func
337        return decorator
338
339    def on_no_block(self, block: bool = False):
340        '''注册非阻断'''
341        def decorator(func):
342            func.block = block
343            self._add_func(func)
344            return func
345        return decorator
346
347    def on_cmd(self, *cmds: Union[str, re.Pattern]):
348        '''注册命令触发,不填写命令则视为文本消息'''
349        def decorator(func):
350            func.cmds = getattr(func, "cmds", [])
351            func.cmds.extend(ensure_regex_list(cmds))
352            self._add_func(func)
353            return func
354        return decorator
355
356    def on_cmd_regex(self, *cmds: Union[str, re.Pattern]):
357        '''注册命令触发,不填写命令则视为文本消息'''
358        def decorator(func):
359            func.cmds = getattr(func, "cmds", [])
360            func.cmds.extend(ensure_regex_list(cmds, False))
361            self._add_func(func)
362            return func
363        return decorator
364
365    def on_text(self):
366        '''注册消息触发'''
367        def decorator(func):
368            func = self.on_no_block()(func)
369            self._add_func(func)
370            return func
371        return decorator
372
373    def on_state(self, *states: Union[AyakaState, str, List[str]]):
374        '''注册有状态响应,不填写states则为plugin_state'''
375        _states = []
376
377        if not states:
378            _states = [self.plugin_state]
379
380        else:
381            for s in states:
382                if isinstance(s, str):
383                    s = self.get_state(s)
384                elif isinstance(s, list):
385                    s = self.get_state(*s)
386                _states.append(s)
387
388        def decorator(func):
389            func.states = _states
390            self._add_func(func)
391            return func
392        return decorator
393
394    def on_idle(self):
395        '''注册根结点回调'''
396        return self.on_state(self.root_state)
397
398    def on_everyday(self, h: int, m: int, s: int):
399        '''每日定时触发'''
400        return self.on_interval(86400, h, m, s)
401
402    def on_interval(self, gap: int, h=-1, m=-1, s=-1, show=True):
403        '''在指定的时间点后循环触发'''
404        def decorator(func):
405            t = AyakaTimer(self, gap, h, m, s, func, show)
406            self.timers.append(t)
407            return func
408        return decorator
409
410    def on_start_cmds(self, *cmds: Union[str, re.Pattern]):
411        def decorator(func):
412            func = self.on_idle()(func)
413            func = self.on_cmd(*cmds)(func)
414            return func
415        return decorator
416
417    def on_close_cmds(self, *cmds: Union[str, re.Pattern]):
418        def decorator(func):
419            func = self.on_state()(func)
420            func = self.on_deep_all()(func)
421            func = self.on_cmd(*cmds)(func)
422            return func
423        return decorator
424
425    def set_start_cmds(self, *cmds: Union[str, re.Pattern]):
426        '''设置应用启动命令,当然,你也可以通过app.on_start_cmds自定义启动方式'''
427        @self.on_start_cmds(*cmds)
428        async def start():
429            '''打开应用'''
430            await self.start()
431
432    def set_close_cmds(self, *cmds: Union[str, re.Pattern]):
433        '''设置应用关闭命令,当然,你也可以通过app.on_close_cmds自定义关闭方式'''
434        @self.on_close_cmds(*cmds)
435        async def close():
436            '''关闭应用'''
437            await self.close()
438
439    async def start(self, state: str = ""):
440        '''*timer触发时不可用*
441
442        启动应用,并发送提示
443
444        state参数为兼容旧API'''
445        if not state:
446            state = self.get_state()
447        await self.goto(state)
448        await self.send(f"已打开应用 [{self.name}]")
449
450    async def close(self):
451        '''*timer触发时不可用*
452
453        关闭应用,并发送提示'''
454        await self.goto(root_state)
455        await self.send(f"已关闭应用 [{self.name}]")
456
457    def add_listener(self, user_id: int):
458        '''为该群组添加对指定私聊的监听'''
459        private_listener_dict[user_id].append(self.group_id)
460
461    def remove_listener(self, user_id: int = 0):
462        '''默认移除该群组对其他私聊的所有监听'''
463        id = self.group_id
464
465        if user_id == 0:
466            for ids in private_listener_dict.values():
467                if id in ids:
468                    ids.remove(id)
469            return
470
471        if id in private_listener_dict[user_id]:
472            private_listener_dict[user_id].remove(self.group_id)
473
474    async def send(self, message):
475        '''发送消息,消息的类型可以是 Message | MessageSegment | str'''
476        # 这里不使用event,因为一些event可能来自其他设备的监听传递
477        await self.bot.send_group_msg(group_id=self.group_id, message=message)
478
479    def pack_messages(self, bot_id, messages):
480        '''转换为cqhttp node格式'''
481        data: List[MessageSegment] = []
482        for m in messages:
483            if isinstance(m, MessageSegment) and m.type == "node":
484                data.append(m)
485            else:
486                m = MessageSegment.node_custom(
487                    user_id=bot_id,
488                    nickname="Ayaka Bot",
489                    content=str(m)
490                )
491                data.append(m)
492        return data
493
494    async def send_many(self, messages):
495        '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]'''
496        # 分割长消息组(不可超过100条
497        div_len = 100
498        div_cnt = ceil(len(messages) / div_len)
499        for i in range(div_cnt):
500            msgs = self.pack_messages(
501                self.bot_id,
502                messages[i*div_len: (i+1)*div_len]
503            )
504            await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
505
506    def t_check(self, bot_id: int):
507        # 未连接
508        bot = get_bot(bot_id)
509        if not bot:
510            logger.warning(f"BOT({bot_id}) 未连接")
511            return
512
513        return bot
514
515    async def t_send(self, bot_id: int, group_id: int, message):
516        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
517        bot = self.t_check(bot_id)
518        if not bot:
519            return
520
521        await bot.send_group_msg(group_id=group_id, message=message)
522
523    async def t_send_many(self, bot_id: int, group_id: int, messages):
524        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
525        bot = self.t_check(bot_id)
526        if not bot:
527            return
528
529        # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度
530        div_len = 80
531        div_cnt = ceil(len(messages) / div_len)
532        for i in range(div_cnt):
533            msgs = self.pack_messages(
534                bot_id,
535                messages[i*div_len: (i+1)*div_len]
536            )
537            await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
538
539
540def get_bot(bot_id: int):
541    '''获取已连接的bot'''
542    bot_id = str(bot_id)
543    for bot in bot_list:
544        if bot.self_id == bot_id:
545            return bot
546
547
548def get_group(bot_id: int, group_id: int):
549    '''获取对应的AyakaGroup对象,自动增加'''
550    for group in group_list:
551        if group.bot_id == bot_id and group.group_id == group_id:
552            break
553    else:
554        group = AyakaGroup(bot_id, group_id)
555    return group
556
557
558def get_app(app_name: str):
559    for app in app_list:
560        if app.name == app_name:
561            return app
562
563
564async def deal_event(bot: Bot, event: MessageEvent):
565    '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组'''
566    if ayaka_root_config.exclude_old_msg:
567        time_i = int(datetime.datetime.now().timestamp())
568        if event.time < time_i - 60:
569            return
570
571    _bot.set(bot)
572    _event.set(event)
573
574    bot_id = int(bot.self_id)
575
576    if isinstance(event, GroupMessageEvent):
577        group_id = event.group_id
578        await deal_group(bot_id, group_id)
579
580    else:
581        id = event.user_id
582        group_ids = private_listener_dict.get(id, [])
583        ts = [asyncio.create_task(deal_group(bot_id, group_id))
584              for group_id in group_ids]
585        await asyncio.gather(*ts)
586
587
588async def deal_group(bot_id: int, group_id: int):
589    prefix = ayaka_root_config.prefix
590
591    # 群组
592    group = get_group(bot_id, group_id)
593    _group.set(group)
594
595    # 消息
596    message = _event.get().message
597    _message.set(message)
598
599    # 从子状态开始向上查找可用的触发
600    state = group.state
601    cascade_triggers = get_cascade_triggers(state, 0)
602
603    # 命令
604    # 消息前缀文本
605    first = get_first(message)
606    if first.startswith(prefix):
607        first = first[len(prefix):]
608        for ts in cascade_triggers:
609            if await deal_cmd_triggers(ts, message, first, state):
610                return
611
612    # 命令退化成消息
613    for ts in cascade_triggers:
614        if await deal_text_triggers(ts, message, state):
615            return
616
617
618async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState):
619    sep = ayaka_root_config.separate
620
621    # 找到触发命令
622    cmd_ts: List[Tuple[re.Pattern, str, AyakaTrigger]] = []
623    for t in triggers:
624        for cmd in t.cmds:
625            r = cmd.match(first)
626            if r:
627                cmd_ts.append([r, r.group(), t])
628                break
629
630    # 根据命令长度排序,长命令优先级更高
631    cmd_ts.sort(key=lambda x: len(x[1]), reverse=1)
632
633    for r, c, t in cmd_ts:
634        # 设置上下文
635        # 设置命令
636        _cmd.set(c)
637        _cmd_regex.set(r)
638        # 设置参数
639        left = first[len(c):].lstrip(sep)
640        if left:
641            arg = Message([MessageSegment.text(left), *message[1:]])
642        else:
643            arg = Message(message[1:])
644        _arg.set(arg)
645        _args.set(divide_message(arg))
646
647        # 记录触发
648        log_trigger(c, t.app.name, state, t.func.__name__)
649        f = await t.run()
650
651        # 阻断后续
652        if f and t.block:
653            return True
654
655
656async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState):
657    # 消息
658    text_ts = [t for t in triggers if not t.cmds]
659
660    # 设置上下文
661    # 设置参数
662    _arg.set(message)
663    _args.set(divide_message(message))
664
665    for t in text_ts:
666        # 记录触发
667        log_trigger("", t.app.name, state, t.func.__name__)
668        f = await t.run()
669
670        # 阻断后续
671        if f and t.block:
672            return True
673
674
675def get_cascade_triggers(state: AyakaState, deep: int = 0):
676    # 根据深度筛选funcs
677    ts = [
678        t for t in state.triggers
679        if t.deep == "all" or t.deep >= deep
680    ]
681    cascade_triggers = [ts]
682
683    # 获取父状态的方法
684    if state.parent:
685        cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1))
686
687    # 排除空项
688    cascade_triggers = [ts for ts in cascade_triggers if ts]
689    return cascade_triggers
690
691
692def log_trigger(cmd, app_name, state, func_name):
693    '''日志记录'''
694    items = []
695    items.append(f"状态:<c>{state}</c>")
696    items.append(f"应用:<y>{app_name}</y>")
697    if cmd:
698        items.append(f"命令:<y>{cmd}</y>")
699    else:
700        items.append("命令:<g>无</g>")
701    items.append(f"回调:<c>{func_name}</c>")
702    info = " | ".join(items)
703    logger.opt(colors=True).debug(info)
704
705
706def get_first(message: Message):
707    first = ""
708    for m in message:
709        if m.type == "text":
710            first += str(m)
711        else:
712            break
713    return first
714
715
716def divide_message(message: Message) -> List[MessageSegment]:
717    args = []
718    sep = ayaka_root_config.separate
719
720    for m in message:
721        if m.is_text():
722            ss = str(m).split(sep)
723            args.extend(MessageSegment.text(s) for s in ss if s)
724        else:
725            args.append(m)
726
727    return args
728
729
730def regist_func(app: AyakaApp, func):
731    '''注册回调'''
732    # 默认是无状态应用,从root开始触发
733    states: List[AyakaState] = getattr(func, "states", [root_state])
734    # 默认是消息响应
735    cmds: List[re.Pattern] = getattr(func, "cmds", [])
736    # 默认监听深度为0
737    deep: int = getattr(func, "deep", 0)
738    # 默认阻断
739    block: bool = getattr(func, "block", True)
740
741    # 注册
742    for s in states:
743        s.on_cmd(cmds, app, deep, block)(func)
744
745    return func
746
747
748driver = get_driver()
749
750
751@driver.on_startup
752async def startup():
753    # 注册所有回调
754    for app in app_list:
755        for func in app.funcs:
756            regist_func(app, func)
757
758    if ayaka_root_config.debug:
759        s = json.dumps(
760            root_state.dict(), ensure_ascii=0,
761            indent=4, default=repr
762        )
763        path = ayaka_data_path / "all_state.json"
764        with path.open("w+", encoding="utf8") as f:
765            f.write(s)
766
767on_message(priority=20, block=False, handlers=[deal_event])
class AyakaGroup:
20class AyakaGroup:
21    def __repr__(self) -> str:
22        return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})"
23
24    def __init__(self, bot_id: int, group_id: int) -> None:
25        self.bot_id = bot_id
26        self.group_id = group_id
27        self.state = root_state
28
29        # 添加app,并分配独立数据空间
30        self.apps: List["AyakaApp"] = []
31        self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {}
32        for app in app_list:
33            # if app.name not in forbid_names:
34            self.apps.append(app)
35            self.cache_dict[app.name] = {}
36
37        group_list.append(self)
38
39        if ayaka_root_config.debug:
40            print(self)
41
42    async def enter(self, state: Union[str, List[str], AyakaState]):
43        if isinstance(state, list):
44            next_state = self.state.join(*state)
45        elif isinstance(state, str):
46            next_state = self.state.join(state)
47        else:
48            next_state = self.state.join(*state.keys)
49        return await self.goto(next_state)
50
51    async def back(self):
52        if self.state.parent:
53            await self.state.exit()
54            self.state = self.state.parent
55            return self.state
56
57    async def goto(self, state: AyakaState):
58        if _enter_exit_during.get() > 0:
59            logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误")
60
61        keys = state.keys
62
63        # 找到第一个不同的结点
64        n0 = len(keys)
65        n1 = len(self.state.keys)
66        n = min(n0, n1)
67        for i in range(n):
68            if keys[i] != self.state.keys[i]:
69                break
70        else:
71            i += 1
72
73        # 回退
74        for j in range(i, n1):
75            await self.back()
76        keys = keys[i:]
77
78        # 重新出发
79        for key in keys:
80            self.state = self.state[key]
81            await self.state.enter()
82        logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>")
83        return self.state
84
85    def get_app(self, name: str):
86        '''根据app名获取该group所启用的app,不存在则返回None'''
87        for app in self.apps:
88            if app.name == name:
89                return app
AyakaGroup(bot_id: int, group_id: int)
24    def __init__(self, bot_id: int, group_id: int) -> None:
25        self.bot_id = bot_id
26        self.group_id = group_id
27        self.state = root_state
28
29        # 添加app,并分配独立数据空间
30        self.apps: List["AyakaApp"] = []
31        self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {}
32        for app in app_list:
33            # if app.name not in forbid_names:
34            self.apps.append(app)
35            self.cache_dict[app.name] = {}
36
37        group_list.append(self)
38
39        if ayaka_root_config.debug:
40            print(self)
async def enter(self, state: Union[str, List[str], ayaka.state.AyakaState]):
42    async def enter(self, state: Union[str, List[str], AyakaState]):
43        if isinstance(state, list):
44            next_state = self.state.join(*state)
45        elif isinstance(state, str):
46            next_state = self.state.join(state)
47        else:
48            next_state = self.state.join(*state.keys)
49        return await self.goto(next_state)
async def back(self):
51    async def back(self):
52        if self.state.parent:
53            await self.state.exit()
54            self.state = self.state.parent
55            return self.state
async def goto(self, state: ayaka.state.AyakaState):
57    async def goto(self, state: AyakaState):
58        if _enter_exit_during.get() > 0:
59            logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误")
60
61        keys = state.keys
62
63        # 找到第一个不同的结点
64        n0 = len(keys)
65        n1 = len(self.state.keys)
66        n = min(n0, n1)
67        for i in range(n):
68            if keys[i] != self.state.keys[i]:
69                break
70        else:
71            i += 1
72
73        # 回退
74        for j in range(i, n1):
75            await self.back()
76        keys = keys[i:]
77
78        # 重新出发
79        for key in keys:
80            self.state = self.state[key]
81            await self.state.enter()
82        logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>")
83        return self.state
def get_app(self, name: str):
85    def get_app(self, name: str):
86        '''根据app名获取该group所启用的app,不存在则返回None'''
87        for app in self.apps:
88            if app.name == name:
89                return app

根据app名获取该group所启用的app,不存在则返回None

class AyakaApp:
 92class AyakaApp:
 93    def __repr__(self) -> str:
 94        return f"{self.__class__.__name__}({self.name})"
 95
 96    def __init__(self, name: str) -> None:
 97        self.path = Path(inspect.stack()[1].filename)
 98        logger.opt(colors=True).debug(f"加载应用 \"<c>{name}</c>\"")
 99
100        for app in app_list:
101            if app.name == name:
102                raise Exception(
103                    f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)")
104
105        self.name = name
106        self.ayaka_root_config = ayaka_root_config
107        self.funcs = []
108        self.on = AyakaOn(self)
109        self.timers: List[AyakaTimer] = []
110
111        self.root_state = root_state
112        self.plugin_state = self.get_state()
113
114        self._intro = "没有介绍"
115        self.state_helps: Dict[str, List[str]] = {}
116        self.idle_helps: List[str] = []
117
118        app_list.append(self)
119        if ayaka_root_config.debug:
120            print(self)
121
122    @property
123    def intro(self):
124        help = self._intro
125        for h in self.idle_helps:
126            help += "\n" + h
127        return help
128
129    @property
130    def all_help(self):
131        help = self.intro
132        for s, hs in self.state_helps.items():
133            help += f"\n[{s}]"
134            for h in hs:
135                help += "\n" + h
136        return help
137
138    @property
139    def help(self):
140        '''获取当前状态下的帮助,没有找到则返回介绍'''
141        total_triggers = get_cascade_triggers(self.state)
142
143        helps = []
144        cmds = []
145        for ts in total_triggers:
146            flag = 1
147            for t in ts:
148                if t.app == self:
149                    for c in t.raw_cmds:
150                        if c in cmds:
151                            break
152                    else:
153                        if flag:
154                            helps.append(f"[{t.state[1:]}]")
155                            flag = 0
156                        cmds.extend(t.raw_cmds)
157                        helps.append(t.help)
158
159        if not helps:
160            return self.intro
161        return "\n".join(helps)
162
163    @help.setter
164    def help(self, help: str):
165        self._intro = help
166
167    @property
168    def user_name(self):
169        '''*timer触发时不可用*
170
171        当前消息的发送人的群名片或昵称
172        '''
173        s = self.event.sender
174        name = s.card or s.nickname
175        return name
176
177    @property
178    def user_id(self):
179        '''*timer触发时不可用*
180
181        当前消息的发送人的uid
182        '''
183        return self.event.user_id
184
185    @property
186    def bot(self):
187        '''*timer触发时不可用*
188
189        当前bot
190        '''
191        return _bot.get()
192
193    @property
194    def event(self):
195        '''*timer触发时不可用*
196
197        当前消息
198        '''
199        return _event.get()
200
201    @property
202    def cache(self):
203        '''*timer触发时不可用*
204
205        当前群组的缓存空间'''
206        return self.group.cache_dict[self.name]
207
208    @property
209    def group_id(self):
210        '''*timer触发时不可用*
211
212        当前群组的id
213
214        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id
215        '''
216        return self.group.group_id
217
218    @property
219    def bot_id(self):
220        '''*timer触发时不可用*
221
222        当前bot的id
223        '''
224        return self.group.bot_id
225
226    @property
227    def group(self):
228        '''*timer触发时不可用*
229
230        当前群组
231
232        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A
233        '''
234        return _group.get()
235
236    @property
237    def arg(self):
238        '''*timer触发时不可用*
239
240        当前消息在移除了命令后的剩余部分
241        '''
242        return _arg.get()
243
244    @property
245    def args(self):
246        '''*timer触发时不可用*
247
248        当前消息在移除了命令后,剩余部分按照空格分割后的数组
249
250        注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素
251        '''
252        return _args.get()
253
254    @property
255    def cmd(self):
256        '''*timer触发时不可用*
257
258        当前消息的命令头
259        '''
260        return _cmd.get()
261
262    @property
263    def cmd_regex(self):
264        '''*timer触发时不可用*
265
266        当前消息的命令头
267        '''
268        return _cmd_regex.get()
269
270    @property
271    def message(self):
272        '''*timer触发时不可用*
273
274        当前消息
275        '''
276        return _message.get()
277
278    @property
279    def state(self):
280        return self.group.state
281
282    def get_state(self, key: Union[str, List[str]] = [], *_keys: str):
283        # _keys为兼容旧API(0.5.3及以前
284        '''
285            假设当前app.name为test
286
287            >>> get_state(key1) -> [root.test].key1
288
289            >>> get_state([key1, key2]) -> [root.test].key1.key2
290
291            特别的,参数可以为空,例如:
292
293            >>> get_state() -> [root.test]
294        '''
295        if isinstance(key, list):
296            _keys = [self.name, *key]
297        else:
298            _keys = [self.name, key, *_keys]
299
300        keys = []
301        for k in _keys:
302            keys.extend(k.split(ayaka_root_config.state_separate))
303
304        return root_state.join(*keys)
305
306    async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str):
307        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
308        return await self.goto(state, *keys)
309
310    async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str):
311        # keys为兼容旧API(0.5.2及以前
312        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
313        if isinstance(state, str):
314            state = self.get_state(state, *keys)
315        elif isinstance(state, list):
316            state = self.get_state(*state)
317        return await self.group.goto(state)
318
319    async def enter(self, state: Union[str, List[str], AyakaState]):
320        '''进入子状态'''
321        return await self.group.enter(state)
322
323    async def back(self):
324        '''回退当前群组的状态'''
325        return await self.group.back()
326
327    def _add_func(self, func):
328        '''如果不存在就加入self.funcs'''
329        if func not in self.funcs:
330            self.funcs.append(func)
331
332    def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"):
333        '''注册深度监听'''
334        def decorator(func):
335            func.deep = deep
336            self._add_func(func)
337            return func
338        return decorator
339
340    def on_no_block(self, block: bool = False):
341        '''注册非阻断'''
342        def decorator(func):
343            func.block = block
344            self._add_func(func)
345            return func
346        return decorator
347
348    def on_cmd(self, *cmds: Union[str, re.Pattern]):
349        '''注册命令触发,不填写命令则视为文本消息'''
350        def decorator(func):
351            func.cmds = getattr(func, "cmds", [])
352            func.cmds.extend(ensure_regex_list(cmds))
353            self._add_func(func)
354            return func
355        return decorator
356
357    def on_cmd_regex(self, *cmds: Union[str, re.Pattern]):
358        '''注册命令触发,不填写命令则视为文本消息'''
359        def decorator(func):
360            func.cmds = getattr(func, "cmds", [])
361            func.cmds.extend(ensure_regex_list(cmds, False))
362            self._add_func(func)
363            return func
364        return decorator
365
366    def on_text(self):
367        '''注册消息触发'''
368        def decorator(func):
369            func = self.on_no_block()(func)
370            self._add_func(func)
371            return func
372        return decorator
373
374    def on_state(self, *states: Union[AyakaState, str, List[str]]):
375        '''注册有状态响应,不填写states则为plugin_state'''
376        _states = []
377
378        if not states:
379            _states = [self.plugin_state]
380
381        else:
382            for s in states:
383                if isinstance(s, str):
384                    s = self.get_state(s)
385                elif isinstance(s, list):
386                    s = self.get_state(*s)
387                _states.append(s)
388
389        def decorator(func):
390            func.states = _states
391            self._add_func(func)
392            return func
393        return decorator
394
395    def on_idle(self):
396        '''注册根结点回调'''
397        return self.on_state(self.root_state)
398
399    def on_everyday(self, h: int, m: int, s: int):
400        '''每日定时触发'''
401        return self.on_interval(86400, h, m, s)
402
403    def on_interval(self, gap: int, h=-1, m=-1, s=-1, show=True):
404        '''在指定的时间点后循环触发'''
405        def decorator(func):
406            t = AyakaTimer(self, gap, h, m, s, func, show)
407            self.timers.append(t)
408            return func
409        return decorator
410
411    def on_start_cmds(self, *cmds: Union[str, re.Pattern]):
412        def decorator(func):
413            func = self.on_idle()(func)
414            func = self.on_cmd(*cmds)(func)
415            return func
416        return decorator
417
418    def on_close_cmds(self, *cmds: Union[str, re.Pattern]):
419        def decorator(func):
420            func = self.on_state()(func)
421            func = self.on_deep_all()(func)
422            func = self.on_cmd(*cmds)(func)
423            return func
424        return decorator
425
426    def set_start_cmds(self, *cmds: Union[str, re.Pattern]):
427        '''设置应用启动命令,当然,你也可以通过app.on_start_cmds自定义启动方式'''
428        @self.on_start_cmds(*cmds)
429        async def start():
430            '''打开应用'''
431            await self.start()
432
433    def set_close_cmds(self, *cmds: Union[str, re.Pattern]):
434        '''设置应用关闭命令,当然,你也可以通过app.on_close_cmds自定义关闭方式'''
435        @self.on_close_cmds(*cmds)
436        async def close():
437            '''关闭应用'''
438            await self.close()
439
440    async def start(self, state: str = ""):
441        '''*timer触发时不可用*
442
443        启动应用,并发送提示
444
445        state参数为兼容旧API'''
446        if not state:
447            state = self.get_state()
448        await self.goto(state)
449        await self.send(f"已打开应用 [{self.name}]")
450
451    async def close(self):
452        '''*timer触发时不可用*
453
454        关闭应用,并发送提示'''
455        await self.goto(root_state)
456        await self.send(f"已关闭应用 [{self.name}]")
457
458    def add_listener(self, user_id: int):
459        '''为该群组添加对指定私聊的监听'''
460        private_listener_dict[user_id].append(self.group_id)
461
462    def remove_listener(self, user_id: int = 0):
463        '''默认移除该群组对其他私聊的所有监听'''
464        id = self.group_id
465
466        if user_id == 0:
467            for ids in private_listener_dict.values():
468                if id in ids:
469                    ids.remove(id)
470            return
471
472        if id in private_listener_dict[user_id]:
473            private_listener_dict[user_id].remove(self.group_id)
474
475    async def send(self, message):
476        '''发送消息,消息的类型可以是 Message | MessageSegment | str'''
477        # 这里不使用event,因为一些event可能来自其他设备的监听传递
478        await self.bot.send_group_msg(group_id=self.group_id, message=message)
479
480    def pack_messages(self, bot_id, messages):
481        '''转换为cqhttp node格式'''
482        data: List[MessageSegment] = []
483        for m in messages:
484            if isinstance(m, MessageSegment) and m.type == "node":
485                data.append(m)
486            else:
487                m = MessageSegment.node_custom(
488                    user_id=bot_id,
489                    nickname="Ayaka Bot",
490                    content=str(m)
491                )
492                data.append(m)
493        return data
494
495    async def send_many(self, messages):
496        '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]'''
497        # 分割长消息组(不可超过100条
498        div_len = 100
499        div_cnt = ceil(len(messages) / div_len)
500        for i in range(div_cnt):
501            msgs = self.pack_messages(
502                self.bot_id,
503                messages[i*div_len: (i+1)*div_len]
504            )
505            await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
506
507    def t_check(self, bot_id: int):
508        # 未连接
509        bot = get_bot(bot_id)
510        if not bot:
511            logger.warning(f"BOT({bot_id}) 未连接")
512            return
513
514        return bot
515
516    async def t_send(self, bot_id: int, group_id: int, message):
517        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
518        bot = self.t_check(bot_id)
519        if not bot:
520            return
521
522        await bot.send_group_msg(group_id=group_id, message=message)
523
524    async def t_send_many(self, bot_id: int, group_id: int, messages):
525        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
526        bot = self.t_check(bot_id)
527        if not bot:
528            return
529
530        # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度
531        div_len = 80
532        div_cnt = ceil(len(messages) / div_len)
533        for i in range(div_cnt):
534            msgs = self.pack_messages(
535                bot_id,
536                messages[i*div_len: (i+1)*div_len]
537            )
538            await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
AyakaApp(name: str)
 96    def __init__(self, name: str) -> None:
 97        self.path = Path(inspect.stack()[1].filename)
 98        logger.opt(colors=True).debug(f"加载应用 \"<c>{name}</c>\"")
 99
100        for app in app_list:
101            if app.name == name:
102                raise Exception(
103                    f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)")
104
105        self.name = name
106        self.ayaka_root_config = ayaka_root_config
107        self.funcs = []
108        self.on = AyakaOn(self)
109        self.timers: List[AyakaTimer] = []
110
111        self.root_state = root_state
112        self.plugin_state = self.get_state()
113
114        self._intro = "没有介绍"
115        self.state_helps: Dict[str, List[str]] = {}
116        self.idle_helps: List[str] = []
117
118        app_list.append(self)
119        if ayaka_root_config.debug:
120            print(self)
help

获取当前状态下的帮助,没有找到则返回介绍

user_name

timer触发时不可用

当前消息的发送人的群名片或昵称

user_id

timer触发时不可用

当前消息的发送人的uid

bot

timer触发时不可用

当前bot

event

timer触发时不可用

当前消息

cache

timer触发时不可用

当前群组的缓存空间

group_id

timer触发时不可用

当前群组的id

注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id

bot_id

timer触发时不可用

当前bot的id

group

timer触发时不可用

当前群组

注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A

arg

timer触发时不可用

当前消息在移除了命令后的剩余部分

args

timer触发时不可用

当前消息在移除了命令后,剩余部分按照空格分割后的数组

注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素

cmd

timer触发时不可用

当前消息的命令头

cmd_regex

timer触发时不可用

当前消息的命令头

message

timer触发时不可用

当前消息

def get_state(self, key: Union[str, List[str]] = [], *_keys: str):
282    def get_state(self, key: Union[str, List[str]] = [], *_keys: str):
283        # _keys为兼容旧API(0.5.3及以前
284        '''
285            假设当前app.name为test
286
287            >>> get_state(key1) -> [root.test].key1
288
289            >>> get_state([key1, key2]) -> [root.test].key1.key2
290
291            特别的,参数可以为空,例如:
292
293            >>> get_state() -> [root.test]
294        '''
295        if isinstance(key, list):
296            _keys = [self.name, *key]
297        else:
298            _keys = [self.name, key, *_keys]
299
300        keys = []
301        for k in _keys:
302            keys.extend(k.split(ayaka_root_config.state_separate))
303
304        return root_state.join(*keys)

假设当前app.name为test

>>> get_state(key1) -> [root.test].key1
>>> get_state([key1, key2]) -> [root.test].key1.key2

特别的,参数可以为空,例如:

>>> get_state() -> [root.test]
async def set_state( self, state: Union[ayaka.state.AyakaState, str, List[str]], *keys: str):
306    async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str):
307        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
308        return await self.goto(state, *keys)

变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割

async def goto( self, state: Union[ayaka.state.AyakaState, str, List[str]], *keys: str):
310    async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str):
311        # keys为兼容旧API(0.5.2及以前
312        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
313        if isinstance(state, str):
314            state = self.get_state(state, *keys)
315        elif isinstance(state, list):
316            state = self.get_state(*state)
317        return await self.group.goto(state)

变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割

async def enter(self, state: Union[str, List[str], ayaka.state.AyakaState]):
319    async def enter(self, state: Union[str, List[str], AyakaState]):
320        '''进入子状态'''
321        return await self.group.enter(state)

进入子状态

async def back(self):
323    async def back(self):
324        '''回退当前群组的状态'''
325        return await self.group.back()

回退当前群组的状态

def on_deep_all(self, deep: Union[int, Literal['all']] = 'all'):
332    def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"):
333        '''注册深度监听'''
334        def decorator(func):
335            func.deep = deep
336            self._add_func(func)
337            return func
338        return decorator

注册深度监听

def on_no_block(self, block: bool = False):
340    def on_no_block(self, block: bool = False):
341        '''注册非阻断'''
342        def decorator(func):
343            func.block = block
344            self._add_func(func)
345            return func
346        return decorator

注册非阻断

def on_cmd(self, *cmds: Union[str, re.Pattern]):
348    def on_cmd(self, *cmds: Union[str, re.Pattern]):
349        '''注册命令触发,不填写命令则视为文本消息'''
350        def decorator(func):
351            func.cmds = getattr(func, "cmds", [])
352            func.cmds.extend(ensure_regex_list(cmds))
353            self._add_func(func)
354            return func
355        return decorator

注册命令触发,不填写命令则视为文本消息

def on_cmd_regex(self, *cmds: Union[str, re.Pattern]):
357    def on_cmd_regex(self, *cmds: Union[str, re.Pattern]):
358        '''注册命令触发,不填写命令则视为文本消息'''
359        def decorator(func):
360            func.cmds = getattr(func, "cmds", [])
361            func.cmds.extend(ensure_regex_list(cmds, False))
362            self._add_func(func)
363            return func
364        return decorator

注册命令触发,不填写命令则视为文本消息

def on_text(self):
366    def on_text(self):
367        '''注册消息触发'''
368        def decorator(func):
369            func = self.on_no_block()(func)
370            self._add_func(func)
371            return func
372        return decorator

注册消息触发

def on_state(self, *states: Union[ayaka.state.AyakaState, str, List[str]]):
374    def on_state(self, *states: Union[AyakaState, str, List[str]]):
375        '''注册有状态响应,不填写states则为plugin_state'''
376        _states = []
377
378        if not states:
379            _states = [self.plugin_state]
380
381        else:
382            for s in states:
383                if isinstance(s, str):
384                    s = self.get_state(s)
385                elif isinstance(s, list):
386                    s = self.get_state(*s)
387                _states.append(s)
388
389        def decorator(func):
390            func.states = _states
391            self._add_func(func)
392            return func
393        return decorator

注册有状态响应,不填写states则为plugin_state

def on_idle(self):
395    def on_idle(self):
396        '''注册根结点回调'''
397        return self.on_state(self.root_state)

注册根结点回调

def on_everyday(self, h: int, m: int, s: int):
399    def on_everyday(self, h: int, m: int, s: int):
400        '''每日定时触发'''
401        return self.on_interval(86400, h, m, s)

每日定时触发

def on_interval(self, gap: int, h=-1, m=-1, s=-1, show=True):
403    def on_interval(self, gap: int, h=-1, m=-1, s=-1, show=True):
404        '''在指定的时间点后循环触发'''
405        def decorator(func):
406            t = AyakaTimer(self, gap, h, m, s, func, show)
407            self.timers.append(t)
408            return func
409        return decorator

在指定的时间点后循环触发

def on_start_cmds(self, *cmds: Union[str, re.Pattern]):
411    def on_start_cmds(self, *cmds: Union[str, re.Pattern]):
412        def decorator(func):
413            func = self.on_idle()(func)
414            func = self.on_cmd(*cmds)(func)
415            return func
416        return decorator
def on_close_cmds(self, *cmds: Union[str, re.Pattern]):
418    def on_close_cmds(self, *cmds: Union[str, re.Pattern]):
419        def decorator(func):
420            func = self.on_state()(func)
421            func = self.on_deep_all()(func)
422            func = self.on_cmd(*cmds)(func)
423            return func
424        return decorator
def set_start_cmds(self, *cmds: Union[str, re.Pattern]):
426    def set_start_cmds(self, *cmds: Union[str, re.Pattern]):
427        '''设置应用启动命令,当然,你也可以通过app.on_start_cmds自定义启动方式'''
428        @self.on_start_cmds(*cmds)
429        async def start():
430            '''打开应用'''
431            await self.start()

设置应用启动命令,当然,你也可以通过app.on_start_cmds自定义启动方式

def set_close_cmds(self, *cmds: Union[str, re.Pattern]):
433    def set_close_cmds(self, *cmds: Union[str, re.Pattern]):
434        '''设置应用关闭命令,当然,你也可以通过app.on_close_cmds自定义关闭方式'''
435        @self.on_close_cmds(*cmds)
436        async def close():
437            '''关闭应用'''
438            await self.close()

设置应用关闭命令,当然,你也可以通过app.on_close_cmds自定义关闭方式

async def start(self, state: str = ''):
440    async def start(self, state: str = ""):
441        '''*timer触发时不可用*
442
443        启动应用,并发送提示
444
445        state参数为兼容旧API'''
446        if not state:
447            state = self.get_state()
448        await self.goto(state)
449        await self.send(f"已打开应用 [{self.name}]")

timer触发时不可用

启动应用,并发送提示

state参数为兼容旧API

async def close(self):
451    async def close(self):
452        '''*timer触发时不可用*
453
454        关闭应用,并发送提示'''
455        await self.goto(root_state)
456        await self.send(f"已关闭应用 [{self.name}]")

timer触发时不可用

关闭应用,并发送提示

def add_listener(self, user_id: int):
458    def add_listener(self, user_id: int):
459        '''为该群组添加对指定私聊的监听'''
460        private_listener_dict[user_id].append(self.group_id)

为该群组添加对指定私聊的监听

def remove_listener(self, user_id: int = 0):
462    def remove_listener(self, user_id: int = 0):
463        '''默认移除该群组对其他私聊的所有监听'''
464        id = self.group_id
465
466        if user_id == 0:
467            for ids in private_listener_dict.values():
468                if id in ids:
469                    ids.remove(id)
470            return
471
472        if id in private_listener_dict[user_id]:
473            private_listener_dict[user_id].remove(self.group_id)

默认移除该群组对其他私聊的所有监听

async def send(self, message):
475    async def send(self, message):
476        '''发送消息,消息的类型可以是 Message | MessageSegment | str'''
477        # 这里不使用event,因为一些event可能来自其他设备的监听传递
478        await self.bot.send_group_msg(group_id=self.group_id, message=message)

发送消息,消息的类型可以是 Message | MessageSegment | str

def pack_messages(self, bot_id, messages):
480    def pack_messages(self, bot_id, messages):
481        '''转换为cqhttp node格式'''
482        data: List[MessageSegment] = []
483        for m in messages:
484            if isinstance(m, MessageSegment) and m.type == "node":
485                data.append(m)
486            else:
487                m = MessageSegment.node_custom(
488                    user_id=bot_id,
489                    nickname="Ayaka Bot",
490                    content=str(m)
491                )
492                data.append(m)
493        return data

转换为cqhttp node格式

async def send_many(self, messages):
495    async def send_many(self, messages):
496        '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]'''
497        # 分割长消息组(不可超过100条
498        div_len = 100
499        div_cnt = ceil(len(messages) / div_len)
500        for i in range(div_cnt):
501            msgs = self.pack_messages(
502                self.bot_id,
503                messages[i*div_len: (i+1)*div_len]
504            )
505            await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)

发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]

def t_check(self, bot_id: int):
507    def t_check(self, bot_id: int):
508        # 未连接
509        bot = get_bot(bot_id)
510        if not bot:
511            logger.warning(f"BOT({bot_id}) 未连接")
512            return
513
514        return bot
async def t_send(self, bot_id: int, group_id: int, message):
516    async def t_send(self, bot_id: int, group_id: int, message):
517        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
518        bot = self.t_check(bot_id)
519        if not bot:
520            return
521
522        await bot.send_group_msg(group_id=group_id, message=message)

timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用

async def t_send_many(self, bot_id: int, group_id: int, messages):
524    async def t_send_many(self, bot_id: int, group_id: int, messages):
525        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
526        bot = self.t_check(bot_id)
527        if not bot:
528            return
529
530        # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度
531        div_len = 80
532        div_cnt = ceil(len(messages) / div_len)
533        for i in range(div_cnt):
534            msgs = self.pack_messages(
535                bot_id,
536                messages[i*div_len: (i+1)*div_len]
537            )
538            await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)

timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用

def get_bot(bot_id: int):
541def get_bot(bot_id: int):
542    '''获取已连接的bot'''
543    bot_id = str(bot_id)
544    for bot in bot_list:
545        if bot.self_id == bot_id:
546            return bot

获取已连接的bot

def get_group(bot_id: int, group_id: int):
549def get_group(bot_id: int, group_id: int):
550    '''获取对应的AyakaGroup对象,自动增加'''
551    for group in group_list:
552        if group.bot_id == bot_id and group.group_id == group_id:
553            break
554    else:
555        group = AyakaGroup(bot_id, group_id)
556    return group

获取对应的AyakaGroup对象,自动增加

def get_app(app_name: str):
559def get_app(app_name: str):
560    for app in app_list:
561        if app.name == app_name:
562            return app
async def deal_event( bot: nonebot.adapters.onebot.v11.bot.Bot, event: nonebot.adapters.onebot.v11.event.MessageEvent):
565async def deal_event(bot: Bot, event: MessageEvent):
566    '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组'''
567    if ayaka_root_config.exclude_old_msg:
568        time_i = int(datetime.datetime.now().timestamp())
569        if event.time < time_i - 60:
570            return
571
572    _bot.set(bot)
573    _event.set(event)
574
575    bot_id = int(bot.self_id)
576
577    if isinstance(event, GroupMessageEvent):
578        group_id = event.group_id
579        await deal_group(bot_id, group_id)
580
581    else:
582        id = event.user_id
583        group_ids = private_listener_dict.get(id, [])
584        ts = [asyncio.create_task(deal_group(bot_id, group_id))
585              for group_id in group_ids]
586        await asyncio.gather(*ts)

处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组

async def deal_group(bot_id: int, group_id: int):
589async def deal_group(bot_id: int, group_id: int):
590    prefix = ayaka_root_config.prefix
591
592    # 群组
593    group = get_group(bot_id, group_id)
594    _group.set(group)
595
596    # 消息
597    message = _event.get().message
598    _message.set(message)
599
600    # 从子状态开始向上查找可用的触发
601    state = group.state
602    cascade_triggers = get_cascade_triggers(state, 0)
603
604    # 命令
605    # 消息前缀文本
606    first = get_first(message)
607    if first.startswith(prefix):
608        first = first[len(prefix):]
609        for ts in cascade_triggers:
610            if await deal_cmd_triggers(ts, message, first, state):
611                return
612
613    # 命令退化成消息
614    for ts in cascade_triggers:
615        if await deal_text_triggers(ts, message, state):
616            return
async def deal_cmd_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, first: str, state: ayaka.state.AyakaState):
619async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState):
620    sep = ayaka_root_config.separate
621
622    # 找到触发命令
623    cmd_ts: List[Tuple[re.Pattern, str, AyakaTrigger]] = []
624    for t in triggers:
625        for cmd in t.cmds:
626            r = cmd.match(first)
627            if r:
628                cmd_ts.append([r, r.group(), t])
629                break
630
631    # 根据命令长度排序,长命令优先级更高
632    cmd_ts.sort(key=lambda x: len(x[1]), reverse=1)
633
634    for r, c, t in cmd_ts:
635        # 设置上下文
636        # 设置命令
637        _cmd.set(c)
638        _cmd_regex.set(r)
639        # 设置参数
640        left = first[len(c):].lstrip(sep)
641        if left:
642            arg = Message([MessageSegment.text(left), *message[1:]])
643        else:
644            arg = Message(message[1:])
645        _arg.set(arg)
646        _args.set(divide_message(arg))
647
648        # 记录触发
649        log_trigger(c, t.app.name, state, t.func.__name__)
650        f = await t.run()
651
652        # 阻断后续
653        if f and t.block:
654            return True
async def deal_text_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, state: ayaka.state.AyakaState):
657async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState):
658    # 消息
659    text_ts = [t for t in triggers if not t.cmds]
660
661    # 设置上下文
662    # 设置参数
663    _arg.set(message)
664    _args.set(divide_message(message))
665
666    for t in text_ts:
667        # 记录触发
668        log_trigger("", t.app.name, state, t.func.__name__)
669        f = await t.run()
670
671        # 阻断后续
672        if f and t.block:
673            return True
def get_cascade_triggers(state: ayaka.state.AyakaState, deep: int = 0):
676def get_cascade_triggers(state: AyakaState, deep: int = 0):
677    # 根据深度筛选funcs
678    ts = [
679        t for t in state.triggers
680        if t.deep == "all" or t.deep >= deep
681    ]
682    cascade_triggers = [ts]
683
684    # 获取父状态的方法
685    if state.parent:
686        cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1))
687
688    # 排除空项
689    cascade_triggers = [ts for ts in cascade_triggers if ts]
690    return cascade_triggers
def log_trigger(cmd, app_name, state, func_name):
693def log_trigger(cmd, app_name, state, func_name):
694    '''日志记录'''
695    items = []
696    items.append(f"状态:<c>{state}</c>")
697    items.append(f"应用:<y>{app_name}</y>")
698    if cmd:
699        items.append(f"命令:<y>{cmd}</y>")
700    else:
701        items.append("命令:<g>无</g>")
702    items.append(f"回调:<c>{func_name}</c>")
703    info = " | ".join(items)
704    logger.opt(colors=True).debug(info)

日志记录

def get_first(message: nonebot.adapters.onebot.v11.message.Message):
707def get_first(message: Message):
708    first = ""
709    for m in message:
710        if m.type == "text":
711            first += str(m)
712        else:
713            break
714    return first
def divide_message( message: nonebot.adapters.onebot.v11.message.Message) -> List[nonebot.adapters.onebot.v11.message.MessageSegment]:
717def divide_message(message: Message) -> List[MessageSegment]:
718    args = []
719    sep = ayaka_root_config.separate
720
721    for m in message:
722        if m.is_text():
723            ss = str(m).split(sep)
724            args.extend(MessageSegment.text(s) for s in ss if s)
725        else:
726            args.append(m)
727
728    return args
def regist_func(app: ayaka.ayaka.AyakaApp, func):
731def regist_func(app: AyakaApp, func):
732    '''注册回调'''
733    # 默认是无状态应用,从root开始触发
734    states: List[AyakaState] = getattr(func, "states", [root_state])
735    # 默认是消息响应
736    cmds: List[re.Pattern] = getattr(func, "cmds", [])
737    # 默认监听深度为0
738    deep: int = getattr(func, "deep", 0)
739    # 默认阻断
740    block: bool = getattr(func, "block", True)
741
742    # 注册
743    for s in states:
744        s.on_cmd(cmds, app, deep, block)(func)
745
746    return func

注册回调

@driver.on_startup
async def startup():
752@driver.on_startup
753async def startup():
754    # 注册所有回调
755    for app in app_list:
756        for func in app.funcs:
757            regist_func(app, func)
758
759    if ayaka_root_config.debug:
760        s = json.dumps(
761            root_state.dict(), ensure_ascii=0,
762            indent=4, default=repr
763        )
764        path = ayaka_data_path / "all_state.json"
765        with path.open("w+", encoding="utf8") as f:
766            f.write(s)