feat: set up modularized version of project with testing

This commit is contained in:
Jef Roosens 2026-05-22 10:09:59 +02:00
commit 7bea08ddac
Signed by: Jef Roosens
GPG key ID: 119385BCAA005C21
19 changed files with 1138 additions and 0 deletions

View file

@ -0,0 +1,3 @@
from .cli import main
__all__ = ["main"]

85
src/timesheets/cli.py Normal file
View file

@ -0,0 +1,85 @@
import argparse
import os
import sys
from datetime import date
from .output import print_summary, write_csv
from .parser import aggregate_rows, detect_has_duration_column, parse_table
from .projects import load_project_map
from .utils import format_date
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Parse a markdown timesheet table and output a CSV file."
)
parser.add_argument(
"input",
help="Path to the markdown file containing the timesheet table, or '-' to read from stdin.",
)
parser.add_argument(
"-o", "--output",
help="Path to the output CSV file. Defaults to stdout.",
default=None,
)
parser.add_argument(
"--date",
help="Date to use in the output (DD/MM/YY). Defaults to today.",
default=None,
)
parser.add_argument(
"--map",
help=(
"Path to a JSON file mapping project keys to Project+Task pairs. "
"Defaults to project_map.json in the current working directory if it exists."
),
default=None,
)
parser.add_argument(
"--summary",
action="store_true",
help="Print a human-readable summary instead of writing CSV.",
)
return parser
def main() -> None:
args = build_parser().parse_args()
date_str = args.date or format_date(date.today())
if args.input == "-":
content = sys.stdin.read()
else:
try:
with open(args.input, "r", encoding="utf-8") as f:
content = f.read()
except FileNotFoundError:
print(f"Error: file not found: {args.input}", file=sys.stderr)
sys.exit(1)
lines = content.splitlines()
rows = parse_table(lines, has_duration_col=detect_has_duration_column(lines))
if not rows:
print("Warning: no timesheet rows found in input.", file=sys.stderr)
aggregated = aggregate_rows(rows)
# Resolve project map: explicit --map flag, else project_map.json in cwd
map_path = args.map
if map_path is None:
default_map = os.path.join(os.getcwd(), "project_map.json")
if os.path.exists(default_map):
map_path = default_map
project_map = load_project_map(map_path)
if args.summary:
print_summary(aggregated, project_map)
elif args.output:
with open(args.output, "w", newline="", encoding="utf-8") as f:
write_csv(aggregated, f, date_str, project_map)
print(f"Written to {args.output}", file=sys.stderr)
else:
write_csv(aggregated, sys.stdout, date_str, project_map)

54
src/timesheets/output.py Normal file
View file

@ -0,0 +1,54 @@
import csv
import sys
from collections import OrderedDict
from typing import IO
from .projects import resolve_project_task
from .utils import decimal_to_hhmm
def write_csv(
aggregated: list[dict],
output: IO[str],
date_str: str,
project_map: dict,
) -> None:
"""Write the aggregated timesheet data as CSV."""
writer = csv.writer(output)
writer.writerow(["Date*", "Project*", "Task", "Description", "Quantity"])
for entry in aggregated:
project, task = resolve_project_task(entry["project"], project_map)
writer.writerow(
[
date_str,
project,
task,
entry["description"],
f"{entry['quantity']:.2f}",
]
)
def print_summary(aggregated: list[dict], project_map: dict) -> None:
"""Print a human-readable summary of time blocks to stdout."""
grouped: dict[str, list[dict]] = OrderedDict()
for entry in aggregated:
project, task = resolve_project_task(entry["project"], project_map)
label = f"{project} / {task}" if task else project
grouped.setdefault(label, []).append(entry)
total_all = sum(e["quantity"] for e in aggregated)
all_descs = [e["description"] for e in aggregated]
desc_width = max(max((len(d) for d in all_descs), default=40), 40)
separator = "-" * (desc_width + 16)
for label, entries in grouped.items():
project_total = sum(e["quantity"] for e in entries)
print(f"\n {label} ({decimal_to_hhmm(project_total)})")
print(" " + separator)
for entry in entries:
print(f" {entry['description']:<{desc_width}} {decimal_to_hhmm(entry['quantity'])}")
print(" " + separator)
print(f"\n {'TOTAL':<{desc_width + 2}} {decimal_to_hhmm(total_all)}\n")

115
src/timesheets/parser.py Normal file
View file

