Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion KERNEL_REV
Original file line number Diff line number Diff line change
@@ -1 +1 @@
101aa465e71991eec98102bba77aad2f7ad8faed
cbeaf44c66ebc57c5b3eed2a5c5d0290d4bf035b
24 changes: 23 additions & 1 deletion src/databricks/sql/backend/kernel/_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,23 @@ def reraise_kernel_error(exc: "_kernel.KernelError") -> "Error":
"""
code = getattr(exc, "code", "Unknown")
cls = _CODE_TO_EXCEPTION.get(code, DatabaseError)
new = cls(getattr(exc, "message", str(exc)))

# For ServerOperationError, reproduce the Thrift backend's
# ``context`` dict so callers that read
# ``err.context["diagnostic-info"]`` (the Spark stack trace) /
# ``err.context["operation-id"]`` get the same shape on the kernel
# path. ``diagnostic_info`` is forwarded from the kernel error (it
# now crosses the PyO3 boundary; older wheels return ``None`` via
# ``getattr``, so this degrades gracefully). Matches
# thrift_backend.py's ServerOperationError construction.
context = None
if cls is ServerOperationError:
context = {
"operation-id": getattr(exc, "query_id", None),
"diagnostic-info": getattr(exc, "diagnostic_info", None),
}
new = cls(getattr(exc, "message", str(exc)), context)

for attr in (
"code",
"sql_state",
Expand All @@ -106,6 +122,12 @@ def reraise_kernel_error(exc: "_kernel.KernelError") -> "Error":
"http_status",
"retryable",
"query_id",
# Extended server status now forwarded across the PyO3 boundary
# (kernel #121). ``getattr(..., None)`` keeps this forward-safe
# against an older wheel that doesn't set these attrs.
"display_message",
"diagnostic_info",
"error_details_json",
):
setattr(new, attr, getattr(exc, attr, None))
return new
Expand Down
132 changes: 132 additions & 0 deletions src/databricks/sql/backend/kernel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,56 @@
# per-request skip-and-warn.
_KERNEL_MANAGED_HEADERS = frozenset({"authorization", "x-databricks-org-id"})

# Leading verbs of SQL volume/staging statements. Detected by the
# leading token (case-insensitive) so the kernel backend can fail loud
# on staging ops it can't service — see ``execute_command``.
_STAGING_VERBS = ("PUT", "GET", "REMOVE")


def _strip_leading_sql_comments(sql: str) -> str:
"""Strip leading whitespace and SQL comments (``-- …`` line and
``/* … */`` block, possibly several) from ``sql``, returning the
remainder.

Needed so staging detection sees the real leading verb: a
comment-prefixed staging op (``-- upload\\nPUT …`` or
``/* c */ PUT …``, common in ETL scripts) must still be classified
as staging, or it would slip past the guard into the silent-no-op
bug. Block comments do not nest in Databricks SQL, so a simple
scan-to-``*/`` is correct.
"""
i = 0
n = len(sql)
while i < n:
if sql[i].isspace():
i += 1
elif sql.startswith("--", i):
# Line comment: skip to end of line (or string).
nl = sql.find("\n", i)
i = n if nl == -1 else nl + 1
elif sql.startswith("/*", i):
# Block comment: skip to closing */ (or end if unterminated).
close = sql.find("*/", i + 2)
i = n if close == -1 else close + 2
else:
break
return sql[i:]


def _is_staging_statement(operation: str) -> bool:
"""True iff ``operation`` is a volume/staging statement (PUT / GET /
REMOVE).

Strips leading whitespace + SQL comments first (so a comment-
prefixed staging op is still caught), then matches the leading token
only — so a normal query that merely *contains* the word (e.g.
``SELECT 'GET' AS x``) isn't misflagged.
"""
stripped = _strip_leading_sql_comments(operation)
# First whitespace-delimited token, uppercased.
verb = stripped.split(None, 1)[0].upper() if stripped.strip() else ""
return verb in _STAGING_VERBS


# ─── Client ─────────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -172,6 +222,17 @@ def __init__(
# path. Same lock as ``_async_handles``.
self._closed_commands: Set[str] = set()
self._async_handles_lock = threading.RLock()
# Sync-execute cancellers keyed by ``id(cursor)``. A blocking
# ``execute()`` sets ``cursor.active_command_id`` only AFTER it
# returns, so a concurrent ``cursor.cancel()`` (the documented
# cross-thread PEP-249 shape) has no command id to target while
# the query runs. We register a detached kernel
# ``StatementCanceller`` here just before the blocking call and
# drop it after; ``cancel_running_cursor`` (invoked by
# ``Cursor.cancel`` when there's no command id yet) fires it.
# Guarded by its own lock — cancel can race execute teardown.
self._sync_cancellers: Dict[int, Any] = {}
self._sync_cancellers_lock = threading.RLock()

# ── Session lifecycle ──────────────────────────────────────────

Expand Down Expand Up @@ -354,6 +415,24 @@ def execute_command(
# ``_async_statements`` and closed by ``close_command``); the sync
# path drops it in finally. ``close_stmt`` is the post-success
# decision flag — it stays True on sync, flips to False on async.
# Volume/staging (PUT/GET/REMOVE) is not supported on the kernel
# path: the kernel returns the staging control row as a normal
# result set (``KernelResultSet.is_staging_operation`` is always
# False), so the connector's ``_handle_staging_operation`` never
# fires and NO file is transferred. Rather than silently no-op
# (the Thrift path performs the presigned-URL upload/download),
# fail loud at the call site so ETL scripts don't ingest
# stale/missing data. Detected by the leading SQL verb — the
# only signal available pre-execute, since the kernel exposes no
# staging marker today.
if _is_staging_statement(operation):
raise NotSupportedError(
"Volume / staging operations (PUT / GET / REMOVE) are not "
"supported on the kernel backend (use_kernel=True); the file "
"transfer would silently not happen. Use the Thrift backend "
"for staging operations."
)

close_stmt = True
try:
try:
Expand Down Expand Up @@ -382,10 +461,23 @@ def execute_command(
self._async_statements[command_id.guid] = stmt
close_stmt = False
return None
# Register a detached canceller BEFORE the blocking
# execute so a concurrent ``cursor.cancel()`` can reach
# the running statement (its server id is populated mid-
# execute). Keyed by ``id(cursor)`` since no command id
# exists yet. Dropped in the finally.
try:
with self._sync_cancellers_lock:
self._sync_cancellers[id(cursor)] = stmt.canceller()
except Exception:
# Canceller is best-effort; never block execute on it.
pass
executed = stmt.execute()
except Exception as exc:
raise _wrap_kernel_exception("execute_command", exc) from exc
finally:
with self._sync_cancellers_lock:
self._sync_cancellers.pop(id(cursor), None)
if close_stmt:
# Sync path: ``Statement`` is a lifecycle owner separate
# from the executed handle. Drop it here so the parent
Expand Down Expand Up @@ -422,6 +514,46 @@ def cancel_command(self, command_id: CommandId) -> None:
except Exception as exc:
raise _wrap_kernel_exception("cancel_command", exc) from exc

def cancel_running_cursor(self, cursor: "Cursor") -> bool:
"""Cancel an in-flight SYNC ``execute()`` on ``cursor``.

Invoked by ``Cursor.cancel()`` when ``active_command_id`` is
still ``None`` — i.e. a blocking ``execute()`` hasn't returned,
so the command id isn't set yet but the server statement may be
running. Fires the detached ``StatementCanceller`` registered in
``execute_command`` before the blocking call.

Returns ``True`` if a canceller was found and fired (the
statement was in flight), ``False`` otherwise so ``Cursor`` can
emit its "no executing command" warning. Safe to call from
another thread.

