Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion KERNEL_REV
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cbeaf44c66ebc57c5b3eed2a5c5d0290d4bf035b
4f7fbe700050a363adc87ae1b94c217df23fe5c9
21 changes: 17 additions & 4 deletions src/databricks/sql/backend/kernel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,16 +473,29 @@ def execute_command(
# Canceller is best-effort; never block execute on it.
pass
executed = stmt.execute()
# Execute succeeded: the kernel now owns the statement
# lifecycle. It auto-closes the server statement when the
# result stream is fully drained (``ExecutedStatement::
# next_batch`` end-of-stream), with the executed handle's
# ``Drop`` as the backstop for partial/abandoned reads.
# So we must NOT close ``stmt`` here: a premature
# ``CloseStatement`` at execute-return broke lazy
# CloudFetch chunk-link fetches (``get_result_chunks``
# against the live statement) for large paginated-link
# results — the H4 gap. Closing here is left ONLY for the
# error path below, where no executed handle / result set
# was produced to reap it.
close_stmt = False
except Exception as exc:
raise _wrap_kernel_exception("execute_command", exc) from exc
finally:
with self._sync_cancellers_lock:
self._sync_cancellers.pop(id(cursor), None)
if close_stmt:
# Sync path: ``Statement`` is a lifecycle owner separate
# from the executed handle. Drop it here so the parent
# doesn't outlive its caller. Swallow close errors —
# they're not actionable.
# Reached only when ``stmt.execute()`` did not succeed
# (or async, which flipped the flag earlier): no executed
# handle owns the statement, so close it here to avoid a
# leak. Swallow close errors — not actionable.
try:
stmt.close()
except Exception:
Expand Down
58 changes: 57 additions & 1 deletion tests/e2e/test_kernel_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
import pytest

import databricks.sql as sql
from databricks.sql.exc import DatabaseError, NotSupportedError, ServerOperationError
from databricks.sql.exc import (
DatabaseError,
NotSupportedError,
OperationalError,
ServerOperationError,
)

# Skip the whole module unless the kernel wheel is genuinely installed.
# ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a
Expand Down Expand Up @@ -478,3 +483,54 @@ def cancel_after_delay():
finally:
t.join()
cur.close()


# ── Batch 2 ────────────────────────────────────────────────────────


def test_large_result_drains_without_premature_close(conn):
"""H4: a large multi-chunk result fully drains even though the
connector no longer closes the statement at execute-return — the
kernel auto-closes on drain. Guards the regression where a premature
CloseStatement broke lazy CloudFetch chunk-link fetches."""
n = 5_000_000
with conn.cursor() as cur:
cur.execute(f"SELECT id, cast(id AS string) s FROM range({n})")
rows = cur.fetchall()
assert len(rows) == n
# Cursor is reusable after the auto-close fired on the prior result.
cur.execute("SELECT 42 AS n")
assert cur.fetchall()[0][0] == 42


def test_server_cancel_maps_to_operational_error(conn):
"""A server-side cancel surfaces as OperationalError (cancelled
class), not ProgrammingError. We trigger it via a cross-thread
cancel of a running query; the raised exception must be in the
OperationalError family, not ProgrammingError."""
import threading
import time

from databricks.sql.exc import ProgrammingError

cur = conn.cursor()

def cancel_after_delay():
time.sleep(15.0)
cur.cancel()

t = threading.Thread(target=cancel_after_delay)
t.start()
try:
with pytest.raises(Exception) as exc_info:
cur.execute(
"SELECT count(*) FROM range(0, 1000000000000) "
"WHERE pow(rand(), 2) < 0.5 AND sqrt(id) > 1"
)
# The cancellation must not masquerade as a caller-argument
# (ProgrammingError) error. It should be operational.
assert not isinstance(exc_info.value, ProgrammingError)
assert isinstance(exc_info.value, (OperationalError, DatabaseError))
finally:
t.join()
cur.close()
68 changes: 68 additions & 0 deletions tests/unit/test_kernel_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,74 @@ def fake_execute():
assert id(cursor) not in c._sync_cancellers


def test_sync_execute_does_not_close_statement_on_success():
"""H4: on a successful sync execute(), the connector must NOT close
the parent kernel Statement — the kernel now auto-closes the server
statement when the result stream drains (with the executed handle's
Drop as backstop). A premature close() here broke lazy CloudFetch
chunk-link fetches for large paginated-link results."""
c = _make_client()
c._kernel_session = MagicMock()
cursor = MagicMock()
cursor.arraysize = 100
cursor.buffer_size_bytes = 1024

stmt = MagicMock()
stmt.execute.return_value = MagicMock(
statement_id="stmt-id",
arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])),
)
c._kernel_session.statement.return_value = stmt

c.execute_command(
operation="SELECT 1",
session_id=MagicMock(),
max_rows=1,
max_bytes=1,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
)

# The kernel owns the statement lifecycle post-execute; connector
# leaves it alone (kernel auto-close-on-drain + Drop backstop).
stmt.close.assert_not_called()


def test_sync_execute_closes_statement_on_failure():
"""On the error path (execute raised, no executed handle / result
set produced), the connector still closes the parent Statement so
it isn't leaked."""
c = _make_client()
c._kernel_session = MagicMock()
cursor = MagicMock()
cursor.arraysize = 100
cursor.buffer_size_bytes = 1024

stmt = MagicMock()
stmt.execute.side_effect = RuntimeError("boom")
c._kernel_session.statement.return_value = stmt

with pytest.raises(Exception):
c.execute_command(
operation="SELECT 1",
session_id=MagicMock(),
max_rows=1,
max_bytes=1,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
)

stmt.close.assert_called_once_with()


def test_get_columns_accepts_none_catalog():
"""The kernel's `list_columns` honours `catalog=None` by issuing
`SHOW COLUMNS IN ALL CATALOGS` server-side. The connector should
Expand Down
Loading