"""
SQL help functions.
"""
from __future__ import annotations
from collections.abc import Collection
from typing import overload, Any, TypeVar, TYPE_CHECKING
from . import expr
if TYPE_CHECKING:
from .stat import SqlStat, SqlSelectStat
__all__ = [
'TRUE', 'FALSE', 'NULL', 'ROWID',
'literal', 'wrap', 'alias', 'cast', 'case', 'exists', 'asc', 'desc', 'nulls_first', 'nulls_last', 'concat',
'and_', 'or_',
'like', 'not_like', 'glob', 'contains', 'not_contains', 'between', 'not_between', 'is_null', 'is_not_null',
'excluded', 'with_common_table', 'fields'
]
T = TypeVar('T')
TRUE = expr.SqlLiteral.SQL_TRUE
FALSE = expr.SqlLiteral.SQL_FALSE
NULL = expr.SqlLiteral.SQL_NULL
ROWID = expr.SqlLiteral('rowid')
[docs]
def literal(x: str) -> expr.SqlExpr:
return expr.SqlLiteral(x)
[docs]
def wrap(x) -> expr.SqlExpr:
"""wrap *x* as an SQL expression."""
return expr.wrap(x)
@overload
def alias(x: str, name: str) -> Any:
pass
@overload
def alias(x: type[T], name: str) -> type[T]:
pass
@overload
def alias(x: SqlStat[T], name: str) -> type[T]:
pass
[docs]
def alias(x, name: str):
"""
.. code-block::SQL
:x AS :name
"""
if isinstance(x, type):
return expr.SqlAlias(x, name)
return expr.SqlAlias(expr.wrap(x), name)
[docs]
def cast(t: type[T], x) -> T:
"""
.. code-block::SQL
CAST(:x AS :t)
"""
return expr.SqlCastOper(t.__name__, expr.wrap(x))
[docs]
def case(x=None) -> expr.SqlCaseExpr:
"""
https://www.sqlite.org/lang_expr.html#the_case_expression
.. code-block::SQL
CASE :x
...
:param x:
:return:
"""
return expr.SqlCaseExpr(None if x is None else expr.wrap(x))
@overload
def exists(x: SqlStat) -> expr.SqlExpr:
pass
@overload
def exists(x: type, *where: bool | expr.SqlExpr) -> expr.SqlExpr:
pass
[docs]
def exists(x, *where: bool | expr.SqlExpr) -> expr.SqlExpr:
"""
https://www.sqlite.org/lang_expr.html#the_exists_operator
>>> exists(A, A.a == 1) # equivalent below
>>> exists(select_from(1, from_table=A).where(A.a == 1))
:param x:
:param where:
:return:
"""
if isinstance(x, type):
from .stat_start import select_from
x = select_from(1, from_table=x).where(*where)
return expr.SqlExistsOper('EXISTS', x)
[docs]
def asc(x) -> expr.SqlExpr:
"""
ascending ordering Used by **ORDER BY**.
.. code-block::SQL
:x ASC
"""
return expr.SqlOrderOper('ASC', expr.wrap(x))
[docs]
def desc(x) -> expr.SqlExpr:
"""
descending ordering used by **ORDER BY**.
.. code-block::SQL
:x DESC
"""
return expr.SqlOrderOper('DESC', expr.wrap(x))
[docs]
def nulls_first(x) -> expr.SqlExpr:
"""
order null first used by **ORDER BY**.
.. code-block::SQL
:x NULLS FIRST
"""
return expr.SqlOrderOper('NULLS FIRST', expr.wrap(x))
[docs]
def nulls_last(x) -> expr.SqlExpr:
"""
order null last used by **ORDER BY**.
.. code-block::SQL
:x NULLS LAST
"""
return expr.SqlOrderOper('NULLS LAST', expr.wrap(x))
[docs]
def concat(*x) -> expr.SqlExpr:
"""
concatenate strings.
.. code-block::SQL
:x[0] || :x[1] || ...
"""
return expr.SqlConcatOper(expr.wrap_seq(*x))
[docs]
def and_(*other) -> expr.SqlExpr:
"""
"AND" SQL expressions.
.. code-block::SQL
other[0] AND other[1] AND ...
"""
if len(other) == 0:
raise RuntimeError()
elif len(other) == 1:
return expr.wrap(other[0])
return expr.SqlVarArgOper('AND', expr.wrap_seq(*other))
[docs]
def or_(*other) -> expr.SqlExpr:
"""
"OR" SQL expressions.
.. code-block::SQL
other[0] OR other[1] OR ...
"""
if len(other) == 0:
raise RuntimeError()
elif len(other) == 1:
return expr.wrap(other[0])
return expr.SqlVarArgOper('OR', expr.wrap_seq(*other))
[docs]
def like(x, s) -> expr.SqlCompareOper:
"""
.. code-block::SQL
:x LIKE :s
:param x:
:param s:
:return:
"""
return expr.wrap(x).like(s)
[docs]
def not_like(x, s) -> expr.SqlCompareOper:
"""
.. code-block::SQL
:x NOT LIKE :s
:param x:
:param s:
:return:
"""
return expr.wrap(x).not_like(s)
[docs]
def glob(x, s) -> expr.SqlCompareOper:
"""
.. code-block::SQL
:x GLOB :s
:param x:
:param s:
:return:
"""
return expr.wrap(x).glob(s)
[docs]
def contains(x, coll) -> expr.SqlCompareOper:
"""
.. code-block::SQL
:x IN :COLL
:param x:
:param coll: a sequence or a select statement.
:return:
"""
from .stat import SqlStat
if not isinstance(coll, (tuple, list, SqlStat)):
coll = [coll]
return expr.wrap(x).contains(coll)
[docs]
def not_contains(x, coll) -> expr.SqlCompareOper:
"""
.. code-block::SQL
:x NOT IN :COLL
:param x:
:param coll: a sequence or a select statement.
:return:
"""
from .stat import SqlStat
if not isinstance(coll, (tuple, list, SqlStat)):
coll = [coll]
return expr.wrap(x).not_contains(coll)
[docs]
def between(x, *value) -> expr.SqlCompareOper:
"""
https://www.sqlite.org/lang_expr.html#the_between_operator
.. code-block::SQL
:x BETWEEN :value[0] AND :value[1]
:param x:
:param value: two value, or a range, a slice.
:return:
"""
return expr.wrap(x).between(*value)
[docs]
def not_between(x, *value) -> expr.SqlCompareOper:
"""
https://www.sqlite.org/lang_expr.html#the_between_operator
.. code-block::SQL
:x NOT BETWEEN :value[0] AND :value[1]
:param x:
:param value: two value, or a range, a slice.
:return:
"""
return expr.wrap(x).not_between(*value)
[docs]
def is_null(x) -> expr.SqlCompareOper:
"""
.. code-block::SQL
:x IS NULL
"""
return expr.wrap(x).is_null()
[docs]
def is_not_null(x) -> expr.SqlCompareOper:
"""
.. code-block::SQL
:x IS NOT NULL
"""
return expr.wrap(x).is_not_null()
[docs]
def excluded(t: type[T]) -> type[T]:
"""https://www.sqlite.org/lang_upsert.html"""
return expr.SqlAlias(t, 'excluded')
@overload
def with_common_table(name: str, select: SqlSelectStat) -> expr.SqlCteExpr:
pass
@overload
def with_common_table(name: type[T], select: SqlSelectStat) -> type[T]:
pass
[docs]
def with_common_table(name: str, select: SqlSelectStat):
if isinstance(name, type):
name = name.__name__
return expr.SqlCteExpr(name, select)
# noinspection PyShadowingNames
[docs]
def fields(table: type[T], *,
primary: bool = None,
has_default: bool = None,
excluded: Collection[str] = None) -> tuple[expr.SqlField, ...]:
from .table import table_fields
fields = table_fields(table)
if primary is not None:
fields = [it for it in fields if it.is_primary == primary]
if has_default is not None:
fields = [it for it in fields if it.has_default == has_default]
if excluded is not None:
fields = [it for it in fields if it.name not in excluded]
return tuple([it(table) for it in fields])