diff --git a/KERNEL_REV b/KERNEL_REV index af059324d..37b717a45 100644 --- a/KERNEL_REV +++ b/KERNEL_REV @@ -1 +1 @@ -101aa465e71991eec98102bba77aad2f7ad8faed +cbeaf44c66ebc57c5b3eed2a5c5d0290d4bf035b diff --git a/src/databricks/sql/backend/kernel/_errors.py b/src/databricks/sql/backend/kernel/_errors.py index 78b542300..151bac96b 100644 --- a/src/databricks/sql/backend/kernel/_errors.py +++ b/src/databricks/sql/backend/kernel/_errors.py @@ -97,7 +97,23 @@ def reraise_kernel_error(exc: "_kernel.KernelError") -> "Error": """ code = getattr(exc, "code", "Unknown") cls = _CODE_TO_EXCEPTION.get(code, DatabaseError) - new = cls(getattr(exc, "message", str(exc))) + + # For ServerOperationError, reproduce the Thrift backend's + # ``context`` dict so callers that read + # ``err.context["diagnostic-info"]`` (the Spark stack trace) / + # ``err.context["operation-id"]`` get the same shape on the kernel + # path. ``diagnostic_info`` is forwarded from the kernel error (it + # now crosses the PyO3 boundary; older wheels return ``None`` via + # ``getattr``, so this degrades gracefully). Matches + # thrift_backend.py's ServerOperationError construction. + context = None + if cls is ServerOperationError: + context = { + "operation-id": getattr(exc, "query_id", None), + "diagnostic-info": getattr(exc, "diagnostic_info", None), + } + new = cls(getattr(exc, "message", str(exc)), context) + for attr in ( "code", "sql_state", @@ -106,6 +122,12 @@ def reraise_kernel_error(exc: "_kernel.KernelError") -> "Error": "http_status", "retryable", "query_id", + # Extended server status now forwarded across the PyO3 boundary + # (kernel #121). ``getattr(..., None)`` keeps this forward-safe + # against an older wheel that doesn't set these attrs. + "display_message", + "diagnostic_info", + "error_details_json", ): setattr(new, attr, getattr(exc, attr, None)) return new diff --git a/src/databricks/sql/backend/kernel/client.py b/src/databricks/sql/backend/kernel/client.py index 7cdd484fa..08bb0d36b 100644 --- a/src/databricks/sql/backend/kernel/client.py +++ b/src/databricks/sql/backend/kernel/client.py @@ -71,6 +71,56 @@ # per-request skip-and-warn. _KERNEL_MANAGED_HEADERS = frozenset({"authorization", "x-databricks-org-id"}) +# Leading verbs of SQL volume/staging statements. Detected by the +# leading token (case-insensitive) so the kernel backend can fail loud +# on staging ops it can't service — see ``execute_command``. +_STAGING_VERBS = ("PUT", "GET", "REMOVE") + + +def _strip_leading_sql_comments(sql: str) -> str: + """Strip leading whitespace and SQL comments (``-- …`` line and + ``/* … */`` block, possibly several) from ``sql``, returning the + remainder. + + Needed so staging detection sees the real leading verb: a + comment-prefixed staging op (``-- upload\\nPUT …`` or + ``/* c */ PUT …``, common in ETL scripts) must still be classified + as staging, or it would slip past the guard into the silent-no-op + bug. Block comments do not nest in Databricks SQL, so a simple + scan-to-``*/`` is correct. + """ + i = 0 + n = len(sql) + while i < n: + if sql[i].isspace(): + i += 1 + elif sql.startswith("--", i): + # Line comment: skip to end of line (or string). + nl = sql.find("\n", i) + i = n if nl == -1 else nl + 1 + elif sql.startswith("/*", i): + # Block comment: skip to closing */ (or end if unterminated). + close = sql.find("*/", i + 2) + i = n if close == -1 else close + 2 + else: + break + return sql[i:] + + +def _is_staging_statement(operation: str) -> bool: + """True iff ``operation`` is a volume/staging statement (PUT / GET / + REMOVE). + + Strips leading whitespace + SQL comments first (so a comment- + prefixed staging op is still caught), then matches the leading token + only — so a normal query that merely *contains* the word (e.g. + ``SELECT 'GET' AS x``) isn't misflagged. + """ + stripped = _strip_leading_sql_comments(operation) + # First whitespace-delimited token, uppercased. + verb = stripped.split(None, 1)[0].upper() if stripped.strip() else "" + return verb in _STAGING_VERBS + # ─── Client ───────────────────────────────────────────────────────────────── @@ -172,6 +222,17 @@ def __init__( # path. Same lock as ``_async_handles``. self._closed_commands: Set[str] = set() self._async_handles_lock = threading.RLock() + # Sync-execute cancellers keyed by ``id(cursor)``. A blocking + # ``execute()`` sets ``cursor.active_command_id`` only AFTER it + # returns, so a concurrent ``cursor.cancel()`` (the documented + # cross-thread PEP-249 shape) has no command id to target while + # the query runs. We register a detached kernel + # ``StatementCanceller`` here just before the blocking call and + # drop it after; ``cancel_running_cursor`` (invoked by + # ``Cursor.cancel`` when there's no command id yet) fires it. + # Guarded by its own lock — cancel can race execute teardown. + self._sync_cancellers: Dict[int, Any] = {} + self._sync_cancellers_lock = threading.RLock() # ── Session lifecycle ────────────────────────────────────────── @@ -354,6 +415,24 @@ def execute_command( # ``_async_statements`` and closed by ``close_command``); the sync # path drops it in finally. ``close_stmt`` is the post-success # decision flag — it stays True on sync, flips to False on async. + # Volume/staging (PUT/GET/REMOVE) is not supported on the kernel + # path: the kernel returns the staging control row as a normal + # result set (``KernelResultSet.is_staging_operation`` is always + # False), so the connector's ``_handle_staging_operation`` never + # fires and NO file is transferred. Rather than silently no-op + # (the Thrift path performs the presigned-URL upload/download), + # fail loud at the call site so ETL scripts don't ingest + # stale/missing data. Detected by the leading SQL verb — the + # only signal available pre-execute, since the kernel exposes no + # staging marker today. + if _is_staging_statement(operation): + raise NotSupportedError( + "Volume / staging operations (PUT / GET / REMOVE) are not " + "supported on the kernel backend (use_kernel=True); the file " + "transfer would silently not happen. Use the Thrift backend " + "for staging operations." + ) + close_stmt = True try: try: @@ -382,10 +461,23 @@ def execute_command( self._async_statements[command_id.guid] = stmt close_stmt = False return None + # Register a detached canceller BEFORE the blocking + # execute so a concurrent ``cursor.cancel()`` can reach + # the running statement (its server id is populated mid- + # execute). Keyed by ``id(cursor)`` since no command id + # exists yet. Dropped in the finally. + try: + with self._sync_cancellers_lock: + self._sync_cancellers[id(cursor)] = stmt.canceller() + except Exception: + # Canceller is best-effort; never block execute on it. + pass executed = stmt.execute() except Exception as exc: raise _wrap_kernel_exception("execute_command", exc) from exc finally: + with self._sync_cancellers_lock: + self._sync_cancellers.pop(id(cursor), None) if close_stmt: # Sync path: ``Statement`` is a lifecycle owner separate # from the executed handle. Drop it here so the parent @@ -422,6 +514,46 @@ def cancel_command(self, command_id: CommandId) -> None: except Exception as exc: raise _wrap_kernel_exception("cancel_command", exc) from exc + def cancel_running_cursor(self, cursor: "Cursor") -> bool: + """Cancel an in-flight SYNC ``execute()`` on ``cursor``. + + Invoked by ``Cursor.cancel()`` when ``active_command_id`` is + still ``None`` — i.e. a blocking ``execute()`` hasn't returned, + so the command id isn't set yet but the server statement may be + running. Fires the detached ``StatementCanceller`` registered in + ``execute_command`` before the blocking call. + + Returns ``True`` if a canceller was found and fired (the + statement was in flight), ``False`` otherwise so ``Cursor`` can + emit its "no executing command" warning. Safe to call from + another thread. + + Tolerant by design: ``cursor.cancel()`` is a best-effort + PEP-249 method (callers don't expect it to raise), so a cancel + failure is logged and swallowed rather than propagated. This + also covers the early-cancel window — a cancel arriving before + the kernel has observed the server statement id is a no-op in + the kernel canceller, but if it ever raised (e.g. a transport + hiccup on the cancel RPC) we must not surface that out of + ``cancel()``. We still return ``True`` (a canceller was present + and we attempted it) so ``Cursor`` doesn't emit the misleading + "no executing command" warning. + """ + with self._sync_cancellers_lock: + canceller = self._sync_cancellers.get(id(cursor)) + if canceller is None: + return False + try: + canceller.cancel() + except Exception: + logger.warning( + "cancel_running_cursor: best-effort cancel of in-flight " + "sync statement failed; swallowing (cursor.cancel() is " + "tolerant by PEP-249 contract)", + exc_info=True, + ) + return True + def close_command(self, command_id: CommandId) -> None: with self._async_handles_lock: handle = self._async_handles.pop(command_id.guid, None) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 7fc815cd8..e66dd897c 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1704,11 +1704,22 @@ def cancel(self) -> None: """ if self.active_command_id is not None: self.backend.cancel_command(self.active_command_id) - else: - logger.warning( - "Attempting to cancel a command, but there is no " - "currently executing command" - ) + return + # No command id yet. A backend whose synchronous ``execute()`` + # blocks without first publishing a command id (the kernel + # backend) may still have a server statement in flight. Such a + # backend exposes ``cancel_running_cursor(cursor)`` -> bool to + # cancel it; it returns True if something was actually + # cancelled. Opt-in via getattr so the Thrift / SEA backends + # (which set ``active_command_id`` before blocking) are + # unaffected. + cancel_running_cursor = getattr(self.backend, "cancel_running_cursor", None) + if cancel_running_cursor is not None and cancel_running_cursor(self): + return + logger.warning( + "Attempting to cancel a command, but there is no " + "currently executing command" + ) def close(self) -> None: """Close cursor""" diff --git a/tests/e2e/test_kernel_backend.py b/tests/e2e/test_kernel_backend.py index 1e61bd7b8..95d7d942e 100644 --- a/tests/e2e/test_kernel_backend.py +++ b/tests/e2e/test_kernel_backend.py @@ -24,7 +24,7 @@ import pytest import databricks.sql as sql -from databricks.sql.exc import DatabaseError +from databricks.sql.exc import DatabaseError, NotSupportedError, ServerOperationError # Skip the whole module unless the kernel wheel is genuinely installed. # ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a @@ -361,3 +361,120 @@ def test_user_agent_entry_and_http_headers_round_trip(kernel_conn_params): with c.cursor() as cur: cur.execute("SELECT 1 AS n") assert cur.fetchall()[0][0] == 1 + + +# ── Parameter parity (tz-aware timestamp, scientific decimal) ────── + + +def test_tz_aware_timestamp_parameter_binds(conn): + """A tz-aware datetime parameter (datetime with tzinfo) binds on + the kernel path and resolves to the correct UTC instant. Previously + rejected at bind on kernel; works on Thrift. (kernel #121)""" + import datetime + + tzdt = datetime.datetime( + 2026, + 5, + 15, + 18, + 0, + 0, + tzinfo=datetime.timezone(datetime.timedelta(hours=5, minutes=30)), + ) + with conn.cursor() as cur: + cur.execute("SELECT ? AS ts", [tzdt]) + ts = cur.fetchall()[0][0] + # 18:00 +05:30 == 12:30 UTC. + assert (ts.hour, ts.minute) == (12, 30) + + +def test_scientific_notation_decimal_parameter_binds(conn): + """A Decimal whose str() is exponential (e.g. 1E-7) binds on the + kernel path. Previously rejected at bind; the server/Thrift accept + scientific-notation decimal literals. (kernel #121)""" + import decimal + from databricks.sql.parameters.native import DecimalParameter + + with conn.cursor() as cur: + # 1E+2 == 100, round-trips cleanly at scale 0. + cur.execute("SELECT ? AS d", [DecimalParameter(decimal.Decimal("1E+2"))]) + assert int(cur.fetchall()[0][0]) == 100 + + +# ── Staging / volume — fail loud, not silent no-op ──────────────── + + +def test_staging_put_raises_not_supported(conn): + """A PUT (volume/staging) statement fails loud on the kernel path + rather than silently no-opping (which would make ETL ingest + stale/missing data). (CUJ-gap audit)""" + with conn.cursor() as cur: + with pytest.raises(NotSupportedError, match="staging"): + cur.execute("PUT '/tmp/x.csv' INTO '/Volumes/main/default/v/x.csv'") + + +def test_comment_prefixed_staging_put_raises_not_supported(conn): + """A comment-prefixed staging op (common in ETL scripts) must also + fail loud — the leading-verb detection strips SQL comments first, so + it can't slip past into the silent-no-op bug (PR #825 review #1).""" + with conn.cursor() as cur: + with pytest.raises(NotSupportedError, match="staging"): + cur.execute( + "-- upload the daily extract\n" + "PUT '/tmp/x.csv' INTO '/Volumes/main/default/v/x.csv'" + ) + + +# ── Error fidelity — diagnostic-info reaches .context ───────────── + + +def test_server_error_exposes_diagnostic_info_context(conn): + """A server-side query failure surfaces as ServerOperationError + with the Spark diagnostic context in ``.context['diagnostic-info']`` + — Thrift parity (kernel #121 forwards diagnostic_info across pyo3; + the connector populates .context).""" + with conn.cursor() as cur: + with pytest.raises(ServerOperationError) as exc_info: + cur.execute("SELECT * FROM definitely_not_a_table_xyz_kernel_e2e") + err = exc_info.value + # context shape matches Thrift; diagnostic-info may be None if + # the server didn't attach one, but the KEY must exist. + assert "diagnostic-info" in err.context + assert "operation-id" in err.context + + +# ── Sync cancel (cursor.cancel() from another thread) ───────────── + + +def test_sync_cancel_interrupts_blocking_execute(conn): + """cursor.cancel() from another thread cancels a long-running + blocking execute() on the kernel path. Previously a silent no-op + (active_command_id was None until execute returned). (kernel #121 + StatementCanceller + connector cancel_running_cursor wiring)""" + import threading + import time + + cur = conn.cursor() + + # The kernel publishes the server statement id once the initial + # POST returns — within the server's default wait window (~10s). + # Cancel after that so the canceller has an id to target; a cancel + # before then is a no-op by design (id not yet known). Pick a query + # that runs well past this so the cancel lands mid-flight. + def cancel_after_delay(): + time.sleep(15.0) + cur.cancel() + + t = threading.Thread(target=cancel_after_delay) + t.start() + try: + # Cancel should make execute() raise rather than run to + # completion — proving the server-side statement was cancelled. + with pytest.raises(Exception): + cur.execute( + "SELECT count(*) FROM range(0, 1000000000000) " + "WHERE pow(rand(), 2) < 0.5 AND sqrt(id) > 1" + ) + finally: + t.join() + cur.close() diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 4a8cb0b68..16e705cec 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -371,13 +371,46 @@ def test_cancel_command_calls_the_backend(self): def test_cancel_command_will_issue_warning_for_cancel_with_no_executing_command( self, logger_instance ): - mock_thrift_backend = Mock() + # Backends like Thrift/SEA set active_command_id before blocking + # and do NOT define ``cancel_running_cursor``. Use ``spec`` so the + # mock doesn't auto-advertise that opt-in hook (a bare ``Mock()`` + # returns a truthy Mock for any attribute). + mock_thrift_backend = Mock(spec=["cancel_command"]) cursor = client.Cursor(Mock(), mock_thrift_backend) cursor.cancel() self.assertTrue(logger_instance.warning.called) self.assertFalse(mock_thrift_backend.cancel_command.called) + @patch("databricks.sql.client.logger") + def test_cancel_routes_to_cancel_running_cursor_when_no_command_id( + self, logger_instance + ): + """When there's no active_command_id (a blocking sync execute() + hasn't published one), cancel() routes to the backend's opt-in + ``cancel_running_cursor`` hook (the kernel backend). If the hook + cancels something (returns True), no warning is emitted.""" + backend = Mock(spec=["cancel_command", "cancel_running_cursor"]) + backend.cancel_running_cursor.return_value = True + cursor = client.Cursor(Mock(), backend) + cursor.cancel() + + backend.cancel_running_cursor.assert_called_once_with(cursor) + self.assertFalse(backend.cancel_command.called) + self.assertFalse(logger_instance.warning.called) + + @patch("databricks.sql.client.logger") + def test_cancel_warns_when_hook_finds_nothing_in_flight(self, logger_instance): + """If the hook returns False (nothing was in flight), the + existing 'no executing command' warning still fires.""" + backend = Mock(spec=["cancel_command", "cancel_running_cursor"]) + backend.cancel_running_cursor.return_value = False + cursor = client.Cursor(Mock(), backend) + cursor.cancel() + + backend.cancel_running_cursor.assert_called_once_with(cursor) + self.assertTrue(logger_instance.warning.called) + def test_version_is_canonical(self): version = databricks.sql.__version__ canonical_version_re = ( @@ -510,7 +543,7 @@ def test_column_name_api(self): expected_values = [["val1", 321, 52.32], ["val2", 2321, 252.32]] - for (row, expected) in zip(data, expected_values): + for row, expected in zip(data, expected_values): self.assertEqual(row.first_col, expected[0]) self.assertEqual(row.second_col, expected[1]) self.assertEqual(row.third_col, expected[2]) diff --git a/tests/unit/test_kernel_client.py b/tests/unit/test_kernel_client.py index 8cff9b3d4..3cbf55540 100644 --- a/tests/unit/test_kernel_client.py +++ b/tests/unit/test_kernel_client.py @@ -41,6 +41,9 @@ def __init__( message: str = "boom", sql_state: Optional[str] = None, query_id: Optional[str] = None, + diagnostic_info: Optional[str] = None, + display_message: Optional[str] = None, + error_details_json: Optional[str] = None, ) -> None: super().__init__(message) self.code = code @@ -51,6 +54,11 @@ def __init__( self.http_status = None self.retryable = False self.query_id = query_id + # Extended server status forwarded across the PyO3 boundary + # (kernel #121). Defaults None so existing tests are unaffected. + self.diagnostic_info = diagnostic_info + self.display_message = display_message + self.error_details_json = error_details_json _fake_kernel_module = types.ModuleType("databricks_sql_kernel") @@ -139,6 +147,40 @@ def test_reraise_forwards_structured_attributes(): assert out.retryable is False +def test_reraise_forwards_extended_status_attributes(): + """display_message / diagnostic_info / error_details_json now cross + the PyO3 boundary (kernel #121) and must be forwarded onto the + re-raised exception so callers can read them.""" + err = _FakeKernelError( + code="SqlError", + message="boom", + diagnostic_info="org.apache.spark...stack", + display_message="user-facing msg", + error_details_json='{"k":1}', + ) + out = kernel_client._reraise_kernel_error(err) + assert out.diagnostic_info == "org.apache.spark...stack" + assert out.display_message == "user-facing msg" + assert out.error_details_json == '{"k":1}' + + +def test_server_operation_error_populates_context_like_thrift(): + """A SqlError maps to ServerOperationError; its ``context`` must + carry ``diagnostic-info`` (the Spark stack trace) and + ``operation-id``, matching the Thrift backend so callers reading + ``err.context["diagnostic-info"]`` work identically on use_kernel.""" + err = _FakeKernelError( + code="SqlError", + message="table not found", + query_id="q-123", + diagnostic_info="org.apache.spark...stack", + ) + out = kernel_client._reraise_kernel_error(err) + assert isinstance(out, ServerOperationError) + assert out.context["diagnostic-info"] == "org.apache.spark...stack" + assert out.context["operation-id"] == "q-123" + + def test_kernel_error_chains_through_wrap(): """``raise wrap_kernel_exception(...) from exc`` is the call-site pattern; ``__cause__`` must be set to the original ``KernelError`` @@ -371,6 +413,165 @@ def test_execute_command_forwards_query_tags(): assert stmt.execute.called +# --------------------------------------------------------------------------- +# Staging / volume operations — fail loud (not silently no-op) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "operation", + [ + "PUT '/local/f.csv' INTO '/Volumes/c/s/v/f.csv'", + " put '/local/f' into '/Volumes/...'", # leading ws + lowercase + "GET '/Volumes/c/s/v/f' TO '/local/f'", + "REMOVE '/Volumes/c/s/v/f'", + ], +) +def test_staging_operation_raises_not_supported(operation): + """Volume/staging PUT/GET/REMOVE must FAIL LOUD on the kernel path + (the kernel can't perform the presigned-URL transfer; silently + no-opping would make ETL ingest stale/missing data).""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + with pytest.raises(NotSupportedError, match="staging"): + c.execute_command( + operation=operation, + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + +@pytest.mark.parametrize( + "operation, is_staging", + [ + ("PUT '/f' INTO '/v'", True), + ("get '/v' to '/f'", True), + ("REMOVE '/v'", True), + # Comment-prefixed staging ops MUST still be caught — otherwise + # they slip into the silent-no-op bug this guard exists to close + # (regression: review #1 on PR #825). ETL scripts commonly + # prefix statements with comments. + ("-- upload the file\nPUT '/f' INTO '/v'", True), + ("/* staging */ PUT '/f' INTO '/v'", True), + ("/* c1 */\n -- c2\n get '/v' to '/f'", True), # mixed, multiple + (" \n\t PUT '/f' INTO '/v'", True), # leading whitespace only + ("SELECT 'GET' AS x", False), # word appears but not leading verb + ("SELECT * FROM puts", False), + ("-- PUT in a comment\nSELECT 1", False), # verb only in comment + ("/* PUT */ SELECT 1", False), + ("INSERT INTO t VALUES (1)", False), + ("", False), + ("-- just a comment", False), # comment only, no statement + ], +) +def test_is_staging_statement(operation, is_staging): + assert kernel_client._is_staging_statement(operation) is is_staging + + +# --------------------------------------------------------------------------- +# Sync cancel wiring (cursor.cancel() during a blocking execute()) +# --------------------------------------------------------------------------- + + +def test_cancel_running_cursor_fires_registered_canceller(): + """A canceller registered for a cursor (as execute_command does + before the blocking call) is fired by cancel_running_cursor, which + returns True.""" + c = _make_client() + cursor = MagicMock() + canceller = MagicMock() + with c._sync_cancellers_lock: + c._sync_cancellers[id(cursor)] = canceller + + assert c.cancel_running_cursor(cursor) is True + canceller.cancel.assert_called_once_with() + + +def test_cancel_running_cursor_returns_false_when_none_registered(): + """No in-flight sync statement for this cursor -> False so the + Cursor can emit its 'no executing command' warning.""" + c = _make_client() + assert c.cancel_running_cursor(MagicMock()) is False + + +def test_cancel_running_cursor_swallows_cancel_errors(): + """cursor.cancel() is best-effort (PEP-249); a failing canceller + (e.g. an early cancel before the statement id is observed, or a + transport hiccup on the cancel RPC) must NOT propagate out of + cancel(). It's swallowed+logged, and we still return True so the + Cursor doesn't emit the misleading 'no executing command' warning + (regression: review #2 on PR #825).""" + c = _make_client() + cursor = MagicMock() + canceller = MagicMock() + canceller.cancel.side_effect = RuntimeError("cancel RPC failed") + with c._sync_cancellers_lock: + c._sync_cancellers[id(cursor)] = canceller + + # Does not raise, returns True (a canceller was present + attempted). + assert c.cancel_running_cursor(cursor) is True + canceller.cancel.assert_called_once_with() + + +def test_execute_command_registers_and_clears_sync_canceller(): + """The sync execute() path registers a StatementCanceller keyed by + the cursor before blocking, and clears it in the finally — so a + concurrent cancel can reach it mid-flight, and it doesn't leak.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + canceller = MagicMock() + stmt = MagicMock() + stmt.canceller.return_value = canceller + seen_during_execute = {} + + def fake_execute(): + # The canceller is registered *during* the blocking execute. + with c._sync_cancellers_lock: + seen_during_execute["registered"] = ( + c._sync_cancellers.get(id(cursor)) is canceller + ) + return MagicMock( + statement_id="stmt-id", + arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])), + ) + + stmt.execute.side_effect = fake_execute + c._kernel_session.statement.return_value = stmt + + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + assert seen_during_execute["registered"] is True + # Cleared after execute returns — no leak. + with c._sync_cancellers_lock: + assert id(cursor) not in c._sync_cancellers + + def test_get_columns_accepts_none_catalog(): """The kernel's `list_columns` honours `catalog=None` by issuing `SHOW COLUMNS IN ALL CATALOGS` server-side. The connector should