ayaka.state
1import inspect 2import re 3from typing import TYPE_CHECKING, List, Literal, Union, Awaitable, Callable 4from typing_extensions import Self 5import asyncio 6import datetime 7from loguru import logger 8 9from .config import ayaka_root_config 10from .constant import _enter_exit_during 11from .depend import AyakaDepend, AyakaInput 12 13 14if TYPE_CHECKING: 15 from .ayaka import AyakaApp 16 17 18def add_flag(): 19 _enter_exit_during.set(_enter_exit_during.get()+1) 20 21 22def sub_flag(): 23 _enter_exit_during.set(_enter_exit_during.get()-1) 24 25 26def ensure_regex(data: Union[str, re.Pattern], escape=True): 27 if isinstance(data, str): 28 if escape: 29 data = re.escape(data) 30 data = re.compile(data) 31 return data 32 33 34def ensure_regex_list(cmds: List[Union[str, re.Pattern]], escape=True): 35 _cmds: List[re.Pattern] = [] 36 for cmd in cmds: 37 _cmds.append(ensure_regex(cmd, escape)) 38 return _cmds 39 40 41class AyakaState: 42 def __init__(self, key="root", parent: Self = None): 43 self.key = key 44 self.parent = parent 45 if not parent: 46 self.keys = [key] 47 else: 48 self.keys = [*parent.keys, key] 49 self.children: List[Self] = [] 50 51 self.enter_funcs = [] 52 self.exit_funcs = [] 53 self.triggers: List[AyakaTrigger] = [] 54 55 def __getitem__(self, k): 56 if isinstance(k, slice): 57 s = AyakaState("") 58 s.keys = self.keys[k] 59 if s.keys: 60 s.key = s.keys[-1] 61 return s 62 63 if isinstance(k, int): 64 s = AyakaState(self.keys[k]) 65 return s 66 67 for node in self.children: 68 if node.key == k: 69 return node 70 node = self.__class__(k, self) 71 self.children.append(node) 72 return node 73 74 def __getattr__(self, k: str): 75 return self[k] 76 77 def __iter__(self): 78 return iter(self.children) 79 80 def __str__(self) -> str: 81 return ayaka_root_config.state_separate.join(self.keys) 82 83 def __repr__(self) -> str: 84 return f"{self.__class__.__name__}({self})" 85 86 def join(self, *keys: str) -> Self: 87 node = self 88 for key in keys: 89 node = node[key] 90 return node 91 92 def belong(self, node: Self): 93 if len(self.keys) < len(node.keys): 94 return False 95 96 for i in range(len(node.keys)): 97 if node.keys[i] != self.keys[i]: 98 return False 99 100 return True 101 102 def __ge__(self, node: Self): 103 return self.belong(node) 104 105 def __le__(self, node: Self): 106 return node.belong(self) 107 108 def __gt__(self, node: Self): 109 return self >= node and len(self.keys) > len(node.keys) 110 111 def __lt__(self, node: Self): 112 return self <= node and len(self.keys) < len(node.keys) 113 114 def __eq__(self, node: Self): 115 return self <= node and len(self.keys) == len(node.keys) 116 117 def dict(self): 118 data = [child.dict() for child in self.children] 119 return { 120 "name": self.key, 121 "triggers": self.triggers, 122 "children": data, 123 } 124 125 async def enter(self): 126 if ayaka_root_config.debug: 127 print(">>>", self.key) 128 add_flag() 129 for func in self.enter_funcs: 130 await func() 131 sub_flag() 132 133 async def exit(self): 134 if ayaka_root_config.debug: 135 print("<<<", self.key) 136 add_flag() 137 for func in self.exit_funcs: 138 await func() 139 sub_flag() 140 141 def on_enter(self): 142 def decorator(func): 143 self.enter_funcs.append(func) 144 return func 145 return decorator 146 147 def on_exit(self): 148 def decorator(func): 149 self.exit_funcs.append(func) 150 return func 151 return decorator 152 153 def on_cmd(self, cmds: List[Union[str, re.Pattern]], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 154 _cmds = ensure_regex_list(cmds) 155 156 def decorator(func): 157 t = AyakaTrigger(func, _cmds, deep, app, block, self) 158 self.triggers.append(t) 159 return func 160 return decorator 161 162 def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 163 return self.on_cmd([], app, deep, block) 164 165 166class AyakaTrigger: 167 def __init__(self, func: Callable[..., Awaitable], cmds: List[re.Pattern], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState): 168 raw_cmds: List[str] = [cmd.pattern for cmd in cmds] 169 self.func = func 170 self.cmds = cmds 171 self.raw_cmds = raw_cmds 172 self.deep = deep 173 self.app = app 174 self.block = block 175 self.state = state 176 177 # 默认没有解析模型 178 models: List[AyakaInput] = [] 179 sig = inspect.signature(func) 180 for k, v in sig.parameters.items(): 181 cls = v.annotation 182 if issubclass(cls, AyakaInput): 183 models.append(cls) 184 185 # 生成帮助 186 doc = "" if not func.__doc__ else f"| {func.__doc__}" 187 cmd_str = '/'.join(raw_cmds) if raw_cmds else "<任意文字>" 188 189 if not models: 190 help = f"- {cmd_str} {doc}" 191 else: 192 helps = [] 193 for model in models: 194 data = model.help() 195 keys_str = " ".join(f"<{k}>" for k in data.keys()) 196 data_str = "\n".join(f" <{k}> {v}" for k, v in data.items()) 197 helps.append(f"- {cmd_str} {keys_str} {doc}\n{data_str}") 198 help = "\n".join(helps) 199 200 self.help = help 201 if len(state.keys) > 1: 202 s = str(state[1:]) 203 if s not in self.app.state_helps: 204 self.app.state_helps[s] = [] 205 self.app.state_helps[s].append(help) 206 else: 207 self.app.idle_helps.append(help) 208 209 if ayaka_root_config.debug: 210 print(repr(self)) 211 212 async def run(self): 213 params = {} 214 sig = inspect.signature(self.func) 215 216 for k, v in sig.parameters.items(): 217 cls = v.annotation 218 if issubclass(cls, AyakaDepend): 219 d = await cls.create_by_app(self.app) 220 if not d: 221 return False 222 params[k] = d 223 224 await self.func(**params) 225 return True 226 227 def __repr__(self) -> str: 228 return f"{self.__class__.__name__}({self})" 229 230 def __str__(self) -> str: 231 data = { 232 "func": self.func.__name__, 233 "app_name": self.app.name, 234 "cmds": self.raw_cmds, 235 "deep": self.deep, 236 "block": self.block 237 } 238 return " ".join(f"{k}={v}" for k, v in data.items()) 239 240 241class AyakaTimer: 242 def __repr__(self) -> str: 243 return f"AyakaTimer({self.app.name}, {self.gap}, {self.func.__name__})" 244 245 def __init__(self, app: "AyakaApp", gap: int, h: int, m: int, s: int, func, show=True) -> None: 246 self.app = app 247 self.h = h 248 self.m = m 249 self.s = s 250 self.gap = gap 251 self.func = func 252 self.show = show 253 if ayaka_root_config.debug: 254 print(self) 255 256 def start(self): 257 asyncio.create_task(self.run_forever()) 258 259 async def run_forever(self): 260 # 有启动时间点要求的 261 time_i = int(datetime.datetime.now().timestamp()) 262 if self.h >= 0: 263 _time_i = self.h*3600+self.m*60+self.s 264 # 移除时区偏差 265 time_i -= 57600 266 gap = 86400 - (time_i - _time_i) % 86400 267 await asyncio.sleep(gap) 268 elif self.m >= 0: 269 _time_i = self.m*60+self.s 270 gap = 3600 - (time_i-_time_i) % 3600 271 await asyncio.sleep(gap) 272 elif self.s >= 0: 273 _time_i = self.s 274 gap = 60 - (time_i-_time_i) % 60 275 await asyncio.sleep(gap) 276 277 while True: 278 if self.show: 279 logger.opt(colors=True).debug( 280 f"定时任务 | 插件:<y>{self.app.name}</y> | 回调:<c>{self.func.__name__}</c>") 281 asyncio.create_task(self.func()) 282 await asyncio.sleep(self.gap) 283 284 285root_state = AyakaState()
def
add_flag():
def
sub_flag():
def
ensure_regex(data: Union[str, re.Pattern], escape=True):
def
ensure_regex_list(cmds: List[Union[str, re.Pattern]], escape=True):
class
AyakaState:
42class AyakaState: 43 def __init__(self, key="root", parent: Self = None): 44 self.key = key 45 self.parent = parent 46 if not parent: 47 self.keys = [key] 48 else: 49 self.keys = [*parent.keys, key] 50 self.children: List[Self] = [] 51 52 self.enter_funcs = [] 53 self.exit_funcs = [] 54 self.triggers: List[AyakaTrigger] = [] 55 56 def __getitem__(self, k): 57 if isinstance(k, slice): 58 s = AyakaState("") 59 s.keys = self.keys[k] 60 if s.keys: 61 s.key = s.keys[-1] 62 return s 63 64 if isinstance(k, int): 65 s = AyakaState(self.keys[k]) 66 return s 67 68 for node in self.children: 69 if node.key == k: 70 return node 71 node = self.__class__(k, self) 72 self.children.append(node) 73 return node 74 75 def __getattr__(self, k: str): 76 return self[k] 77 78 def __iter__(self): 79 return iter(self.children) 80 81 def __str__(self) -> str: 82 return ayaka_root_config.state_separate.join(self.keys) 83 84 def __repr__(self) -> str: 85 return f"{self.__class__.__name__}({self})" 86 87 def join(self, *keys: str) -> Self: 88 node = self 89 for key in keys: 90 node = node[key] 91 return node 92 93 def belong(self, node: Self): 94 if len(self.keys) < len(node.keys): 95 return False 96 97 for i in range(len(node.keys)): 98 if node.keys[i] != self.keys[i]: 99 return False 100 101 return True 102 103 def __ge__(self, node: Self): 104 return self.belong(node) 105 106 def __le__(self, node: Self): 107 return node.belong(self) 108 109 def __gt__(self, node: Self): 110 return self >= node and len(self.keys) > len(node.keys) 111 112 def __lt__(self, node: Self): 113 return self <= node and len(self.keys) < len(node.keys) 114 115 def __eq__(self, node: Self): 116 return self <= node and len(self.keys) == len(node.keys) 117 118 def dict(self): 119 data = [child.dict() for child in self.children] 120 return { 121 "name": self.key, 122 "triggers": self.triggers, 123 "children": data, 124 } 125 126 async def enter(self): 127 if ayaka_root_config.debug: 128 print(">>>", self.key) 129 add_flag() 130 for func in self.enter_funcs: 131 await func() 132 sub_flag() 133 134 async def exit(self): 135 if ayaka_root_config.debug: 136 print("<<<", self.key) 137 add_flag() 138 for func in self.exit_funcs: 139 await func() 140 sub_flag() 141 142 def on_enter(self): 143 def decorator(func): 144 self.enter_funcs.append(func) 145 return func 146 return decorator 147 148 def on_exit(self): 149 def decorator(func): 150 self.exit_funcs.append(func) 151 return func 152 return decorator 153 154 def on_cmd(self, cmds: List[Union[str, re.Pattern]], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 155 _cmds = ensure_regex_list(cmds) 156 157 def decorator(func): 158 t = AyakaTrigger(func, _cmds, deep, app, block, self) 159 self.triggers.append(t) 160 return func 161 return decorator 162 163 def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 164 return self.on_cmd([], app, deep, block)
AyakaState(key='root', parent: typing_extensions.Self = None)
43 def __init__(self, key="root", parent: Self = None): 44 self.key = key 45 self.parent = parent 46 if not parent: 47 self.keys = [key] 48 else: 49 self.keys = [*parent.keys, key] 50 self.children: List[Self] = [] 51 52 self.enter_funcs = [] 53 self.exit_funcs = [] 54 self.triggers: List[AyakaTrigger] = []
def
on_cmd( self, cmds: List[Union[str, re.Pattern]], app: ayaka.ayaka.AyakaApp, deep: Union[int, Literal['all']] = 0, block=True):
154 def on_cmd(self, cmds: List[Union[str, re.Pattern]], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 155 _cmds = ensure_regex_list(cmds) 156 157 def decorator(func): 158 t = AyakaTrigger(func, _cmds, deep, app, block, self) 159 self.triggers.append(t) 160 return func 161 return decorator
class
AyakaTrigger:
167class AyakaTrigger: 168 def __init__(self, func: Callable[..., Awaitable], cmds: List[re.Pattern], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState): 169 raw_cmds: List[str] = [cmd.pattern for cmd in cmds] 170 self.func = func 171 self.cmds = cmds 172 self.raw_cmds = raw_cmds 173 self.deep = deep 174 self.app = app 175 self.block = block 176 self.state = state 177 178 # 默认没有解析模型 179 models: List[AyakaInput] = [] 180 sig = inspect.signature(func) 181 for k, v in sig.parameters.items(): 182 cls = v.annotation 183 if issubclass(cls, AyakaInput): 184 models.append(cls) 185 186 # 生成帮助 187 doc = "" if not func.__doc__ else f"| {func.__doc__}" 188 cmd_str = '/'.join(raw_cmds) if raw_cmds else "<任意文字>" 189 190 if not models: 191 help = f"- {cmd_str} {doc}" 192 else: 193 helps = [] 194 for model in models: 195 data = model.help() 196 keys_str = " ".join(f"<{k}>" for k in data.keys()) 197 data_str = "\n".join(f" <{k}> {v}" for k, v in data.items()) 198 helps.append(f"- {cmd_str} {keys_str} {doc}\n{data_str}") 199 help = "\n".join(helps) 200 201 self.help = help 202 if len(state.keys) > 1: 203 s = str(state[1:]) 204 if s not in self.app.state_helps: 205 self.app.state_helps[s] = [] 206 self.app.state_helps[s].append(help) 207 else: 208 self.app.idle_helps.append(help) 209 210 if ayaka_root_config.debug: 211 print(repr(self)) 212 213 async def run(self): 214 params = {} 215 sig = inspect.signature(self.func) 216 217 for k, v in sig.parameters.items(): 218 cls = v.annotation 219 if issubclass(cls, AyakaDepend): 220 d = await cls.create_by_app(self.app) 221 if not d: 222 return False 223 params[k] = d 224 225 await self.func(**params) 226 return True 227 228 def __repr__(self) -> str: 229 return f"{self.__class__.__name__}({self})" 230 231 def __str__(self) -> str: 232 data = { 233 "func": self.func.__name__, 234 "app_name": self.app.name, 235 "cmds": self.raw_cmds, 236 "deep": self.deep, 237 "block": self.block 238 } 239 return " ".join(f"{k}={v}" for k, v in data.items())
AyakaTrigger( func: Callable[..., Awaitable], cmds: List[re.Pattern], deep: Union[int, Literal['all']], app: ayaka.ayaka.AyakaApp, block: bool, state: ayaka.state.AyakaState)
168 def __init__(self, func: Callable[..., Awaitable], cmds: List[re.Pattern], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState): 169 raw_cmds: List[str] = [cmd.pattern for cmd in cmds] 170 self.func = func 171 self.cmds = cmds 172 self.raw_cmds = raw_cmds 173 self.deep = deep 174 self.app = app 175 self.block = block 176 self.state = state 177 178 # 默认没有解析模型 179 models: List[AyakaInput] = [] 180 sig = inspect.signature(func) 181 for k, v in sig.parameters.items(): 182 cls = v.annotation 183 if issubclass(cls, AyakaInput): 184 models.append(cls) 185 186 # 生成帮助 187 doc = "" if not func.__doc__ else f"| {func.__doc__}" 188 cmd_str = '/'.join(raw_cmds) if raw_cmds else "<任意文字>" 189 190 if not models: 191 help = f"- {cmd_str} {doc}" 192 else: 193 helps = [] 194 for model in models: 195 data = model.help() 196 keys_str = " ".join(f"<{k}>" for k in data.keys()) 197 data_str = "\n".join(f" <{k}> {v}" for k, v in data.items()) 198 helps.append(f"- {cmd_str} {keys_str} {doc}\n{data_str}") 199 help = "\n".join(helps) 200 201 self.help = help 202 if len(state.keys) > 1: 203 s = str(state[1:]) 204 if s not in self.app.state_helps: 205 self.app.state_helps[s] = [] 206 self.app.state_helps[s].append(help) 207 else: 208 self.app.idle_helps.append(help) 209 210 if ayaka_root_config.debug: 211 print(repr(self))
async def
run(self):
213 async def run(self): 214 params = {} 215 sig = inspect.signature(self.func) 216 217 for k, v in sig.parameters.items(): 218 cls = v.annotation 219 if issubclass(cls, AyakaDepend): 220 d = await cls.create_by_app(self.app) 221 if not d: 222 return False 223 params[k] = d 224 225 await self.func(**params) 226 return True
class
AyakaTimer:
242class AyakaTimer: 243 def __repr__(self) -> str: 244 return f"AyakaTimer({self.app.name}, {self.gap}, {self.func.__name__})" 245 246 def __init__(self, app: "AyakaApp", gap: int, h: int, m: int, s: int, func, show=True) -> None: 247 self.app = app 248 self.h = h 249 self.m = m 250 self.s = s 251 self.gap = gap 252 self.func = func 253 self.show = show 254 if ayaka_root_config.debug: 255 print(self) 256 257 def start(self): 258 asyncio.create_task(self.run_forever()) 259 260 async def run_forever(self): 261 # 有启动时间点要求的 262 time_i = int(datetime.datetime.now().timestamp()) 263 if self.h >= 0: 264 _time_i = self.h*3600+self.m*60+self.s 265 # 移除时区偏差 266 time_i -= 57600 267 gap = 86400 - (time_i - _time_i) % 86400 268 await asyncio.sleep(gap) 269 elif self.m >= 0: 270 _time_i = self.m*60+self.s 271 gap = 3600 - (time_i-_time_i) % 3600 272 await asyncio.sleep(gap) 273 elif self.s >= 0: 274 _time_i = self.s 275 gap = 60 - (time_i-_time_i) % 60 276 await asyncio.sleep(gap) 277 278 while True: 279 if self.show: 280 logger.opt(colors=True).debug( 281 f"定时任务 | 插件:<y>{self.app.name}</y> | 回调:<c>{self.func.__name__}</c>") 282 asyncio.create_task(self.func()) 283 await asyncio.sleep(self.gap)
AyakaTimer( app: ayaka.ayaka.AyakaApp, gap: int, h: int, m: int, s: int, func, show=True)
async def
run_forever(self):
260 async def run_forever(self): 261 # 有启动时间点要求的 262 time_i = int(datetime.datetime.now().timestamp()) 263 if self.h >= 0: 264 _time_i = self.h*3600+self.m*60+self.s 265 # 移除时区偏差 266 time_i -= 57600 267 gap = 86400 - (time_i - _time_i) % 86400 268 await asyncio.sleep(gap) 269 elif self.m >= 0: 270 _time_i = self.m*60+self.s 271 gap = 3600 - (time_i-_time_i) % 3600 272 await asyncio.sleep(gap) 273 elif self.s >= 0: 274 _time_i = self.s 275 gap = 60 - (time_i-_time_i) % 60 276 await asyncio.sleep(gap) 277 278 while True: 279 if self.show: 280 logger.opt(colors=True).debug( 281 f"定时任务 | 插件:<y>{self.app.name}</y> | 回调:<c>{self.func.__name__}</c>") 282 asyncio.create_task(self.func()) 283 await asyncio.sleep(self.gap)