ayaka.depend.sql

  1import sqlite3
  2from typing import TYPE_CHECKING, List, Type
  3from ..config import ayaka_data_path, ayaka_root_config
  4
  5if TYPE_CHECKING:
  6    from .db import AyakaDB
  7
  8PrimaryKey = {"primary": True}
  9JsonKey = {"json": True}
 10
 11path = ayaka_data_path / "ayaka.db"
 12db = sqlite3.connect(path)
 13
 14
 15def execute(query, values=None):
 16    if ayaka_root_config.debug:
 17        print(query)
 18        if values:
 19            print(values)
 20    if values is None:
 21        cursor = db.execute(query)
 22    else:
 23        cursor = db.execute(query, values)
 24    cursor.close()
 25
 26
 27def executemany(query, values=None):
 28    if ayaka_root_config.debug:
 29        print(query)
 30        if values:
 31            print(values)
 32    if values is None:
 33        cursor = db.executemany(query)
 34    else:
 35        cursor = db.executemany(query, values)
 36    cursor.close()
 37
 38
 39def fetchall(query: str):
 40    cursor = db.execute(query)
 41    values = cursor.fetchall()
 42    cursor.close()
 43    if ayaka_root_config.debug:
 44        print(query)
 45        print(values)
 46    return values
 47
 48
 49def create_table(name: str, cls: Type["AyakaDB"]):
 50    props = cls.props()
 51    args = []
 52    primarys = []
 53
 54    for k, v in props.items():
 55        extra: dict = v.get("extra", {})
 56        if extra.get("primary"):
 57            primarys.append(k)
 58        if extra.get("json"):
 59            args.append(f"{k} text")
 60        else:
 61            args.append(f"{k} {v['type']}")
 62
 63    if primarys:
 64        primarys_str = ",".join(f"\"{k}\"" for k in primarys)
 65        args.append(f"PRIMARY KEY({primarys_str})")
 66
 67    args_str = ",\n".join(args)
 68    query = f"create table if not exists \"{name}\" ({args_str})"
 69
 70    execute(query)
 71
 72
 73def insert_or_replace(name: str, data: "AyakaDB", action: str):
 74    keys = list(data.dict().keys())
 75    values = list(data.dict().values())
 76    keys_str = ",".join(keys)
 77    values_str = ("(?),"*len(keys))[:-1]
 78    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
 79    execute(query, values)
 80
 81
 82def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str):
 83    data = datas[0]
 84    keys = list(data.dict().keys())
 85    values = [[getattr(data, k) for k in keys] for data in datas]
 86    keys_str = ",".join(keys)
 87    values_str = ("(?),"*len(keys))[:-1]
 88    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
 89    executemany(query, values)
 90
 91
 92def select_many(name: str, cls: Type["AyakaDB"], where: str = "1"):
 93    props = cls.props()
 94    keys = list(props.keys())
 95
 96    # 本来想用*,不过为了保险起见(后续更新的兼容性),还是老老实实写key吧
 97    keys_str = ",".join(keys)
 98    query = f"select {keys_str} from \"{name}\" where {where}"
 99    values = fetchall(query)
100
101    # 组装为字典
102    datas = [
103        cls._create_by_db_data({k: v for k, v in zip(keys, vs)})
104        for vs in values
105    ]
106    return datas
107
108
109def drop_table(name: str):
110    query = f"drop table if exists \"{name}\""
111    execute(query)
112
113
114def commit():
115    if ayaka_root_config.debug:
116        print("commit")
117    db.commit()
118
119
120def wrap(v):
121    if isinstance(v, str):
122        return f"\"{v}\""
123    return str(v)
def execute(query, values=None):
16def execute(query, values=None):
17    if ayaka_root_config.debug:
18        print(query)
19        if values:
20            print(values)
21    if values is None:
22        cursor = db.execute(query)
23    else:
24        cursor = db.execute(query, values)
25    cursor.close()
def executemany(query, values=None):
28def executemany(query, values=None):
29    if ayaka_root_config.debug:
30        print(query)
31        if values:
32            print(values)
33    if values is None:
34        cursor = db.executemany(query)
35    else:
36        cursor = db.executemany(query, values)
37    cursor.close()
def fetchall(query: str):
40def fetchall(query: str):
41    cursor = db.execute(query)
42    values = cursor.fetchall()
43    cursor.close()
44    if ayaka_root_config.debug:
45        print(query)
46        print(values)
47    return values
def create_table(name: str, cls: type[ayaka.depend.db.AyakaDB]):
50def create_table(name: str, cls: Type["AyakaDB"]):
51    props = cls.props()
52    args = []
53    primarys = []
54
55    for k, v in props.items():
56        extra: dict = v.get("extra", {})
57        if extra.get("primary"):
58            primarys.append(k)
59        if extra.get("json"):
60            args.append(f"{k} text")
61        else:
62            args.append(f"{k} {v['type']}")
63
64    if primarys:
65        primarys_str = ",".join(f"\"{k}\"" for k in primarys)
66        args.append(f"PRIMARY KEY({primarys_str})")
67
68    args_str = ",\n".join(args)
69    query = f"create table if not exists \"{name}\" ({args_str})"
70
71    execute(query)
def insert_or_replace(name: str, data: ayaka.depend.db.AyakaDB, action: str):
74def insert_or_replace(name: str, data: "AyakaDB", action: str):
75    keys = list(data.dict().keys())
76    values = list(data.dict().values())
77    keys_str = ",".join(keys)
78    values_str = ("(?),"*len(keys))[:-1]
79    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
80    execute(query, values)
def insert_or_replace_many(name: str, datas: list[ayaka.depend.db.AyakaDB], action: str):
83def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str):
84    data = datas[0]
85    keys = list(data.dict().keys())
86    values = [[getattr(data, k) for k in keys] for data in datas]
87    keys_str = ",".join(keys)
88    values_str = ("(?),"*len(keys))[:-1]
89    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
90    executemany(query, values)
def select_many(name: str, cls: type[ayaka.depend.db.AyakaDB], where: str = '1'):
 93def select_many(name: str, cls: Type["AyakaDB"], where: str = "1"):
 94    props = cls.props()
 95    keys = list(props.keys())
 96
 97    # 本来想用*,不过为了保险起见(后续更新的兼容性),还是老老实实写key吧
 98    keys_str = ",".join(keys)
 99    query = f"select {keys_str} from \"{name}\" where {where}"
100    values = fetchall(query)
101
102    # 组装为字典
103    datas = [
104        cls._create_by_db_data({k: v for k, v in zip(keys, vs)})
105        for vs in values
106    ]
107    return datas
def drop_table(name: str):
110def drop_table(name: str):
111    query = f"drop table if exists \"{name}\""
112    execute(query)
def commit():
115def commit():
116    if ayaka_root_config.debug:
117        print("commit")
118    db.commit()
def wrap(v):
121def wrap(v):
122    if isinstance(v, str):
123        return f"\"{v}\""
124    return str(v)