Skip to content

feat(retention): scheduled task-retention cleanup workflow#272

Merged
smoreinis merged 27 commits into
mainfrom
feat/retention-scheduled-cleanup
Jun 8, 2026
Merged

feat(retention): scheduled task-retention cleanup workflow#272
smoreinis merged 27 commits into
mainfrom
feat/retention-scheduled-cleanup

Conversation

@smoreinis

@smoreinis smoreinis commented Jun 3, 2026

Copy link
Copy Markdown
Collaborator

Summary

Adds a regularly scheduled sweep that cleans up long-lived task/chat data for idle tasks, to support project-data-isolation retention requirements. A Temporal Schedule fires a RetentionCleanupSweepWorkflow that discovers idle tasks belonging to an allowlisted set of agents and runs the existing, idempotent TaskRetentionUseCase.clean_task path against each one.

  • Discovery (cheap pre-filter): cleaned_at IS NULL AND updated_at < cutoff, joined to an agent-name allowlist, keyset-paginated. Deliberately no status filter — status is race-prone, so the authoritative RUNNING/idle/unprocessed-events checks stay inside clean_task at clean-time. updated_at < cutoff is a correct superset of truly-idle tasks (never under-includes).
  • Fan-out: one RetentionCleanupTaskWorkflow child per task (bounded by max_in_flight), aggregated into cleaned/skipped/failed totals; continue_as_new per page bounds history.
  • Error handling: the clean activity maps clean_task's ClientError refusals to a skipped outcome (so one task never aborts the sweep); genuine transient errors propagate and are retried by Temporal.
  • Policy read at runtime: the allowlist / idle-days / paging are read from env by a load_cleanup_config activity at sweep run time (carried across pages), so changing the allowlist is an env edit + worker restart — no schedule recreation. Cron remains a schedule property.
  • Safety: gated by RETENTION_CLEANUP_ENABLED (off by default); fail-closed empty allowlist cleans nothing; clean-only (no export sink in this iteration).

Reuses the already-merged export/clean/rehydrate path — no duplicated deletion logic.

Config (env vars)

RETENTION_CLEANUP_ENABLED (default false), RETENTION_CLEANUP_AGENT_ALLOWLIST (CSV of agent names, empty ⇒ no-op), RETENTION_CLEANUP_IDLE_DAYS (7), RETENTION_CLEANUP_CRON (0 4 * * *), RETENTION_CLEANUP_PAGE_SIZE (200), RETENTION_CLEANUP_MAX_IN_FLIGHT (20).

Test Plan

  • Unit: env parsing/defaults; activities (delegation, cleaned/skipped mapping, transient-error propagation, runtime config load); workflow (fan-out, continue_as_new paging, failed-child-doesn't-abort, no-args config-load path) — all green.
  • Integration (testcontainers): discovery query filters + keyset paging + fail-closed empty allowlist — green locally.
  • Lint clean on all changed files.
  • Manual smoke in a target env with the schedule enabled (trigger the schedule, confirm the sweep summary).

Notes / follow-ups

  • Infra prerequisite (out of scope here): a backend Temporal worker must run on the agentex-server task queue in the deployed environment for the schedule to fire. This PR wires the worker for local docker-compose only.
  • Cron changes still require updating/recreating the schedule (only the allowlist/idle-days/paging are runtime-tunable).
  • No export-before-clean in this iteration (clean-only); export/rehydrate APIs remain available for the later full-restore path.

Greptile Summary

This PR introduces a scheduled Temporal sweep that discovers idle tasks belonging to an agent allowlist and runs the existing idempotent TaskRetentionUseCase.clean_task path against each one, with a safe dry-run default and a continue_as_new keyset-pagination pattern to keep workflow history bounded.

  • Discovery & fan-out: list_cleanup_candidate_ids uses a DISTINCT keyset-paginated query (no status filter, by design) to find candidates; RetentionCleanupSweepWorkflow fans out one RetentionCleanupTaskWorkflow child per task (bounded by max_in_flight), aggregates cleaned/skipped/failed totals across pages, and skips multi-agent tasks via a preflight query.
  • Policy at runtime: load_cleanup_config reads all RETENTION_CLEANUP_* env vars at sweep time so allowlist/idle-days changes take effect on the next run without schedule recreation; the cron expression is the only property baked into the Temporal Schedule.
  • Safety defaults: feature is off by default (RETENTION_CLEANUP_ENABLED=false), dry-run is on by default (RETENTION_CLEANUP_DRY_RUN=true), and an empty allowlist is fail-closed (returns no candidates).

Confidence Score: 4/5

Safe to merge after addressing the DRY_RUN boolean comparison; all deletion paths default to no-op and the feature is off by default.

The boolean parsing for RETENTION_CLEANUP_DRY_RUN is case-sensitive: an operator who sets the env var to 'True' (capital T — common from YAML tools and Python config generators) gets dry_run=False, enabling live deletions while believing they are in dry-run mode. Every other aspect of the PR is well-structured: the sweep logic is correct, the fail-closed defaults are right, the tests are thorough, and the keyset pagination is safe.

agentex/src/config/environment_variables.py — the RETENTION_CLEANUP_DRY_RUN boolean comparison; agentex/src/domain/services/task_retention_service.py — preview_clean_task duplicates the clean_task safety guards.

Important Files Changed

Filename Overview
agentex/src/config/environment_variables.py Adds 7 RETENTION_CLEANUP_* env vars; the DRY_RUN boolean uses a case-sensitive == 'true' comparison that silently disables the dry-run guard when set to 'True' (capitalized).
agentex/src/temporal/workflows/retention_cleanup_workflow.py New sweep + per-task workflows; correct continue_as_new pagination, enabled flag is carried in the payload, child workflow IDs are scoped to the sweep run_id to avoid REJECT_DUPLICATE collisions.
agentex/src/temporal/activities/retention_cleanup_activities.py Four activities covering config load, candidate discovery, multi-agent preflight, and clean; ClientError correctly mapped to skipped, transient errors propagate for Temporal retry.
agentex/src/domain/services/task_retention_service.py Adds preview_clean_task for dry-run support; duplicates the three safety guards from clean_task rather than sharing a helper, creating a maintenance drift risk.
agentex/src/domain/repositories/task_repository.py Adds list_cleanup_candidate_ids (keyset-paginated with DISTINCT) and list_multi_agent_task_ids; fail-closed on empty allowlist, correct WHERE/ORDER BY/LIMIT SQL semantics.
agentex/src/temporal/run_worker.py Merges health-check and retention-cleanup workers into a single agentex-server worker; registers all four retention activities alongside existing health-check activities.
agentex/src/temporal/run_retention_cleanup_schedule.py Idempotent schedule creator; skips creation when disabled or Temporal not configured; bakes only the cron expression into the schedule, not the policy args.
agentex/tests/unit/temporal/test_retention_cleanup_workflow.py Good workflow tests covering multi-page aggregation, disabled sweep, multi-agent skipping, dry-run propagation, and config-load path; uses Temporal's time-skipping test environment.
agentex/tests/integration/test_retention_cleanup_discovery.py Integration tests for discovery filters, keyset paging, empty-allowlist fail-closed, and multi-agent preflight detection; covers the critical query paths.

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
agentex/src/config/environment_variables.py:242-244
The `RETENTION_CLEANUP_DRY_RUN` check uses a case-sensitive `== "true"` comparison — consistent with the `ENABLE_HEALTH_CHECK_WORKFLOW` pattern, but the stakes are very different here. Setting `RETENTION_CLEANUP_DRY_RUN=True` (capital T, which is common in YAML and many env-management tools) evaluates `"True" == "true"``False`, silently switching from dry-run to live deletion. For a feature-enable flag that's a safe failure; for a deletion guard it's not. `.lower()` makes this robust to common capitalisation conventions.

```suggestion
            RETENTION_CLEANUP_DRY_RUN=(
                os.environ.get(EnvVarKeys.RETENTION_CLEANUP_DRY_RUN, "true").lower()
                == "true"
            ),
```

### Issue 2 of 2
agentex/src/domain/services/task_retention_service.py:290-338
`preview_clean_task` hand-replicates the three safety checks from `clean_task` (`RUNNING`, `_is_task_idle`, `_has_unprocessed_events`) rather than delegating to a shared helper. If a new guard is added to `clean_task` in the future (e.g. an active-export check), `preview_clean_task` won't automatically get it, so the dry-run preview would report tasks as cleanable when the real path would refuse them — making the dry-run misleading right when correctness matters most. Consider extracting the validation block into a private `_assert_task_cleanable(task, ...)` method that both paths call.

Reviews (8): Last reviewed commit: "fix(retention): default cleanup dry-run ..." | Re-trigger Greptile

smoreinis added 17 commits June 3, 2026 11:15
Adds RETENTION_CLEANUP_ENABLED, RETENTION_CLEANUP_AGENT_ALLOWLIST,
RETENTION_CLEANUP_IDLE_DAYS, RETENTION_CLEANUP_CRON,
RETENTION_CLEANUP_PAGE_SIZE, and RETENTION_CLEANUP_MAX_IN_FLIGHT to
EnvVarKeys, EnvironmentVariables, and the refresh() parser, following
the existing ENABLE_HEALTH_CHECK_WORKFLOW pattern. Includes unit tests
covering both explicit values and defaults.
…o schedule

Add a load_cleanup_config activity that reads RETENTION_CLEANUP_* env vars at
sweep runtime so changing policy and restarting the worker takes effect on the
next scheduled run without recreating the Temporal Schedule. The schedule now
carries no policy args — only cron + workflow identity. The sweep loads config
once on the first page and carries it across continue_as_new pages for
consistency. The allowlist gate in the schedule bootstrap is removed; fail-closed
is preserved at runtime (empty allowlist → discovery returns no candidates).
@smoreinis smoreinis marked this pull request as ready for review June 4, 2026 00:08
@smoreinis smoreinis requested a review from a team as a code owner June 4, 2026 00:08
Merge the health-check and retention-cleanup Temporal workers into one
process/container. Both sets of workflows and activities are now registered
on the same `agentex-server` task queue worker, preventing tasks from
landing on a worker that has no handler for their type.

Remove the separate `agentex-retention-cleanup-worker` compose service and
`run_retention_cleanup_worker_main` entrypoint; add the four runtime-policy
env vars to the surviving `agentex-temporal-worker` service.
Comment thread agentex/src/temporal/workflows/retention_cleanup_workflow.py
Comment thread agentex/src/temporal/workflows/retention_cleanup_workflow.py Outdated
smoreinis added 2 commits June 3, 2026 17:54
- carry the enabled flag across continue_as_new pages so the runtime kill
  switch stays consistent with the other carried policy fields
- scope child workflow ids with the sweep run id to avoid collisions under a
  REJECT_DUPLICATE workflow-id-reuse policy
- add a task_repository property to TaskRetentionUseCase instead of reaching
  through retention_service internals in the worker
@smoreinis smoreinis force-pushed the feat/retention-scheduled-cleanup branch from 7bd0643 to 45802a7 Compare June 4, 2026 01:01
@smoreinis

Copy link
Copy Markdown
Collaborator Author

discussed and this is just going in as-is since the difference in having another agentex member stamp is negligible and this change should be a no-op on deployment as any clean up is gated behind an enabled as well as a dry run flag

@smoreinis smoreinis merged commit 5277fcf into main Jun 8, 2026
29 checks passed
@smoreinis smoreinis deleted the feat/retention-scheduled-cleanup branch June 8, 2026 23:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant