ayaka.depend.sql

  1import sqlite3
  2from typing import TYPE_CHECKING, List, Type
  3
  4from loguru import logger
  5from ..config import ayaka_data_path, ayaka_root_config
  6
  7if TYPE_CHECKING:
  8    from .db import AyakaDB
  9
 10PrimaryKey = {"primary": True}
 11JsonKey = {"json": True}
 12
 13path = ayaka_data_path / "ayaka.db"
 14journal_path = ayaka_data_path / "ayaka.db-journal"
 15db = sqlite3.connect(path)
 16
 17
 18def execute(query, values=None):
 19    if ayaka_root_config.debug:
 20        print(query)
 21        if values:
 22            print(values)
 23    if values is None:
 24        cursor = db.execute(query)
 25    else:
 26        cursor = db.execute(query, values)
 27    cursor.close()
 28
 29
 30def executemany(query, values=None):
 31    if ayaka_root_config.debug:
 32        print(query)
 33        if values:
 34            print(values)
 35    if values is None:
 36        cursor = db.executemany(query)
 37    else:
 38        cursor = db.executemany(query, values)
 39    cursor.close()
 40
 41
 42def fetchall(query: str):
 43    cursor = db.execute(query)
 44    values = cursor.fetchall()
 45    cursor.close()
 46    if ayaka_root_config.debug:
 47        print(query)
 48        print(values)
 49    return values
 50
 51
 52def create_table(name: str, cls: Type["AyakaDB"]):
 53    props = cls.props()
 54    args = []
 55    primarys = []
 56
 57    for k, v in props.items():
 58        extra: dict = v.get("extra", {})
 59        if extra.get("primary"):
 60            primarys.append(k)
 61        if extra.get("json"):
 62            args.append(f"{k} text")
 63        else:
 64            args.append(f"{k} {v['type']}")
 65
 66    if primarys:
 67        primarys_str = ",".join(f"\"{k}\"" for k in primarys)
 68        args.append(f"PRIMARY KEY({primarys_str})")
 69
 70    args_str = ",\n".join(args)
 71    query = f"create table if not exists \"{name}\" ({args_str})"
 72
 73    execute(query)
 74
 75
 76def insert_or_replace(name: str, data: "AyakaDB", action: str):
 77    keys = list(data.dict().keys())
 78    values = list(data.dict().values())
 79    keys_str = ",".join(keys)
 80    values_str = ("(?),"*len(keys))[:-1]
 81    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
 82    execute(query, values)
 83
 84
 85def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str):
 86    data = datas[0]
 87    keys = list(data.dict().keys())
 88    values = [[getattr(data, k) for k in keys] for data in datas]
 89    keys_str = ",".join(keys)
 90    values_str = ("(?),"*len(keys))[:-1]
 91    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
 92    executemany(query, values)
 93
 94
 95def select_many(name: str, cls: Type["AyakaDB"], where: str = "1"):
 96    props = cls.props()
 97    keys = list(props.keys())
 98
 99    # 本来想用*,不过为了保险起见(后续更新的兼容性),还是老老实实写key吧
100    keys_str = ",".join(keys)
101    query = f"select {keys_str} from \"{name}\" where {where}"
102    values = fetchall(query)
103
104    # 组装为字典
105    datas = [
106        cls._create_by_db_data({k: v for k, v in zip(keys, vs)})
107        for vs in values
108    ]
109    return datas
110
111
112def drop_table(name: str):
113    query = f"drop table if exists \"{name}\""
114    execute(query)
115
116
117def commit():
118    if journal_path.exists():
119        if ayaka_root_config.debug:
120            print("commit")
121        logger.debug("更新数据库")
122        db.commit()
123
124
125def wrap(v):
126    if isinstance(v, str):
127        return f"\"{v}\""
128    return str(v)
def execute(query, values=None):
19def execute(query, values=None):
20    if ayaka_root_config.debug:
21        print(query)
22        if values:
23            print(values)
24    if values is None:
25        cursor = db.execute(query)
26    else:
27        cursor = db.execute(query, values)
28    cursor.close()
def executemany(query, values=None):
31def executemany(query, values=None):
32    if ayaka_root_config.debug:
33        print(query)
34        if values:
35            print(values)
36    if values is None:
37        cursor = db.executemany(query)
38    else:
39        cursor = db.executemany(query, values)
40    cursor.close()
def fetchall(query: str):
43def fetchall(query: str):
44    cursor = db.execute(query)
45    values = cursor.fetchall()
46    cursor.close()
47    if ayaka_root_config.debug:
48        print(query)
49        print(values)
50    return values
def create_table(name: str, cls: type[ayaka.depend.db.AyakaDB]):
53def create_table(name: str, cls: Type["AyakaDB"]):
54    props = cls.props()
55    args = []
56    primarys = []
57
58    for k, v in props.items():
59        extra: dict = v.get("extra", {})
60        if extra.get("primary"):
61            primarys.append(k)
62        if extra.get("json"):
63            args.append(f"{k} text")
64        else:
65            args.append(f"{k} {v['type']}")
66
67    if primarys:
68        primarys_str = ",".join(f"\"{k}\"" for k in primarys)
69        args.append(f"PRIMARY KEY({primarys_str})")
70
71    args_str = ",\n".join(args)
72    query = f"create table if not exists \"{name}\" ({args_str})"
73
74    execute(query)
def insert_or_replace(name: str, data: ayaka.depend.db.AyakaDB, action: str):
77def insert_or_replace(name: str, data: "AyakaDB", action: str):
78    keys = list(data.dict().keys())
79    values = list(data.dict().values())
80    keys_str = ",".join(keys)
81    values_str = ("(?),"*len(keys))[:-1]
82    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
83    execute(query, values)
def insert_or_replace_many(name: str, datas: list[ayaka.depend.db.AyakaDB], action: str):
86def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str):
87    data = datas[0]
88    keys = list(data.dict().keys())
89    values = [[getattr(data, k) for k in keys] for data in datas]
90    keys_str = ",".join(keys)
91    values_str = ("(?),"*len(keys))[:-1]
92    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
93    executemany(query, values)
def select_many(name: str, cls: type[ayaka.depend.db.AyakaDB], where: str = '1'):
 96def select_many(name: str, cls: Type["AyakaDB"], where: str = "1"):
 97    props = cls.props()
 98    keys = list(props.keys())
 99
100    # 本来想用*,不过为了保险起见(后续更新的兼容性),还是老老实实写key吧
101    keys_str = ",".join(keys)
102    query = f"select {keys_str} from \"{name}\" where {where}"
103    values = fetchall(query)
104
105    # 组装为字典
106    datas = [
107        cls._create_by_db_data({k: v for k, v in zip(keys, vs)})
108        for vs in values
109    ]
110    return datas
def drop_table(name: str):
113def drop_table(name: str):
114    query = f"drop table if exists \"{name}\""
115    execute(query)
def commit():
118def commit():
119    if journal_path.exists():
120        if ayaka_root_config.debug:
121            print("commit")
122        logger.debug("更新数据库")
123        db.commit()
def wrap(v):
126def wrap(v):
127    if isinstance(v, str):
128        return f"\"{v}\""
129    return str(v)