From 63c0817d4541491b7eb2c6be5ea2fefe2344fa47 Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Thu, 4 Jun 2026 07:10:01 +0000 Subject: [PATCH 1/3] =?UTF-8?q?feat(kernel):=20wire=20CUJ-gap=20fixes=20?= =?UTF-8?q?=E2=80=94=20staging=20fail-loud,=20error=20context,=20sync=20ca?= =?UTF-8?q?ncel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Connector half of the API+CUJ audit fixes (kernel half: databricks-sql-kernel PR #121). Bumps KERNEL_REV to pick up the kernel surface. Staging fail-loud (kernel/client.py): - Volume/staging PUT/GET/REMOVE silently no-op'd on the kernel path (KernelResultSet.is_staging_operation is always False, so the connector's _handle_staging_operation never fired and no file was transferred). Detect the leading verb in execute_command and raise NotSupportedError so ETL fails loud instead of ingesting stale data. Error context (_errors.py): - Forward display_message / diagnostic_info / error_details_json (now exposed across the pyo3 boundary in #121) onto the re-raised PEP-249 exception, and populate ServerOperationError.context with "diagnostic-info" (Spark stack trace) + "operation-id" — matching the Thrift backend so callers reading err.context work identically. Sync cancel wiring (client.py, kernel/client.py): - cursor.cancel() was a silent no-op for the default blocking execute() (active_command_id is None until execute returns). The kernel backend now registers a detached StatementCanceller (keyed by the cursor) before the blocking execute and exposes cancel_running_cursor(cursor). Cursor.cancel() routes to that hook via getattr when there's no command id yet — opt-in, so Thrift/SEA backends are unaffected. Tests: unit (staging fail-loud, _is_staging_statement, cancel registry + routing, error context/diagnostic-info forwarding) and e2e (tz-aware TIMESTAMP, scientific DECIMAL, staging NotSupportedError, diagnostic-info context, cross-thread sync cancel interrupts a running query). All e2e verified live against dogfood with use_kernel=True. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala --- KERNEL_REV | 2 +- src/databricks/sql/backend/kernel/_errors.py | 24 ++- src/databricks/sql/backend/kernel/client.py | 84 +++++++++ src/databricks/sql/client.py | 21 ++- tests/e2e/test_kernel_backend.py | 107 +++++++++++- tests/unit/test_client.py | 37 +++- tests/unit/test_kernel_client.py | 171 +++++++++++++++++++ 7 files changed, 436 insertions(+), 10 deletions(-) diff --git a/KERNEL_REV b/KERNEL_REV index af059324d..00e1afc5e 100644 --- a/KERNEL_REV +++ b/KERNEL_REV @@ -1 +1 @@ -101aa465e71991eec98102bba77aad2f7ad8faed +f17a302de06fcee3dea02089474ee2d71725a136 diff --git a/src/databricks/sql/backend/kernel/_errors.py b/src/databricks/sql/backend/kernel/_errors.py index 78b542300..151bac96b 100644 --- a/src/databricks/sql/backend/kernel/_errors.py +++ b/src/databricks/sql/backend/kernel/_errors.py @@ -97,7 +97,23 @@ def reraise_kernel_error(exc: "_kernel.KernelError") -> "Error": """ code = getattr(exc, "code", "Unknown") cls = _CODE_TO_EXCEPTION.get(code, DatabaseError) - new = cls(getattr(exc, "message", str(exc))) + + # For ServerOperationError, reproduce the Thrift backend's + # ``context`` dict so callers that read + # ``err.context["diagnostic-info"]`` (the Spark stack trace) / + # ``err.context["operation-id"]`` get the same shape on the kernel + # path. ``diagnostic_info`` is forwarded from the kernel error (it + # now crosses the PyO3 boundary; older wheels return ``None`` via + # ``getattr``, so this degrades gracefully). Matches + # thrift_backend.py's ServerOperationError construction. + context = None + if cls is ServerOperationError: + context = { + "operation-id": getattr(exc, "query_id", None), + "diagnostic-info": getattr(exc, "diagnostic_info", None), + } + new = cls(getattr(exc, "message", str(exc)), context) + for attr in ( "code", "sql_state", @@ -106,6 +122,12 @@ def reraise_kernel_error(exc: "_kernel.KernelError") -> "Error": "http_status", "retryable", "query_id", + # Extended server status now forwarded across the PyO3 boundary + # (kernel #121). ``getattr(..., None)`` keeps this forward-safe + # against an older wheel that doesn't set these attrs. + "display_message", + "diagnostic_info", + "error_details_json", ): setattr(new, attr, getattr(exc, attr, None)) return new diff --git a/src/databricks/sql/backend/kernel/client.py b/src/databricks/sql/backend/kernel/client.py index 7cdd484fa..7ada0e406 100644 --- a/src/databricks/sql/backend/kernel/client.py +++ b/src/databricks/sql/backend/kernel/client.py @@ -71,6 +71,24 @@ # per-request skip-and-warn. _KERNEL_MANAGED_HEADERS = frozenset({"authorization", "x-databricks-org-id"}) +# Leading verbs of SQL volume/staging statements. Detected by the +# leading token (case-insensitive) so the kernel backend can fail loud +# on staging ops it can't service — see ``execute_command``. +_STAGING_VERBS = ("PUT", "GET", "REMOVE") + + +def _is_staging_statement(operation: str) -> bool: + """True iff ``operation`` is a volume/staging statement (PUT / GET / + REMOVE). + + Matches the leading token only, so a normal query that merely + *contains* the word (e.g. ``SELECT 'GET' AS x``) isn't misflagged. + """ + stripped = operation.lstrip() + # First whitespace-delimited token, uppercased. + verb = stripped.split(None, 1)[0].upper() if stripped else "" + return verb in _STAGING_VERBS + # ─── Client ───────────────────────────────────────────────────────────────── @@ -172,6 +190,17 @@ def __init__( # path. Same lock as ``_async_handles``. self._closed_commands: Set[str] = set() self._async_handles_lock = threading.RLock() + # Sync-execute cancellers keyed by ``id(cursor)``. A blocking + # ``execute()`` sets ``cursor.active_command_id`` only AFTER it + # returns, so a concurrent ``cursor.cancel()`` (the documented + # cross-thread PEP-249 shape) has no command id to target while + # the query runs. We register a detached kernel + # ``StatementCanceller`` here just before the blocking call and + # drop it after; ``cancel_running_cursor`` (invoked by + # ``Cursor.cancel`` when there's no command id yet) fires it. + # Guarded by its own lock — cancel can race execute teardown. + self._sync_cancellers: Dict[int, Any] = {} + self._sync_cancellers_lock = threading.RLock() # ── Session lifecycle ────────────────────────────────────────── @@ -354,6 +383,24 @@ def execute_command( # ``_async_statements`` and closed by ``close_command``); the sync # path drops it in finally. ``close_stmt`` is the post-success # decision flag — it stays True on sync, flips to False on async. + # Volume/staging (PUT/GET/REMOVE) is not supported on the kernel + # path: the kernel returns the staging control row as a normal + # result set (``KernelResultSet.is_staging_operation`` is always + # False), so the connector's ``_handle_staging_operation`` never + # fires and NO file is transferred. Rather than silently no-op + # (the Thrift path performs the presigned-URL upload/download), + # fail loud at the call site so ETL scripts don't ingest + # stale/missing data. Detected by the leading SQL verb — the + # only signal available pre-execute, since the kernel exposes no + # staging marker today. + if _is_staging_statement(operation): + raise NotSupportedError( + "Volume / staging operations (PUT / GET / REMOVE) are not " + "supported on the kernel backend (use_kernel=True); the file " + "transfer would silently not happen. Use the Thrift backend " + "for staging operations." + ) + close_stmt = True try: try: @@ -382,10 +429,23 @@ def execute_command( self._async_statements[command_id.guid] = stmt close_stmt = False return None + # Register a detached canceller BEFORE the blocking + # execute so a concurrent ``cursor.cancel()`` can reach + # the running statement (its server id is populated mid- + # execute). Keyed by ``id(cursor)`` since no command id + # exists yet. Dropped in the finally. + try: + with self._sync_cancellers_lock: + self._sync_cancellers[id(cursor)] = stmt.canceller() + except Exception: + # Canceller is best-effort; never block execute on it. + pass executed = stmt.execute() except Exception as exc: raise _wrap_kernel_exception("execute_command", exc) from exc finally: + with self._sync_cancellers_lock: + self._sync_cancellers.pop(id(cursor), None) if close_stmt: # Sync path: ``Statement`` is a lifecycle owner separate # from the executed handle. Drop it here so the parent @@ -422,6 +482,30 @@ def cancel_command(self, command_id: CommandId) -> None: except Exception as exc: raise _wrap_kernel_exception("cancel_command", exc) from exc + def cancel_running_cursor(self, cursor: "Cursor") -> bool: + """Cancel an in-flight SYNC ``execute()`` on ``cursor``. + + Invoked by ``Cursor.cancel()`` when ``active_command_id`` is + still ``None`` — i.e. a blocking ``execute()`` hasn't returned, + so the command id isn't set yet but the server statement may be + running. Fires the detached ``StatementCanceller`` registered in + ``execute_command`` before the blocking call. + + Returns ``True`` if a canceller was found and fired (the + statement was in flight), ``False`` otherwise so ``Cursor`` can + emit its "no executing command" warning. Safe to call from + another thread. + """ + with self._sync_cancellers_lock: + canceller = self._sync_cancellers.get(id(cursor)) + if canceller is None: + return False + try: + canceller.cancel() + except Exception as exc: + raise _wrap_kernel_exception("cancel_running_cursor", exc) from exc + return True + def close_command(self, command_id: CommandId) -> None: with self._async_handles_lock: handle = self._async_handles.pop(command_id.guid, None) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 7fc815cd8..e66dd897c 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1704,11 +1704,22 @@ def cancel(self) -> None: """ if self.active_command_id is not None: self.backend.cancel_command(self.active_command_id) - else: - logger.warning( - "Attempting to cancel a command, but there is no " - "currently executing command" - ) + return + # No command id yet. A backend whose synchronous ``execute()`` + # blocks without first publishing a command id (the kernel + # backend) may still have a server statement in flight. Such a + # backend exposes ``cancel_running_cursor(cursor)`` -> bool to + # cancel it; it returns True if something was actually + # cancelled. Opt-in via getattr so the Thrift / SEA backends + # (which set ``active_command_id`` before blocking) are + # unaffected. + cancel_running_cursor = getattr(self.backend, "cancel_running_cursor", None) + if cancel_running_cursor is not None and cancel_running_cursor(self): + return + logger.warning( + "Attempting to cancel a command, but there is no " + "currently executing command" + ) def close(self) -> None: """Close cursor""" diff --git a/tests/e2e/test_kernel_backend.py b/tests/e2e/test_kernel_backend.py index 1e61bd7b8..e20171288 100644 --- a/tests/e2e/test_kernel_backend.py +++ b/tests/e2e/test_kernel_backend.py @@ -24,7 +24,7 @@ import pytest import databricks.sql as sql -from databricks.sql.exc import DatabaseError +from databricks.sql.exc import DatabaseError, NotSupportedError, ServerOperationError # Skip the whole module unless the kernel wheel is genuinely installed. # ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a @@ -361,3 +361,108 @@ def test_user_agent_entry_and_http_headers_round_trip(kernel_conn_params): with c.cursor() as cur: cur.execute("SELECT 1 AS n") assert cur.fetchall()[0][0] == 1 + + +# ── Parameter parity (tz-aware timestamp, scientific decimal) ────── + + +def test_tz_aware_timestamp_parameter_binds(conn): + """A tz-aware datetime parameter (datetime with tzinfo) binds on + the kernel path and resolves to the correct UTC instant. Previously + rejected at bind on kernel; works on Thrift. (kernel #121)""" + import datetime + + tzdt = datetime.datetime( + 2026, + 5, + 15, + 18, + 0, + 0, + tzinfo=datetime.timezone(datetime.timedelta(hours=5, minutes=30)), + ) + with conn.cursor() as cur: + cur.execute("SELECT ? AS ts", [tzdt]) + ts = cur.fetchall()[0][0] + # 18:00 +05:30 == 12:30 UTC. + assert (ts.hour, ts.minute) == (12, 30) + + +def test_scientific_notation_decimal_parameter_binds(conn): + """A Decimal whose str() is exponential (e.g. 1E-7) binds on the + kernel path. Previously rejected at bind; the server/Thrift accept + scientific-notation decimal literals. (kernel #121)""" + import decimal + from databricks.sql.parameters.native import DecimalParameter + + with conn.cursor() as cur: + # 1E+2 == 100, round-trips cleanly at scale 0. + cur.execute("SELECT ? AS d", [DecimalParameter(decimal.Decimal("1E+2"))]) + assert int(cur.fetchall()[0][0]) == 100 + + +# ── Staging / volume — fail loud, not silent no-op ──────────────── + + +def test_staging_put_raises_not_supported(conn): + """A PUT (volume/staging) statement fails loud on the kernel path + rather than silently no-opping (which would make ETL ingest + stale/missing data). (CUJ-gap audit)""" + with conn.cursor() as cur: + with pytest.raises(NotSupportedError, match="staging"): + cur.execute("PUT '/tmp/x.csv' INTO '/Volumes/main/default/v/x.csv'") + + +# ── Error fidelity — diagnostic-info reaches .context ───────────── + + +def test_server_error_exposes_diagnostic_info_context(conn): + """A server-side query failure surfaces as ServerOperationError + with the Spark diagnostic context in ``.context['diagnostic-info']`` + — Thrift parity (kernel #121 forwards diagnostic_info across pyo3; + the connector populates .context).""" + with conn.cursor() as cur: + with pytest.raises(ServerOperationError) as exc_info: + cur.execute("SELECT * FROM definitely_not_a_table_xyz_kernel_e2e") + err = exc_info.value + # context shape matches Thrift; diagnostic-info may be None if + # the server didn't attach one, but the KEY must exist. + assert "diagnostic-info" in err.context + assert "operation-id" in err.context + + +# ── Sync cancel (cursor.cancel() from another thread) ───────────── + + +def test_sync_cancel_interrupts_blocking_execute(conn): + """cursor.cancel() from another thread cancels a long-running + blocking execute() on the kernel path. Previously a silent no-op + (active_command_id was None until execute returned). (kernel #121 + StatementCanceller + connector cancel_running_cursor wiring)""" + import threading + import time + + cur = conn.cursor() + + # The kernel publishes the server statement id once the initial + # POST returns — within the server's default wait window (~10s). + # Cancel after that so the canceller has an id to target; a cancel + # before then is a no-op by design (id not yet known). Pick a query + # that runs well past this so the cancel lands mid-flight. + def cancel_after_delay(): + time.sleep(15.0) + cur.cancel() + + t = threading.Thread(target=cancel_after_delay) + t.start() + try: + # Cancel should make execute() raise rather than run to + # completion — proving the server-side statement was cancelled. + with pytest.raises(Exception): + cur.execute( + "SELECT count(*) FROM range(0, 1000000000000) " + "WHERE pow(rand(), 2) < 0.5 AND sqrt(id) > 1" + ) + finally: + t.join() + cur.close() diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 4a8cb0b68..16e705cec 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -371,13 +371,46 @@ def test_cancel_command_calls_the_backend(self): def test_cancel_command_will_issue_warning_for_cancel_with_no_executing_command( self, logger_instance ): - mock_thrift_backend = Mock() + # Backends like Thrift/SEA set active_command_id before blocking + # and do NOT define ``cancel_running_cursor``. Use ``spec`` so the + # mock doesn't auto-advertise that opt-in hook (a bare ``Mock()`` + # returns a truthy Mock for any attribute). + mock_thrift_backend = Mock(spec=["cancel_command"]) cursor = client.Cursor(Mock(), mock_thrift_backend) cursor.cancel() self.assertTrue(logger_instance.warning.called) self.assertFalse(mock_thrift_backend.cancel_command.called) + @patch("databricks.sql.client.logger") + def test_cancel_routes_to_cancel_running_cursor_when_no_command_id( + self, logger_instance + ): + """When there's no active_command_id (a blocking sync execute() + hasn't published one), cancel() routes to the backend's opt-in + ``cancel_running_cursor`` hook (the kernel backend). If the hook + cancels something (returns True), no warning is emitted.""" + backend = Mock(spec=["cancel_command", "cancel_running_cursor"]) + backend.cancel_running_cursor.return_value = True + cursor = client.Cursor(Mock(), backend) + cursor.cancel() + + backend.cancel_running_cursor.assert_called_once_with(cursor) + self.assertFalse(backend.cancel_command.called) + self.assertFalse(logger_instance.warning.called) + + @patch("databricks.sql.client.logger") + def test_cancel_warns_when_hook_finds_nothing_in_flight(self, logger_instance): + """If the hook returns False (nothing was in flight), the + existing 'no executing command' warning still fires.""" + backend = Mock(spec=["cancel_command", "cancel_running_cursor"]) + backend.cancel_running_cursor.return_value = False + cursor = client.Cursor(Mock(), backend) + cursor.cancel() + + backend.cancel_running_cursor.assert_called_once_with(cursor) + self.assertTrue(logger_instance.warning.called) + def test_version_is_canonical(self): version = databricks.sql.__version__ canonical_version_re = ( @@ -510,7 +543,7 @@ def test_column_name_api(self): expected_values = [["val1", 321, 52.32], ["val2", 2321, 252.32]] - for (row, expected) in zip(data, expected_values): + for row, expected in zip(data, expected_values): self.assertEqual(row.first_col, expected[0]) self.assertEqual(row.second_col, expected[1]) self.assertEqual(row.third_col, expected[2]) diff --git a/tests/unit/test_kernel_client.py b/tests/unit/test_kernel_client.py index 8cff9b3d4..26bf9361b 100644 --- a/tests/unit/test_kernel_client.py +++ b/tests/unit/test_kernel_client.py @@ -41,6 +41,9 @@ def __init__( message: str = "boom", sql_state: Optional[str] = None, query_id: Optional[str] = None, + diagnostic_info: Optional[str] = None, + display_message: Optional[str] = None, + error_details_json: Optional[str] = None, ) -> None: super().__init__(message) self.code = code @@ -51,6 +54,11 @@ def __init__( self.http_status = None self.retryable = False self.query_id = query_id + # Extended server status forwarded across the PyO3 boundary + # (kernel #121). Defaults None so existing tests are unaffected. + self.diagnostic_info = diagnostic_info + self.display_message = display_message + self.error_details_json = error_details_json _fake_kernel_module = types.ModuleType("databricks_sql_kernel") @@ -139,6 +147,40 @@ def test_reraise_forwards_structured_attributes(): assert out.retryable is False +def test_reraise_forwards_extended_status_attributes(): + """display_message / diagnostic_info / error_details_json now cross + the PyO3 boundary (kernel #121) and must be forwarded onto the + re-raised exception so callers can read them.""" + err = _FakeKernelError( + code="SqlError", + message="boom", + diagnostic_info="org.apache.spark...stack", + display_message="user-facing msg", + error_details_json='{"k":1}', + ) + out = kernel_client._reraise_kernel_error(err) + assert out.diagnostic_info == "org.apache.spark...stack" + assert out.display_message == "user-facing msg" + assert out.error_details_json == '{"k":1}' + + +def test_server_operation_error_populates_context_like_thrift(): + """A SqlError maps to ServerOperationError; its ``context`` must + carry ``diagnostic-info`` (the Spark stack trace) and + ``operation-id``, matching the Thrift backend so callers reading + ``err.context["diagnostic-info"]`` work identically on use_kernel.""" + err = _FakeKernelError( + code="SqlError", + message="table not found", + query_id="q-123", + diagnostic_info="org.apache.spark...stack", + ) + out = kernel_client._reraise_kernel_error(err) + assert isinstance(out, ServerOperationError) + assert out.context["diagnostic-info"] == "org.apache.spark...stack" + assert out.context["operation-id"] == "q-123" + + def test_kernel_error_chains_through_wrap(): """``raise wrap_kernel_exception(...) from exc`` is the call-site pattern; ``__cause__`` must be set to the original ``KernelError`` @@ -371,6 +413,135 @@ def test_execute_command_forwards_query_tags(): assert stmt.execute.called +# --------------------------------------------------------------------------- +# Staging / volume operations — fail loud (not silently no-op) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "operation", + [ + "PUT '/local/f.csv' INTO '/Volumes/c/s/v/f.csv'", + " put '/local/f' into '/Volumes/...'", # leading ws + lowercase + "GET '/Volumes/c/s/v/f' TO '/local/f'", + "REMOVE '/Volumes/c/s/v/f'", + ], +) +def test_staging_operation_raises_not_supported(operation): + """Volume/staging PUT/GET/REMOVE must FAIL LOUD on the kernel path + (the kernel can't perform the presigned-URL transfer; silently + no-opping would make ETL ingest stale/missing data).""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + with pytest.raises(NotSupportedError, match="staging"): + c.execute_command( + operation=operation, + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + +@pytest.mark.parametrize( + "operation, is_staging", + [ + ("PUT '/f' INTO '/v'", True), + ("get '/v' to '/f'", True), + ("REMOVE '/v'", True), + ("SELECT 'GET' AS x", False), # word appears but not leading verb + ("SELECT * FROM puts", False), + ("INSERT INTO t VALUES (1)", False), + ("", False), + ], +) +def test_is_staging_statement(operation, is_staging): + assert kernel_client._is_staging_statement(operation) is is_staging + + +# --------------------------------------------------------------------------- +# Sync cancel wiring (cursor.cancel() during a blocking execute()) +# --------------------------------------------------------------------------- + + +def test_cancel_running_cursor_fires_registered_canceller(): + """A canceller registered for a cursor (as execute_command does + before the blocking call) is fired by cancel_running_cursor, which + returns True.""" + c = _make_client() + cursor = MagicMock() + canceller = MagicMock() + with c._sync_cancellers_lock: + c._sync_cancellers[id(cursor)] = canceller + + assert c.cancel_running_cursor(cursor) is True + canceller.cancel.assert_called_once_with() + + +def test_cancel_running_cursor_returns_false_when_none_registered(): + """No in-flight sync statement for this cursor -> False so the + Cursor can emit its 'no executing command' warning.""" + c = _make_client() + assert c.cancel_running_cursor(MagicMock()) is False + + +def test_execute_command_registers_and_clears_sync_canceller(): + """The sync execute() path registers a StatementCanceller keyed by + the cursor before blocking, and clears it in the finally — so a + concurrent cancel can reach it mid-flight, and it doesn't leak.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + canceller = MagicMock() + stmt = MagicMock() + stmt.canceller.return_value = canceller + seen_during_execute = {} + + def fake_execute(): + # The canceller is registered *during* the blocking execute. + with c._sync_cancellers_lock: + seen_during_execute["registered"] = ( + c._sync_cancellers.get(id(cursor)) is canceller + ) + return MagicMock( + statement_id="stmt-id", + arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])), + ) + + stmt.execute.side_effect = fake_execute + c._kernel_session.statement.return_value = stmt + + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + assert seen_during_execute["registered"] is True + # Cleared after execute returns — no leak. + with c._sync_cancellers_lock: + assert id(cursor) not in c._sync_cancellers + + def test_get_columns_accepts_none_catalog(): """The kernel's `list_columns` honours `catalog=None` by issuing `SHOW COLUMNS IN ALL CATALOGS` server-side. The connector should From 4413040123810b5144e256ba182033b9e5c409bd Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Thu, 4 Jun 2026 09:30:18 +0000 Subject: [PATCH 2/3] =?UTF-8?q?fix(kernel):=20address=20review=20=E2=80=94?= =?UTF-8?q?=20comment-prefixed=20staging=20+=20tolerant=20sync=20cancel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review fixes on PR #825 (+ KERNEL_REV bump to the amended kernel #121 which now folds the original message on attach failure and bounds error_details_json): P1 #1 — staging fail-loud missed comment-prefixed statements: _is_staging_statement took the first whitespace token without stripping SQL comments, so "-- upload\nPUT ..." / "/* c */ PUT ..." (common in ETL) classified as non-staging and slipped into the silent-no-op bug. Added _strip_leading_sql_comments (handles leading -- line and /* */ block comments, multiple/mixed) before extracting the verb. Tests for both comment forms, mixed, and verb-only-in-comment (must NOT match). P1 #2 — sync cancel could raise out of cursor.cancel(): cursor.cancel() is best-effort per PEP-249, but cancel_running_cursor re-raised a canceller failure (e.g. an early cancel before the server statement id is observed, or a transport hiccup) via the public cancel(). Now swallow+log and still return True (a canceller was present and attempted) so Cursor doesn't emit the misleading "no executing command" warning. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala --- KERNEL_REV | 2 +- src/databricks/sql/backend/kernel/client.py | 60 ++++++++++++++++++--- tests/e2e/test_kernel_backend.py | 12 +++++ tests/unit/test_kernel_client.py | 30 +++++++++++ 4 files changed, 97 insertions(+), 7 deletions(-) diff --git a/KERNEL_REV b/KERNEL_REV index 00e1afc5e..d5e176c5c 100644 --- a/KERNEL_REV +++ b/KERNEL_REV @@ -1 +1 @@ -f17a302de06fcee3dea02089474ee2d71725a136 +f62d9415f5645ece70e86d1713f2e45423358c3c diff --git a/src/databricks/sql/backend/kernel/client.py b/src/databricks/sql/backend/kernel/client.py index 7ada0e406..08bb0d36b 100644 --- a/src/databricks/sql/backend/kernel/client.py +++ b/src/databricks/sql/backend/kernel/client.py @@ -77,16 +77,48 @@ _STAGING_VERBS = ("PUT", "GET", "REMOVE") +def _strip_leading_sql_comments(sql: str) -> str: + """Strip leading whitespace and SQL comments (``-- …`` line and + ``/* … */`` block, possibly several) from ``sql``, returning the + remainder. + + Needed so staging detection sees the real leading verb: a + comment-prefixed staging op (``-- upload\\nPUT …`` or + ``/* c */ PUT …``, common in ETL scripts) must still be classified + as staging, or it would slip past the guard into the silent-no-op + bug. Block comments do not nest in Databricks SQL, so a simple + scan-to-``*/`` is correct. + """ + i = 0 + n = len(sql) + while i < n: + if sql[i].isspace(): + i += 1 + elif sql.startswith("--", i): + # Line comment: skip to end of line (or string). + nl = sql.find("\n", i) + i = n if nl == -1 else nl + 1 + elif sql.startswith("/*", i): + # Block comment: skip to closing */ (or end if unterminated). + close = sql.find("*/", i + 2) + i = n if close == -1 else close + 2 + else: + break + return sql[i:] + + def _is_staging_statement(operation: str) -> bool: """True iff ``operation`` is a volume/staging statement (PUT / GET / REMOVE). - Matches the leading token only, so a normal query that merely - *contains* the word (e.g. ``SELECT 'GET' AS x``) isn't misflagged. + Strips leading whitespace + SQL comments first (so a comment- + prefixed staging op is still caught), then matches the leading token + only — so a normal query that merely *contains* the word (e.g. + ``SELECT 'GET' AS x``) isn't misflagged. """ - stripped = operation.lstrip() + stripped = _strip_leading_sql_comments(operation) # First whitespace-delimited token, uppercased. - verb = stripped.split(None, 1)[0].upper() if stripped else "" + verb = stripped.split(None, 1)[0].upper() if stripped.strip() else "" return verb in _STAGING_VERBS @@ -495,6 +527,17 @@ def cancel_running_cursor(self, cursor: "Cursor") -> bool: statement was in flight), ``False`` otherwise so ``Cursor`` can emit its "no executing command" warning. Safe to call from another thread. + + Tolerant by design: ``cursor.cancel()`` is a best-effort + PEP-249 method (callers don't expect it to raise), so a cancel + failure is logged and swallowed rather than propagated. This + also covers the early-cancel window — a cancel arriving before + the kernel has observed the server statement id is a no-op in + the kernel canceller, but if it ever raised (e.g. a transport + hiccup on the cancel RPC) we must not surface that out of + ``cancel()``. We still return ``True`` (a canceller was present + and we attempted it) so ``Cursor`` doesn't emit the misleading + "no executing command" warning. """ with self._sync_cancellers_lock: canceller = self._sync_cancellers.get(id(cursor)) @@ -502,8 +545,13 @@ def cancel_running_cursor(self, cursor: "Cursor") -> bool: return False try: canceller.cancel() - except Exception as exc: - raise _wrap_kernel_exception("cancel_running_cursor", exc) from exc + except Exception: + logger.warning( + "cancel_running_cursor: best-effort cancel of in-flight " + "sync statement failed; swallowing (cursor.cancel() is " + "tolerant by PEP-249 contract)", + exc_info=True, + ) return True def close_command(self, command_id: CommandId) -> None: diff --git a/tests/e2e/test_kernel_backend.py b/tests/e2e/test_kernel_backend.py index e20171288..95d7d942e 100644 --- a/tests/e2e/test_kernel_backend.py +++ b/tests/e2e/test_kernel_backend.py @@ -413,6 +413,18 @@ def test_staging_put_raises_not_supported(conn): cur.execute("PUT '/tmp/x.csv' INTO '/Volumes/main/default/v/x.csv'") +def test_comment_prefixed_staging_put_raises_not_supported(conn): + """A comment-prefixed staging op (common in ETL scripts) must also + fail loud — the leading-verb detection strips SQL comments first, so + it can't slip past into the silent-no-op bug (PR #825 review #1).""" + with conn.cursor() as cur: + with pytest.raises(NotSupportedError, match="staging"): + cur.execute( + "-- upload the daily extract\n" + "PUT '/tmp/x.csv' INTO '/Volumes/main/default/v/x.csv'" + ) + + # ── Error fidelity — diagnostic-info reaches .context ───────────── diff --git a/tests/unit/test_kernel_client.py b/tests/unit/test_kernel_client.py index 26bf9361b..3cbf55540 100644 --- a/tests/unit/test_kernel_client.py +++ b/tests/unit/test_kernel_client.py @@ -458,10 +458,21 @@ def test_staging_operation_raises_not_supported(operation): ("PUT '/f' INTO '/v'", True), ("get '/v' to '/f'", True), ("REMOVE '/v'", True), + # Comment-prefixed staging ops MUST still be caught — otherwise + # they slip into the silent-no-op bug this guard exists to close + # (regression: review #1 on PR #825). ETL scripts commonly + # prefix statements with comments. + ("-- upload the file\nPUT '/f' INTO '/v'", True), + ("/* staging */ PUT '/f' INTO '/v'", True), + ("/* c1 */\n -- c2\n get '/v' to '/f'", True), # mixed, multiple + (" \n\t PUT '/f' INTO '/v'", True), # leading whitespace only ("SELECT 'GET' AS x", False), # word appears but not leading verb ("SELECT * FROM puts", False), + ("-- PUT in a comment\nSELECT 1", False), # verb only in comment + ("/* PUT */ SELECT 1", False), ("INSERT INTO t VALUES (1)", False), ("", False), + ("-- just a comment", False), # comment only, no statement ], ) def test_is_staging_statement(operation, is_staging): @@ -494,6 +505,25 @@ def test_cancel_running_cursor_returns_false_when_none_registered(): assert c.cancel_running_cursor(MagicMock()) is False +def test_cancel_running_cursor_swallows_cancel_errors(): + """cursor.cancel() is best-effort (PEP-249); a failing canceller + (e.g. an early cancel before the statement id is observed, or a + transport hiccup on the cancel RPC) must NOT propagate out of + cancel(). It's swallowed+logged, and we still return True so the + Cursor doesn't emit the misleading 'no executing command' warning + (regression: review #2 on PR #825).""" + c = _make_client() + cursor = MagicMock() + canceller = MagicMock() + canceller.cancel.side_effect = RuntimeError("cancel RPC failed") + with c._sync_cancellers_lock: + c._sync_cancellers[id(cursor)] = canceller + + # Does not raise, returns True (a canceller was present + attempted). + assert c.cancel_running_cursor(cursor) is True + canceller.cancel.assert_called_once_with() + + def test_execute_command_registers_and_clears_sync_canceller(): """The sync execute() path registers a StatementCanceller keyed by the cursor before blocking, and clears it in the finally — so a From dc93c09fc7e50475143d84ac285b8ac12b3dbe16 Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Thu, 4 Jun 2026 10:04:48 +0000 Subject: [PATCH 3/3] chore: re-pin KERNEL_REV to merged kernel #121 (cbeaf44) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #121 (tz-aware/scientific param binds, error context, sync cancel + Ctrl-C) is merged to kernel main. Re-pin from the orphaned branch HEAD (f62d941) to the merged squash SHA (cbeaf44) — content-identical, but reachable from main so no orphan-SHA risk. Verified against a wheel built from cbeaf44: connector unit + kernel e2e (tz-aware TIMESTAMP, scientific DECIMAL, staging fail-loud incl. comment-prefixed, diagnostic-info context) all pass. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala --- KERNEL_REV | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/KERNEL_REV b/KERNEL_REV index d5e176c5c..37b717a45 100644 --- a/KERNEL_REV +++ b/KERNEL_REV @@ -1 +1 @@ -f62d9415f5645ece70e86d1713f2e45423358c3c +cbeaf44c66ebc57c5b3eed2a5c5d0290d4bf035b