feat(status): add status subcommand with day and week metrics

- Add status.py with compute_day_status() and compute_week_status()
- Add projected_hours_for_day(): for each entry, use its duration if
  closed, or the next entry's start time as the close time if open
- Open entries (start present, end absent) are preserved by the parser
  instead of being skipped; aggregate_rows() skips them for summaries
- Expected end is computed by filling remaining hours into available
  time slots from the open entry's start; clamped to latest_end if a
  pre-logged entry ends later, with a note explaining why
- Projected week total sums projected_hours_for_day() across all days
- Add status subcommand to cli.py with shared source/day arguments
- Add [work] daily_hours / weekly_hours config keys (default 8 / 40)
- Add timesheets.example.toml [work] section
- Add tests for projected_hours_for_day, compute_day_status,
  compute_week_status and all DayStatus/WeekStatus fields
This commit is contained in:
Jef Roosens 2026-05-22 13:35:22 +02:00
parent d5dbe8791b
commit f372a691d4
Signed by: Jef Roosens
GPG key ID: 119385BCAA005C21
8 changed files with 799 additions and 7 deletions

View file

@ -3,8 +3,16 @@ import os
import sys
from datetime import date
from .config import find_default_config, get_map_path, get_token, load_config
from .config import (
find_default_config,
get_daily_target,
get_map_path,
get_token,
get_weekly_target,
load_config,
)
from .output import (
print_status,
print_summary,
print_summary_short,
print_summary_weekly,
@ -121,6 +129,12 @@ def build_parser() -> argparse.ArgumentParser:
)
_add_shared_args(csv_parser)
status_parser = subparsers.add_parser(
"status",
help="Show how many hours remain today and this week.",
)
_add_shared_args(status_parser)
return parser
@ -272,6 +286,49 @@ def _cmd_summary(args: argparse.Namespace, config: dict) -> None:
_write(print_summary, aggregated, project_map)
def _cmd_status(args: argparse.Namespace, config: dict) -> None:
from datetime import timedelta
from .status import compute_day_status, compute_week_status
target_date, _ = _resolve_date(args)
daily_target = get_daily_target(config)
weekly_target = get_weekly_target(config)
week_start = target_date - timedelta(days=target_date.weekday())
# Fetch the full week document once for both day and week status
if args.joplin:
from .joplin import fetch_week_note
token = _resolve_token(args, config)
try:
content = fetch_week_note(token, target_date)
except RuntimeError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
lines = content.splitlines()
else:
if args.input == "-":
lines = sys.stdin.read().splitlines()
else:
try:
with open(args.input, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
except FileNotFoundError:
print(f"Error: file not found: {args.input}", file=sys.stderr)
sys.exit(1)
from .parser import filter_rows_by_date, filter_week_sections
day_rows = filter_rows_by_date(lines, target_date)
day_sections = filter_week_sections(lines, week_start)
day_status = compute_day_status(day_rows, target_date, daily_target)
week_status = compute_week_status(day_sections, target_date, weekly_target)
print_status(day_status, week_status)
def _cmd_csv(args: argparse.Namespace, config: dict) -> None:
target_date, date_str = _resolve_date(args)
rows = _resolve_rows(args, config, target_date)
@ -303,3 +360,5 @@ def main() -> None:
_cmd_summary(args, config)
elif args.command == "csv":
_cmd_csv(args, config)
elif args.command == "status":
_cmd_status(args, config)

View file

@ -52,3 +52,13 @@ def get_token(config: dict) -> str | None:
def get_map_path(config: dict) -> str | None:
"""Extract projects.map from a loaded config dict, or None."""
return config.get("projects", {}).get("map")
def get_daily_target(config: dict) -> float:
"""Extract work.daily_hours from config, defaulting to 8.0."""
return float(config.get("work", {}).get("daily_hours", 8.0))
def get_weekly_target(config: dict) -> float:
"""Extract work.weekly_hours from config, defaulting to 40.0."""
return float(config.get("work", {}).get("weekly_hours", 40.0))

View file

@ -2,7 +2,10 @@ import csv
import sys
from collections import OrderedDict
from datetime import date
from typing import IO
from typing import IO, TYPE_CHECKING
if TYPE_CHECKING:
from .status import DayStatus, WeekStatus
from .projects import resolve_project_task
from .utils import decimal_to_hhmm
@ -206,3 +209,87 @@ def _aggregate(rows: list[dict]) -> list[dict]:
{"project": p, "description": d, "quantity": totals[(p, d)]}
for p, d in key_order
]
# ---------------------------------------------------------------------------
# Status output
# ---------------------------------------------------------------------------
def print_status(day: "DayStatus", week: "WeekStatus") -> None:
"""Print the status dashboard to stdout."""
hh = decimal_to_hhmm
W = 42 # label column width
def row(label: str, value: str, note: str = "") -> None:
suffix = f" ({note})" if note else ""
print(f" {label:<{W - 2}} {value}{suffix}")
def sep(char: str = "-", width: int = W + 12) -> None:
print(" " + char * width)
# ---- Today ----
print(f"\n Today {day.target_date.strftime('%A, %Y-%m-%d')}")
sep()
start_str = day.day_start.strftime("%H:%M") if day.day_start else "?"
row("Started", start_str)
logged_note = ""
if day.has_open_entry:
open_str = (
day.open_entry_start.strftime("%H:%M") if day.open_entry_start else "?"
)
logged_note = f"open entry since {open_str}"
if day.future_hours > 0:
logged_note += f", {hh(day.future_hours)} pre-logged ahead"
elif day.future_hours > 0:
logged_note = f"{hh(day.future_hours)} pre-logged ahead"
row("Logged", f"{hh(day.logged_hours)} of {hh(day.daily_target)}", logged_note)
row("Remaining today", hh(day.remaining_hours))
if day.expected_end is not None:
if day.remaining_hours == 0:
note = "done"
elif day.latest_end_exceeds_expected:
note = "pre-logged entry ends later than needed"
else:
note = ""
row("Expected end", day.expected_end.strftime("%H:%M"), note)
else:
row("Expected end", "unknown")
# ---- Week ----
print(
f"\n Week W{day.target_date.isocalendar()[1]:02d} "
f"({week.week_start.strftime('%d/%m')} \u2013 {week.week_end.strftime('%d/%m')})"
)
sep()
row(
"Logged this week",
f"{hh(week.total_logged)} of {hh(week.weekly_target)}",
)
row("Remaining this week", hh(week.remaining_hours))
if week.daily_average is not None:
row(
"Daily average",
hh(week.daily_average),
f"{week.days_worked} day{'s' if week.days_worked != 1 else ''} worked",
)
if week.projected_total is not None:
deficit = week.weekly_target - week.projected_total
if deficit > 0.05:
note = f"{hh(deficit)} short"
elif deficit < -0.05:
note = f"{hh(abs(deficit))} over"
else:
note = "exact"
row("Projected week total", hh(week.projected_total), note)
if week.on_track is not None:
row("On track", "yes \u2713" if week.on_track else "no \u2717")
print()

View file

@ -114,7 +114,21 @@ def parse_table(lines: list[str], has_duration_col: bool = True) -> list[dict]:
continue
if not re.match(r"^\d+:\d{2}$", start):
continue
# Open entry: has a start time but no end time yet
if not re.match(r"^\d+:\d{2}$", end):
if duration is None:
# No end and no duration — preserve as an open entry
rows.append(
{
"start": start,
"end": None,
"duration_hours": None,
"project": project,
"story": story,
"note": note,
}
)
continue
if duration is not None:
@ -249,6 +263,8 @@ def aggregate_rows(rows: list[dict]) -> list[dict]:
totals: dict[tuple, float] = defaultdict(float)
for row in rows:
if row["duration_hours"] is None:
continue # skip open entries
description = build_description(row["story"], row["note"])
key = (row["project"].strip(), description)
if key not in totals:

320
src/timesheets/status.py Normal file
View file

@ -0,0 +1,320 @@
"""
Status calculations for the timesheets status command.
All time arithmetic is done in decimal hours. The status is computed
purely from the timesheet data no wall-clock dependency.
Open entries (start time present, end time absent) act as the anchor
for expected-end calculations:
- If an open entry is the LAST entry of the day:
remaining = daily_target - logged_so_far
expected_end = open_start + (logged_before_open + remaining)
- If entries FOLLOW the open entry:
the gap between open_start and the next entry's start is treated
as fillable time; expected_end is computed from the last known
end time + any remaining hours after all closed entries.
- No open entry:
expected_end = last_end + remaining
"""
import re
from dataclasses import dataclass
from datetime import date, time, timedelta
def _parse_hhmm(s: str) -> time | None:
"""Parse a HH:MM string into a time object, returning None on failure."""
if s is None:
return None
m = re.match(r"^(\d{1,2}):(\d{2})$", s.strip())
if not m:
return None
try:
return time(int(m.group(1)), int(m.group(2)))
except ValueError:
return None
def _time_to_hours(t: time) -> float:
return t.hour + t.minute / 60.0
def _hours_to_time(hours: float) -> time:
"""Convert decimal hours to a time object (clamps to 23:59)."""
total_minutes = round(hours * 60)
h, m = divmod(total_minutes, 60)
h = min(h, 23)
m = min(m, 59)
return time(h, m)
@dataclass
class DayStatus:
target_date: date
# Hours
logged_hours: float # sum of all closed entries
future_hours: float # sum of closed entries that start after the open entry
daily_target: float
# Open entry
has_open_entry: bool # True if there is an entry with no end time
open_entry_start: time | None
# Times
day_start: time | None # start time of first entry
latest_end: (
time | None
) # end time of last closed entry (including future closed ones)
# Derived
remaining_hours: float # daily_target - logged_hours (floored to 0)
expected_end: time | None
latest_end_exceeds_expected: (
bool # True when a pre-logged entry ends later than expected_end
)
hours_to_log_today: float # logged_hours + (latest_end - open_start) if open entry, else logged_hours
@dataclass
class WeekStatus:
week_start: date
week_end: date
weekly_target: float
logged_hours_by_day: dict[date, float]
total_logged: float
remaining_hours: float
days_worked: int
daily_average: float | None
projected_total: float | None
on_track: bool | None
def compute_day_status(
rows: list[dict],
target_date: date,
daily_target: float = 8.0,
) -> DayStatus:
"""
Compute per-day status metrics from raw (unaggregated) rows.
Rows may include open entries (duration_hours is None, end is None).
The wall clock is not used; open entries serve as the work-in-progress
anchor instead.
"""
if not rows:
return DayStatus(
target_date=target_date,
logged_hours=0.0,
future_hours=0.0,
daily_target=daily_target,
has_open_entry=False,
open_entry_start=None,
day_start=None,
latest_end=None,
remaining_hours=daily_target,
expected_end=None,
latest_end_exceeds_expected=False,
hours_to_log_today=0.0,
)
# Sort rows by start time so we can reason about ordering
def _sort_key(r: dict) -> float:
t = _parse_hhmm(r["start"])
return _time_to_hours(t) if t else 0.0
sorted_rows = sorted(rows, key=_sort_key)
# Find the open entry (if any) — the one with no end time
open_idx: int | None = None
for i, row in enumerate(sorted_rows):
if row["duration_hours"] is None:
open_idx = i
break # only one open entry expected
# Closed entries only
closed_rows = [r for r in sorted_rows if r["duration_hours"] is not None]
logged_hours = sum(r["duration_hours"] for r in closed_rows)
day_start: time | None = None
latest_end: time | None = None
latest_end_hours = 0.0
for row in closed_rows:
start_t = _parse_hhmm(row["start"])
end_t = _parse_hhmm(row["end"])
if start_t is not None:
if day_start is None or start_t < day_start:
day_start = start_t
if end_t is not None:
end_h = _time_to_hours(end_t)
if end_h > latest_end_hours:
latest_end_hours = end_h
latest_end = end_t
# Entries after the open entry are "future closed" (pre-logged)
future_hours = 0.0
if open_idx is not None:
for row in sorted_rows[open_idx + 1 :]:
if row["duration_hours"] is not None:
future_hours += row["duration_hours"]
remaining_hours = max(0.0, daily_target - logged_hours)
has_open_entry = open_idx is not None
open_entry_start = (
_parse_hhmm(sorted_rows[open_idx]["start"]) if has_open_entry else None
)
# --- Expected end calculation ---
expected_end: time | None = None
if has_open_entry:
open_start_hours = _time_to_hours(open_entry_start) if open_entry_start else 0.0
entries_after_open = [
r for r in sorted_rows[open_idx + 1 :] if r["duration_hours"] is not None
]
if not entries_after_open:
# Open entry is last: work continuously from open_start
expected_end = _hours_to_time(open_start_hours + remaining_hours)
elif remaining_hours == 0:
# Already done — expected end is the last known entry's end
expected_end = latest_end
else:
# Fill remaining_hours into available slots starting from open_start.
# Slots: [open_start → next_entry_start] then each subsequent entry.
# Any remaining hours after all known slots extend beyond latest_end.
budget = remaining_hours
cursor = open_start_hours
for i, entry in enumerate(entries_after_open):
entry_start_h = _time_to_hours(_parse_hhmm(entry["start"]) or time(0))
entry_end_h = _time_to_hours(_parse_hhmm(entry["end"]) or time(0))
# Gap before this entry is available filling time
gap = max(0.0, entry_start_h - cursor)
if budget <= gap:
expected_end = _hours_to_time(cursor + budget)
budget = 0.0
break
budget -= gap
# The entry itself is already logged — advance cursor past it
cursor = entry_end_h
if budget > 0:
# Still hours left after all known entries — extend beyond latest end
expected_end = _hours_to_time(cursor + budget)
elif expected_end is None:
expected_end = latest_end
else:
# No open entry: purely data-driven
if latest_end is not None:
if remaining_hours > 0:
expected_end = _hours_to_time(latest_end_hours + remaining_hours)
else:
expected_end = latest_end
# If any pre-logged entry ends later than the computed expected_end, use
# that as the effective end and flag it so the caller can explain why.
latest_end_exceeds_expected = False
if (
expected_end is not None
and latest_end is not None
and latest_end > expected_end
):
latest_end_exceeds_expected = True
expected_end = latest_end
elif expected_end is None and latest_end is not None:
expected_end = latest_end
# hours_to_log_today is unused in DayStatus now; kept at 0 for compatibility
hours_to_log_today = 0.0
return DayStatus(
target_date=target_date,
logged_hours=logged_hours,
future_hours=future_hours,
daily_target=daily_target,
has_open_entry=has_open_entry,
open_entry_start=open_entry_start,
day_start=day_start,
latest_end=latest_end,
remaining_hours=remaining_hours,
expected_end=expected_end,
latest_end_exceeds_expected=latest_end_exceeds_expected,
hours_to_log_today=hours_to_log_today,
)
def projected_hours_for_day(rows: list[dict]) -> float:
"""
Calculate the total hours that will be logged for a day.
For each entry:
- Closed (duration_hours is not None): add its duration.
- Open (duration_hours is None): use the next entry's start time as the
close time. If there is no next entry the open entry contributes 0.
"""
sorted_rows = sorted(
rows,
key=lambda r: _time_to_hours(_parse_hhmm(r["start"]) or time(0)),
)
total = 0.0
for i, row in enumerate(sorted_rows):
if row["duration_hours"] is not None:
total += row["duration_hours"]
else:
# Open entry: close at the next entry's start, if one exists
if i + 1 < len(sorted_rows):
open_start = _parse_hhmm(row["start"])
next_start = _parse_hhmm(sorted_rows[i + 1]["start"])
if open_start is not None and next_start is not None:
total += max(
0.0, _time_to_hours(next_start) - _time_to_hours(open_start)
)
return total
def compute_week_status(
day_sections: list[tuple[date, list[dict]]],
target_date: date,
weekly_target: float = 40.0,
) -> WeekStatus:
"""
Compute per-week status metrics from all days in the week.
day_sections is a list of (date, raw_rows) pairs for the week.
Open entries are closed at the next entry's start time for projection purposes.
"""
week_start = target_date - timedelta(days=target_date.weekday())
week_end = week_start + timedelta(days=4)
logged_by_day: dict[date, float] = {}
for day, rows in day_sections:
total = sum(
r["duration_hours"] for r in rows if r["duration_hours"] is not None
)
logged_by_day[day] = total
total_logged = sum(logged_by_day.values())
remaining = max(0.0, weekly_target - total_logged)
days_worked = len([h for h in logged_by_day.values() if h > 0])
daily_average = total_logged / days_worked if days_worked > 0 else None
# Projected total: sum projected hours for each day in the week
projected_total = sum(projected_hours_for_day(rows) for _, rows in day_sections)
on_track = projected_total >= weekly_target
return WeekStatus(
week_start=week_start,
week_end=week_end,
weekly_target=weekly_target,
logged_hours_by_day=logged_by_day,
total_logged=total_logged,
remaining_hours=remaining,
days_worked=days_worked,
daily_average=daily_average,
projected_total=projected_total,
on_track=on_track,
)