diff --git a/KERNEL_REV b/KERNEL_REV index 37b717a45..682e6c2ec 100644 --- a/KERNEL_REV +++ b/KERNEL_REV @@ -1 +1 @@ -cbeaf44c66ebc57c5b3eed2a5c5d0290d4bf035b +4f7fbe700050a363adc87ae1b94c217df23fe5c9 diff --git a/src/databricks/sql/backend/kernel/client.py b/src/databricks/sql/backend/kernel/client.py index 08bb0d36b..a57a7a821 100644 --- a/src/databricks/sql/backend/kernel/client.py +++ b/src/databricks/sql/backend/kernel/client.py @@ -473,16 +473,29 @@ def execute_command( # Canceller is best-effort; never block execute on it. pass executed = stmt.execute() + # Execute succeeded: the kernel now owns the statement + # lifecycle. It auto-closes the server statement when the + # result stream is fully drained (``ExecutedStatement:: + # next_batch`` end-of-stream), with the executed handle's + # ``Drop`` as the backstop for partial/abandoned reads. + # So we must NOT close ``stmt`` here: a premature + # ``CloseStatement`` at execute-return broke lazy + # CloudFetch chunk-link fetches (``get_result_chunks`` + # against the live statement) for large paginated-link + # results — the H4 gap. Closing here is left ONLY for the + # error path below, where no executed handle / result set + # was produced to reap it. + close_stmt = False except Exception as exc: raise _wrap_kernel_exception("execute_command", exc) from exc finally: with self._sync_cancellers_lock: self._sync_cancellers.pop(id(cursor), None) if close_stmt: - # Sync path: ``Statement`` is a lifecycle owner separate - # from the executed handle. Drop it here so the parent - # doesn't outlive its caller. Swallow close errors — - # they're not actionable. + # Reached only when ``stmt.execute()`` did not succeed + # (or async, which flipped the flag earlier): no executed + # handle owns the statement, so close it here to avoid a + # leak. Swallow close errors — not actionable. try: stmt.close() except Exception: diff --git a/tests/e2e/test_kernel_backend.py b/tests/e2e/test_kernel_backend.py index 95d7d942e..b0d289979 100644 --- a/tests/e2e/test_kernel_backend.py +++ b/tests/e2e/test_kernel_backend.py @@ -24,7 +24,12 @@ import pytest import databricks.sql as sql -from databricks.sql.exc import DatabaseError, NotSupportedError, ServerOperationError +from databricks.sql.exc import ( + DatabaseError, + NotSupportedError, + OperationalError, + ServerOperationError, +) # Skip the whole module unless the kernel wheel is genuinely installed. # ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a @@ -478,3 +483,54 @@ def cancel_after_delay(): finally: t.join() cur.close() + + +# ── Batch 2 ──────────────────────────────────────────────────────── + + +def test_large_result_drains_without_premature_close(conn): + """H4: a large multi-chunk result fully drains even though the + connector no longer closes the statement at execute-return — the + kernel auto-closes on drain. Guards the regression where a premature + CloseStatement broke lazy CloudFetch chunk-link fetches.""" + n = 5_000_000 + with conn.cursor() as cur: + cur.execute(f"SELECT id, cast(id AS string) s FROM range({n})") + rows = cur.fetchall() + assert len(rows) == n + # Cursor is reusable after the auto-close fired on the prior result. + cur.execute("SELECT 42 AS n") + assert cur.fetchall()[0][0] == 42 + + +def test_server_cancel_maps_to_operational_error(conn): + """A server-side cancel surfaces as OperationalError (cancelled + class), not ProgrammingError. We trigger it via a cross-thread + cancel of a running query; the raised exception must be in the + OperationalError family, not ProgrammingError.""" + import threading + import time + + from databricks.sql.exc import ProgrammingError + + cur = conn.cursor() + + def cancel_after_delay(): + time.sleep(15.0) + cur.cancel() + + t = threading.Thread(target=cancel_after_delay) + t.start() + try: + with pytest.raises(Exception) as exc_info: + cur.execute( + "SELECT count(*) FROM range(0, 1000000000000) " + "WHERE pow(rand(), 2) < 0.5 AND sqrt(id) > 1" + ) + # The cancellation must not masquerade as a caller-argument + # (ProgrammingError) error. It should be operational. + assert not isinstance(exc_info.value, ProgrammingError) + assert isinstance(exc_info.value, (OperationalError, DatabaseError)) + finally: + t.join() + cur.close() diff --git a/tests/unit/test_kernel_client.py b/tests/unit/test_kernel_client.py index 3cbf55540..0553f6109 100644 --- a/tests/unit/test_kernel_client.py +++ b/tests/unit/test_kernel_client.py @@ -572,6 +572,74 @@ def fake_execute(): assert id(cursor) not in c._sync_cancellers +def test_sync_execute_does_not_close_statement_on_success(): + """H4: on a successful sync execute(), the connector must NOT close + the parent kernel Statement — the kernel now auto-closes the server + statement when the result stream drains (with the executed handle's + Drop as backstop). A premature close() here broke lazy CloudFetch + chunk-link fetches for large paginated-link results.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + stmt = MagicMock() + stmt.execute.return_value = MagicMock( + statement_id="stmt-id", + arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])), + ) + c._kernel_session.statement.return_value = stmt + + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + # The kernel owns the statement lifecycle post-execute; connector + # leaves it alone (kernel auto-close-on-drain + Drop backstop). + stmt.close.assert_not_called() + + +def test_sync_execute_closes_statement_on_failure(): + """On the error path (execute raised, no executed handle / result + set produced), the connector still closes the parent Statement so + it isn't leaked.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + stmt = MagicMock() + stmt.execute.side_effect = RuntimeError("boom") + c._kernel_session.statement.return_value = stmt + + with pytest.raises(Exception): + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + stmt.close.assert_called_once_with() + + def test_get_columns_accepts_none_catalog(): """The kernel's `list_columns` honours `catalog=None` by issuing `SHOW COLUMNS IN ALL CATALOGS` server-side. The connector should