ayaka.state

  1import inspect
  2import re
  3from typing import TYPE_CHECKING, List, Literal, Union, Awaitable, Callable
  4from typing_extensions import Self
  5import asyncio
  6import datetime
  7from loguru import logger
  8
  9from .config import ayaka_root_config
 10from .constant import _enter_exit_during
 11from .depend import AyakaDepend, AyakaInput
 12
 13
 14if TYPE_CHECKING:
 15    from .ayaka import AyakaApp
 16
 17
 18def add_flag():
 19    _enter_exit_during.set(_enter_exit_during.get()+1)
 20
 21
 22def sub_flag():
 23    _enter_exit_during.set(_enter_exit_during.get()-1)
 24
 25
 26def ensure_regex(data: Union[str, re.Pattern], escape=True):
 27    if isinstance(data, str):
 28        if escape:
 29            data = re.escape(data)
 30        data = re.compile(data)
 31    return data
 32
 33
 34def ensure_regex_list(cmds: List[Union[str, re.Pattern]], escape=True):
 35    _cmds: List[re.Pattern] = []
 36    for cmd in cmds:
 37        _cmds.append(ensure_regex(cmd, escape))
 38    return _cmds
 39
 40
 41class AyakaState:
 42    def __init__(self, key="root", parent: Self = None):
 43        self.key = key
 44        self.parent = parent
 45        if not parent:
 46            self.keys = [key]
 47        else:
 48            self.keys = [*parent.keys, key]
 49        self.children: List[Self] = []
 50
 51        self.enter_funcs = []
 52        self.exit_funcs = []
 53        self.triggers: List[AyakaTrigger] = []
 54
 55    def __getitem__(self, k):
 56        if isinstance(k, slice):
 57            s = AyakaState("")
 58            s.keys = self.keys[k]
 59            if s.keys:
 60                s.key = s.keys[-1]
 61            return s
 62
 63        if isinstance(k, int):
 64            s = AyakaState(self.keys[k])
 65            return s
 66
 67        for node in self.children:
 68            if node.key == k:
 69                return node
 70        node = self.__class__(k, self)
 71        self.children.append(node)
 72        return node
 73
 74    def __getattr__(self, k: str):
 75        return self[k]
 76
 77    def __iter__(self):
 78        return iter(self.children)
 79
 80    def __str__(self) -> str:
 81        return ayaka_root_config.state_separate.join(self.keys)
 82
 83    def __repr__(self) -> str:
 84        return f"{self.__class__.__name__}({self})"
 85
 86    def join(self, *keys: str) -> Self:
 87        node = self
 88        for key in keys:
 89            node = node[key]
 90        return node
 91
 92    def belong(self, node: Self):
 93        if len(self.keys) < len(node.keys):
 94            return False
 95
 96        for i in range(len(node.keys)):
 97            if node.keys[i] != self.keys[i]:
 98                return False
 99
100        return True
101
102    def __ge__(self, node: Self):
103        return self.belong(node)
104
105    def __le__(self, node: Self):
106        return node.belong(self)
107
108    def __gt__(self, node: Self):
109        return self >= node and len(self.keys) > len(node.keys)
110
111    def __lt__(self, node: Self):
112        return self <= node and len(self.keys) < len(node.keys)
113
114    def __eq__(self, node: Self):
115        return self <= node and len(self.keys) == len(node.keys)
116
117    def dict(self):
118        data = [child.dict() for child in self.children]
119        return {
120            "name": self.key,
121            "triggers": self.triggers,
122            "children": data,
123        }
124
125    async def enter(self):
126        if ayaka_root_config.debug:
127            print(">>>", self.key)
128        add_flag()
129        for func in self.enter_funcs:
130            await func()
131        sub_flag()
132
133    async def exit(self):
134        if ayaka_root_config.debug:
135            print("<<<", self.key)
136        add_flag()
137        for func in self.exit_funcs:
138            await func()
139        sub_flag()
140
141    def on_enter(self):
142        def decorator(func):
143            self.enter_funcs.append(func)
144            return func
145        return decorator
146
147    def on_exit(self):
148        def decorator(func):
149            self.exit_funcs.append(func)
150            return func
151        return decorator
152
153    def on_cmd(self, cmds: List[Union[str, re.Pattern]], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
154        _cmds = ensure_regex_list(cmds)
155
156        def decorator(func):
157            t = AyakaTrigger(func, _cmds, deep, app, block, self)
158            self.triggers.append(t)
159            return func
160        return decorator
161
162    def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
163        return self.on_cmd([], app, deep, block)
164
165
166class AyakaTrigger:
167    def __init__(self, func: Callable[..., Awaitable], cmds: List[re.Pattern], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState):
168        raw_cmds: List[str] = [cmd.pattern for cmd in cmds]
169        self.func = func
170        self.cmds = cmds
171        self.raw_cmds = raw_cmds
172        self.deep = deep
173        self.app = app
174        self.block = block
175        self.state = state
176
177        # 默认没有解析模型
178        models: List[AyakaInput] = []
179        sig = inspect.signature(func)
180        for k, v in sig.parameters.items():
181            cls = v.annotation
182            if issubclass(cls, AyakaInput):
183                models.append(cls)
184
185        # 生成帮助
186        doc = "" if not func.__doc__ else f"| {func.__doc__}"
187        cmd_str = '/'.join(raw_cmds) if raw_cmds else "<任意文字>"
188
189        if not models:
190            help = f"- {cmd_str} {doc}"
191        else:
192            helps = []
193            for model in models:
194                data = model.help()
195                keys_str = " ".join(f"<{k}>" for k in data.keys())
196                data_str = "\n".join(f"    <{k}> {v}" for k, v in data.items())
197                helps.append(f"- {cmd_str} {keys_str} {doc}\n{data_str}")
198            help = "\n".join(helps)
199
200        self.help = help
201        if len(state.keys) > 1:
202            s = str(state[1:])
203            if s not in self.app.state_helps:
204                self.app.state_helps[s] = []
205            self.app.state_helps[s].append(help)
206        else:
207            self.app.idle_helps.append(help)
208
209        if ayaka_root_config.debug:
210            print(repr(self))
211
212    async def run(self):
213        params = {}
214        sig = inspect.signature(self.func)
215
216        for k, v in sig.parameters.items():
217            cls = v.annotation
218            if issubclass(cls, AyakaDepend):
219                d = await cls.create_by_app(self.app)
220                if not d:
221                    return False
222                params[k] = d
223
224        await self.func(**params)
225        return True
226
227    def __repr__(self) -> str:
228        return f"{self.__class__.__name__}({self})"
229
230    def __str__(self) -> str:
231        data = {
232            "func": self.func.__name__,
233            "app_name": self.app.name,
234            "cmds": self.raw_cmds,
235            "deep": self.deep,
236            "block": self.block
237        }
238        return " ".join(f"{k}={v}" for k, v in data.items())
239
240
241class AyakaTimer:
242    def __repr__(self) -> str:
243        return f"AyakaTimer({self.app.name}, {self.gap}, {self.func.__name__})"
244
245    def __init__(self,  app: "AyakaApp", gap: int, h: int, m: int, s: int, func, show=True) -> None:
246        self.app = app
247        self.h = h
248        self.m = m
249        self.s = s
250        self.gap = gap
251        self.func = func
252        self.show = show
253        if ayaka_root_config.debug:
254            print(self)
255
256    def start(self):
257        asyncio.create_task(self.run_forever())
258
259    async def run_forever(self):
260        # 有启动时间点要求的
261        time_i = int(datetime.datetime.now().timestamp())
262        if self.h >= 0:
263            _time_i = self.h*3600+self.m*60+self.s
264            # 移除时区偏差
265            time_i -= 57600
266            gap = 86400 - (time_i - _time_i) % 86400
267            await asyncio.sleep(gap)
268        elif self.m >= 0:
269            _time_i = self.m*60+self.s
270            gap = 3600 - (time_i-_time_i) % 3600
271            await asyncio.sleep(gap)
272        elif self.s >= 0:
273            _time_i = self.s
274            gap = 60 - (time_i-_time_i) % 60
275            await asyncio.sleep(gap)
276
277        while True:
278            if self.show:
279                logger.opt(colors=True).debug(
280                    f"定时任务 | 插件:<y>{self.app.name}</y> | 回调:<c>{self.func.__name__}</c>")
281            asyncio.create_task(self.func())
282            await asyncio.sleep(self.gap)
283
284
285root_state = AyakaState()
def add_flag():
19def add_flag():
20    _enter_exit_during.set(_enter_exit_during.get()+1)
def sub_flag():
23def sub_flag():
24    _enter_exit_during.set(_enter_exit_during.get()-1)
def ensure_regex(data: Union[str, re.Pattern], escape=True):
27def ensure_regex(data: Union[str, re.Pattern], escape=True):
28    if isinstance(data, str):
29        if escape:
30            data = re.escape(data)
31        data = re.compile(data)
32    return data
def ensure_regex_list(cmds: List[Union[str, re.Pattern]], escape=True):
35def ensure_regex_list(cmds: List[Union[str, re.Pattern]], escape=True):
36    _cmds: List[re.Pattern] = []
37    for cmd in cmds:
38        _cmds.append(ensure_regex(cmd, escape))
39    return _cmds
class AyakaState:
 42class AyakaState:
 43    def __init__(self, key="root", parent: Self = None):
 44        self.key = key
 45        self.parent = parent
 46        if not parent:
 47            self.keys = [key]
 48        else:
 49            self.keys = [*parent.keys, key]
 50        self.children: List[Self] = []
 51
 52        self.enter_funcs = []
 53        self.exit_funcs = []
 54        self.triggers: List[AyakaTrigger] = []
 55
 56    def __getitem__(self, k):
 57        if isinstance(k, slice):
 58            s = AyakaState("")
 59            s.keys = self.keys[k]
 60            if s.keys:
 61                s.key = s.keys[-1]
 62            return s
 63
 64        if isinstance(k, int):
 65            s = AyakaState(self.keys[k])
 66            return s
 67
 68        for node in self.children:
 69            if node.key == k:
 70                return node
 71        node = self.__class__(k, self)
 72        self.children.append(node)
 73        return node
 74
 75    def __getattr__(self, k: str):
 76        return self[k]
 77
 78    def __iter__(self):
 79        return iter(self.children)
 80
 81    def __str__(self) -> str:
 82        return ayaka_root_config.state_separate.join(self.keys)
 83
 84    def __repr__(self) -> str:
 85        return f"{self.__class__.__name__}({self})"
 86
 87    def join(self, *keys: str) -> Self:
 88        node = self
 89        for key in keys:
 90            node = node[key]
 91        return node
 92
 93    def belong(self, node: Self):
 94        if len(self.keys) < len(node.keys):
 95            return False
 96
 97        for i in range(len(node.keys)):
 98            if node.keys[i] != self.keys[i]:
 99                return False
100
101        return True
102
103    def __ge__(self, node: Self):
104        return self.belong(node)
105
106    def __le__(self, node: Self):
107        return node.belong(self)
108
109    def __gt__(self, node: Self):
110        return self >= node and len(self.keys) > len(node.keys)
111
112    def __lt__(self, node: Self):
113        return self <= node and len(self.keys) < len(node.keys)
114
115    def __eq__(self, node: Self):
116        return self <= node and len(self.keys) == len(node.keys)
117
118    def dict(self):
119        data = [child.dict() for child in self.children]
120        return {
121            "name": self.key,
122            "triggers": self.triggers,
123            "children": data,
124        }
125
126    async def enter(self):
127        if ayaka_root_config.debug:
128            print(">>>", self.key)
129        add_flag()
130        for func in self.enter_funcs:
131            await func()
132        sub_flag()
133
134    async def exit(self):
135        if ayaka_root_config.debug:
136            print("<<<", self.key)
137        add_flag()
138        for func in self.exit_funcs:
139            await func()
140        sub_flag()
141
142    def on_enter(self):
143        def decorator(func):
144            self.enter_funcs.append(func)
145            return func
146        return decorator
147
148    def on_exit(self):
149        def decorator(func):
150            self.exit_funcs.append(func)
151            return func
152        return decorator
153
154    def on_cmd(self, cmds: List[Union[str, re.Pattern]], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
155        _cmds = ensure_regex_list(cmds)
156
157        def decorator(func):
158            t = AyakaTrigger(func, _cmds, deep, app, block, self)
159            self.triggers.append(t)
160            return func
161        return decorator
162
163    def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
164        return self.on_cmd([], app, deep, block)
AyakaState(key='root', parent: typing_extensions.Self = None)
43    def __init__(self, key="root", parent: Self = None):
44        self.key = key
45        self.parent = parent
46        if not parent:
47            self.keys = [key]
48        else:
49            self.keys = [*parent.keys, key]
50        self.children: List[Self] = []
51
52        self.enter_funcs = []
53        self.exit_funcs = []
54        self.triggers: List[AyakaTrigger] = []
def join(self, *keys: str) -> typing_extensions.Self:
87    def join(self, *keys: str) -> Self:
88        node = self
89        for key in keys:
90            node = node[key]
91        return node
def belong(self, node: typing_extensions.Self):
 93    def belong(self, node: Self):
 94        if len(self.keys) < len(node.keys):
 95            return False
 96
 97        for i in range(len(node.keys)):
 98            if node.keys[i] != self.keys[i]:
 99                return False
100
101        return True
def dict(self):
118    def dict(self):
119        data = [child.dict() for child in self.children]
120        return {
121            "name": self.key,
122            "triggers": self.triggers,
123            "children": data,
124        }
async def enter(self):
126    async def enter(self):
127        if ayaka_root_config.debug:
128            print(">>>", self.key)
129        add_flag()
130        for func in self.enter_funcs:
131            await func()
132        sub_flag()
async def exit(self):
134    async def exit(self):
135        if ayaka_root_config.debug:
136            print("<<<", self.key)
137        add_flag()
138        for func in self.exit_funcs:
139            await func()
140        sub_flag()
def on_enter(self):
142    def on_enter(self):
143        def decorator(func):
144            self.enter_funcs.append(func)
145            return func
146        return decorator
def on_exit(self):
148    def on_exit(self):
149        def decorator(func):
150            self.exit_funcs.append(func)
151            return func
152        return decorator
def on_cmd( self, cmds: List[Union[str, re.Pattern]], app: ayaka.ayaka.AyakaApp, deep: Union[int, Literal['all']] = 0, block=True):
154    def on_cmd(self, cmds: List[Union[str, re.Pattern]], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
155        _cmds = ensure_regex_list(cmds)
156
157        def decorator(func):
158            t = AyakaTrigger(func, _cmds, deep, app, block, self)
159            self.triggers.append(t)
160            return func
161        return decorator
def on_text( self, app: ayaka.ayaka.AyakaApp, deep: Union[int, Literal['all']] = 0, block=True):
163    def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
164        return self.on_cmd([], app, deep, block)
class AyakaTrigger:
167class AyakaTrigger:
168    def __init__(self, func: Callable[..., Awaitable], cmds: List[re.Pattern], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState):
169        raw_cmds: List[str] = [cmd.pattern for cmd in cmds]
170        self.func = func
171        self.cmds = cmds
172        self.raw_cmds = raw_cmds
173        self.deep = deep
174        self.app = app
175        self.block = block
176        self.state = state
177
178        # 默认没有解析模型
179        models: List[AyakaInput] = []
180        sig = inspect.signature(func)
181        for k, v in sig.parameters.items():
182            cls = v.annotation
183            if issubclass(cls, AyakaInput):
184                models.append(cls)
185
186        # 生成帮助
187        doc = "" if not func.__doc__ else f"| {func.__doc__}"
188        cmd_str = '/'.join(raw_cmds) if raw_cmds else "<任意文字>"
189
190        if not models:
191            help = f"- {cmd_str} {doc}"
192        else:
193            helps = []
194            for model in models:
195                data = model.help()
196                keys_str = " ".join(f"<{k}>" for k in data.keys())
197                data_str = "\n".join(f"    <{k}> {v}" for k, v in data.items())
198                helps.append(f"- {cmd_str} {keys_str} {doc}\n{data_str}")
199            help = "\n".join(helps)
200
201        self.help = help
202        if len(state.keys) > 1:
203            s = str(state[1:])
204            if s not in self.app.state_helps:
205                self.app.state_helps[s] = []
206            self.app.state_helps[s].append(help)
207        else:
208            self.app.idle_helps.append(help)
209
210        if ayaka_root_config.debug:
211            print(repr(self))
212
213    async def run(self):
214        params = {}
215        sig = inspect.signature(self.func)
216
217        for k, v in sig.parameters.items():
218            cls = v.annotation
219            if issubclass(cls, AyakaDepend):
220                d = await cls.create_by_app(self.app)
221                if not d:
222                    return False
223                params[k] = d
224
225        await self.func(**params)
226        return True
227
228    def __repr__(self) -> str:
229        return f"{self.__class__.__name__}({self})"
230
231    def __str__(self) -> str:
232        data = {
233            "func": self.func.__name__,
234            "app_name": self.app.name,
235            "cmds": self.raw_cmds,
236            "deep": self.deep,
237            "block": self.block
238        }
239        return " ".join(f"{k}={v}" for k, v in data.items())
AyakaTrigger( func: Callable[..., Awaitable], cmds: List[re.Pattern], deep: Union[int, Literal['all']], app: ayaka.ayaka.AyakaApp, block: bool, state: ayaka.state.AyakaState)
168    def __init__(self, func: Callable[..., Awaitable], cmds: List[re.Pattern], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState):
169        raw_cmds: List[str] = [cmd.pattern for cmd in cmds]
170        self.func = func
171        self.cmds = cmds
172        self.raw_cmds = raw_cmds
173        self.deep = deep
174        self.app = app
175        self.block = block
176        self.state = state
177
178        # 默认没有解析模型
179        models: List[AyakaInput] = []
180        sig = inspect.signature(func)
181        for k, v in sig.parameters.items():
182            cls = v.annotation
183            if issubclass(cls, AyakaInput):
184                models.append(cls)
185
186        # 生成帮助
187        doc = "" if not func.__doc__ else f"| {func.__doc__}"
188        cmd_str = '/'.join(raw_cmds) if raw_cmds else "<任意文字>"
189
190        if not models:
191            help = f"- {cmd_str} {doc}"
192        else:
193            helps = []
194            for model in models:
195                data = model.help()
196                keys_str = " ".join(f"<{k}>" for k in data.keys())
197                data_str = "\n".join(f"    <{k}> {v}" for k, v in data.items())
198                helps.append(f"- {cmd_str} {keys_str} {doc}\n{data_str}")
199            help = "\n".join(helps)
200
201        self.help = help
202        if len(state.keys) > 1:
203            s = str(state[1:])
204            if s not in self.app.state_helps:
205                self.app.state_helps[s] = []
206            self.app.state_helps[s].append(help)
207        else:
208            self.app.idle_helps.append(help)
209
210        if ayaka_root_config.debug:
211            print(repr(self))
async def run(self):
213    async def run(self):
214        params = {}
215        sig = inspect.signature(self.func)
216
217        for k, v in sig.parameters.items():
218            cls = v.annotation
219            if issubclass(cls, AyakaDepend):
220                d = await cls.create_by_app(self.app)
221                if not d:
222                    return False
223                params[k] = d
224
225        await self.func(**params)
226        return True
class AyakaTimer:
242class AyakaTimer:
243    def __repr__(self) -> str:
244        return f"AyakaTimer({self.app.name}, {self.gap}, {self.func.__name__})"
245
246    def __init__(self,  app: "AyakaApp", gap: int, h: int, m: int, s: int, func, show=True) -> None:
247        self.app = app
248        self.h = h
249        self.m = m
250        self.s = s
251        self.gap = gap
252        self.func = func
253        self.show = show
254        if ayaka_root_config.debug:
255            print(self)
256
257    def start(self):
258        asyncio.create_task(self.run_forever())
259
260    async def run_forever(self):
261        # 有启动时间点要求的
262        time_i = int(datetime.datetime.now().timestamp())
263        if self.h >= 0:
264            _time_i = self.h*3600+self.m*60+self.s
265            # 移除时区偏差
266            time_i -= 57600
267            gap = 86400 - (time_i - _time_i) % 86400
268            await asyncio.sleep(gap)
269        elif self.m >= 0:
270            _time_i = self.m*60+self.s
271            gap = 3600 - (time_i-_time_i) % 3600
272            await asyncio.sleep(gap)
273        elif self.s >= 0:
274            _time_i = self.s
275            gap = 60 - (time_i-_time_i) % 60
276            await asyncio.sleep(gap)
277
278        while True:
279            if self.show:
280                logger.opt(colors=True).debug(
281                    f"定时任务 | 插件:<y>{self.app.name}</y> | 回调:<c>{self.func.__name__}</c>")
282            asyncio.create_task(self.func())
283            await asyncio.sleep(self.gap)
AyakaTimer( app: ayaka.ayaka.AyakaApp, gap: int, h: int, m: int, s: int, func, show=True)
246    def __init__(self,  app: "AyakaApp", gap: int, h: int, m: int, s: int, func, show=True) -> None:
247        self.app = app
248        self.h = h
249        self.m = m
250        self.s = s
251        self.gap = gap
252        self.func = func
253        self.show = show
254        if ayaka_root_config.debug:
255            print(self)
def start(self):
257    def start(self):
258        asyncio.create_task(self.run_forever())
async def run_forever(self):
260    async def run_forever(self):
261        # 有启动时间点要求的
262        time_i = int(datetime.datetime.now().timestamp())
263        if self.h >= 0:
264            _time_i = self.h*3600+self.m*60+self.s
265            # 移除时区偏差
266            time_i -= 57600
267            gap = 86400 - (time_i - _time_i) % 86400
268            await asyncio.sleep(gap)
269        elif self.m >= 0:
270            _time_i = self.m*60+self.s
271            gap = 3600 - (time_i-_time_i) % 3600
272            await asyncio.sleep(gap)
273        elif self.s >= 0:
274            _time_i = self.s
275            gap = 60 - (time_i-_time_i) % 60
276            await asyncio.sleep(gap)
277
278        while True:
279            if self.show:
280                logger.opt(colors=True).debug(
281                    f"定时任务 | 插件:<y>{self.app.name}</y> | 回调:<c>{self.func.__name__}</c>")
282            asyncio.create_task(self.func())
283            await asyncio.sleep(self.gap)