@ -0,0 +1,115 @@
import re
from collections import defaultdict
from .utils import duration_from_start_end, parse_duration, strip_markdown_link
def detect_has_duration_column(lines: list[str]) -> bool:
"""
Inspect the header row to determine whether a Duration column is present.
Falls back to True if no header row is found.
"""
for line in lines:
line = line.strip()
if not (line.startswith("|") and line.endswith("|")):
continue
cells = [c.strip().lower() for c in line.strip("|").split("|")]
if "start" in cells:
return "duration" in cells
return True
def parse_table(lines: list[str], has_duration_col: bool = True) -> list[dict]:
"""
Parse markdown table lines into a list of row dicts.
With duration: Start | End | Duration | Project | Story | Note (6 cols)
Without duration: Start | End | Project | Story | Note (5 cols)
"""
min_cols = 6 if has_duration_col else 5
rows = []
for line in lines:
line = line.strip()
if not line or re.match(r"^\|[-| :]+\|$", line):
continue
if not (line.startswith("|") and line.endswith("|")):
continue
cells = [c.strip() for c in line.strip("|").split("|")]
if len(cells) < min_cols:
continue
if has_duration_col:
start, end, duration, project, story, note = (
cells[0], cells[1], cells[2], cells[3],
strip_markdown_link(cells[4]),
strip_markdown_link(cells[5]),
)
else:
start, end, project, story, note = (
cells[0], cells[1], cells[2],
strip_markdown_link(cells[3]),
strip_markdown_link(cells[4]),
)
duration = None
if start.lower() == "start":
continue
if not re.match(r"^\d+:\d{2}$", start):
continue
if duration is not None:
if not re.match(r"^\d+:\d{2}$", duration):
continue
duration_hours = parse_duration(duration)
else:
try:
duration_hours = duration_from_start_end(start, end)
except ValueError:
continue
rows.append(
{
"start": start,
"end": end,
"duration_hours": duration_hours,
"project": project,
"story": story,
"note": note,
}
)
return rows
def build_description(story: str, note: str) -> str:
"""Combine story and note into a single description string."""
parts = [p.strip() for p in [story, note] if p.strip()]
return " - ".join(parts) if parts else "/"
def aggregate_rows(rows: list[dict]) -> list[dict]:
"""
Group rows by (project, description) and sum durations.
Returns a list of dicts with keys: project, description, quantity.
Preserves insertion order of first occurrence.
"""
key_order: list[tuple] = []
totals: dict[tuple, float] = defaultdict(float)
for row in rows:
description = build_description(row["story"], row["note"])
key = (row["project"].strip(), description)
if key not in totals:
key_order.append(key)
totals[key] += row["duration_hours"]
return [
{
"project": project,
"description": description,
"quantity": totals[(project, description)],
}
for project, description in key_order
]

View file

@ -0,0 +1,29 @@
import json
import sys
def load_project_map(path: str | None) -> dict:
"""Load a project map JSON file. Returns an empty dict if path is None or missing."""
if not path:
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
print(f"Warning: project map file not found: {path}", file=sys.stderr)
return {}
except json.JSONDecodeError as e:
print(f"Warning: could not parse project map file: {e}", file=sys.stderr)
return {}
def resolve_project_task(raw_project: str, project_map: dict) -> tuple[str, str]:
"""
Look up a raw project key in the project map.
Returns (Project, Task) if found, or (raw_project, "") as fallback.
"""
key = raw_project.strip().lower()
entry = project_map.get(key) or project_map.get(raw_project.strip())
if entry:
return entry.get("Project", raw_project), entry.get("Task", "")
return raw_project, ""

44
src/timesheets/utils.py Normal file
View file

@ -0,0 +1,44 @@
import re
from datetime import date
def parse_duration(duration_str: str) -> float:
"""Convert HH:MM duration string to decimal hours."""
duration_str = duration_str.strip()
match = re.match(r"^(\d+):(\d{2})$", duration_str)
if not match:
raise ValueError(f"Invalid duration format: {duration_str!r}")
return int(match.group(1)) + int(match.group(2)) / 60.0
def duration_from_start_end(start_str: str, end_str: str) -> float:
"""Calculate duration in decimal hours from two HH:MM time strings."""
def to_minutes(t: str) -> int:
match = re.match(r"^(\d+):(\d{2})$", t.strip())
if not match:
raise ValueError(f"Invalid time format: {t!r}")
return int(match.group(1)) * 60 + int(match.group(2))
start_minutes = to_minutes(start_str)
end_minutes = to_minutes(end_str)
if end_minutes < start_minutes:
end_minutes += 24 * 60 # midnight rollover
return (end_minutes - start_minutes) / 60.0
def decimal_to_hhmm(hours: float) -> str:
"""Convert decimal hours to a HH:MM string."""
total_minutes = round(hours * 60)
h, m = divmod(total_minutes, 60)
return f"{h:02d}:{m:02d}"
def strip_markdown_link(text: str) -> str:
"""Strip markdown link syntax [label](url), keeping only the label."""
return re.sub(r"\[([^\]]+)\]\([^)]*\)", r"\1", text)
def format_date(d: date) -> str:
"""Format date as DD/MM/YY."""
return d.strftime("%d/%m/%y")