feat(parser): resolve overlapping timesheet entries

Parallel work is logged as overlapping entries. resolve_overlaps()
splits the shared time equally using the midpoint of the overlap region:

- Partial overlap: the midpoint becomes the boundary between the two
  entries (earlier entry trimmed, later entry delayed).
- Full containment: the containing entry is split into two pieces
  surrounding the contained one, with the midpoint rule applied to
  the overlap region.

Open entries (no end time) are passed through unchanged.

resolve_overlaps() is called automatically in filter_rows_by_date,
filter_week_sections, and the --input single-day path in cli.py, so
all subcommands benefit without further changes.
This commit is contained in:
Jef Roosens 2026-05-28 12:53:56 +02:00
parent f99e114770
commit 9f0a6e2027
Signed by: Jef Roosens
GPG key ID: 119385BCAA005C21
3 changed files with 302 additions and 3 deletions

View file

@ -26,6 +26,7 @@ from .parser import (
filter_rows_by_date,
filter_week_sections,
parse_document,
resolve_overlaps,
)
from .projects import load_project_map
from .utils import AmbiguousDateError, format_date, parse_date_arg
@ -211,7 +212,7 @@ def _resolve_rows(
except FileNotFoundError:
print(f"Error: file not found: {args.input}", file=sys.stderr)
sys.exit(1)
return parse_document(content.splitlines())
return resolve_overlaps(parse_document(content.splitlines()))
def _resolve_week_sections(

View file

@ -192,7 +192,7 @@ def filter_rows_by_date(lines: list[str], target: date) -> list[dict]:
for section_date, section_lines in sections:
if section_date == target_str:
rows.extend(parse_document(section_lines))
return rows
return resolve_overlaps(rows)
def filter_week_sections(
@ -236,12 +236,124 @@ def filter_week_sections(
if date_str in week_strs:
rows = parse_document(sections[date_str])
if rows:
result.append((week_strs[date_str], rows))
result.append((week_strs[date_str], resolve_overlaps(rows)))
result.sort(key=lambda x: x[0])
return result
# ---------------------------------------------------------------------------
# Overlap resolution
# ---------------------------------------------------------------------------
def _time_to_minutes(t: str) -> int:
"""Convert an HH:MM time string to integer minutes since midnight."""
h, m = t.split(":")
return int(h) * 60 + int(m)
def _minutes_to_time(minutes: int) -> str:
"""Convert integer minutes since midnight to an HH:MM string."""
return f"{minutes // 60:02d}:{minutes % 60:02d}"
def _make_closed_row(template: dict, start_m: int, end_m: int) -> dict | None:
"""
Return a copy of *template* with updated start, end, and duration_hours.
Returns None if start_m >= end_m (zero- or negative-duration entry).
"""
if start_m >= end_m:
return None
row = dict(template)
row["start"] = _minutes_to_time(start_m)
row["end"] = _minutes_to_time(end_m)
row["duration_hours"] = (end_m - start_m) / 60.0
return row
def resolve_overlaps(rows: list[dict]) -> list[dict]:
"""
Resolve overlapping closed timesheet entries by splitting the overlap equally.
For a partial overlap, the midpoint of the overlap region becomes the
boundary: the earlier entry is trimmed to end at the midpoint, and the
later entry is delayed to start at the midpoint.
For full containment (one entry completely inside another), the containing
entry is split into two entries surrounding the contained one, with the
midpoint rule applied to the overlap region.
Open entries (``end`` is ``None``) are passed through unchanged.
"""
closed = [dict(r) for r in rows if r["end"] is not None]
open_entries = [r for r in rows if r["end"] is None]
if len(closed) <= 1:
return rows
initial_n = len(closed)
for _ in range(initial_n * initial_n + 1):
closed.sort(key=lambda r: _time_to_minutes(r["start"]))
resolved_any = False
for i in range(len(closed)):
a = closed[i]
a_start_m = _time_to_minutes(a["start"])
a_end_m = _time_to_minutes(a["end"])
for j in range(i + 1, len(closed)):
b = closed[j]
b_start_m = _time_to_minutes(b["start"])
if b_start_m >= a_end_m:
break # sorted order: no later b can overlap a
b_end_m = _time_to_minutes(b["end"])
overlap_end_m = min(a_end_m, b_end_m)
if overlap_end_m <= b_start_m:
continue
midpoint_m = (b_start_m + overlap_end_m) // 2
replacements: list[dict] = []
if b_end_m <= a_end_m:
# Full containment: a contains b.
# a gets [a_start, midpoint] and [b_end, a_end].
# b gets [midpoint, b_end].
for entry in (
_make_closed_row(a, a_start_m, midpoint_m),
_make_closed_row(b, midpoint_m, b_end_m),
_make_closed_row(a, b_end_m, a_end_m),
):
if entry is not None:
replacements.append(entry)
else:
# Partial overlap: a starts first, b ends after a.
# a gets [a_start, midpoint], b gets [midpoint, b_end].
for entry in (
_make_closed_row(a, a_start_m, midpoint_m),
_make_closed_row(b, midpoint_m, b_end_m),
):
if entry is not None:
replacements.append(entry)
closed = [closed[k] for k in range(len(closed)) if k != i and k != j]
closed.extend(replacements)
resolved_any = True
break
if resolved_any:
break
if not resolved_any:
break
closed.sort(key=lambda r: _time_to_minutes(r["start"]))
return open_entries + closed
def build_description(story: str, note: str) -> str:
"""Combine story and note into a single description string."""
parts = [p.strip() for p in [story, note] if p.strip()]

View file

@ -10,6 +10,7 @@ from timesheets.parser import (
filter_rows_by_date,
parse_document,
parse_table,
resolve_overlaps,
)
# ---------------------------------------------------------------------------
@ -370,3 +371,188 @@ class TestAggregateRows:
def test_empty_input(self):
assert aggregate_rows([]) == []
# ---------------------------------------------------------------------------
# resolve_overlaps
# ---------------------------------------------------------------------------
class TestResolveOverlaps:
"""Tests for overlap resolution between timesheet entries."""
def _row(self, start, end, project="proj", story="s", note=""):
"""Build a minimal closed row dict."""
if end is None:
return {
"start": start,
"end": None,
"duration_hours": None,
"project": project,
"story": story,
"story_raw": story,
"note": note,
}
h1, m1 = map(int, start.split(":"))
h2, m2 = map(int, end.split(":"))
duration = (h2 * 60 + m2 - h1 * 60 - m1) / 60.0
return {
"start": start,
"end": end,
"duration_hours": duration,
"project": project,
"story": story,
"story_raw": story,
"note": note,
}
def _sorted(self, rows):
return sorted(
[r for r in rows if r["end"] is not None],
key=lambda r: r["start"],
)
# --- no-op cases ---
def test_empty_input(self):
assert resolve_overlaps([]) == []
def test_single_entry_unchanged(self):
rows = [self._row("09:00", "10:00")]
assert resolve_overlaps(rows) == rows
def test_no_overlap_unchanged(self):
rows = [self._row("09:00", "10:00"), self._row("10:00", "11:00")]
result = resolve_overlaps(rows)
assert len(result) == 2
s = self._sorted(result)
assert (s[0]["start"], s[0]["end"]) == ("09:00", "10:00")
assert (s[1]["start"], s[1]["end"]) == ("10:00", "11:00")
def test_only_open_entries_unchanged(self):
rows = [self._row("09:00", None), self._row("10:00", None)]
result = resolve_overlaps(rows)
assert result == rows
# --- partial overlap ---
def test_partial_overlap_spec_example(self):
"""Spec example: 9:00-10:00 vs 9:30-10:30 → boundary at 9:45."""
rows = [self._row("09:00", "10:00", "a"), self._row("09:30", "10:30", "b")]
result = self._sorted(resolve_overlaps(rows))
assert len(result) == 2
assert (result[0]["start"], result[0]["end"]) == ("09:00", "09:45")
assert result[0]["project"] == "a"
assert (result[1]["start"], result[1]["end"]) == ("09:45", "10:30")
assert result[1]["project"] == "b"
def test_partial_overlap_duration_recalculated(self):
rows = [self._row("09:00", "10:00"), self._row("09:30", "10:30")]
result = self._sorted(resolve_overlaps(rows))
assert result[0]["duration_hours"] == pytest.approx(0.75) # 45 min
assert result[1]["duration_hours"] == pytest.approx(0.75) # 45 min
def test_partial_overlap_total_hours(self):
"""Total logged time after resolution equals the spanned wall-clock time."""
rows = [self._row("09:00", "10:00"), self._row("09:30", "10:30")]
result = resolve_overlaps(rows)
total = sum(r["duration_hours"] for r in result)
assert total == pytest.approx(1.5) # 9:0010:30 = 90 min
def test_partial_overlap_input_order_independent(self):
"""Result should be the same regardless of input order."""
rows_forward = [
self._row("09:00", "10:00", "a"),
self._row("09:30", "10:30", "b"),
]
rows_reverse = [
self._row("09:30", "10:30", "b"),
self._row("09:00", "10:00", "a"),
]
r1 = self._sorted(resolve_overlaps(rows_forward))
r2 = self._sorted(resolve_overlaps(rows_reverse))
assert [(r["start"], r["end"]) for r in r1] == [
(r["start"], r["end"]) for r in r2
]
# --- full containment ---
def test_containment_spec_example(self):
"""Spec example: 9:00-10:00 contains 9:15-9:45 → A1: 9:00-9:30, B: 9:30-9:45, A2: 9:45-10:00."""
rows = [self._row("09:00", "10:00", "a"), self._row("09:15", "09:45", "b")]
result = self._sorted(resolve_overlaps(rows))
assert len(result) == 3
assert (result[0]["start"], result[0]["end"]) == ("09:00", "09:30")
assert result[0]["project"] == "a"
assert (result[1]["start"], result[1]["end"]) == ("09:30", "09:45")
assert result[1]["project"] == "b"
assert (result[2]["start"], result[2]["end"]) == ("09:45", "10:00")
assert result[2]["project"] == "a"
def test_containment_total_hours(self):
"""Total after containment resolution equals the outer entry's original duration."""
rows = [self._row("09:00", "10:00"), self._row("09:15", "09:45")]
result = resolve_overlaps(rows)
total = sum(r["duration_hours"] for r in result)
assert total == pytest.approx(1.0) # 9:0010:00 = 60 min
def test_containment_same_start(self):
"""Smaller entry starts at the same time as the larger one."""
rows = [self._row("09:00", "10:00", "a"), self._row("09:00", "09:30", "b")]
result = self._sorted(resolve_overlaps(rows))
assert len(result) == 3
assert (result[0]["start"], result[0]["end"]) == ("09:00", "09:15")
assert result[0]["project"] == "a"
assert (result[1]["start"], result[1]["end"]) == ("09:15", "09:30")
assert result[1]["project"] == "b"
assert (result[2]["start"], result[2]["end"]) == ("09:30", "10:00")
assert result[2]["project"] == "a"
def test_containment_same_end(self):
"""Smaller entry ends at the same time as the larger one."""
rows = [self._row("09:00", "10:00", "a"), self._row("09:30", "10:00", "b")]
result = self._sorted(resolve_overlaps(rows))
assert len(result) == 2
assert (result[0]["start"], result[0]["end"]) == ("09:00", "09:45")
assert result[0]["project"] == "a"
assert (result[1]["start"], result[1]["end"]) == ("09:45", "10:00")
assert result[1]["project"] == "b"
# --- open entries ---
def test_open_entry_passed_through(self):
open_row = self._row("09:00", None, "open")
closed_row = self._row("09:30", "10:30", "closed")
result = resolve_overlaps([open_row, closed_row])
assert any(r["end"] is None for r in result)
assert any(r["end"] == "10:30" for r in result)
# --- metadata preservation ---
def test_project_and_story_preserved(self):
rows = [
self._row("09:00", "10:00", project="p1", story="s1", note="n1"),
self._row("09:30", "10:30", project="p2", story="s2", note="n2"),
]
result = self._sorted(resolve_overlaps(rows))
assert result[0]["project"] == "p1"
assert result[0]["story"] == "s1"
assert result[0]["note"] == "n1"
assert result[1]["project"] == "p2"
assert result[1]["story"] == "s2"
assert result[1]["note"] == "n2"
# --- no remaining overlaps ---
def test_result_has_no_overlaps(self):
"""After resolution, no two entries in the result should overlap."""
rows = [
self._row("09:00", "11:00", "a"),
self._row("09:30", "10:30", "b"),
self._row("10:00", "12:00", "c"),
]
result = self._sorted(resolve_overlaps(rows))
for i in range(len(result) - 1):
assert result[i]["end"] <= result[i + 1]["start"], (
f"Overlap between entry {i} ({result[i]}) and {i + 1} ({result[i + 1]})"
)