ayaka.state
1import inspect 2from typing import TYPE_CHECKING, List, Literal, Union, Awaitable, Callable 3from typing_extensions import Self 4 5from .config import ayaka_root_config 6from .constant import _enter_exit_during 7from .depend import AyakaDepend, AyakaInput 8 9 10if TYPE_CHECKING: 11 from .ayaka import AyakaApp 12 13 14def add_flag(): 15 _enter_exit_during.set(_enter_exit_during.get()+1) 16 17 18def sub_flag(): 19 _enter_exit_during.set(_enter_exit_during.get()-1) 20 21 22class AyakaState: 23 def __init__(self, key="root", parent: Self = None): 24 self.key = key 25 self.parent = parent 26 if not parent: 27 self.keys = [key] 28 else: 29 self.keys = [*parent.keys, key] 30 self.children: List[Self] = [] 31 32 self.enter_funcs = [] 33 self.exit_funcs = [] 34 self.triggers: List[AyakaTrigger] = [] 35 36 def __getitem__(self, k): 37 if isinstance(k, slice): 38 s = AyakaState("") 39 s.keys = self.keys[k] 40 if s.keys: 41 s.key = s.keys[-1] 42 return s 43 44 if isinstance(k, int): 45 s = AyakaState(self.keys[k]) 46 return s 47 48 for node in self.children: 49 if node.key == k: 50 return node 51 node = self.__class__(k, self) 52 self.children.append(node) 53 return node 54 55 def __getattr__(self, k: str): 56 return self[k] 57 58 def __iter__(self): 59 return iter(self.children) 60 61 def __str__(self) -> str: 62 return ".".join(self.keys) 63 64 def __repr__(self) -> str: 65 return f"{self.__class__.__name__}({self})" 66 67 def join(self, *keys: str) -> Self: 68 node = self 69 for key in keys: 70 node = node[key] 71 return node 72 73 def belong(self, node: Self): 74 if len(self.keys) < len(node.keys): 75 return False 76 77 for i in range(len(node.keys)): 78 if node.keys[i] != self.keys[i]: 79 return False 80 81 return True 82 83 def __ge__(self, node: Self): 84 return self.belong(node) 85 86 def __le__(self, node: Self): 87 return node.belong(self) 88 89 def __gt__(self, node: Self): 90 return self >= node and len(self.keys) > len(node.keys) 91 92 def __lt__(self, node: Self): 93 return self <= node and len(self.keys) < len(node.keys) 94 95 def __eq__(self, node: Self): 96 return self <= node and len(self.keys) == len(node.keys) 97 98 def dict(self): 99 data = [child.dict() for child in self.children] 100 return { 101 "name": self.key, 102 "triggers": self.triggers, 103 "children": data, 104 } 105 106 async def enter(self): 107 if ayaka_root_config.debug: 108 print(">>>", self.key) 109 add_flag() 110 for func in self.enter_funcs: 111 await func() 112 sub_flag() 113 114 async def exit(self): 115 if ayaka_root_config.debug: 116 print("<<<", self.key) 117 add_flag() 118 for func in self.exit_funcs: 119 await func() 120 sub_flag() 121 122 def on_enter(self): 123 def decorator(func): 124 self.enter_funcs.append(func) 125 return func 126 return decorator 127 128 def on_exit(self): 129 def decorator(func): 130 self.exit_funcs.append(func) 131 return func 132 return decorator 133 134 def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 135 def decorator(func): 136 t = AyakaTrigger(func, cmds, deep, app, block, self) 137 self.triggers.append(t) 138 return func 139 return decorator 140 141 def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 142 return self.on_cmd([], app, deep, block) 143 144 145class AyakaTrigger: 146 def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState): 147 self.func = func 148 self.cmds = cmds 149 self.deep = deep 150 self.app = app 151 self.block = block 152 self.state = state 153 154 # 默认没有解析模型 155 model = None 156 sig = inspect.signature(func) 157 for k, v in sig.parameters.items(): 158 cls = v.annotation 159 if issubclass(cls, AyakaInput): 160 model = cls 161 break 162 self.model = model 163 164 # 生成帮助 165 doc = "" if not func.__doc__ else f"| {func.__doc__}" 166 cmd_str = '/'.join(cmds) if cmds else "<任意文字>" 167 168 if not model: 169 help = f"- {cmd_str} {doc}" 170 else: 171 data = model.help() 172 keys_str = " ".join(f"<{k}>" for k in data.keys()) 173 data_str = "\n".join(f" <{k}> {v}" for k, v in data.items()) 174 help = f"- {cmd_str} {keys_str} {doc}\n{data_str}" 175 176 self.help = help 177 if len(state.keys) > 1: 178 s = str(state[1:]) 179 if s not in self.app.state_helps: 180 self.app.state_helps[s] = [] 181 self.app.state_helps[s].append(help) 182 else: 183 self.app.idle_helps.append(help) 184 185 if ayaka_root_config.debug: 186 print(repr(self)) 187 188 async def run(self): 189 params = {} 190 sig = inspect.signature(self.func) 191 192 for k, v in sig.parameters.items(): 193 cls = v.annotation 194 if issubclass(cls, AyakaDepend): 195 d = await cls._create_by_app(self.app) 196 if not d: 197 return 198 params[k] = d 199 200 await self.func(**params) 201 202 def __repr__(self) -> str: 203 return f"{self.__class__.__name__}({self})" 204 205 def __str__(self) -> str: 206 data = { 207 "func": self.func.__name__, 208 "app_name": self.app.name, 209 "cmds": self.cmds, 210 "deep": self.deep, 211 "block": self.block 212 } 213 return " ".join(f"{k}={v}" for k, v in data.items()) 214 215 216root_state = AyakaState()
def
add_flag():
def
sub_flag():
class
AyakaState:
23class AyakaState: 24 def __init__(self, key="root", parent: Self = None): 25 self.key = key 26 self.parent = parent 27 if not parent: 28 self.keys = [key] 29 else: 30 self.keys = [*parent.keys, key] 31 self.children: List[Self] = [] 32 33 self.enter_funcs = [] 34 self.exit_funcs = [] 35 self.triggers: List[AyakaTrigger] = [] 36 37 def __getitem__(self, k): 38 if isinstance(k, slice): 39 s = AyakaState("") 40 s.keys = self.keys[k] 41 if s.keys: 42 s.key = s.keys[-1] 43 return s 44 45 if isinstance(k, int): 46 s = AyakaState(self.keys[k]) 47 return s 48 49 for node in self.children: 50 if node.key == k: 51 return node 52 node = self.__class__(k, self) 53 self.children.append(node) 54 return node 55 56 def __getattr__(self, k: str): 57 return self[k] 58 59 def __iter__(self): 60 return iter(self.children) 61 62 def __str__(self) -> str: 63 return ".".join(self.keys) 64 65 def __repr__(self) -> str: 66 return f"{self.__class__.__name__}({self})" 67 68 def join(self, *keys: str) -> Self: 69 node = self 70 for key in keys: 71 node = node[key] 72 return node 73 74 def belong(self, node: Self): 75 if len(self.keys) < len(node.keys): 76 return False 77 78 for i in range(len(node.keys)): 79 if node.keys[i] != self.keys[i]: 80 return False 81 82 return True 83 84 def __ge__(self, node: Self): 85 return self.belong(node) 86 87 def __le__(self, node: Self): 88 return node.belong(self) 89 90 def __gt__(self, node: Self): 91 return self >= node and len(self.keys) > len(node.keys) 92 93 def __lt__(self, node: Self): 94 return self <= node and len(self.keys) < len(node.keys) 95 96 def __eq__(self, node: Self): 97 return self <= node and len(self.keys) == len(node.keys) 98 99 def dict(self): 100 data = [child.dict() for child in self.children] 101 return { 102 "name": self.key, 103 "triggers": self.triggers, 104 "children": data, 105 } 106 107 async def enter(self): 108 if ayaka_root_config.debug: 109 print(">>>", self.key) 110 add_flag() 111 for func in self.enter_funcs: 112 await func() 113 sub_flag() 114 115 async def exit(self): 116 if ayaka_root_config.debug: 117 print("<<<", self.key) 118 add_flag() 119 for func in self.exit_funcs: 120 await func() 121 sub_flag() 122 123 def on_enter(self): 124 def decorator(func): 125 self.enter_funcs.append(func) 126 return func 127 return decorator 128 129 def on_exit(self): 130 def decorator(func): 131 self.exit_funcs.append(func) 132 return func 133 return decorator 134 135 def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 136 def decorator(func): 137 t = AyakaTrigger(func, cmds, deep, app, block, self) 138 self.triggers.append(t) 139 return func 140 return decorator 141 142 def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 143 return self.on_cmd([], app, deep, block)
AyakaState(key='root', parent: typing_extensions.Self = None)
24 def __init__(self, key="root", parent: Self = None): 25 self.key = key 26 self.parent = parent 27 if not parent: 28 self.keys = [key] 29 else: 30 self.keys = [*parent.keys, key] 31 self.children: List[Self] = [] 32 33 self.enter_funcs = [] 34 self.exit_funcs = [] 35 self.triggers: List[AyakaTrigger] = []
def
on_cmd( self, cmds: List[str], app: ayaka.ayaka.AyakaApp, deep: Union[int, Literal['all']] = 0, block=True):
class
AyakaTrigger:
146class AyakaTrigger: 147 def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState): 148 self.func = func 149 self.cmds = cmds 150 self.deep = deep 151 self.app = app 152 self.block = block 153 self.state = state 154 155 # 默认没有解析模型 156 model = None 157 sig = inspect.signature(func) 158 for k, v in sig.parameters.items(): 159 cls = v.annotation 160 if issubclass(cls, AyakaInput): 161 model = cls 162 break 163 self.model = model 164 165 # 生成帮助 166 doc = "" if not func.__doc__ else f"| {func.__doc__}" 167 cmd_str = '/'.join(cmds) if cmds else "<任意文字>" 168 169 if not model: 170 help = f"- {cmd_str} {doc}" 171 else: 172 data = model.help() 173 keys_str = " ".join(f"<{k}>" for k in data.keys()) 174 data_str = "\n".join(f" <{k}> {v}" for k, v in data.items()) 175 help = f"- {cmd_str} {keys_str} {doc}\n{data_str}" 176 177 self.help = help 178 if len(state.keys) > 1: 179 s = str(state[1:]) 180 if s not in self.app.state_helps: 181 self.app.state_helps[s] = [] 182 self.app.state_helps[s].append(help) 183 else: 184 self.app.idle_helps.append(help) 185 186 if ayaka_root_config.debug: 187 print(repr(self)) 188 189 async def run(self): 190 params = {} 191 sig = inspect.signature(self.func) 192 193 for k, v in sig.parameters.items(): 194 cls = v.annotation 195 if issubclass(cls, AyakaDepend): 196 d = await cls._create_by_app(self.app) 197 if not d: 198 return 199 params[k] = d 200 201 await self.func(**params) 202 203 def __repr__(self) -> str: 204 return f"{self.__class__.__name__}({self})" 205 206 def __str__(self) -> str: 207 data = { 208 "func": self.func.__name__, 209 "app_name": self.app.name, 210 "cmds": self.cmds, 211 "deep": self.deep, 212 "block": self.block 213 } 214 return " ".join(f"{k}={v}" for k, v in data.items())
AyakaTrigger( func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal['all']], app: ayaka.ayaka.AyakaApp, block: bool, state: ayaka.state.AyakaState)
147 def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState): 148 self.func = func 149 self.cmds = cmds 150 self.deep = deep 151 self.app = app 152 self.block = block 153 self.state = state 154 155 # 默认没有解析模型 156 model = None 157 sig = inspect.signature(func) 158 for k, v in sig.parameters.items(): 159 cls = v.annotation 160 if issubclass(cls, AyakaInput): 161 model = cls 162 break 163 self.model = model 164 165 # 生成帮助 166 doc = "" if not func.__doc__ else f"| {func.__doc__}" 167 cmd_str = '/'.join(cmds) if cmds else "<任意文字>" 168 169 if not model: 170 help = f"- {cmd_str} {doc}" 171 else: 172 data = model.help() 173 keys_str = " ".join(f"<{k}>" for k in data.keys()) 174 data_str = "\n".join(f" <{k}> {v}" for k, v in data.items()) 175 help = f"- {cmd_str} {keys_str} {doc}\n{data_str}" 176 177 self.help = help 178 if len(state.keys) > 1: 179 s = str(state[1:]) 180 if s not in self.app.state_helps: 181 self.app.state_helps[s] = [] 182 self.app.state_helps[s].append(help) 183 else: 184 self.app.idle_helps.append(help) 185 186 if ayaka_root_config.debug: 187 print(repr(self))
async def
run(self):
189 async def run(self): 190 params = {} 191 sig = inspect.signature(self.func) 192 193 for k, v in sig.parameters.items(): 194 cls = v.annotation 195 if issubclass(cls, AyakaDepend): 196 d = await cls._create_by_app(self.app) 197 if not d: 198 return 199 params[k] = d 200 201 await self.func(**params)