[improve](streaming-job) add from-to cdc WAL-search timeout and stale-reader release#64013
[improve](streaming-job) add from-to cdc WAL-search timeout and stale-reader release#64013JNSimba wants to merge 9 commits into
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
b647f18 to
139ec20
Compare
|
/review |
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR improves reliability of from-to (at-least-once) CDC streaming tasks by (1) adding a setup-phase timeout for WAL/binlog search during startup to prevent indefinite hangs when upstream is idle, and (2) introducing a best-effort mechanism for FE to ask the previously scheduled backend to release a stale reader on failure/cancel so reschedules don’t leave two readers competing for the same upstream state (e.g., a Postgres replication slot).
Changes:
- Add
taskTimeoutMstoWriteRecordRequestand enforce a setup-phase “WAL-search/idle” timeout incdc_clientpolling (snapshot splits excluded). - Add a
/api/releaseReaderendpoint oncdc_clientand an FE best-effort fire-and-forget RPC fromStreamingMultiTblTaskon fail/cancel to release the previous backend’s reader while keeping upstream slot state. - Add targeted tests to ensure
release()remains the base implementation (to avoid dropping slots) and thatEnv.getReaderIfPresent()is a non-creating lookup.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java | Adds a reflection-based test ensuring release() is not overridden by specific readers (slot-preserving behavior). |
| fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/common/EnvTest.java | Adds a test for the new non-creating reader lookup method. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java | Extends the reader contract with release(JobBaseConfig) for slot-preserving shutdown. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java | Implements a default release() that stops reading without invoking close-time cleanup that could drop upstream artifacts. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java | Adds a setup-phase timeout (based on FE task timeout) to avoid indefinite startup hangs in streaming splits. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java | Adds /api/releaseReader endpoint to stop a reader engine while keeping upstream slot state. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java | Adds getReaderIfPresent(jobId) to avoid accidentally creating readers during release attempts. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java | Passes taskTimeoutMs to BE; adds best-effort stale-reader release RPC on fail/cancel. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java | Adds taskTimeoutMs field used by BE for setup timeout decisions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Review result: request changes. I found one blocking lifecycle issue in the new stale-reader release path.
Critical checkpoint conclusions:
- Goal/test: The PR targets CDC from-to startup timeout, stale reader release, and stream-load first error surfacing. Tests cover some helper behavior and JSON parsing, but the stale-release race is not covered.
- Scope/focus: The change is mostly focused, but the new release endpoint lacks task/generation identity and can affect a different task instance.
- Concurrency: This PR explicitly involves asynchronous FE failure/cancel and CDC-client async writer threads. The new release RPC can race with a newer writer for the same job on the same backend; see inline comment.
- Lifecycle: Reader lifecycle is non-intuitive because Env stores readers by jobId while tasks are recreated. The release lifecycle is therefore not safe without task identity.
- Config: No new dynamic config was added; taskTimeoutMs is derived from existing FE config.
- Compatibility: Adding taskTimeoutMs to the JSON request is backward-compatible for default zero behavior.
- Parallel paths: Snapshot splits are intentionally excluded from setup timeout. The release path affects both MySQL and PostgreSQL readers through the shared base implementation.
- Conditional checks: The setup timeout condition is understandable; the release endpoint needs an additional stale-task guard.
- Test coverage: Existing tests do not cover delayed release arriving after a newer task starts, which is the blocking scenario.
- Observability: Logs exist for release attempts, but they do not include a task/generation id, which also contributes to diagnosing the race.
- Transactions/persistence/data writes: Offset commit still relies on task id validation in FE, but the release race can interrupt the active reader/streamload before commit.
- FE-BE variables: The release request does not pass task identity, which is the root of the issue.
User focus: No additional user-provided review focus was specified.
|
/review |
There was a problem hiding this comment.
I found one blocking race in the task ownership guard. The PR goal is clear and mostly focused: add setup timeout, best-effort stale reader release, and better stream-load error detail. The FirstErrorMsg plumbing and timeout path are covered by tests, but the stale-release ownership protection still has a concurrency gap.
Critical checkpoint conclusions:
- Goal/test coverage: The intended behavior is covered partially, but the replacement-reader race below is not covered and can still interrupt a valid replacement task.
- Scope/focus: The change is reasonably small and scoped to streaming CDC.
- Concurrency: The PR explicitly changes concurrent FE timeout/cancel RPCs and async cdc_client write tasks; the owner update is not atomic with reader acquisition, leaving a race.
- Lifecycle: Reader and stream-load instances are shared per job and closed by releaseReader; this lifecycle must be protected by the same generation/ownership transition.
- Config: No new config items. Existing dynamic timeout multiplier is still read live on FE; the per-request value is fixed for cdc_client execution.
- Compatibility: Adding an optional request field is compatible with default 0 behavior; the new release API path is only used by the new FE.
- Parallel paths: Snapshot path is intentionally excluded from the setup timeout; close path is unchanged.
- Conditional checks: The owner check is necessary but currently not sufficient because owner assignment is separate from getReader().
- Observability: Logs include job/task/backend identifiers for new release/timeout paths.
- Data/transaction correctness: No Doris table transaction path changes, but CDC offset progress depends on not killing the replacement reader; the race can cause repeated task failure and slot contention.
User focus: No additional user-provided review focus was specified.
|
/review |
|
run buildall |
There was a problem hiding this comment.
Review result: request changes.
Critical checkpoint conclusions:
- Goal/test: The PR improves CDC reader release on failure/cancel, avoids stale releases by task owner, adds a setup-phase timeout, and surfaces FirstErrorMsg. The stale-release owner checks now address the already-known jobId-only issue, and tests cover parsing and some Env no-op cases, but the core handoff behavior still has an uncovered lifecycle race.
- Scope/focus: The change is mostly focused on streaming CDC task failure/cancel handling and error reporting.
- Concurrency: There is a remaining concurrency/lifecycle issue: FE sends releaseReader asynchronously and immediately continues failure/cancel processing, so a restarted/resumed task can start before the old backend has actually stopped the CDC engine. This defeats the stated guarantee that reschedule never leaves two readers competing for the same source.
- Lifecycle: Reader ownership is now tracked per task on the backend, but valid-owner release completion is not synchronized with FE task lifecycle.
- Configuration: No new config item was added; existing timeout multiplier remains dynamically read.
- Compatibility: The new request field is additive and defaults to 0 on older senders, disabling only the new backend setup timeout.
- Parallel paths: Close/release and writeRecords paths were reviewed; the issue is in failure/cancel release ordering, not the already-commented stale release guard.
- Tests: Added unit/regression coverage does not exercise the asynchronous release vs new task start race.
- Observability: Logs were added and are useful, but they do not prevent the race.
- Transaction/persistence/data writes: No direct storage-format or EditLog change was found in this PR.
- Performance: No blocking hot-path performance issue found, but the non-blocking release is currently used where correctness depends on the release having completed.
User focus: No additional user-provided focus was specified.
TPC-H: Total hot run time: 29286 ms |
TPC-DS: Total hot run time: 169509 ms |
TPC-H: Total hot run time: 29222 ms |
TPC-DS: Total hot run time: 169585 ms |
|
/review |
|
run buildall |
There was a problem hiding this comment.
I found one additional correctness issue distinct from the existing reader-ownership and fire-and-forget threads. The PR goal is clear, but the current implementation still does not fully guarantee safe handoff between CDC tasks because release cleanup can close a replacement task's job-level stream-load state.
Critical checkpoint conclusions:
- Goal/test: The startup timeout and FirstErrorMsg surfacing are covered by targeted tests, but stale-reader release handoff remains insufficiently tested for concurrent release vs replacement task startup.
- Scope/focus: The code is mostly focused on from-to CDC streaming, but release now affects shared per-job stream-load lifecycle.
- Concurrency: This PR is concurrency-sensitive. Reader ownership is guarded, but
batchStreamLoadMapremains keyed only by job id and is closed without task/generation validation. Existing inline threads already cover the fire-and-forget release and acquire/claim reader races; the new inline comment covers a separate stream-load race after reader detach. - Lifecycle: Reader release removes the reader context but leaves stream-load cleanup to a separate job-id-only map, creating a lifecycle mismatch between old and replacement tasks.
- Compatibility/config:
taskTimeoutMsis additive in the JSON request path; no storage-format compatibility concern found. - Parallel paths: Snapshot/TVF paths appear intentionally excluded from the new release timeout behavior.
- Tests: Unit/regression coverage exists for error-message parsing and simple Env no-op cases, but there is no test for release racing a replacement task or for ownership of stream-load cleanup.
- Observability: Logs include job/task identifiers for new release paths.
- Transaction/persistence/data correctness: The stream-load cleanup race can interrupt active writes for the replacement task, so correctness is not yet acceptable.
User focus: No additional user-provided review focus was specified.
…g replacement loader
|
run buildall |
|
/review |
There was a problem hiding this comment.
I found no additional distinct inline comments beyond the existing review threads, but I cannot approve the PR while the previously raised release-handoff correctness issue remains unresolved. In the current head, StreamingMultiTblTask.releaseRemoteReader() still sends /api/releaseReader/{taskId} fire-and-forget and immediately continues onFail()/reschedule handling, so the next task can start on another backend before the old backend has actually stopped its CDC reader. The PR body calls this a known limitation, but for the from-to CDC path it still allows competing readers for the same source/replication slot until retry/self-heal, which is the failure mode this change is intended to prevent. Please resolve the existing inline thread at fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java rather than adding a duplicate comment.
Review focus: no additional user-provided focus points.
Critical checkpoint conclusions:
- Goal/test: The PR addresses setup timeout, stale-reader release, and surfacing
FirstErrorMsg; tests cover response parsing and some Env no-op cases, but the release handoff correctness remains unproven and currently incomplete. - Scope: The code is mostly focused on the streaming CDC path.
- Concurrency: This PR is concurrency-sensitive. The reader claim/detach race and stream-load cleanup race from prior threads appear addressed in the latest head, but the asynchronous FE-to-BE release before reschedule is still a correctness gap.
- Lifecycle: Reader release intentionally keeps source-side slot state and detaches the backend context; normal
/api/closeremains responsible for final cleanup. No additional lifecycle issue found beyond the handoff timing. - Configuration: No new configuration items.
taskTimeoutMsis derived from existing config. - Compatibility/protocol: FE sends a new
taskTimeoutMsrequest field and a new cdc_client endpoint path; no persisted format change found. - Parallel paths: Scoped to
StreamingMultiTblTask/from-to CDC as described; snapshot timeout is explicitly excluded. - Conditional checks: The setup-timeout condition is limited to binlog/stream split before first record; offset extraction uses the starting offset, so I did not find a data-loss issue there.
- Test coverage/results: Unit/regression coverage was added for
FirstErrorMsgand limited Env behavior, but there is no test/mechanism proving that reschedule cannot overlap with the previous reader. - Observability: Added logs are sufficient for the new paths.
- Transactions/persistence/data writes: No direct transaction metadata change; late commit remains guarded by task id, but overlapping source readers are still possible until the release completes.
- FE/BE variables:
taskTimeoutMsis passed on the existing request object and used by cdc_client. - Performance: No new obvious hot-path performance issue found in the reviewed scope.
FE UT Coverage ReportIncrement line coverage |
|
run nonConcurrent |
FE Regression Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 29391 ms |
TPC-DS: Total hot run time: 170169 ms |
TPC-H: Total hot run time: 29177 ms |
TPC-DS: Total hot run time: 169396 ms |
FE Regression Coverage ReportIncrement line coverage |
Proposed changes
Three reliability/observability fixes for from-to (at-least-once) CDC streaming tasks.
Startup timeout. A from-to binlog task whose upstream is idle could block
indefinitely in the replication startup/locate phase (no first message arrives,
so the poll loop never times out). This adds a setup-phase timeout — half of the
FE task timeout, passed down via
WriteRecordRequest.taskTimeoutMs— so the taskexits and commits the current offset gracefully instead of hanging. Snapshot
splits are explicitly excluded so an incomplete watermark is never committed.
Release a stale reader on failure, guarded by task ownership. On task
onFail/cancel, FE makes a best-effort request (/api/releaseReader/{taskId})asking the previous backend to stop its reader while keeping the replication slot,
so a reschedule to another backend does not leave two readers competing for the
same slot. The reader cache is keyed by job id and reused across tasks, so the
release carries the failing task id and the backend releases only if that task
still owns the reader; a stale/late RPC becomes a no-op and cannot interrupt a
replacement task that reused the same reader. The RPC is fire-and-forget so it
never blocks while the job lock is held.
Surface the first rejected-row detail in the task error. When a stream load
fails with a data-quality error, the cdc_client now parses the
FirstErrorMsgfield from the response and appends it to the task error reported to FE, so the
job
errorMsgshows the actual offending row instead of only anErrorURL.Known limitation: the release is best-effort, so a reschedule may briefly observe
"replication slot is active"; this self-heals via task retry or the source-side
sender timeout.
Further comments
Scoped to the from-to streaming path; snapshot and TVF paths are unaffected.