ayaka.depend.sql

  1import os
  2import sqlite3
  3from typing import TYPE_CHECKING, List, Type
  4
  5from loguru import logger
  6from ..config import ayaka_data_path, ayaka_root_config
  7
  8if TYPE_CHECKING:
  9    from .db import AyakaDB
 10
 11PrimaryKey = {"primary": True}
 12JsonKey = {"json": True}
 13
 14path = ayaka_data_path / "ayaka.db"
 15journal_path = ayaka_data_path / "ayaka.db-journal"
 16old_journal_path = ayaka_data_path / "ayaka.db-journal-old"
 17db = sqlite3.connect(path)
 18
 19
 20def execute(query, values=None):
 21    if ayaka_root_config.debug:
 22        print(query)
 23        if values:
 24            print(values)
 25    if values is None:
 26        cursor = db.execute(query)
 27    else:
 28        cursor = db.execute(query, values)
 29    cursor.close()
 30
 31
 32def executemany(query, values=None):
 33    if ayaka_root_config.debug:
 34        print(query)
 35        if values:
 36            print(values)
 37    if values is None:
 38        cursor = db.executemany(query)
 39    else:
 40        cursor = db.executemany(query, values)
 41    cursor.close()
 42
 43
 44def fetchall(query: str):
 45    cursor = db.execute(query)
 46    values = cursor.fetchall()
 47    cursor.close()
 48    if ayaka_root_config.debug:
 49        print(query)
 50        print(values)
 51    return values
 52
 53
 54def create_table(name: str, cls: Type["AyakaDB"]):
 55    props = cls.props()
 56    args = []
 57    primarys = []
 58
 59    for k, v in props.items():
 60        extra: dict = v.get("extra", {})
 61        if extra.get("primary"):
 62            primarys.append(k)
 63        if extra.get("json"):
 64            args.append(f"{k} text")
 65        else:
 66            args.append(f"{k} {v['type']}")
 67
 68    if primarys:
 69        primarys_str = ",".join(f"\"{k}\"" for k in primarys)
 70        args.append(f"PRIMARY KEY({primarys_str})")
 71
 72    args_str = ",\n".join(args)
 73    query = f"create table if not exists \"{name}\" ({args_str})"
 74
 75    execute(query)
 76
 77
 78def insert_or_replace(name: str, data: "AyakaDB", action: str):
 79    keys = list(data.dict().keys())
 80    values = list(data.dict().values())
 81    keys_str = ",".join(keys)
 82    values_str = ("(?),"*len(keys))[:-1]
 83    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
 84    execute(query, values)
 85
 86
 87def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str):
 88    data = datas[0]
 89    keys = list(data.dict().keys())
 90    values = [[getattr(data, k) for k in keys] for data in datas]
 91    keys_str = ",".join(keys)
 92    values_str = ("(?),"*len(keys))[:-1]
 93    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
 94    executemany(query, values)
 95
 96
 97def select_many(name: str, cls: Type["AyakaDB"], where: str = ""):
 98    props = cls.props()
 99    keys = list(props.keys())
100    if where:
101        where = f"where {where}"
102
103    keys_str = ",".join(keys)
104    query = f"select {keys_str} from \"{name}\" {where}"
105    values = fetchall(query)
106
107    # 组装为字典
108    datas = [
109        cls._create_by_db_data({k: v for k, v in zip(keys, vs)})
110        for vs in values
111    ]
112    return datas
113
114
115def drop_table(name: str):
116    query = f"drop table if exists \"{name}\""
117    execute(query)
118
119
120def commit():
121    if old_journal_path.exists():
122        os.remove(old_journal_path)
123        logger.debug("已删除旧db-journal-old文件")
124    if journal_path.exists():
125        if ayaka_root_config.debug:
126            print("commit")
127        logger.debug("更新数据库")
128        db.commit()
129
130        # 提交失败,直接删除
131        if journal_path.exists():
132            logger.debug("更新数据库失败,已将journal文件更名为db-journal-old")
133            journal_path.rename(old_journal_path)
134
135
136def wrap(v):
137    if isinstance(v, str):
138        return f"\"{v}\""
139    return str(v)
def execute(query, values=None):
21def execute(query, values=None):
22    if ayaka_root_config.debug:
23        print(query)
24        if values:
25            print(values)
26    if values is None:
27        cursor = db.execute(query)
28    else:
29        cursor = db.execute(query, values)
30    cursor.close()
def executemany(query, values=None):
33def executemany(query, values=None):
34    if ayaka_root_config.debug:
35        print(query)
36        if values:
37            print(values)
38    if values is None:
39        cursor = db.executemany(query)
40    else:
41        cursor = db.executemany(query, values)
42    cursor.close()
def fetchall(query: str):
45def fetchall(query: str):
46    cursor = db.execute(query)
47    values = cursor.fetchall()
48    cursor.close()
49    if ayaka_root_config.debug:
50        print(query)
51        print(values)
52    return values
def create_table(name: str, cls: type[ayaka.depend.db.AyakaDB]):
55def create_table(name: str, cls: Type["AyakaDB"]):
56    props = cls.props()
57    args = []
58    primarys = []
59
60    for k, v in props.items():
61        extra: dict = v.get("extra", {})
62        if extra.get("primary"):
63            primarys.append(k)
64        if extra.get("json"):
65            args.append(f"{k} text")
66        else:
67            args.append(f"{k} {v['type']}")
68
69    if primarys:
70        primarys_str = ",".join(f"\"{k}\"" for k in primarys)
71        args.append(f"PRIMARY KEY({primarys_str})")
72
73    args_str = ",\n".join(args)
74    query = f"create table if not exists \"{name}\" ({args_str})"
75
76    execute(query)
def insert_or_replace(name: str, data: ayaka.depend.db.AyakaDB, action: str):
79def insert_or_replace(name: str, data: "AyakaDB", action: str):
80    keys = list(data.dict().keys())
81    values = list(data.dict().values())
82    keys_str = ",".join(keys)
83    values_str = ("(?),"*len(keys))[:-1]
84    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
85    execute(query, values)
def insert_or_replace_many(name: str, datas: list[ayaka.depend.db.AyakaDB], action: str):
88def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str):
89    data = datas[0]
90    keys = list(data.dict().keys())
91    values = [[getattr(data, k) for k in keys] for data in datas]
92    keys_str = ",".join(keys)
93    values_str = ("(?),"*len(keys))[:-1]
94    query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})"
95    executemany(query, values)
def select_many(name: str, cls: type[ayaka.depend.db.AyakaDB], where: str = ''):
 98def select_many(name: str, cls: Type["AyakaDB"], where: str = ""):
 99    props = cls.props()
100    keys = list(props.keys())
101    if where:
102        where = f"where {where}"
103
104    keys_str = ",".join(keys)
105    query = f"select {keys_str} from \"{name}\" {where}"
106    values = fetchall(query)
107
108    # 组装为字典
109    datas = [
110        cls._create_by_db_data({k: v for k, v in zip(keys, vs)})
111        for vs in values
112    ]
113    return datas
def drop_table(name: str):
116def drop_table(name: str):
117    query = f"drop table if exists \"{name}\""
118    execute(query)
def commit():
121def commit():
122    if old_journal_path.exists():
123        os.remove(old_journal_path)
124        logger.debug("已删除旧db-journal-old文件")
125    if journal_path.exists():
126        if ayaka_root_config.debug:
127            print("commit")
128        logger.debug("更新数据库")
129        db.commit()
130
131        # 提交失败,直接删除
132        if journal_path.exists():
133            logger.debug("更新数据库失败,已将journal文件更名为db-journal-old")
134            journal_path.rename(old_journal_path)
def wrap(v):
137def wrap(v):
138    if isinstance(v, str):
139        return f"\"{v}\""
140    return str(v)