- Add `to_csv_entries()` to output.py: converts raw rows to write_csv entries one-for-one, without merging by (project, description) - Add `--raw` flag to the csv subparser; _cmd_csv branches on it - Add TestToCsvEntries with 6 tests - Update README with --raw usage example - Add .coverage and htmlcov/ to .gitignore
431 lines
14 KiB
Python
431 lines
14 KiB
Python
import argparse
|
|
import os
|
|
import sys
|
|
from datetime import date
|
|
|
|
from .config import (
|
|
find_default_config,
|
|
get_daily_target,
|
|
get_map_path,
|
|
get_token,
|
|
get_weekly_target,
|
|
load_config,
|
|
)
|
|
from .output import (
|
|
print_status,
|
|
print_stories,
|
|
print_summary,
|
|
print_summary_short,
|
|
print_summary_weekly,
|
|
print_summary_weekly_short,
|
|
print_summary_weekly_totals,
|
|
to_csv_entries,
|
|
write_csv,
|
|
)
|
|
from .parser import (
|
|
aggregate_rows,
|
|
filter_rows_by_date,
|
|
filter_week_sections,
|
|
parse_document,
|
|
resolve_overlaps,
|
|
)
|
|
from .projects import load_project_map
|
|
from .utils import AmbiguousDateError, format_date, parse_date_arg
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared argument helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _add_shared_args(parser: argparse.ArgumentParser) -> None:
|
|
"""Add arguments common to both subcommands."""
|
|
source = parser.add_mutually_exclusive_group(required=True)
|
|
source.add_argument(
|
|
"--input",
|
|
"-i",
|
|
help="Path to a markdown file, or '-' to read from stdin.",
|
|
default=None,
|
|
)
|
|
source.add_argument(
|
|
"--joplin",
|
|
action="store_true",
|
|
default=False,
|
|
help=(
|
|
"Fetch the weekly timesheet note from Joplin. "
|
|
"Requires a Joplin API token via --token or JOPLIN_TOKEN."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"day",
|
|
nargs="*",
|
|
help=(
|
|
"Day to extract timesheets for. Accepts YYYY-MM-DD, MM-DD, DD-MM, "
|
|
"or a natural language expression like 'yesterday' or '3 days ago'. "
|
|
"Defaults to today."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--config",
|
|
help="Path to a TOML config file. Defaults to timesheets.toml in the cwd if it exists.",
|
|
default=None,
|
|
)
|
|
parser.add_argument(
|
|
"--token",
|
|
help="Joplin API token. Falls back to the JOPLIN_TOKEN environment variable.",
|
|
default=None,
|
|
)
|
|
parser.add_argument(
|
|
"--map",
|
|
help=(
|
|
"Path to a JSON file mapping project keys to Project+Task pairs. "
|
|
"Defaults to project_map.json in the cwd if it exists."
|
|
),
|
|
default=None,
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
help="Write output to this file instead of stdout.",
|
|
default=None,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parser construction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Parse markdown timesheet tables and export them.",
|
|
prog="timesheets",
|
|
)
|
|
subparsers = parser.add_subparsers(dest="command", metavar="command")
|
|
subparsers.required = True
|
|
|
|
summary_parser = subparsers.add_parser(
|
|
"summary",
|
|
help="Print a human-readable summary of time spent per project.",
|
|
)
|
|
_add_shared_args(summary_parser)
|
|
summary_parser.add_argument(
|
|
"--weekly",
|
|
"-w",
|
|
action="store_true",
|
|
default=False,
|
|
help="Show the summary for the entire week containing the given day.",
|
|
)
|
|
summary_parser.add_argument(
|
|
"--short",
|
|
"-s",
|
|
action="count",
|
|
default=0,
|
|
help=(
|
|
"Compact output. Once (-s): one line per project with total hours. "
|
|
"Twice (-ss): one line per day (only with --weekly)."
|
|
),
|
|
)
|
|
|
|
csv_parser = subparsers.add_parser(
|
|
"csv",
|
|
help="Export timesheet entries as CSV.",
|
|
)
|
|
_add_shared_args(csv_parser)
|
|
csv_parser.add_argument(
|
|
"--raw",
|
|
action="store_true",
|
|
default=False,
|
|
help=(
|
|
"Skip aggregation: output one CSV row per timesheet entry instead of "
|
|
"combining entries that share the same project and story."
|
|
),
|
|
)
|
|
|
|
status_parser = subparsers.add_parser(
|
|
"status",
|
|
help="Show how many hours remain today and this week.",
|
|
)
|
|
_add_shared_args(status_parser)
|
|
|
|
stories_parser = subparsers.add_parser(
|
|
"stories",
|
|
help="List stories worked on, grouped by project.",
|
|
)
|
|
_add_shared_args(stories_parser)
|
|
stories_parser.add_argument(
|
|
"--weekly",
|
|
"-w",
|
|
action="store_true",
|
|
default=False,
|
|
help="Show stories for the entire week containing the given day.",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared resolution helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _resolve_token(args: argparse.Namespace, config: dict) -> str:
|
|
token = args.token or get_token(config) or os.environ.get("JOPLIN_TOKEN")
|
|
if not token:
|
|
print(
|
|
"Error: Joplin API token required. "
|
|
"Provide --token or set the JOPLIN_TOKEN environment variable.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
return token
|
|
|
|
|
|
def _resolve_date(args: argparse.Namespace) -> tuple[date, str]:
|
|
"""Parse the day positional argument and return (date, formatted string)."""
|
|
day_input = " ".join(args.day) if args.day else None
|
|
if day_input is None:
|
|
target_date = date.today()
|
|
else:
|
|
try:
|
|
target_date = parse_date_arg(day_input)
|
|
except AmbiguousDateError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
except ValueError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return target_date, format_date(target_date)
|
|
|
|
|
|
def _resolve_rows(
|
|
args: argparse.Namespace, config: dict, target_date: date
|
|
) -> list[dict]:
|
|
"""Fetch and parse rows from the configured source (single day)."""
|
|
if args.joplin:
|
|
from .joplin import fetch_week_note
|
|
|
|
token = _resolve_token(args, config)
|
|
try:
|
|
content = fetch_week_note(token, target_date)
|
|
except RuntimeError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
lines = content.splitlines()
|
|
return filter_rows_by_date(lines, target_date)
|
|
else:
|
|
if args.input == "-":
|
|
content = sys.stdin.read()
|
|
else:
|
|
try:
|
|
with open(args.input, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
except FileNotFoundError:
|
|
print(f"Error: file not found: {args.input}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return resolve_overlaps(parse_document(content.splitlines()))
|
|
|
|
|
|
def _resolve_week_sections(
|
|
args: argparse.Namespace, config: dict, target_date: date
|
|
) -> list[tuple[date, list[dict]]]:
|
|
"""Fetch and parse rows from the configured source, grouped by day for the week."""
|
|
from datetime import timedelta
|
|
|
|
week_start = target_date - timedelta(days=target_date.weekday())
|
|
|
|
if args.joplin:
|
|
from .joplin import fetch_week_note
|
|
|
|
token = _resolve_token(args, config)
|
|
try:
|
|
content = fetch_week_note(token, target_date)
|
|
except RuntimeError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return filter_week_sections(content.splitlines(), week_start)
|
|
else:
|
|
if args.input == "-":
|
|
content = sys.stdin.read()
|
|
else:
|
|
try:
|
|
with open(args.input, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
except FileNotFoundError:
|
|
print(f"Error: file not found: {args.input}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return filter_week_sections(content.splitlines(), week_start)
|
|
|
|
|
|
def _resolve_project_map(args: argparse.Namespace, config: dict) -> dict:
|
|
"""Resolve the project map from CLI flag, config, or cwd default."""
|
|
map_path = args.map or get_map_path(config)
|
|
if map_path is None:
|
|
default = os.path.join(os.getcwd(), "project_map.json")
|
|
if os.path.exists(default):
|
|
map_path = default
|
|
return load_project_map(map_path)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Subcommand handlers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _cmd_summary(args: argparse.Namespace, config: dict) -> None:
|
|
target_date, _ = _resolve_date(args)
|
|
project_map = _resolve_project_map(args, config)
|
|
short = args.short # 0, 1, or 2
|
|
|
|
def _write(fn, *fn_args):
|
|
if args.output:
|
|
with open(args.output, "w", encoding="utf-8") as f:
|
|
old_stdout = sys.stdout
|
|
sys.stdout = f
|
|
try:
|
|
fn(*fn_args)
|
|
finally:
|
|
sys.stdout = old_stdout
|
|
print(f"Written to {args.output}", file=sys.stderr)
|
|
else:
|
|
fn(*fn_args)
|
|
|
|
if args.weekly:
|
|
day_sections = _resolve_week_sections(args, config, target_date)
|
|
if not day_sections:
|
|
print("Warning: no timesheet rows found for this week.", file=sys.stderr)
|
|
return
|
|
if short >= 2:
|
|
_write(print_summary_weekly_totals, day_sections, project_map)
|
|
elif short == 1:
|
|
_write(print_summary_weekly_short, day_sections, project_map)
|
|
else:
|
|
_write(print_summary_weekly, day_sections, project_map)
|
|
else:
|
|
rows = _resolve_rows(args, config, target_date)
|
|
if not rows:
|
|
print("Warning: no timesheet rows found in input.", file=sys.stderr)
|
|
aggregated = aggregate_rows(rows)
|
|
if short >= 1:
|
|
_write(print_summary_short, aggregated, project_map)
|
|
else:
|
|
_write(print_summary, aggregated, project_map)
|
|
|
|
|
|
def _cmd_status(args: argparse.Namespace, config: dict) -> None:
|
|
from datetime import timedelta
|
|
|
|
from .status import compute_day_status, compute_week_status
|
|
|
|
target_date, _ = _resolve_date(args)
|
|
daily_target = get_daily_target(config)
|
|
weekly_target = get_weekly_target(config)
|
|
week_start = target_date - timedelta(days=target_date.weekday())
|
|
|
|
# Fetch the full week document once for both day and week status
|
|
if args.joplin:
|
|
from .joplin import fetch_week_note
|
|
|
|
token = _resolve_token(args, config)
|
|
try:
|
|
content = fetch_week_note(token, target_date)
|
|
except RuntimeError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
lines = content.splitlines()
|
|
else:
|
|
if args.input == "-":
|
|
lines = sys.stdin.read().splitlines()
|
|
else:
|
|
try:
|
|
with open(args.input, "r", encoding="utf-8") as f:
|
|
lines = f.read().splitlines()
|
|
except FileNotFoundError:
|
|
print(f"Error: file not found: {args.input}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
from .parser import filter_rows_by_date, filter_week_sections
|
|
from .status import projected_hours_for_day
|
|
|
|
day_rows = filter_rows_by_date(lines, target_date)
|
|
day_sections = filter_week_sections(lines, week_start)
|
|
|
|
# Adjust daily target to account for over/under time earlier this week.
|
|
# Only applies when the target date is within the same week as today.
|
|
# days_remaining includes today (weekday 0=Mon, so Mon has 5 days remaining).
|
|
from datetime import date as _date
|
|
|
|
days_remaining = 5 - target_date.weekday() # 1 on Friday, 5 on Monday
|
|
if week_start <= _date.today() and target_date.weekday() > 0:
|
|
# Sum projected hours for days before today in the week
|
|
hours_before_today = sum(
|
|
projected_hours_for_day(rows)
|
|
for day, rows in day_sections
|
|
if day < target_date
|
|
)
|
|
effective_daily_target = max(
|
|
0.0, (weekly_target - hours_before_today) / days_remaining
|
|
)
|
|
else:
|
|
effective_daily_target = daily_target
|
|
|
|
day_status = compute_day_status(day_rows, target_date, effective_daily_target)
|
|
week_status = compute_week_status(day_sections, target_date, weekly_target)
|
|
|
|
print_status(day_status, week_status)
|
|
|
|
|
|
def _cmd_stories(args: argparse.Namespace, config: dict) -> None:
|
|
from datetime import timedelta
|
|
|
|
target_date, _ = _resolve_date(args)
|
|
|
|
if args.weekly:
|
|
week_start = target_date - timedelta(days=target_date.weekday())
|
|
day_sections = _resolve_week_sections(args, config, target_date)
|
|
rows = [row for _, day_rows in day_sections for row in day_rows]
|
|
else:
|
|
rows = _resolve_rows(args, config, target_date)
|
|
|
|
if not rows:
|
|
print("Warning: no timesheet rows found in input.", file=sys.stderr)
|
|
return
|
|
|
|
project_map = _resolve_project_map(args, config)
|
|
print_stories(rows, project_map)
|
|
|
|
|
|
def _cmd_csv(args: argparse.Namespace, config: dict) -> None:
|
|
target_date, date_str = _resolve_date(args)
|
|
rows = _resolve_rows(args, config, target_date)
|
|
if not rows:
|
|
print("Warning: no timesheet rows found in input.", file=sys.stderr)
|
|
entries = to_csv_entries(rows) if args.raw else aggregate_rows(rows)
|
|
project_map = _resolve_project_map(args, config)
|
|
|
|
if args.output:
|
|
with open(args.output, "w", newline="", encoding="utf-8") as f:
|
|
write_csv(entries, f, date_str, project_map)
|
|
print(f"Written to {args.output}", file=sys.stderr)
|
|
else:
|
|
write_csv(entries, sys.stdout, date_str, project_map)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main() -> None:
|
|
args = build_parser().parse_args()
|
|
|
|
config_path = args.config if args.config is not None else find_default_config()
|
|
config = load_config(config_path)
|
|
|
|
if args.command == "summary":
|
|
_cmd_summary(args, config)
|
|
elif args.command == "csv":
|
|
_cmd_csv(args, config)
|
|
elif args.command == "status":
|
|
_cmd_status(args, config)
|
|
elif args.command == "stories":
|
|
_cmd_stories(args, config)
|