From 9f0a6e2027a61f457db58da1378d76135813b5ca Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Thu, 28 May 2026 12:53:56 +0200 Subject: [PATCH] feat(parser): resolve overlapping timesheet entries Parallel work is logged as overlapping entries. resolve_overlaps() splits the shared time equally using the midpoint of the overlap region: - Partial overlap: the midpoint becomes the boundary between the two entries (earlier entry trimmed, later entry delayed). - Full containment: the containing entry is split into two pieces surrounding the contained one, with the midpoint rule applied to the overlap region. Open entries (no end time) are passed through unchanged. resolve_overlaps() is called automatically in filter_rows_by_date, filter_week_sections, and the --input single-day path in cli.py, so all subcommands benefit without further changes. --- src/timesheets/cli.py | 3 +- src/timesheets/parser.py | 116 +++++++++++++++++++++++- tests/test_parser.py | 186 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 302 insertions(+), 3 deletions(-) diff --git a/src/timesheets/cli.py b/src/timesheets/cli.py index 025162d..b44290e 100644 --- a/src/timesheets/cli.py +++ b/src/timesheets/cli.py @@ -26,6 +26,7 @@ from .parser import ( filter_rows_by_date, filter_week_sections, parse_document, + resolve_overlaps, ) from .projects import load_project_map from .utils import AmbiguousDateError, format_date, parse_date_arg @@ -211,7 +212,7 @@ def _resolve_rows( except FileNotFoundError: print(f"Error: file not found: {args.input}", file=sys.stderr) sys.exit(1) - return parse_document(content.splitlines()) + return resolve_overlaps(parse_document(content.splitlines())) def _resolve_week_sections( diff --git a/src/timesheets/parser.py b/src/timesheets/parser.py index dc9319a..35c472d 100644 --- a/src/timesheets/parser.py +++ b/src/timesheets/parser.py @@ -192,7 +192,7 @@ def filter_rows_by_date(lines: list[str], target: date) -> list[dict]: for section_date, section_lines in sections: if section_date == target_str: rows.extend(parse_document(section_lines)) - return rows + return resolve_overlaps(rows) def filter_week_sections( @@ -236,12 +236,124 @@ def filter_week_sections( if date_str in week_strs: rows = parse_document(sections[date_str]) if rows: - result.append((week_strs[date_str], rows)) + result.append((week_strs[date_str], resolve_overlaps(rows))) result.sort(key=lambda x: x[0]) return result +# --------------------------------------------------------------------------- +# Overlap resolution +# --------------------------------------------------------------------------- + + +def _time_to_minutes(t: str) -> int: + """Convert an HH:MM time string to integer minutes since midnight.""" + h, m = t.split(":") + return int(h) * 60 + int(m) + + +def _minutes_to_time(minutes: int) -> str: + """Convert integer minutes since midnight to an HH:MM string.""" + return f"{minutes // 60:02d}:{minutes % 60:02d}" + + +def _make_closed_row(template: dict, start_m: int, end_m: int) -> dict | None: + """ + Return a copy of *template* with updated start, end, and duration_hours. + Returns None if start_m >= end_m (zero- or negative-duration entry). + """ + if start_m >= end_m: + return None + row = dict(template) + row["start"] = _minutes_to_time(start_m) + row["end"] = _minutes_to_time(end_m) + row["duration_hours"] = (end_m - start_m) / 60.0 + return row + + +def resolve_overlaps(rows: list[dict]) -> list[dict]: + """ + Resolve overlapping closed timesheet entries by splitting the overlap equally. + + For a partial overlap, the midpoint of the overlap region becomes the + boundary: the earlier entry is trimmed to end at the midpoint, and the + later entry is delayed to start at the midpoint. + + For full containment (one entry completely inside another), the containing + entry is split into two entries surrounding the contained one, with the + midpoint rule applied to the overlap region. + + Open entries (``end`` is ``None``) are passed through unchanged. + """ + closed = [dict(r) for r in rows if r["end"] is not None] + open_entries = [r for r in rows if r["end"] is None] + + if len(closed) <= 1: + return rows + + initial_n = len(closed) + for _ in range(initial_n * initial_n + 1): + closed.sort(key=lambda r: _time_to_minutes(r["start"])) + resolved_any = False + + for i in range(len(closed)): + a = closed[i] + a_start_m = _time_to_minutes(a["start"]) + a_end_m = _time_to_minutes(a["end"]) + + for j in range(i + 1, len(closed)): + b = closed[j] + b_start_m = _time_to_minutes(b["start"]) + + if b_start_m >= a_end_m: + break # sorted order: no later b can overlap a + + b_end_m = _time_to_minutes(b["end"]) + overlap_end_m = min(a_end_m, b_end_m) + + if overlap_end_m <= b_start_m: + continue + + midpoint_m = (b_start_m + overlap_end_m) // 2 + replacements: list[dict] = [] + + if b_end_m <= a_end_m: + # Full containment: a contains b. + # a gets [a_start, midpoint] and [b_end, a_end]. + # b gets [midpoint, b_end]. + for entry in ( + _make_closed_row(a, a_start_m, midpoint_m), + _make_closed_row(b, midpoint_m, b_end_m), + _make_closed_row(a, b_end_m, a_end_m), + ): + if entry is not None: + replacements.append(entry) + else: + # Partial overlap: a starts first, b ends after a. + # a gets [a_start, midpoint], b gets [midpoint, b_end]. + for entry in ( + _make_closed_row(a, a_start_m, midpoint_m), + _make_closed_row(b, midpoint_m, b_end_m), + ): + if entry is not None: + replacements.append(entry) + + closed = [closed[k] for k in range(len(closed)) if k != i and k != j] + closed.extend(replacements) + resolved_any = True + break + + if resolved_any: + break + + if not resolved_any: + break + + closed.sort(key=lambda r: _time_to_minutes(r["start"])) + return open_entries + closed + + def build_description(story: str, note: str) -> str: """Combine story and note into a single description string.""" parts = [p.strip() for p in [story, note] if p.strip()] diff --git a/tests/test_parser.py b/tests/test_parser.py index 8778fc1..d3fa738 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -10,6 +10,7 @@ from timesheets.parser import ( filter_rows_by_date, parse_document, parse_table, + resolve_overlaps, ) # --------------------------------------------------------------------------- @@ -370,3 +371,188 @@ class TestAggregateRows: def test_empty_input(self): assert aggregate_rows([]) == [] + + +# --------------------------------------------------------------------------- +# resolve_overlaps +# --------------------------------------------------------------------------- + + +class TestResolveOverlaps: + """Tests for overlap resolution between timesheet entries.""" + + def _row(self, start, end, project="proj", story="s", note=""): + """Build a minimal closed row dict.""" + if end is None: + return { + "start": start, + "end": None, + "duration_hours": None, + "project": project, + "story": story, + "story_raw": story, + "note": note, + } + h1, m1 = map(int, start.split(":")) + h2, m2 = map(int, end.split(":")) + duration = (h2 * 60 + m2 - h1 * 60 - m1) / 60.0 + return { + "start": start, + "end": end, + "duration_hours": duration, + "project": project, + "story": story, + "story_raw": story, + "note": note, + } + + def _sorted(self, rows): + return sorted( + [r for r in rows if r["end"] is not None], + key=lambda r: r["start"], + ) + + # --- no-op cases --- + + def test_empty_input(self): + assert resolve_overlaps([]) == [] + + def test_single_entry_unchanged(self): + rows = [self._row("09:00", "10:00")] + assert resolve_overlaps(rows) == rows + + def test_no_overlap_unchanged(self): + rows = [self._row("09:00", "10:00"), self._row("10:00", "11:00")] + result = resolve_overlaps(rows) + assert len(result) == 2 + s = self._sorted(result) + assert (s[0]["start"], s[0]["end"]) == ("09:00", "10:00") + assert (s[1]["start"], s[1]["end"]) == ("10:00", "11:00") + + def test_only_open_entries_unchanged(self): + rows = [self._row("09:00", None), self._row("10:00", None)] + result = resolve_overlaps(rows) + assert result == rows + + # --- partial overlap --- + + def test_partial_overlap_spec_example(self): + """Spec example: 9:00-10:00 vs 9:30-10:30 → boundary at 9:45.""" + rows = [self._row("09:00", "10:00", "a"), self._row("09:30", "10:30", "b")] + result = self._sorted(resolve_overlaps(rows)) + assert len(result) == 2 + assert (result[0]["start"], result[0]["end"]) == ("09:00", "09:45") + assert result[0]["project"] == "a" + assert (result[1]["start"], result[1]["end"]) == ("09:45", "10:30") + assert result[1]["project"] == "b" + + def test_partial_overlap_duration_recalculated(self): + rows = [self._row("09:00", "10:00"), self._row("09:30", "10:30")] + result = self._sorted(resolve_overlaps(rows)) + assert result[0]["duration_hours"] == pytest.approx(0.75) # 45 min + assert result[1]["duration_hours"] == pytest.approx(0.75) # 45 min + + def test_partial_overlap_total_hours(self): + """Total logged time after resolution equals the spanned wall-clock time.""" + rows = [self._row("09:00", "10:00"), self._row("09:30", "10:30")] + result = resolve_overlaps(rows) + total = sum(r["duration_hours"] for r in result) + assert total == pytest.approx(1.5) # 9:00–10:30 = 90 min + + def test_partial_overlap_input_order_independent(self): + """Result should be the same regardless of input order.""" + rows_forward = [ + self._row("09:00", "10:00", "a"), + self._row("09:30", "10:30", "b"), + ] + rows_reverse = [ + self._row("09:30", "10:30", "b"), + self._row("09:00", "10:00", "a"), + ] + r1 = self._sorted(resolve_overlaps(rows_forward)) + r2 = self._sorted(resolve_overlaps(rows_reverse)) + assert [(r["start"], r["end"]) for r in r1] == [ + (r["start"], r["end"]) for r in r2 + ] + + # --- full containment --- + + def test_containment_spec_example(self): + """Spec example: 9:00-10:00 contains 9:15-9:45 → A1: 9:00-9:30, B: 9:30-9:45, A2: 9:45-10:00.""" + rows = [self._row("09:00", "10:00", "a"), self._row("09:15", "09:45", "b")] + result = self._sorted(resolve_overlaps(rows)) + assert len(result) == 3 + assert (result[0]["start"], result[0]["end"]) == ("09:00", "09:30") + assert result[0]["project"] == "a" + assert (result[1]["start"], result[1]["end"]) == ("09:30", "09:45") + assert result[1]["project"] == "b" + assert (result[2]["start"], result[2]["end"]) == ("09:45", "10:00") + assert result[2]["project"] == "a" + + def test_containment_total_hours(self): + """Total after containment resolution equals the outer entry's original duration.""" + rows = [self._row("09:00", "10:00"), self._row("09:15", "09:45")] + result = resolve_overlaps(rows) + total = sum(r["duration_hours"] for r in result) + assert total == pytest.approx(1.0) # 9:00–10:00 = 60 min + + def test_containment_same_start(self): + """Smaller entry starts at the same time as the larger one.""" + rows = [self._row("09:00", "10:00", "a"), self._row("09:00", "09:30", "b")] + result = self._sorted(resolve_overlaps(rows)) + assert len(result) == 3 + assert (result[0]["start"], result[0]["end"]) == ("09:00", "09:15") + assert result[0]["project"] == "a" + assert (result[1]["start"], result[1]["end"]) == ("09:15", "09:30") + assert result[1]["project"] == "b" + assert (result[2]["start"], result[2]["end"]) == ("09:30", "10:00") + assert result[2]["project"] == "a" + + def test_containment_same_end(self): + """Smaller entry ends at the same time as the larger one.""" + rows = [self._row("09:00", "10:00", "a"), self._row("09:30", "10:00", "b")] + result = self._sorted(resolve_overlaps(rows)) + assert len(result) == 2 + assert (result[0]["start"], result[0]["end"]) == ("09:00", "09:45") + assert result[0]["project"] == "a" + assert (result[1]["start"], result[1]["end"]) == ("09:45", "10:00") + assert result[1]["project"] == "b" + + # --- open entries --- + + def test_open_entry_passed_through(self): + open_row = self._row("09:00", None, "open") + closed_row = self._row("09:30", "10:30", "closed") + result = resolve_overlaps([open_row, closed_row]) + assert any(r["end"] is None for r in result) + assert any(r["end"] == "10:30" for r in result) + + # --- metadata preservation --- + + def test_project_and_story_preserved(self): + rows = [ + self._row("09:00", "10:00", project="p1", story="s1", note="n1"), + self._row("09:30", "10:30", project="p2", story="s2", note="n2"), + ] + result = self._sorted(resolve_overlaps(rows)) + assert result[0]["project"] == "p1" + assert result[0]["story"] == "s1" + assert result[0]["note"] == "n1" + assert result[1]["project"] == "p2" + assert result[1]["story"] == "s2" + assert result[1]["note"] == "n2" + + # --- no remaining overlaps --- + + def test_result_has_no_overlaps(self): + """After resolution, no two entries in the result should overlap.""" + rows = [ + self._row("09:00", "11:00", "a"), + self._row("09:30", "10:30", "b"), + self._row("10:00", "12:00", "c"), + ] + result = self._sorted(resolve_overlaps(rows)) + for i in range(len(result) - 1): + assert result[i]["end"] <= result[i + 1]["start"], ( + f"Overlap between entry {i} ({result[i]}) and {i + 1} ({result[i + 1]})" + )