feat(summary): add --weekly/-w and --short/-s flags
--weekly (-w): show the summary for the entire week containing the given day, fetching from Joplin or parsing all tables in the file --short (-s, repeatable): -s alone: one line per project label + total -s --weekly: per-day project totals with day subtotals -ss --weekly: one line per day with right-aligned date + week total Add filter_week_sections() to parser.py to split a document into (date, rows) pairs for a given ISO week. Add print_summary_short(), print_summary_weekly(), print_summary_weekly_short(), and print_summary_weekly_totals() to output.py.
This commit is contained in:
parent
6915d8d764
commit
ac1e9f959a
4 changed files with 304 additions and 23 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -11,3 +11,5 @@ wheels/
|
|||
|
||||
# Local config (contains secrets)
|
||||
timesheets.toml
|
||||
|
||||
timesheets.csv
|
||||
|
|
|
|||
|
|
@ -4,8 +4,20 @@ import sys
|
|||
from datetime import date
|
||||
|
||||
from .config import find_default_config, get_map_path, get_token, load_config
|
||||
from .output import print_summary, write_csv
|
||||
from .parser import aggregate_rows, filter_rows_by_date, parse_document
|
||||
from .output import (
|
||||
print_summary,
|
||||
print_summary_short,
|
||||
print_summary_weekly,
|
||||
print_summary_weekly_short,
|
||||
print_summary_weekly_totals,
|
||||
write_csv,
|
||||
)
|
||||
from .parser import (
|
||||
aggregate_rows,
|
||||
filter_rows_by_date,
|
||||
filter_week_sections,
|
||||
parse_document,
|
||||
)
|
||||
from .projects import load_project_map
|
||||
from .utils import AmbiguousDateError, format_date, parse_date_arg
|
||||
|
||||
|
|
@ -85,6 +97,23 @@ def build_parser() -> argparse.ArgumentParser:
|
|||
help="Print a human-readable summary of time spent per project.",
|
||||
)
|
||||
_add_shared_args(summary_parser)
|
||||
summary_parser.add_argument(
|
||||
"--weekly",
|
||||
"-w",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Show the summary for the entire week containing the given day.",
|
||||
)
|
||||
summary_parser.add_argument(
|
||||
"--short",
|
||||
"-s",
|
||||
action="count",
|
||||
default=0,
|
||||
help=(
|
||||
"Compact output. Once (-s): one line per project with total hours. "
|
||||
"Twice (-ss): one line per day (only with --weekly)."
|
||||
),
|
||||
)
|
||||
|
||||
csv_parser = subparsers.add_parser(
|
||||
"csv",
|
||||
|
|
@ -132,7 +161,7 @@ def _resolve_date(args: argparse.Namespace) -> tuple[date, str]:
|
|||
def _resolve_rows(
|
||||
args: argparse.Namespace, config: dict, target_date: date
|
||||
) -> list[dict]:
|
||||
"""Fetch and parse rows from the configured source."""
|
||||
"""Fetch and parse rows from the configured source (single day)."""
|
||||
if args.joplin:
|
||||
from .joplin import fetch_week_note
|
||||
|
||||
|
|
@ -157,6 +186,37 @@ def _resolve_rows(
|
|||
return parse_document(content.splitlines())
|
||||
|
||||
|
||||
def _resolve_week_sections(
|
||||
args: argparse.Namespace, config: dict, target_date: date
|
||||
) -> list[tuple[date, list[dict]]]:
|
||||
"""Fetch and parse rows from the configured source, grouped by day for the week."""
|
||||
from datetime import timedelta
|
||||
|
||||
week_start = target_date - timedelta(days=target_date.weekday())
|
||||
|
||||
if args.joplin:
|
||||
from .joplin import fetch_week_note
|
||||
|
||||
token = _resolve_token(args, config)
|
||||
try:
|
||||
content = fetch_week_note(token, target_date)
|
||||
except RuntimeError as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return filter_week_sections(content.splitlines(), week_start)
|
||||
else:
|
||||
if args.input == "-":
|
||||
content = sys.stdin.read()
|
||||
else:
|
||||
try:
|
||||
with open(args.input, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except FileNotFoundError:
|
||||
print(f"Error: file not found: {args.input}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return filter_week_sections(content.splitlines(), week_start)
|
||||
|
||||
|
||||
def _resolve_project_map(args: argparse.Namespace, config: dict) -> dict:
|
||||
"""Resolve the project map from CLI flag, config, or cwd default."""
|
||||
map_path = args.map or get_map_path(config)
|
||||
|
|
@ -174,24 +234,42 @@ def _resolve_project_map(args: argparse.Namespace, config: dict) -> dict:
|
|||
|
||||
def _cmd_summary(args: argparse.Namespace, config: dict) -> None:
|
||||
target_date, _ = _resolve_date(args)
|
||||
rows = _resolve_rows(args, config, target_date)
|
||||
if not rows:
|
||||
print("Warning: no timesheet rows found in input.", file=sys.stderr)
|
||||
aggregated = aggregate_rows(rows)
|
||||
project_map = _resolve_project_map(args, config)
|
||||
short = args.short # 0, 1, or 2
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w", encoding="utf-8") as f:
|
||||
# Redirect stdout temporarily so print_summary writes to the file
|
||||
old_stdout = sys.stdout
|
||||
sys.stdout = f
|
||||
try:
|
||||
print_summary(aggregated, project_map)
|
||||
finally:
|
||||
sys.stdout = old_stdout
|
||||
print(f"Written to {args.output}", file=sys.stderr)
|
||||
def _write(fn, *fn_args):
|
||||
if args.output:
|
||||
with open(args.output, "w", encoding="utf-8") as f:
|
||||
old_stdout = sys.stdout
|
||||
sys.stdout = f
|
||||
try:
|
||||
fn(*fn_args)
|
||||
finally:
|
||||
sys.stdout = old_stdout
|
||||
print(f"Written to {args.output}", file=sys.stderr)
|
||||
else:
|
||||
fn(*fn_args)
|
||||
|
||||
if args.weekly:
|
||||
day_sections = _resolve_week_sections(args, config, target_date)
|
||||
if not day_sections:
|
||||
print("Warning: no timesheet rows found for this week.", file=sys.stderr)
|
||||
return
|
||||
if short >= 2:
|
||||
_write(print_summary_weekly_totals, day_sections, project_map)
|
||||
elif short == 1:
|
||||
_write(print_summary_weekly_short, day_sections, project_map)
|
||||
else:
|
||||
_write(print_summary_weekly, day_sections, project_map)
|
||||
else:
|
||||
print_summary(aggregated, project_map)
|
||||
rows = _resolve_rows(args, config, target_date)
|
||||
if not rows:
|
||||
print("Warning: no timesheet rows found in input.", file=sys.stderr)
|
||||
aggregated = aggregate_rows(rows)
|
||||
if short >= 1:
|
||||
_write(print_summary_short, aggregated, project_map)
|
||||
else:
|
||||
_write(print_summary, aggregated, project_map)
|
||||
|
||||
|
||||
def _cmd_csv(args: argparse.Namespace, config: dict) -> None:
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import csv
|
||||
import sys
|
||||
from collections import OrderedDict
|
||||
from datetime import date
|
||||
from typing import IO
|
||||
|
||||
from .projects import resolve_project_task
|
||||
|
|
@ -29,14 +30,43 @@ def write_csv(
|
|||
)
|
||||
|
||||
|
||||
def print_summary(aggregated: list[dict], project_map: dict) -> None:
|
||||
"""Print a human-readable summary of time blocks to stdout."""
|
||||
grouped: dict[str, list[dict]] = OrderedDict()
|
||||
# ---------------------------------------------------------------------------
|
||||
# Summary helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _group_by_project_label(
|
||||
aggregated: list[dict], project_map: dict
|
||||
) -> "OrderedDict[str, list[dict]]":
|
||||
"""Group aggregated entries by resolved project/task label."""
|
||||
grouped: OrderedDict[str, list[dict]] = OrderedDict()
|
||||
for entry in aggregated:
|
||||
project, task = resolve_project_task(entry["project"], project_map)
|
||||
label = f"{project} / {task}" if task else project
|
||||
grouped.setdefault(label, []).append(entry)
|
||||
return grouped
|
||||
|
||||
|
||||
def _sum_by_project_label(
|
||||
aggregated: list[dict], project_map: dict
|
||||
) -> "OrderedDict[str, float]":
|
||||
"""Return total hours per project label."""
|
||||
totals: OrderedDict[str, float] = OrderedDict()
|
||||
for entry in aggregated:
|
||||
project, task = resolve_project_task(entry["project"], project_map)
|
||||
label = f"{project} / {task}" if task else project
|
||||
totals[label] = totals.get(label, 0.0) + entry["quantity"]
|
||||
return totals
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Full detail summary (default)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def print_summary(aggregated: list[dict], project_map: dict) -> None:
|
||||
"""Print a full human-readable summary with per-entry descriptions."""
|
||||
grouped = _group_by_project_label(aggregated, project_map)
|
||||
total_all = sum(e["quantity"] for e in aggregated)
|
||||
|
||||
all_descs = [e["description"] for e in aggregated]
|
||||
|
|
@ -48,7 +78,131 @@ def print_summary(aggregated: list[dict], project_map: dict) -> None:
|
|||
print(f"\n {label} ({decimal_to_hhmm(project_total)})")
|
||||
print(" " + separator)
|
||||
for entry in entries:
|
||||
print(f" {entry['description']:<{desc_width}} {decimal_to_hhmm(entry['quantity'])}")
|
||||
print(
|
||||
f" {entry['description']:<{desc_width}} {decimal_to_hhmm(entry['quantity'])}"
|
||||
)
|
||||
print(" " + separator)
|
||||
|
||||
print(f"\n {'TOTAL':<{desc_width + 2}} {decimal_to_hhmm(total_all)}\n")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Short summary (-s): one line per project label
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def print_summary_short(aggregated: list[dict], project_map: dict) -> None:
|
||||
"""Print one line per project label with total hours."""
|
||||
totals = _sum_by_project_label(aggregated, project_map)
|
||||
total_all = sum(totals.values())
|
||||
|
||||
label_width = max((len(l) for l in totals), default=20)
|
||||
label_width = max(label_width, 20)
|
||||
|
||||
for label, hours in totals.items():
|
||||
print(f" {label:<{label_width}} {decimal_to_hhmm(hours)}")
|
||||
|
||||
print(f" {'TOTAL':<{label_width}} {decimal_to_hhmm(total_all)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Weekly summary (--weekly)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def print_summary_weekly(
|
||||
day_sections: list[tuple[date, list[dict]]], project_map: dict
|
||||
) -> None:
|
||||
"""Print a full summary for each day of the week."""
|
||||
week_total = sum(e["duration_hours"] for _, rows in day_sections for e in rows)
|
||||
for day, rows in day_sections:
|
||||
aggregated = _aggregate(rows)
|
||||
day_total = sum(e["quantity"] for e in aggregated)
|
||||
print(f"\n{'═' * 60}")
|
||||
print(f" {day.strftime('%A, %Y-%m-%d')} ({decimal_to_hhmm(day_total)})")
|
||||
print(f"{'═' * 60}")
|
||||
print_summary(aggregated, project_map)
|
||||
|
||||
print(f"\n {'WEEK TOTAL':<42} {decimal_to_hhmm(week_total)}\n")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Weekly short (-s --weekly): per-day project totals
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def print_summary_weekly_short(
|
||||
day_sections: list[tuple[date, list[dict]]], project_map: dict
|
||||
) -> None:
|
||||
"""Print one line per project per day, plus day and week totals."""
|
||||
week_total = 0.0
|
||||
all_labels: list[str] = []
|
||||
day_data: list[tuple[date, OrderedDict[str, float]]] = []
|
||||
|
||||
for day, rows in day_sections:
|
||||
aggregated = _aggregate(rows)
|
||||
totals = _sum_by_project_label(aggregated, project_map)
|
||||
day_data.append((day, totals))
|
||||
all_labels.extend(totals.keys())
|
||||
|
||||
label_width = max((len(l) for l in all_labels), default=20)
|
||||
label_width = max(label_width, 20)
|
||||
|
||||
for day, totals in day_data:
|
||||
day_total = sum(totals.values())
|
||||
week_total += day_total
|
||||
print(f"\n {day.strftime('%A, %Y-%m-%d')}")
|
||||
for label, hours in totals.items():
|
||||
print(f" {label:<{label_width}} {decimal_to_hhmm(hours)}")
|
||||
print(f" {'day total':<{label_width}} {decimal_to_hhmm(day_total)}")
|
||||
|
||||
print(f"\n {'WEEK TOTAL':<{label_width + 2}} {decimal_to_hhmm(week_total)}\n")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Weekly day-total-only (-ss --weekly): one line per day
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def print_summary_weekly_totals(
|
||||
day_sections: list[tuple[date, list[dict]]], project_map: dict
|
||||
) -> None:
|
||||
"""Print only the total hours per day plus a week total."""
|
||||
week_total = 0.0
|
||||
rows_out = []
|
||||
|
||||
for day, rows in day_sections:
|
||||
day_total = sum(e["duration_hours"] for e in rows)
|
||||
week_total += day_total
|
||||
rows_out.append((day.strftime("%A"), day.strftime("%Y-%m-%d"), day_total))
|
||||
|
||||
day_width = max((len(r[0]) for r in rows_out), default=9)
|
||||
for day_name, date_str, hours in rows_out:
|
||||
print(f" {day_name:<{day_width}}, {date_str} {decimal_to_hhmm(hours)}")
|
||||
print(f" {'WEEK TOTAL':<{day_width + 13}} {decimal_to_hhmm(week_total)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _aggregate(rows: list[dict]) -> list[dict]:
|
||||
"""Aggregate raw rows — thin wrapper to avoid importing parser in output."""
|
||||
from collections import defaultdict
|
||||
|
||||
from .parser import build_description
|
||||
|
||||
key_order: list[tuple] = []
|
||||
totals: dict[tuple, float] = defaultdict(float)
|
||||
for row in rows:
|
||||
description = build_description(row["story"], row["note"])
|
||||
key = (row["project"].strip(), description)
|
||||
if key not in totals:
|
||||
key_order.append(key)
|
||||
totals[key] += row["duration_hours"]
|
||||
|
||||
return [
|
||||
{"project": p, "description": d, "quantity": totals[(p, d)]}
|
||||
for p, d in key_order
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import re
|
||||
from collections import defaultdict
|
||||
from datetime import date
|
||||
from datetime import date, timedelta
|
||||
|
||||
from .utils import duration_from_start_end, parse_duration, strip_markdown_link
|
||||
|
||||
|
|
@ -176,6 +176,53 @@ def filter_rows_by_date(lines: list[str], target: date) -> list[dict]:
|
|||
return rows
|
||||
|
||||
|
||||
def filter_week_sections(
|
||||
lines: list[str], week_start: date
|
||||
) -> list[tuple[date, list[dict]]]:
|
||||
"""
|
||||
Parse a document and return rows grouped by day for the ISO week containing
|
||||
week_start. Returns a list of (day_date, aggregated_rows) pairs, one per
|
||||
day that has data, in chronological order.
|
||||
"""
|
||||
week_dates = {week_start + timedelta(days=i) for i in range(7)}
|
||||
week_strs = {d.strftime("%Y-%m-%d"): d for d in week_dates}
|
||||
|
||||
# Split into sections keyed by date string
|
||||
sections: dict[str, list[str]] = {}
|
||||
section_order: list[str] = []
|
||||
current_date_str: str | None = None
|
||||
current_lines: list[str] = []
|
||||
|
||||
for line in lines:
|
||||
m = _DATE_HEADING_RE.match(line)
|
||||
if m:
|
||||
if current_lines and current_date_str is not None:
|
||||
if current_date_str not in sections:
|
||||
sections[current_date_str] = []
|
||||
section_order.append(current_date_str)
|
||||
sections[current_date_str].extend(current_lines)
|
||||
current_date_str = m.group(1)
|
||||
current_lines = []
|
||||
else:
|
||||
current_lines.append(line)
|
||||
|
||||
if current_lines and current_date_str is not None:
|
||||
if current_date_str not in sections:
|
||||
sections[current_date_str] = []
|
||||
section_order.append(current_date_str)
|
||||
sections[current_date_str].extend(current_lines)
|
||||
|
||||
result = []
|
||||
for date_str in section_order:
|
||||
if date_str in week_strs:
|
||||
rows = parse_document(sections[date_str])
|
||||
if rows:
|
||||
result.append((week_strs[date_str], rows))
|
||||
|
||||
result.sort(key=lambda x: x[0])
|
||||
return result
|
||||
|
||||
|
||||
def build_description(story: str, note: str) -> str:
|
||||
"""Combine story and note into a single description string."""
|
||||
parts = [p.strip() for p in [story, note] if p.strip()]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue