ayaka.ayaka_master

ayaka综合管理模块

  1'''ayaka综合管理模块'''
  2from pydantic import Field
  3from .ayaka import app_list, AyakaApp, get_app
  4from .config import ayaka_root_config, save
  5from .state import root_state
  6from .driver import get_driver
  7from .depend import commit, AyakaInput
  8
  9driver = get_driver()
 10
 11app = AyakaApp("ayaka_master")
 12app.help = '''ayaka综合管理模块'''
 13
 14
 15class UidInput(AyakaInput):
 16    uid: int = Field(description="QQ号", gt=0)
 17
 18
 19class AppnameInput(AyakaInput):
 20    name: str = Field("", description="应用名")
 21
 22
 23@app.on.idle(super=True)
 24@app.on.command("插件", "plugin", "plugins")
 25async def show_plugins():
 26    '''展示所有应用'''
 27    items = []
 28    for _app in app_list:
 29        info = f"[{_app.name}]"
 30        items.append(info)
 31    await app.send("\n".join(items))
 32
 33
 34@app.on.idle(super=True)
 35@app.on.command("状态", "state")
 36async def show_state():
 37    state = app.group.state
 38    if state <= root_state:
 39        await app.send("当前设备处于闲置状态")
 40        return
 41    await app.send(f"当前状态:{state}")
 42
 43
 44@app.on.idle(super=True)
 45@app.on.command("帮助", "help")
 46async def show_help(data: AppnameInput):
 47    # 展示当前应用当前状态的帮助
 48    if app.state > root_state:
 49        app_name = app.state.keys[1]
 50        _app = get_app(app_name)
 51        await app.send(_app.help)
 52        return
 53
 54    # 没有应用正在运行
 55    # 查询指定应用的详细帮助
 56    name = data.name
 57    if name:
 58        _app = get_app(name)
 59        if not _app:
 60            await app.send(f"应用不存在 [{name}]")
 61            return
 62
 63        # 详细帮助
 64        info = f"[{_app.name}]\n{_app.all_help}"
 65        await app.send(info)
 66        return
 67
 68    # 展示所有应用介绍
 69    items = ["所有应用介绍一览"]
 70    for _app in app_list:
 71        info = f"[{_app.name}]\n{_app.intro}"
 72        items.append(info)
 73
 74    await app.send_many(items)
 75    await app.send("使用 帮助 <插件名> 可以进一步展示指定插件的详细帮助信息")
 76
 77
 78@app.on.idle(super=True)
 79@app.on.command("强制退出", "force_exit")
 80async def force_exit():
 81    await app.group.goto(root_state)
 82    await app.send("已强制退出应用")
 83
 84
 85@app.on.idle(super=True)
 86@app.on.command("查看缓存")
 87async def show_cache():
 88    '''查看当前群组的所有缓存'''
 89    print(app.group.cache_dict)
 90
 91
 92@app.on.idle(super=True)
 93@app.on.command("add_admin")
 94async def add_admin(data: UidInput):
 95    '''增加ayaka管理者'''
 96    if app.user_id not in ayaka_root_config.owners:
 97        await app.send("仅有ayaka所有者有权执行此命令")
 98        return
 99
100    uid = data.uid
101
102    if uid not in ayaka_root_config.admins:
103        ayaka_root_config.admins.append(uid)
104        ayaka_root_config.force_update()
105
106    await app.send("设置成功")
107
108
109@app.on.idle(super=True)
110@app.on.command("remove_admin")
111async def remove_admin(data: UidInput):
112    '''移除ayaka管理者'''
113    if app.user_id not in ayaka_root_config.owners:
114        await app.send("仅有ayaka所有者有权执行此命令")
115        return
116
117    uid = data.uid
118
119    if uid in ayaka_root_config.admins:
120        ayaka_root_config.admins.remove(uid)
121        ayaka_root_config.force_update()
122
123    await app.send("设置成功")
124
125
126# 定时提交db、保存setting
127@app.on.interval(60, show=False)
128async def update_data():
129    commit()
130    save()
131
132
133# 退出提交
134@driver.on_shutdown
135async def update_data():
136    commit()
137    save()
class UidInput(ayaka.depend.input.AyakaInput):
16class UidInput(AyakaInput):
17    uid: int = Field(description="QQ号", gt=0)

解析命令行参数为对应成员属性

注意:每个成员属性都必须标注类型,否则可能解析顺序出错

