"""
Subcommand to create a new receipt YAML file and import it.
"""
from datetime import date, datetime, time, timedelta
from pathlib import Path
from typing import ClassVar, cast, final
from sqlalchemy import Row, select
from sqlalchemy.orm import Session
from sqlalchemy.sql.functions import max as max_, min as min_
from typing_extensions import override
from ...database import Database
from ...matcher.product import ProductMatcher
from ...models.receipt import Discount, ProductItem, Receipt
from ...models.shop import Shop
from ..base import Base, SubparserArguments, SubparserKeywords
from .input import InputSource, Prompt
from .step import (
Discounts,
Edit,
Help,
Menu,
ProductMeta,
Products,
Quit,
Read,
ReturnToMenu,
Step,
View,
Write,
)
class _DateRow(Row[tuple[date, date]]):
"""
Result row of query of date ranges of receipts.
"""
min: date | None
max: date
@override
def __len__(self) -> int: # pragma: no cover
return 2
[docs]
@final
@Base.register("new")
class New(Base):
"""
Create a YAML file for a receipt and import it to the database.
"""
subparser_keywords: ClassVar[SubparserKeywords] = {
"help": "Create receipt file and import",
"description": (
"Interactively fill in a YAML file for a receipt and "
"import it to the database."
),
}
subparser_arguments: ClassVar[SubparserArguments] = [
(
("-c", "--confirm"),
{
"action": "store_true",
"default": False,
"help": "Confirm before updating database files or exiting",
},
),
(
("-m", "--more"),
{
"action": "store_true",
"default": False,
"help": "Allow more discounts and metadata than product items",
},
),
]
def __init__(self) -> None:
super().__init__()
self.confirm: bool = False
self.more: bool = False
def _get_menu_step(self, menu: Menu, input_source: InputSource) -> Step:
choice: str | None = None
while choice not in menu:
choice = input_source.get_input(
"Menu (help or ? for usage)", str, options="menu"
)
if choice != "" and choice not in menu:
# Autocomplete
choice = input_source.get_completion(choice, 0)
return menu[choice]
def _show_menu_step(
self, menu: Menu, step: Step, reason: ReturnToMenu
) -> Step:
if reason.msg:
self.logger.warning("%s", reason.msg)
if step.final:
step = menu["view"]
_ = step.run()
return step
def _confirm_final(self, step: Step, input_source: InputSource) -> None:
if self.confirm and step.final:
prompt = f"Confirm that you want to {step.description.lower()} (y)"
if input_source.get_input(prompt, str) != "y":
raise ReturnToMenu("Confirmation canceled")
def _get_path(self, receipt_date: datetime, shop: str) -> Path:
data_path = Path(self.settings.get("data", "path"))
data_format = self.settings.get("data", "format")
filename = data_format.format(date=receipt_date, shop=shop)
return Path(data_path) / filename
def _load_date_suggestions(
self, session: Session, input_source: InputSource
) -> None:
dates = cast(
_DateRow | None,
session.execute(
select(
min_(Receipt.date).label("min"),
max_(Receipt.date).label("max"),
)
).first(),
)
if dates is None or dates.min is None:
return
today = date.today()
input_source.update_suggestions(
{
"days": [
str(dates.max + timedelta(days=day))
for day in range(max(0, (today - dates.max).days) + 1)
],
}
)
def _load_suggestions(
self, session: Session, input_source: InputSource
) -> None:
self._load_date_suggestions(session, input_source)
input_source.update_suggestions(
{
"shops": list(
session.scalars(select(Shop.key).order_by(Shop.key))
),
}
)
def _load_shop_suggestions(
self, session: Session, input_source: InputSource, shop: str
) -> None:
input_source.update_suggestions(
{
"products": list(
session.scalars(
select(ProductItem.label)
.distinct()
.join(Receipt)
.filter(Receipt.shop == shop)
.order_by(ProductItem.label)
)
),
"discounts": list(
session.scalars(
select(Discount.label)
.distinct()
.join(Receipt)
.filter(Receipt.shop == shop)
.order_by(Discount.label)
)
),
}
)
[docs]
@override
def run(self) -> None:
input_source: InputSource = Prompt()
matcher = ProductMatcher()
matcher.discounts = False
with Database() as session:
self._load_suggestions(session, input_source)
receipt_date = input_source.get_date(
datetime.combine(date.today(), time.min)
)
shop = input_source.get_input("Shop", str, options="shops")
self._load_shop_suggestions(session, input_source, shop)
path = self._get_path(receipt_date, shop)
receipt = Receipt(
filename=path.name,
updated=datetime.now(),
date=receipt_date.date(),
shop=shop,
)
write = Write(receipt, input_source, matcher=matcher)
write.path = path
usage = Help(receipt, input_source)
menu: Menu = {
"read": Read(receipt, input_source, matcher=matcher),
"products": Products(receipt, input_source, matcher=matcher),
"discounts": Discounts(
receipt, input_source, matcher=matcher, more=self.more
),
"meta": ProductMeta(
receipt, input_source, matcher=matcher, more=self.more
),
"view": View(receipt, input_source),
"write": write,
"edit": Edit(
receipt,
input_source,
matcher=matcher,
editor=self.settings.get("data", "editor"),
),
"quit": Quit(receipt, input_source),
"help": usage,
"?": usage,
}
usage.menu = menu
step = self._run_sequential(menu, input_source)
if step.final:
return
# Sequential run did not lead to a final step, so ask for menu choice
input_source.update_suggestions({"menu": list(menu.keys())})
while not step.final:
step = self._get_menu_step(menu, input_source)
try:
self._confirm_final(step, input_source)
result = step.run()
# Edit might change receipt metadata
if result.get("receipt_path", False):
if receipt.date != receipt_date.date():
receipt_date = datetime.combine(receipt.date, time.min)
write.path = self._get_path(receipt_date, receipt.shop)
receipt.filename = write.path.name
except ReturnToMenu as reason:
step = self._show_menu_step(menu, step, reason)
def _run_sequential(self, menu: Menu, input_source: InputSource) -> Step:
if not menu: # pragma: no cover
raise ValueError("Menu must have defined steps")
steps = list(menu.values())
step = steps[0]
for step in steps: # pragma: no branch
try:
self._confirm_final(step, input_source)
_ = step.run()
if step.final:
return step
except ReturnToMenu as reason:
step = self._show_menu_step(menu, step, reason)
break
return step