Tolerant by design: ``cursor.cancel()`` is a best-effort
PEP-249 method (callers don't expect it to raise), so a cancel
failure is logged and swallowed rather than propagated. This
also covers the early-cancel window — a cancel arriving before
the kernel has observed the server statement id is a no-op in
the kernel canceller, but if it ever raised (e.g. a transport
hiccup on the cancel RPC) we must not surface that out of
``cancel()``. We still return ``True`` (a canceller was present
and we attempted it) so ``Cursor`` doesn't emit the misleading
"no executing command" warning.
"""
with self._sync_cancellers_lock:
canceller = self._sync_cancellers.get(id(cursor))
if canceller is None:
return False
try:
canceller.cancel()
except Exception:
logger.warning(
"cancel_running_cursor: best-effort cancel of in-flight "
"sync statement failed; swallowing (cursor.cancel() is "
"tolerant by PEP-249 contract)",
exc_info=True,
)
return True

def close_command(self, command_id: CommandId) -> None:
with self._async_handles_lock:
handle = self._async_handles.pop(command_id.guid, None)
Expand Down
21 changes: 16 additions & 5 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1704,11 +1704,22 @@ def cancel(self) -> None:
"""
if self.active_command_id is not None:
self.backend.cancel_command(self.active_command_id)
else:
logger.warning(
"Attempting to cancel a command, but there is no "
"currently executing command"
)
return
# No command id yet. A backend whose synchronous ``execute()``
# blocks without first publishing a command id (the kernel
# backend) may still have a server statement in flight. Such a
# backend exposes ``cancel_running_cursor(cursor)`` -> bool to
# cancel it; it returns True if something was actually
# cancelled. Opt-in via getattr so the Thrift / SEA backends
# (which set ``active_command_id`` before blocking) are
# unaffected.
cancel_running_cursor = getattr(self.backend, "cancel_running_cursor", None)
if cancel_running_cursor is not None and cancel_running_cursor(self):
return
logger.warning(
"Attempting to cancel a command, but there is no "
"currently executing command"
)

def close(self) -> None:
"""Close cursor"""
Expand Down
119 changes: 118 additions & 1 deletion tests/e2e/test_kernel_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import pytest

import databricks.sql as sql
from databricks.sql.exc import DatabaseError
from databricks.sql.exc import DatabaseError, NotSupportedError, ServerOperationError

# Skip the whole module unless the kernel wheel is genuinely installed.
# ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a
Expand Down Expand Up @@ -361,3 +361,120 @@ def test_user_agent_entry_and_http_headers_round_trip(kernel_conn_params):
with c.cursor() as cur:
cur.execute("SELECT 1 AS n")
assert cur.fetchall()[0][0] == 1


# ── Parameter parity (tz-aware timestamp, scientific decimal) ──────


def test_tz_aware_timestamp_parameter_binds(conn):
"""A tz-aware datetime parameter (datetime with tzinfo) binds on
the kernel path and resolves to the correct UTC instant. Previously
rejected at bind on kernel; works on Thrift. (kernel #121)"""
import datetime

tzdt = datetime.datetime(
2026,
5,
15,
18,
0,
0,
tzinfo=datetime.timezone(datetime.timedelta(hours=5, minutes=30)),
)
with conn.cursor() as cur:
cur.execute("SELECT ? AS ts", [tzdt])
ts = cur.fetchall()[0][0]
# 18:00 +05:30 == 12:30 UTC.
assert (ts.hour, ts.minute) == (12, 30)


def test_scientific_notation_decimal_parameter_binds(conn):
"""A Decimal whose str() is exponential (e.g. 1E-7) binds on the
kernel path. Previously rejected at bind; the server/Thrift accept
scientific-notation decimal literals. (kernel #121)"""
import decimal
from databricks.sql.parameters.native import DecimalParameter

with conn.cursor() as cur:
# 1E+2 == 100, round-trips cleanly at scale 0.
cur.execute("SELECT ? AS d", [DecimalParameter(decimal.Decimal("1E+2"))])
assert int(cur.fetchall()[0][0]) == 100


# ── Staging / volume — fail loud, not silent no-op ────────────────


def test_staging_put_raises_not_supported(conn):
"""A PUT (volume/staging) statement fails loud on the kernel path
rather than silently no-opping (which would make ETL ingest
stale/missing data). (CUJ-gap audit)"""
with conn.cursor() as cur:
with pytest.raises(NotSupportedError, match="staging"):
cur.execute("PUT '/tmp/x.csv' INTO '/Volumes/main/default/v/x.csv'")


def test_comment_prefixed_staging_put_raises_not_supported(conn):
"""A comment-prefixed staging op (common in ETL scripts) must also
fail loud — the leading-verb detection strips SQL comments first, so
it can't slip past into the silent-no-op bug (PR #825 review #1)."""
with conn.cursor() as cur:
with pytest.raises(NotSupportedError, match="staging"):
cur.execute(
"-- upload the daily extract\n"
"PUT '/tmp/x.csv' INTO '/Volumes/main/default/v/x.csv'"
)


# ── Error fidelity — diagnostic-info reaches .context ─────────────


def test_server_error_exposes_diagnostic_info_context(conn):
"""A server-side query failure surfaces as ServerOperationError
with the Spark diagnostic context in ``.context['diagnostic-info']``
— Thrift parity (kernel #121 forwards diagnostic_info across pyo3;
the connector populates .context)."""
with conn.cursor() as cur:
with pytest.raises(ServerOperationError) as exc_info:
cur.execute("SELECT * FROM definitely_not_a_table_xyz_kernel_e2e")
err = exc_info.value
# context shape matches Thrift; diagnostic-info may be None if
# the server didn't attach one, but the KEY must exist.
assert "diagnostic-info" in err.context
assert "operation-id" in err.context


# ── Sync cancel (cursor.cancel() from another thread) ─────────────


def test_sync_cancel_interrupts_blocking_execute(conn):
"""cursor.cancel() from another thread cancels a long-running
blocking execute() on the kernel path. Previously a silent no-op
(active_command_id was None until execute returned). (kernel #121
StatementCanceller + connector cancel_running_cursor wiring)"""
import threading
import time

cur = conn.cursor()

# The kernel publishes the server statement id once the initial
# POST returns — within the server's default wait window (~10s).
# Cancel after that so the canceller has an id to target; a cancel
# before then is a no-op by design (id not yet known). Pick a query
# that runs well past this so the cancel lands mid-flight.
def cancel_after_delay():
time.sleep(15.0)
cur.cancel()

t = threading.Thread(target=cancel_after_delay)
t.start()
try:
# Cancel should make execute() raise rather than run to
# completion — proving the server-side statement was cancelled.
with pytest.raises(Exception):
cur.execute(
"SELECT count(*) FROM range(0, 1000000000000) "
"WHERE pow(rand(), 2) < 0.5 AND sqrt(id) > 1"
)
finally:
t.join()
cur.close()
Loading
Loading