Inherited Members
pydantic.main.BaseModel
BaseModel
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
ayaka.depend.input.AyakaInput
help
ayaka.depend.depend.AyakaDepend
props
class AppnameInput(ayaka.depend.input.AyakaInput):
20class AppnameInput(AyakaInput):
21    name: str = Field("", description="应用名")

解析命令行参数为对应成员属性

注意:每个成员属性都必须标注类型,否则可能解析顺序出错

Inherited Members
pydantic.main.BaseModel
BaseModel
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
ayaka.depend.input.AyakaInput
help
ayaka.depend.depend.AyakaDepend
props
@app.on.idle(super=True)
@app.on.command('插件', 'plugin', 'plugins')
async def show_plugins():
24@app.on.idle(super=True)
25@app.on.command("插件", "plugin", "plugins")
26async def show_plugins():
27    '''展示所有应用'''
28    items = []
29    for _app in app_list:
30        info = f"[{_app.name}]"
31        items.append(info)
32    await app.send("\n".join(items))

展示所有应用

@app.on.idle(super=True)
@app.on.command('状态', 'state')
async def show_state():
35@app.on.idle(super=True)
36@app.on.command("状态", "state")
37async def show_state():
38    state = app.group.state
39    if state <= root_state:
40        await app.send("当前设备处于闲置状态")
41        return
42    await app.send(f"当前状态:{state}")
@app.on.idle(super=True)
@app.on.command('帮助', 'help')
async def show_help(data: ayaka.ayaka_master.AppnameInput):
45@app.on.idle(super=True)
46@app.on.command("帮助", "help")
47async def show_help(data: AppnameInput):
48    # 展示当前应用当前状态的帮助
49    if app.state > root_state:
50        app_name = app.state.keys[1]
51        _app = get_app(app_name)
52        await app.send(_app.help)
53        return
54
55    # 没有应用正在运行
56    # 查询指定应用的详细帮助
57    name = data.name
58    if name:
59        _app = get_app(name)
60        if not _app:
61            await app.send(f"应用不存在 [{name}]")
62            return
63
64        # 详细帮助
65        info = f"[{_app.name}]\n{_app.all_help}"
66        await app.send(info)
67        return
68
69    # 展示所有应用介绍
70    items = ["所有应用介绍一览"]
71    for _app in app_list:
72        info = f"[{_app.name}]\n{_app.intro}"
73        items.append(info)
74
75    await app.send_many(items)
76    await app.send("使用 帮助 <插件名> 可以进一步展示指定插件的详细帮助信息")
@app.on.idle(super=True)
@app.on.command('强制退出', 'force_exit')
async def force_exit():
79@app.on.idle(super=True)
80@app.on.command("强制退出", "force_exit")
81async def force_exit():
82    await app.group.goto(root_state)
83    await app.send("已强制退出应用")
@app.on.idle(super=True)
@app.on.command('查看缓存')
async def show_cache():
86@app.on.idle(super=True)
87@app.on.command("查看缓存")
88async def show_cache():
89    '''查看当前群组的所有缓存'''
90    print(app.group.cache_dict)

查看当前群组的所有缓存

@app.on.idle(super=True)
@app.on.command('add_admin')
async def add_admin(data: ayaka.ayaka_master.UidInput):
 93@app.on.idle(super=True)
 94@app.on.command("add_admin")
 95async def add_admin(data: UidInput):
 96    '''增加ayaka管理者'''
 97    if app.user_id not in ayaka_root_config.owners:
 98        await app.send("仅有ayaka所有者有权执行此命令")
 99        return
100
101    uid = data.uid
102
103    if uid not in ayaka_root_config.admins:
104        ayaka_root_config.admins.append(uid)
105        ayaka_root_config.force_update()
106
107    await app.send("设置成功")

增加ayaka管理者

@app.on.idle(super=True)
@app.on.command('remove_admin')
async def remove_admin(data: ayaka.ayaka_master.UidInput):
110@app.on.idle(super=True)
111@app.on.command("remove_admin")
112async def remove_admin(data: UidInput):
113    '''移除ayaka管理者'''
114    if app.user_id not in ayaka_root_config.owners:
115        await app.send("仅有ayaka所有者有权执行此命令")
116        return
117
118    uid = data.uid
119
120    if uid in ayaka_root_config.admins:
121        ayaka_root_config.admins.remove(uid)
122        ayaka_root_config.force_update()
123
124    await app.send("设置成功")

移除ayaka管理者

@driver.on_shutdown
async def update_data():
135@driver.on_shutdown
136async def update_data():
137    commit()
138    save()