From 1b0e8c6ae347787c97024bfd19232cc91cad9c97 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Tue, 2 Jun 2026 18:51:27 +0000 Subject: [PATCH 1/6] [SEA-NodeJS] SEA connection & statement options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire the SEA connection-level and per-statement option surfaces onto the merged-kernel napi binding (thin forwarding — the kernel owns the behaviour): Connection options (SeaAuth.buildSeaConnectionOptions): - `maxConnections` → kernel pool size, validated as a positive integer within the napi u32 range. - TLS: `checkServerCertificate` (secure-by-default — omit to keep the kernel's verify-on default; `false` opts into insecure) and `customCaCert` (PEM string or Buffer; strings are PEM-sanity-checked and normalised to a Buffer before the FFI boundary), via the new `buildSeaTlsOptions`. - `intervalsAsString: true` is always set so SEA interval/duration columns render as strings — a byte-compatible drop-in for the Thrift backend. `complexTypesAsJson` is intentionally left at the kernel default (native Arrow), which already decodes identically to Thrift via the shared converter. Statement options (SeaSessionBackend.executeStatement, via buildExecuteOptions): - `queryTimeout` → `queryTimeoutSecs`; `rowLimit` → `rowLimit` (SEA-only cap). - `queryTags` serialised JS-side (reusing Thrift's `serializeQueryTags`) into the reserved `query_tags` conf key, merged with any explicit `statementConf` — the napi `queryTags` field can't carry null-valued tags, and the kernel rejects setting both. `queryTags` / `queryTimeout` are no longer rejected. - Still rejected (genuinely unsupported on SEA): `useCloudFetch`, `useLZ4Compression`, `stagingAllowedLocalPath`. `rowLimit` / `statementConf` added to the public `ExecuteStatementOptions`; SEA-only knobs (`maxConnections` / `checkServerCertificate` / `customCaCert`) added to the internal `InternalConnectionOptions`. Validated against a live warehouse: secure-by-default connect, maxConnections, checkServerCertificate, rowLimit (caps rows), queryTimeout, queryTags, statementConf, and non-PEM customCaCert rejection. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/contracts/IDBSQLSession.ts | 12 ++ lib/contracts/InternalConnectionOptions.ts | 23 ++++ lib/sea/SeaAuth.ts | 136 ++++++++++++++++++++- lib/sea/SeaSessionBackend.ts | 110 ++++++++++------- tests/unit/sea/auth-m2m.test.ts | 2 + tests/unit/sea/auth-pat.test.ts | 1 + tests/unit/sea/auth-u2m.test.ts | 2 + tests/unit/sea/connectionOptions.test.ts | 113 +++++++++++++++++ tests/unit/sea/execution.test.ts | 51 ++++++-- 9 files changed, 394 insertions(+), 56 deletions(-) create mode 100644 tests/unit/sea/connectionOptions.test.ts diff --git a/lib/contracts/IDBSQLSession.ts b/lib/contracts/IDBSQLSession.ts index 392f3108..fd0dda16 100644 --- a/lib/contracts/IDBSQLSession.ts +++ b/lib/contracts/IDBSQLSession.ts @@ -27,6 +27,18 @@ export type ExecuteStatementOptions = { * These tags apply only to this statement and do not persist across queries. */ queryTags?: Record; + /** + * SEA-only: server-side row cap for this statement (kernel `row_limit`). The + * Thrift backend has no execute-time server cap, so this is a no-op there; + * use `maxRows` for the cross-backend client-side fetch limit. + */ + rowLimit?: number; + /** + * SEA-only: per-statement Spark conf overlay (kernel `statement_conf`). + * Merged with the serialized `queryTags` (which land under the reserved + * `query_tags` key). Ignored by the Thrift backend. + */ + statementConf?: Record; }; export type TypeInfoRequest = { diff --git a/lib/contracts/InternalConnectionOptions.ts b/lib/contracts/InternalConnectionOptions.ts index a115aa47..24575984 100644 --- a/lib/contracts/InternalConnectionOptions.ts +++ b/lib/contracts/InternalConnectionOptions.ts @@ -18,4 +18,27 @@ export interface InternalConnectionOptions { * @internal Not stable; M0 stub only. */ useSEA?: boolean; + + /** + * SEA-only: kernel connection-pool size (`ConnectionOptions.max_connections`). + * Validated as a positive integer within the napi `u32` range. + * @internal SEA path only. + */ + maxConnections?: number; + + /** + * SEA-only: verify the server's TLS certificate. Secure-by-default — omit + * to keep full chain + hostname verification; set `false` only to opt into + * the insecure accept-anything mode. + * @internal SEA path only. + */ + checkServerCertificate?: boolean; + + /** + * SEA-only: PEM-encoded CA certificate (string or `Buffer`) added to the + * trust store on top of the system roots — for TLS-inspecting proxies or + * on-prem internal CAs. Honoured regardless of `checkServerCertificate`. + * @internal SEA path only. + */ + customCaCert?: Buffer | string; } diff --git a/lib/sea/SeaAuth.ts b/lib/sea/SeaAuth.ts index bdfabf3d..00fe2b5f 100644 --- a/lib/sea/SeaAuth.ts +++ b/lib/sea/SeaAuth.ts @@ -66,9 +66,58 @@ export interface SeaSessionDefaults { catalog?: string; schema?: string; sessionConf?: Record; + /** + * Render `INTERVAL` / `DURATION` result columns as strings + * (kernel `ResultConfig.intervals_as_string`). The kernel default is + * native Arrow `month_interval` / `duration[us]`, but the NodeJS + * Thrift driver surfaces intervals as strings — so the SEA path sets + * this `true` so its result shape is a byte-compatible drop-in for the + * Thrift backend. Omitting it falls back to the kernel's native types. + */ + intervalsAsString?: boolean; + /** + * Render complex (`ARRAY` / `MAP` / `STRUCT` / `VARIANT`) result + * columns as JSON strings (kernel `ResultConfig.complex_types_as_json`). + * Left unset on the SEA path: native Arrow nested types already decode + * identically to the Thrift backend through the shared Arrow converter, + * so forcing JSON here would *introduce* a divergence rather than + * remove one. + */ + complexTypesAsJson?: boolean; + /** + * Per-session kernel connection-pool size + * (kernel `ConnectionOptions.max_connections`). Validated as a positive + * integer within the napi `u32` range by `buildSeaConnectionOptions`. + */ + maxConnections?: number; +} + +/** + * TLS options shared across all auth-mode variants. Mirror the napi + * binding's `ConnectionOptions.checkServerCertificate` / `.customCaCert` + * (kernel `Session::builder().tls(TlsConfig)`). + * + * The napi shape takes `customCaCert` as a `Buffer` only; the public + * `ConnectionOptions` additionally accepts a PEM string, which + * `buildSeaConnectionOptions` normalises to a `Buffer` before crossing + * the FFI boundary. + */ +export interface SeaTlsOptions { + /** + * Verify the server's TLS certificate. The SEA backend is + * **secure-by-default**: omitting this leaves the kernel default of + * `true` (full chain + hostname verification). Set `false` only to opt + * into the insecure, accept-anything mode (analogous to Thrift's + * `rejectUnauthorized: false`); prefer pairing strict checking with + * `customCaCert` over disabling verification entirely. + */ + checkServerCertificate?: boolean; + /** PEM-encoded CA bytes to add to the trust store. */ + customCaCert?: Buffer; } export type SeaNativeConnectionOptions = SeaSessionDefaults & + SeaTlsOptions & ( | { hostName: string; @@ -114,6 +163,57 @@ export function isBlankOrReserved(s: string): boolean { return normalized.length === 0 || normalized === 'undefined' || normalized === 'null'; } +/** napi-rs marshals `maxConnections` as a `u32`; reject values it can't hold. */ +const MAX_U32 = 0xffffffff; + +/** + * Normalise the public TLS options (`checkServerCertificate` / + * `customCaCert`) into the napi shape. + * + * - `checkServerCertificate` passes through verbatim (only when set; an + * absent value leaves the kernel default, which is secure — verify on). + * - `customCaCert` accepts a PEM string or `Buffer` on the public + * surface; we convert a string to a `Buffer` here and do a light PEM + * sanity check. The bytes are NOT parsed in JS — the kernel returns a + * meaningful error if the PEM is malformed. + * + * Throws `HiveDriverError` when `customCaCert` is supplied but empty or + * (for strings) lacks a PEM certificate header. + */ +export function buildSeaTlsOptions(options: ConnectionOptions): SeaTlsOptions { + const { checkServerCertificate, customCaCert } = options as { + checkServerCertificate?: boolean; + customCaCert?: Buffer | string; + }; + + const tls: SeaTlsOptions = {}; + + if (checkServerCertificate !== undefined) { + tls.checkServerCertificate = checkServerCertificate; + } + + if (customCaCert !== undefined) { + if (typeof customCaCert === 'string') { + if (!customCaCert.includes('-----BEGIN CERTIFICATE-----')) { + throw new HiveDriverError( + 'SEA backend: `customCaCert` string does not look like a PEM certificate ' + + "(missing '-----BEGIN CERTIFICATE-----'). Pass PEM text or a Buffer of PEM bytes.", + ); + } + tls.customCaCert = Buffer.from(customCaCert, 'utf8'); + } else if (Buffer.isBuffer(customCaCert)) { + if (customCaCert.length === 0) { + throw new HiveDriverError('SEA backend: `customCaCert` Buffer is empty.'); + } + tls.customCaCert = customCaCert; + } else { + throw new HiveDriverError('SEA backend: `customCaCert` must be a PEM string or a Buffer.'); + } + } + + return tls; +} + /** * Validate the user-supplied `ConnectionOptions` and build the * napi-binding's connection-options shape. @@ -170,11 +270,45 @@ export function isBlankOrReserved(s: string): boolean { export function buildSeaConnectionOptions(options: ConnectionOptions): SeaNativeConnectionOptions { const { authType } = options as { authType?: string }; - const base = { + const base: { + hostName: string; + httpPath: string; + intervalsAsString: boolean; + maxConnections?: number; + } & SeaTlsOptions = { hostName: options.host, httpPath: prependSlash(options.path), + // Match the NodeJS Thrift driver, which surfaces INTERVAL columns as + // strings. The kernel defaults to native Arrow interval/duration types; + // forcing the string rendering here keeps the SEA path a byte-compatible + // drop-in. Complex types are intentionally left at the kernel default + // (native Arrow) — they already decode identically to Thrift via the + // shared Arrow converter, so `complexTypesAsJson` is not forced on. + intervalsAsString: true, + // TLS knobs (server-cert verification toggle + custom CA). Validated and + // normalised (string PEM → Buffer) here so the napi shape only sees a Buffer. + ...buildSeaTlsOptions(options), }; + // SEA-only pool sizing; read via cast to match how this function reads the + // other SEA-specific options (TLS) — they live on the internal options + // surface, not the published public `ConnectionOptions` `.d.ts`. + const { maxConnections } = options as { maxConnections?: number }; + if (maxConnections !== undefined) { + if (!Number.isInteger(maxConnections) || maxConnections < 1) { + throw new HiveDriverError( + `SEA backend: \`maxConnections\` must be a positive integer; got ${maxConnections}.`, + ); + } + if (maxConnections > MAX_U32) { + throw new HiveDriverError( + `SEA backend: \`maxConnections\` exceeds the napi u32 limit (${MAX_U32}); got ${maxConnections}. ` + + 'Typical pool sizes are 10-500.', + ); + } + base.maxConnections = maxConnections; + } + const oauth = options as { oauthClientId?: string; oauthClientSecret?: string; diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index e12e1d60..55c08dc7 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -38,6 +38,7 @@ import { decodeNapiKernelError } from './SeaErrorMapping'; import SeaOperationBackend from './SeaOperationBackend'; import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams'; import { seaServerInfoValue } from './SeaServerInfo'; +import { serializeQueryTags } from '../utils'; export interface SeaSessionBackendOptions { /** The opaque napi `Connection` handle returned by `openSession`. */ @@ -116,51 +117,30 @@ export default class SeaSessionBackend implements ISessionBackend { /** * Execute a SQL statement through the napi binding. * - * Catalog / schema / sessionConf were applied at session open, so - * there are no per-statement options to thread through. + * Catalog / schema / sessionConf are session-level (applied at open). + * Per-statement options forwarded to the kernel `ExecuteOptions`: + * - `ordinalParameters` / `namedParameters` → bound params (mutually + * exclusive — the kernel binds one placeholder style per statement); + * - `queryTimeout` → `queryTimeoutSecs` (SEA server wait timeout); + * - `rowLimit` → `rowLimit` (SEA-only server-side row cap); + * - `queryTags` → serialised into the conf overlay's reserved + * `query_tags` key (the same wire shape Thrift's `serializeQueryTags` + * produces), merged with any explicit `statementConf`. * - * M0 intentionally rejects `queryTimeout`, `namedParameters`, and - * `ordinalParameters` with explicit deferred-to-M1 errors. `useCloudFetch` - * is a no-op on the SEA path — the kernel hardcodes the SEA - * `disposition` to `INLINE_OR_EXTERNAL_LINKS`, and per-statement - * conf overrides have no reader on the kernel; cloud-fetch behaviour - * is governed entirely by the kernel's `ResultConfig` (M1 binding - * surface). - * - * The Thrift backend remains the path for consumers that need any - * of those today. + * Still rejected (genuinely unsupported on SEA, rather than silently + * dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a + * per-statement knob), `useLZ4Compression` (kernel owns result compression), + * and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by + * the facade at fetch time, so it is intentionally not handled here. */ public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise { this.failIfClosed(); - // Positional (`?`) and named (`:name`) parameters are mutually exclusive — - // the kernel param codec binds exactly one placeholder style per statement. - // Use the SAME error type and message as the Thrift backend - // (`ThriftSessionBackend.getQueryParameters`) so a caller catching - // `ParameterError` for this case behaves identically across backends. - const positionalParams = buildSeaPositionalParams(options.ordinalParameters); - const namedParams = buildSeaNamedParams(options.namedParameters); - if (positionalParams !== undefined && namedParams !== undefined) { - throw new ParameterError('Driver does not support both ordinal and named parameters.'); - } - - if (options.queryTimeout !== undefined) { - throw new HiveDriverError('SEA executeStatement: queryTimeout is not supported in M0 (deferred to M1)'); - } if (options.useCloudFetch !== undefined) { throw new HiveDriverError( 'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA', ); } - // Reject — rather than silently ignore — the remaining Thrift-path - // options the SEA M0 backend does not honor. Silently dropping them - // is the worst failure mode for an agent/caller: passing e.g. - // `queryTags` or `useLZ4Compression` would no-op with zero signal. - // (`maxRows` is intentionally NOT here — the facade applies it at - // fetch time.) - if (options.queryTags !== undefined) { - throw new HiveDriverError('SEA executeStatement: queryTags is not supported in M0 (deferred to M1)'); - } if (options.useLZ4Compression !== undefined) { throw new HiveDriverError( 'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)', @@ -168,16 +148,11 @@ export default class SeaSessionBackend implements ISessionBackend { } if (options.stagingAllowedLocalPath !== undefined) { throw new HiveDriverError( - 'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported in M0 (deferred to M1)', + 'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported on SEA', ); } - // Only build the napi options object when there is something to send — - // the no-params path keeps the minimal call shape (`executeStatement(sql)`). - let execOptions: SeaNativeExecuteOptions | undefined; - if (positionalParams !== undefined || namedParams !== undefined) { - execOptions = { positionalParams, namedParams }; - } + const execOptions = this.buildExecuteOptions(options); let nativeStatement: SeaStatement; try { @@ -191,6 +166,57 @@ export default class SeaSessionBackend implements ISessionBackend { return this.wrapStatement(nativeStatement); } + /** + * Translate the public `ExecuteStatementOptions` into the kernel napi + * `ExecuteOptions`, returning `undefined` when nothing is set so the + * no-options call shape (`executeStatement(sql)`) is preserved. + */ + private buildExecuteOptions(options: ExecuteStatementOptions): SeaNativeExecuteOptions | undefined { + // Positional (`?`) and named (`:name`) parameters are mutually exclusive — + // the kernel binds one placeholder style per statement. Use the SAME error + // type and message as the Thrift backend (`ThriftSessionBackend`) so a + // caller catching `ParameterError` behaves identically across backends. + const positionalParams = buildSeaPositionalParams(options.ordinalParameters); + const namedParams = buildSeaNamedParams(options.namedParameters); + if (positionalParams !== undefined && namedParams !== undefined) { + throw new ParameterError('Driver does not support both ordinal and named parameters.'); + } + + const execOptions: SeaNativeExecuteOptions = {}; + if (positionalParams !== undefined) { + execOptions.positionalParams = positionalParams; + } + if (namedParams !== undefined) { + execOptions.namedParams = namedParams; + } + // JDBC `setQueryTimeout` is whole seconds; the kernel's `queryTimeoutSecs` + // (SEA wait timeout) is the native equivalent. The SEA wire caps it at 50s. + if (options.queryTimeout !== undefined) { + execOptions.queryTimeoutSecs = Number(options.queryTimeout); + } + if (options.rowLimit !== undefined) { + execOptions.rowLimit = Number(options.rowLimit); + } + // Per-statement conf overlay plus query tags. Tags are serialised JS-side + // into the reserved `query_tags` key (the same wire shape the Thrift + // backend produces via `serializeQueryTags` → `confOverlay`), rather than + // via the napi `queryTags` field: napi's `HashMap` can't + // represent a null-valued tag, and the kernel rejects setting both the + // `queryTags` field and a `query_tags` conf key. + const serializedQueryTags = serializeQueryTags(options.queryTags); + if (options.statementConf !== undefined || serializedQueryTags !== undefined) { + const statementConf: Record = { ...(options.statementConf ?? {}) }; + if (serializedQueryTags !== undefined) { + statementConf.query_tags = serializedQueryTags; + } + if (Object.keys(statementConf).length > 0) { + execOptions.statementConf = statementConf; + } + } + + return Object.keys(execOptions).length > 0 ? execOptions : undefined; + } + /** Wrap a napi `Statement` (from execute or a metadata call) as an operation backend. */ private wrapStatement(nativeStatement: SeaStatement): IOperationBackend { return new SeaOperationBackend({ diff --git a/tests/unit/sea/auth-m2m.test.ts b/tests/unit/sea/auth-m2m.test.ts index a4f90ed5..159afe1d 100644 --- a/tests/unit/sea/auth-m2m.test.ts +++ b/tests/unit/sea/auth-m2m.test.ts @@ -35,6 +35,7 @@ describe('SeaAuth + SeaBackend — OAuth M2M auth flow', () => { expect(native).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthM2m', oauthClientId: 'client-uuid', oauthClientSecret: 'dose-fake-secret', @@ -165,6 +166,7 @@ describe('SeaAuth + SeaBackend — OAuth M2M auth flow', () => { expect(calls[0].args[0]).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthM2m', oauthClientId: 'client-uuid', oauthClientSecret: 'dose-fake-secret', diff --git a/tests/unit/sea/auth-pat.test.ts b/tests/unit/sea/auth-pat.test.ts index f59b445c..bd82eb87 100644 --- a/tests/unit/sea/auth-pat.test.ts +++ b/tests/unit/sea/auth-pat.test.ts @@ -31,6 +31,7 @@ describe('SeaAuth — PAT auth options builder', () => { expect(native).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'Pat', token: 'dapi-fake-pat', }); diff --git a/tests/unit/sea/auth-u2m.test.ts b/tests/unit/sea/auth-u2m.test.ts index c8f63fef..828ca961 100644 --- a/tests/unit/sea/auth-u2m.test.ts +++ b/tests/unit/sea/auth-u2m.test.ts @@ -33,6 +33,7 @@ describe('SeaAuth + SeaBackend — OAuth U2M auth flow', () => { expect(native).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthU2m', oauthRedirectPort: 8030, }); @@ -132,6 +133,7 @@ describe('SeaAuth + SeaBackend — OAuth U2M auth flow', () => { expect(calls[0].args[0]).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthU2m', oauthRedirectPort: 8030, }); diff --git a/tests/unit/sea/connectionOptions.test.ts b/tests/unit/sea/connectionOptions.test.ts new file mode 100644 index 00000000..8b065697 --- /dev/null +++ b/tests/unit/sea/connectionOptions.test.ts @@ -0,0 +1,113 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { expect } from 'chai'; +import { buildSeaConnectionOptions, buildSeaTlsOptions } from '../../../lib/sea/SeaAuth'; +import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; +import HiveDriverError from '../../../lib/errors/HiveDriverError'; + +const PAT = { host: 'h.databricks.com', path: '/sql/1.0/warehouses/abc', token: 'dapi-x' }; + +// Cast helper: the SEA connection-tuning/TLS options live on the internal +// surface, so tests build untyped option literals. +const opts = (extra: Record) => ({ ...PAT, ...extra }) as unknown as ConnectionOptions; + +describe('SeaAuth connection options — intervalsAsString default', () => { + it('always sets intervalsAsString:true (thrift-compatible interval rendering)', () => { + const native = buildSeaConnectionOptions(opts({})) as { intervalsAsString?: boolean }; + expect(native.intervalsAsString).to.equal(true); + }); + + it('does NOT force complexTypesAsJson (native Arrow nested types match Thrift)', () => { + const native = buildSeaConnectionOptions(opts({})) as { complexTypesAsJson?: boolean }; + expect(native.complexTypesAsJson).to.equal(undefined); + }); +}); + +describe('SeaAuth connection options — maxConnections', () => { + it('forwards a valid positive integer', () => { + const native = buildSeaConnectionOptions(opts({ maxConnections: 10 })) as { maxConnections?: number }; + expect(native.maxConnections).to.equal(10); + }); + + it('omits maxConnections when unset', () => { + const native = buildSeaConnectionOptions(opts({})) as { maxConnections?: number }; + expect(native.maxConnections).to.equal(undefined); + }); + + for (const bad of [0, -1, 1.5]) { + it(`rejects non-positive-integer maxConnections (${bad})`, () => { + expect(() => buildSeaConnectionOptions(opts({ maxConnections: bad }))).to.throw( + HiveDriverError, + /positive integer/, + ); + }); + } + + it('rejects maxConnections beyond the u32 limit', () => { + expect(() => buildSeaConnectionOptions(opts({ maxConnections: 0x1_0000_0000 }))).to.throw( + HiveDriverError, + /u32 limit/, + ); + }); +}); + +describe('SeaAuth TLS options (buildSeaTlsOptions)', () => { + it('is empty by default (secure-by-default — kernel default verify-on)', () => { + expect(buildSeaTlsOptions(opts({}))).to.deep.equal({}); + }); + + it('passes checkServerCertificate through verbatim (including false)', () => { + expect(buildSeaTlsOptions(opts({ checkServerCertificate: false }))).to.deep.equal({ + checkServerCertificate: false, + }); + expect(buildSeaTlsOptions(opts({ checkServerCertificate: true }))).to.deep.equal({ + checkServerCertificate: true, + }); + }); + + it('normalises a PEM string to a Buffer', () => { + const pem = '-----BEGIN CERTIFICATE-----\nMIIB...\n-----END CERTIFICATE-----\n'; + const tls = buildSeaTlsOptions(opts({ customCaCert: pem })); + expect(Buffer.isBuffer(tls.customCaCert)).to.equal(true); + expect(tls.customCaCert?.toString('utf8')).to.equal(pem); + }); + + it('passes a Buffer customCaCert through unchanged', () => { + const buf = Buffer.from('-----BEGIN CERTIFICATE-----\nx\n-----END CERTIFICATE-----'); + expect(buildSeaTlsOptions(opts({ customCaCert: buf })).customCaCert).to.equal(buf); + }); + + it('rejects a non-PEM string', () => { + expect(() => buildSeaTlsOptions(opts({ customCaCert: 'not-a-pem' }))).to.throw( + HiveDriverError, + /PEM certificate/, + ); + }); + + it('rejects an empty Buffer', () => { + expect(() => buildSeaTlsOptions(opts({ customCaCert: Buffer.alloc(0) }))).to.throw(HiveDriverError, /empty/); + }); + + it('rejects a non-string, non-Buffer customCaCert', () => { + expect(() => buildSeaTlsOptions(opts({ customCaCert: 123 }))).to.throw(HiveDriverError, /PEM string or a Buffer/); + }); + + it('folds TLS options into the full connection options', () => { + const native = buildSeaConnectionOptions(opts({ checkServerCertificate: false })) as { + checkServerCertificate?: boolean; + }; + expect(native.checkServerCertificate).to.equal(false); + }); +}); diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index fefc79f7..6fd7308b 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -312,11 +312,14 @@ describe('SeaBackend', () => { const args = binding.openSessionStub.firstCall.args[0]; // sea-auth-u2m introduced the discriminated SeaNativeConnectionOptions // shape with a leading `authMode` tag — `'Pat'` for the PAT branch. + // `intervalsAsString: true` is always set so the SEA result shape is a + // byte-compatible drop-in for the Thrift backend (interval-as-string). expect(args).to.deep.equal({ hostName: 'workspace.example', httpPath: '/sql/1.0/warehouses/xyz', authMode: 'Pat', token: 'dapi-token', + intervalsAsString: true, }); }); @@ -452,24 +455,46 @@ describe('SeaSessionBackend', () => { expect((thrown as Error).message).to.equal('Driver does not support both ordinal and named parameters.'); }); - it('executeStatement rejects queryTimeout (M1)', async () => { + it('executeStatement forwards queryTimeout as queryTimeoutSecs', async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); - let thrown: unknown; - try { - await session.executeStatement('SELECT 1', { queryTimeout: 30 }); - } catch (err) { - thrown = err; - } - expect(thrown).to.be.instanceOf(HiveDriverError); - expect((thrown as Error).message).to.match(/queryTimeout/); + await session.executeStatement('SELECT 1', { queryTimeout: 30 }); + expect((connection.lastOptions as { queryTimeoutSecs?: number }).queryTimeoutSecs).to.equal(30); + }); + + it('executeStatement forwards rowLimit', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { rowLimit: 100 }); + expect((connection.lastOptions as { rowLimit?: number }).rowLimit).to.equal(100); + }); + + it('executeStatement serialises queryTags into statementConf.query_tags', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { queryTags: { team: 'x', env: 'prod' } }); + const conf = (connection.lastOptions as { statementConf?: Record }).statementConf; + expect(conf).to.have.property('query_tags'); + expect(conf?.query_tags).to.contain('team:x').and.to.contain('env:prod'); + }); + + it('executeStatement merges explicit statementConf with serialised queryTags', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { + statementConf: { 'spark.sql.ansi.enabled': 'true' }, + queryTags: { team: 'x' }, + }); + const conf = (connection.lastOptions as { statementConf?: Record }).statementConf; + expect(conf?.['spark.sql.ansi.enabled']).to.equal('true'); + expect(conf?.query_tags).to.contain('team:x'); }); - // These Thrift-path options are not honored on SEA M0. Rejecting them - // (rather than silently ignoring) is the contract a caller/agent needs: - // a silent no-op gives zero signal to debug. + // Genuinely unsupported on SEA — rejected (rather than silently ignored) so + // a caller/agent gets signal instead of a no-op. queryTags / queryTimeout / + // rowLimit are NOT here — they are forwarded (asserted above). for (const { name, options, re } of [ - { name: 'queryTags', options: { queryTags: { team: 'x' } }, re: /queryTags/ }, + { name: 'useCloudFetch', options: { useCloudFetch: true }, re: /useCloudFetch/ }, { name: 'useLZ4Compression', options: { useLZ4Compression: true }, re: /useLZ4Compression/ }, { name: 'stagingAllowedLocalPath', options: { stagingAllowedLocalPath: '/tmp' }, re: /stagingAllowedLocalPath/ }, ] as const) { From 6ab9e27f0527bd34e46ac3e22755fbedc328f96d Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Tue, 2 Jun 2026 19:02:26 +0000 Subject: [PATCH 2/6] [SEA-NodeJS] SEA async execute (submit / poll / awaitResult) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch the SEA query path from the blocking `executeStatement` to the kernel's async `submitStatement`, matching the Thrift backend's always-async (`runAsync: true`) model. `submitStatement` returns immediately with a pending `AsyncStatement` (kernel `wait_timeout=0s`) while the query runs server-side. SeaOperationBackend becomes dual-mode (exactly one of): - `asyncStatement` (query path): `waitUntilReady()` polls `status()` to a terminal state on a 100ms cadence (matching Thrift), firing the progress callback each tick. Polling `status()` rather than blocking on `awaitResult()` keeps `cancel()` responsive — a blocking awaitResult would hold the kernel statement mutex for the whole query and queue cancel behind it. On Succeeded it materialises the result handle (first fetch is free); on Failed it drives `awaitResult()` to surface the kernel's typed SQL-error envelope; on a server-side Cancelled/Closed/Unknown it throws a clear error. `status()` reports the real Pending/Running/Succeeded state. - `statement` (metadata path): the kernel `list*`/`get*` statement is already terminal, so `waitUntilReady()` stays the one-shot completion tick. The fetch pipeline is shared: `awaitResult()`'s `AsyncResultHandle` and the metadata `Statement` expose the same `fetchNextBatch()` / `schema()` surface, so `SeaResultsProvider` → `ArrowResultConverter` → `ResultSlicer` consume either interchangeably via a single memoised fetch handle. cancel()/close() route through a `lifecycleHandle` abstraction over whichever handle backs the op. Re-exports the kernel `AsyncStatement` / `AsyncResultHandle` types from `SeaNativeLoader`. Validated against a live warehouse: async fetchAll correctness, multi-row drain (5000 rows), long-running aggregate (count over 20M), kernel SQL-error surfacing, and cancellation mid-execution. PR1's params/metadata/getInfo all still pass through the new async path. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/sea/SeaNativeLoader.ts | 10 ++ lib/sea/SeaOperationBackend.ts | 232 +++++++++++++++++++++++++------ lib/sea/SeaSessionBackend.ts | 18 ++- tests/unit/sea/execution.test.ts | 171 ++++++++++++++++++++++- 4 files changed, 377 insertions(+), 54 deletions(-) diff --git a/lib/sea/SeaNativeLoader.ts b/lib/sea/SeaNativeLoader.ts index 8eb36f6a..75004e51 100644 --- a/lib/sea/SeaNativeLoader.ts +++ b/lib/sea/SeaNativeLoader.ts @@ -36,6 +36,8 @@ import type { ExecuteOptions as NativeExecuteOptions, TypedValueInput as NativeTypedValueInput, NamedTypedValueInput as NativeNamedTypedValueInput, + AsyncStatement as NativeAsyncStatement, + AsyncResultHandle as NativeAsyncResultHandle, } from '../../native/sea'; // SEA-prefixed re-exports. The kernel-generated `.d.ts` keeps the @@ -59,6 +61,14 @@ export type SeaNativeExecuteOptions = NativeExecuteOptions; export type SeaNativeTypedValueInput = NativeTypedValueInput; export type SeaNativeNamedTypedValueInput = NativeNamedTypedValueInput; +// Async-submit surface: `Connection.submitStatement` returns an +// `AsyncStatement` (status / awaitResult / cancel / close); `awaitResult` +// yields an `AsyncResultHandle` whose `fetchNextBatch` / `schema` match the +// blocking `Statement`'s fetch surface, so the results pipeline consumes +// either interchangeably. +export type SeaNativeAsyncStatement = NativeAsyncStatement; +export type SeaNativeAsyncResultHandle = NativeAsyncResultHandle; + /** * The full native binding surface, derived from the generated module * so it can never drift from the `.d.ts` contract: when the kernel diff --git a/lib/sea/SeaOperationBackend.ts b/lib/sea/SeaOperationBackend.ts index eb002870..ad187d3d 100644 --- a/lib/sea/SeaOperationBackend.ts +++ b/lib/sea/SeaOperationBackend.ts @@ -50,7 +50,7 @@ import ResultSlicer from '../result/ResultSlicer'; import SeaResultsProvider from './SeaResultsProvider'; import { arrowSchemaToThriftSchema, decodeIpcSchema, patchIpcBytes } from './SeaArrowIpc'; import { decodeNapiKernelError } from './SeaErrorMapping'; -import { SeaStatement } from './SeaNativeLoader'; +import { SeaStatement, SeaNativeAsyncStatement, SeaNativeAsyncResultHandle } from './SeaNativeLoader'; import { SeaStatementHandle, SeaOperationLifecycleState, @@ -71,23 +71,69 @@ import { export type SeaOperationStatement = SeaStatementHandle & Partial; /** - * Constructor options for `SeaOperationBackend`. + * The fetch surface shared by the blocking metadata `Statement` and the async + * query path's `AsyncResultHandle` (from `awaitResult()`): both expose + * `fetchNextBatch()` + a synchronous `schema()`, so the results pipeline + * (`SeaResultsProvider` → `ArrowResultConverter` → `ResultSlicer`) consumes + * either interchangeably. + */ +type SeaFetchHandle = Pick; + +/** Poll cadence for the async `status()` loop — matches the Thrift backend's 100ms. */ +const STATUS_POLL_INTERVAL_MS = 100; + +function delay(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +/** + * Map a kernel `AsyncStatement.status()` string to the backend-neutral + * `OperationState`. The kernel variant names (`Pending` / `Running` / + * `Succeeded` / `Failed` / `Cancelled` / `Closed` / `Unknown`) line up 1:1 + * with the enum; `Canceled` (one-L spelling) is mapped defensively, and any + * unrecognised value collapses to `Unknown`. + */ +function statusStringToOperationState(state: string): OperationState { + if (state === 'Canceled') { + return OperationState.Cancelled; + } + if ((Object.values(OperationState) as string[]).includes(state)) { + return state as OperationState; + } + return OperationState.Unknown; +} + +/** + * Constructor options for `SeaOperationBackend`. Exactly one of + * `asyncStatement` (query path — `Connection.submitStatement`) or `statement` + * (metadata path — `Connection.list*` / `get*`, already terminal) must be set. */ export interface SeaOperationBackendOptions { - /** The opaque napi `Statement` handle returned by `Connection.executeStatement(...)`. */ - statement: SeaOperationStatement; + /** The pending napi `AsyncStatement` from `Connection.submitStatement(...)`. */ + asyncStatement?: SeaNativeAsyncStatement; + /** The terminal napi `Statement` from a metadata call. */ + statement?: SeaOperationStatement; context: IClientContext; /** - * Optional override for `id`. When not provided a fresh UUIDv4 is - * generated upstream (in `SeaSessionBackend.executeStatement`); the - * kernel does not yet surface its internal statement-id at the napi - * boundary. Once it does, the JS layer can thread it through here. + * Optional override for `id`. Defaults to the napi statement-id when the + * handle exposes one, else a fresh UUIDv4. */ id?: string; } export default class SeaOperationBackend implements IOperationBackend { - private readonly statement: SeaOperationStatement; + // Query path: pending async statement we poll to terminal. Undefined on the + // metadata path. + private readonly asyncStatement?: SeaNativeAsyncStatement; + + // Metadata path: terminal statement. Undefined on the query path. + private readonly blockingStatement?: SeaOperationStatement; + + // The cancel/close surface — whichever handle backs this operation. Both + // `AsyncStatement` and `Statement` expose `cancel()` / `close()`. + private readonly lifecycleHandle: SeaStatementHandle; private readonly context: IClientContext; @@ -103,10 +149,22 @@ export default class SeaOperationBackend implements IOperationBackend { private metadataPromise?: Promise; - constructor({ statement, context, id }: SeaOperationBackendOptions) { - this.statement = statement; + // Memoised fetch handle: on the async path it is `awaitResult()`'s result + // (resolved once the statement is terminal); on the metadata path it is the + // already-terminal statement. Drives both fetch and result-metadata. + private fetchHandlePromise?: Promise; + + constructor({ asyncStatement, statement, context, id }: SeaOperationBackendOptions) { + if ((asyncStatement === undefined) === (statement === undefined)) { + throw new HiveDriverError( + 'SeaOperationBackend: exactly one of `asyncStatement` or `statement` must be provided', + ); + } + this.asyncStatement = asyncStatement; + this.blockingStatement = statement; + this.lifecycleHandle = (asyncStatement ?? statement) as SeaStatementHandle; this.context = context; - this._id = id ?? uuidv4(); + this._id = id ?? asyncStatement?.statementId ?? statement?.statementId ?? uuidv4(); } public get id(): string { @@ -162,7 +220,7 @@ export default class SeaOperationBackend implements IOperationBackend { // wedged, so nothing downstream forces another close). We still don't // mask the original fetch error, but log the close failure at warn so // the leak is diagnosable rather than completely invisible. - await seaClose(this.lifecycle, this.statement, this.context, this._id).catch((closeErr) => { + await seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id).catch((closeErr) => { const cause = closeErr instanceof Error ? closeErr.message : String(closeErr); this.context .getLogger() @@ -191,12 +249,16 @@ export default class SeaOperationBackend implements IOperationBackend { return this.metadataPromise; } this.metadataPromise = (async () => { - if (!this.statement.schema) { - throw new HiveDriverError('SeaOperationBackend: statement.schema() is not available on this handle'); + // The schema lives on the fetch handle: the metadata `Statement` + // directly, or the async path's `AsyncResultHandle` (materialised by + // `getFetchHandle()` once the statement is terminal). + const handle = await this.getFetchHandle(); + if (!handle.schema) { + throw new HiveDriverError('SeaOperationBackend: schema() is not available on this handle'); } // `schema()` is a synchronous napi getter (returns `ArrowSchema`, not a // Promise) — no `await` needed. - const arrowSchemaIpc = this.statement.schema(); + const arrowSchemaIpc = handle.schema(); const arrowSchema = decodeIpcSchema(arrowSchemaIpc.ipcBytes); // `ResultMetadata.schema` keeps the Thrift `TTableSchema` shape for // back-compat with the public `IOperation.getSchema()` surface. @@ -229,60 +291,146 @@ export default class SeaOperationBackend implements IOperationBackend { // --------------------------------------------------------------------------- public async status(_progress: boolean): Promise { - // Synthesised — the kernel resolves `Statement::execute().await` before - // it hands back a Statement handle, so by the time a SeaOperationBackend - // exists the statement is terminal. Note there is intentionally no - // `Failed` arm: a failed execution rejects inside `executeStatement` - // (the kernel surfaces the error at submit), so a `Failed` statement - // never becomes a SeaOperationBackend — `status()` only ever observes - // Succeeded, or Cancelled/Closed from a client-side lifecycle call. - // Report Cancelled/Closed if the lifecycle flag is set, else Succeeded. - // Returns the backend-neutral OperationStatus the IOperationBackend - // contract expects, so the DBSQLOperation facade switches on `state` - // identically across backends. + // A client-side cancel/close wins over any server state. if (this.lifecycle.isCancelled) { return { state: OperationState.Cancelled, hasResultSet: true }; } if (this.lifecycle.isClosed) { return { state: OperationState.Closed, hasResultSet: true }; } + if (this.asyncStatement) { + // Query path: report the real kernel state (single GetStatementStatus + // RPC — no polling here; `waitUntilReady` owns the poll loop). + const state = statusStringToOperationState(await this.asyncStatement.status()); + return { state, hasResultSet: true }; + } + // Metadata path: the kernel statement is already terminal. return { state: OperationState.Succeeded, hasResultSet: true }; } public async waitUntilReady(options?: IOperationBackendWaitOptions): Promise { - // Kernel's `Statement::execute().await` has already resolved by the - // time we hold a Statement handle — there is no pending/running - // state to poll for M0. seaFinished fires the progress callback - // once with a synthesised FINISHED response so progress-UI callers - // see the same one-shot completion tick the Thrift path emits at - // the end of its polling loop. + if (this.asyncStatement) { + return this.waitUntilReadyAsync(options); + } + // Metadata path: the kernel statement has already resolved, so there is + // nothing to poll. seaFinished fires the progress callback once with a + // synthesised completion tick, matching the Thrift path's final tick. return seaFinished(this.lifecycle, options); } public async cancel(): Promise { - return seaCancel(this.lifecycle, this.statement, this.context, this._id); + return seaCancel(this.lifecycle, this.lifecycleHandle, this.context, this._id); } public async close(): Promise { - return seaClose(this.lifecycle, this.statement, this.context, this._id); + return seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id); } // --------------------------------------------------------------------------- // Internals. // --------------------------------------------------------------------------- + /** + * Poll the kernel `AsyncStatement` to a terminal state, mirroring the Thrift + * backend's `waitUntilReady` loop (100ms cadence). Polling `status()` rather + * than awaiting `awaitResult()` directly is deliberate: a blocking + * `awaitResult()` holds the kernel statement mutex for the whole query and + * would queue a concurrent `cancel()` behind it, whereas the poll loop + * releases the mutex between ticks so `cancel()` stays responsive. On + * success it materialises the result handle (so the first fetch is free); + * on a bad terminal state it surfaces the real kernel error. + */ + private async waitUntilReadyAsync(options?: IOperationBackendWaitOptions): Promise { + // Already materialised → terminal-and-ready, nothing to wait for. + if (this.fetchHandlePromise) { + return; + } + for (;;) { + // A JS-initiated cancel/close short-circuits before the next poll. + failIfNotActive(this.lifecycle); + + // eslint-disable-next-line no-await-in-loop + const state = statusStringToOperationState(await this.asyncStatement!.status()); + + if (options?.callback) { + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(options.callback({ state, hasResultSet: true })); + } + + switch (state) { + case OperationState.Pending: + case OperationState.Running: + break; + case OperationState.Succeeded: + // Materialise the result stream now so the first fetch/metadata call + // doesn't pay an extra await_result round-trip. + // eslint-disable-next-line no-await-in-loop + await this.getFetchHandle(); + return; + case OperationState.Failed: + // `status()` collapses Failed to the variant name only; the real + // SQL-error envelope (sql_state / error_code / query_id) rides on + // `awaitResult()`'s rejection — drive it to surface the typed error. + // eslint-disable-next-line no-await-in-loop + await this.throwAsyncError(); + break; + case OperationState.Cancelled: + throw new HiveDriverError(`SEA operation ${this._id} was cancelled server-side.`); + case OperationState.Closed: + throw new HiveDriverError(`SEA operation ${this._id} was closed before it produced a result.`); + default: + throw new HiveDriverError(`SEA operation ${this._id} reached an unexpected state: ${state}.`); + } + + // eslint-disable-next-line no-await-in-loop + await delay(STATUS_POLL_INTERVAL_MS); + } + } + + /** + * Drive `awaitResult()` on a Failed statement to surface the kernel's typed + * SQL-error envelope. Falls back to a generic error if `awaitResult()` + * unexpectedly resolves instead of rejecting. + */ + private async throwAsyncError(): Promise { + try { + await this.asyncStatement!.awaitResult(); + } catch (err) { + throw decodeNapiKernelError(err); + } + throw new HiveDriverError(`SEA operation ${this._id} reported Failed but produced a result.`); + } + + /** + * Resolve (and memoise) the fetch handle: `awaitResult()`'s `AsyncResultHandle` + * on the query path, or the already-terminal `Statement` on the metadata path. + */ + private getFetchHandle(): Promise { + if (!this.fetchHandlePromise) { + if (this.asyncStatement) { + this.fetchHandlePromise = this.asyncStatement.awaitResult().catch((err) => { + throw decodeNapiKernelError(err); + }) as Promise; + } else { + const stmt = this.blockingStatement!; + if (!stmt.fetchNextBatch) { + throw new HiveDriverError('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle'); + } + this.fetchHandlePromise = Promise.resolve(stmt as unknown as SeaFetchHandle); + } + } + return this.fetchHandlePromise; + } + private async getResultSlicer(): Promise> { if (this.resultSlicer) { return this.resultSlicer; } - if (!this.statement.fetchNextBatch) { - throw new HiveDriverError('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle'); - } const metadata = await this.getResultMetadata(); - // The lifecycle subset has cancel/close only; fetch methods exist on - // the full napi Statement. Cast is safe here because we've just - // verified `fetchNextBatch` is callable. - this.resultsProvider = new SeaResultsProvider(this.statement as SeaStatement); + const handle = await this.getFetchHandle(); + // SeaResultsProvider consumes only `fetchNextBatch`; both the async result + // handle and the blocking statement satisfy that surface. + this.resultsProvider = new SeaResultsProvider(handle as unknown as SeaStatement); const converter = new ArrowResultConverter(this.context, this.resultsProvider, metadata); this.resultSlicer = new ResultSlicer(this.context, converter); return this.resultSlicer; diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index 55c08dc7..17f74821 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -154,16 +154,22 @@ export default class SeaSessionBackend implements ISessionBackend { const execOptions = this.buildExecuteOptions(options); - let nativeStatement: SeaStatement; + // Submit asynchronously (kernel `wait_timeout=0s`): the server returns a + // pending `AsyncStatement` immediately while the query runs, matching the + // Thrift backend's always-async (`runAsync: true`) path. The operation + // backend polls `status()` to terminal in `waitUntilReady()` and + // materialises results via `awaitResult()`, so a long-running query stays + // cancellable mid-flight and `status()` reports real Pending/Running states. + let asyncStatement; try { - nativeStatement = + asyncStatement = execOptions === undefined - ? await this.connection.executeStatement(statement) - : await this.connection.executeStatement(statement, execOptions); + ? await this.connection.submitStatement(statement) + : await this.connection.submitStatement(statement, execOptions); } catch (err) { throw this.logAndMapError('executeStatement', err); } - return this.wrapStatement(nativeStatement); + return new SeaOperationBackend({ asyncStatement: asyncStatement!, context: this.context }); } /** @@ -217,7 +223,7 @@ export default class SeaSessionBackend implements ISessionBackend { return Object.keys(execOptions).length > 0 ? execOptions : undefined; } - /** Wrap a napi `Statement` (from execute or a metadata call) as an operation backend. */ + /** Wrap a napi metadata `Statement` (already terminal) as an operation backend. */ private wrapStatement(nativeStatement: SeaStatement): IOperationBackend { return new SeaOperationBackend({ statement: nativeStatement, diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index 6fd7308b..716d213b 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -23,6 +23,7 @@ import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; import HiveDriverError from '../../../lib/errors/HiveDriverError'; import ParameterError from '../../../lib/errors/ParameterError'; import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; +import { OperationState } from '../../../lib/contracts/OperationStatus'; // ----------------------------------------------------------------------------- // Fakes — minimal stand-ins for the napi-rs generated surface and the @@ -73,6 +74,53 @@ class FakeNativeStatement implements SeaStatement { } } +/** + * Fake `AsyncStatement` (the `submitStatement` return). `status()` reports a + * configurable state (default Succeeded); `awaitResult()` yields a fetch handle + * (reuses `FakeNativeStatement`'s fetchNextBatch/schema surface). + */ +class FakeAsyncStatement { + public cancelled = false; + + public closed = false; + + public statusCalls = 0; + + public awaitResultError: Error | null = null; + + // Successive status() returns drain this queue; the last value sticks. + private readonly states: string[]; + + public readonly statementId = '01ef-fake-async-id'; + + constructor( + statusValue: string | string[] = 'Succeeded', + public readonly resultHandle: FakeNativeStatement = new FakeNativeStatement(), + ) { + this.states = Array.isArray(statusValue) ? [...statusValue] : [statusValue]; + } + + public async status(): Promise { + this.statusCalls += 1; + return this.states.length > 1 ? (this.states.shift() as string) : this.states[0]; + } + + public async awaitResult(): Promise { + if (this.awaitResultError) { + throw this.awaitResultError; + } + return this.resultHandle; + } + + public async cancel(): Promise { + this.cancelled = true; + } + + public async close(): Promise { + this.closed = true; + } +} + class FakeNativeConnection implements SeaConnection { public closed = false; @@ -93,8 +141,14 @@ class FakeNativeConnection implements SeaConnection { // Mirrors the kernel `Connection.sessionId` getter. public readonly sessionId = '01ef-fake-session-id'; - // The merged kernel binding takes an optional per-statement `ExecuteOptions` - // (positional/named params, statementConf, …). Record it for assertions. + // Last AsyncStatement handed out by submitStatement (the query path). + public lastAsyncStatement?: FakeAsyncStatement; + + // The async submit state(s) the next FakeAsyncStatement should report. + public submitStatusValue: string | string[] = 'Succeeded'; + + // The blocking executeStatement path is no longer used by the SEA backend + // (queries go through submitStatement), but the binding still exposes it. public async executeStatement(sql: string, options?: unknown): Promise { if (this.throwOnExecute) { throw this.throwOnExecute; @@ -104,10 +158,16 @@ class FakeNativeConnection implements SeaConnection { return this.statementToReturn; } - // Async-submit path (PR 2 territory); present only so the fake satisfies - // the full `Connection` surface. Not exercised by these tests. - public submitStatement(): Promise { - throw new Error('submitStatement not used in this test'); + // Async-submit path: records sql + per-statement options (for forwarding + // assertions) and returns a pending AsyncStatement. + public async submitStatement(sql: string, options?: unknown): Promise { + if (this.throwOnExecute) { + throw this.throwOnExecute; + } + this.lastSql = sql; + this.lastOptions = options; + this.lastAsyncStatement = new FakeAsyncStatement(this.submitStatusValue); + return this.lastAsyncStatement; } private recordMetadata(method: string, args: unknown[]): Promise { @@ -672,3 +732,102 @@ describe('SeaOperationBackend', () => { // tests/unit/sea/SeaOperationBackend.test.ts and the parity-gate e2e // at tests/e2e/sea/results-e2e.test.ts. }); + +describe('SeaOperationBackend — async (submitStatement) path', () => { + const makeAsyncOp = (asyncStatement: FakeAsyncStatement) => + // eslint-disable-next-line @typescript-eslint/no-explicit-any + new SeaOperationBackend({ asyncStatement: asyncStatement as any, context: makeContext() }); + + it('rejects when neither asyncStatement nor statement is provided', () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect(() => new SeaOperationBackend({ context: makeContext() } as any)).to.throw( + HiveDriverError, + /exactly one/, + ); + }); + + it('rejects when BOTH asyncStatement and statement are provided', () => { + expect( + () => + new SeaOperationBackend({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + asyncStatement: new FakeAsyncStatement() as any, + statement: new FakeNativeStatement(), + context: makeContext(), + }), + ).to.throw(HiveDriverError, /exactly one/); + }); + + it('id defaults to the async statement id', () => { + const op = makeAsyncOp(new FakeAsyncStatement()); + expect(op.id).to.equal('01ef-fake-async-id'); + }); + + it('status() reports the real kernel state', async () => { + const running = makeAsyncOp(new FakeAsyncStatement('Running')); + expect((await running.status(false)).state).to.equal(OperationState.Running); + const ok = makeAsyncOp(new FakeAsyncStatement('Succeeded')); + expect((await ok.status(false)).state).to.equal(OperationState.Succeeded); + }); + + it('waitUntilReady() polls status() until terminal, firing the progress callback each tick', async () => { + const stmt = new FakeAsyncStatement(['Pending', 'Running', 'Succeeded']); + const op = makeAsyncOp(stmt); + const states: OperationState[] = []; + await op.waitUntilReady({ callback: (s) => states.push(s.state) }); + expect(stmt.statusCalls).to.equal(3); + expect(states).to.deep.equal([OperationState.Pending, OperationState.Running, OperationState.Succeeded]); + }); + + it('waitUntilReady() surfaces the kernel error envelope on a Failed statement', async () => { + const stmt = new FakeAsyncStatement('Failed'); + // The kernel rejects awaitResult() with a sentinel-framed structured error; + // decodeNapiKernelError turns it into a typed HiveDriverError. + stmt.awaitResultError = new Error( + `__databricks_error__:${JSON.stringify({ code: 'SqlError', message: 'TABLE_OR_VIEW_NOT_FOUND' })}`, + ); + const op = makeAsyncOp(stmt); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/TABLE_OR_VIEW_NOT_FOUND/); + }); + + it('waitUntilReady() throws on a server-side Cancelled statement', async () => { + const op = makeAsyncOp(new FakeAsyncStatement('Cancelled')); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/cancelled/i); + }); + + it('cancel() forwards to the async statement and short-circuits a subsequent poll', async () => { + const stmt = new FakeAsyncStatement(['Running', 'Running', 'Succeeded']); + const op = makeAsyncOp(stmt); + await op.cancel(); + expect(stmt.cancelled).to.equal(true); + // A JS-side cancel makes waitUntilReady fail fast without further polling. + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.an('error'); + }); + + it('close() forwards to the async statement', async () => { + const stmt = new FakeAsyncStatement(); + const op = makeAsyncOp(stmt); + await op.close(); + expect(stmt.closed).to.equal(true); + }); +}); From de97ac993f727b401ab85dce36a81134beaa434a Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Wed, 3 Jun 2026 06:44:38 +0000 Subject: [PATCH 3/6] [SEA-NodeJS] prettier-format SEA connection/options + async files (CI code-style) The CI "Check code style" step runs `prettier . --check` (whole repo); these files were committed without prettier formatting. Formatting-only. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/sea/SeaAuth.ts | 4 +--- lib/sea/SeaOperationBackend.ts | 4 +--- tests/unit/sea/connectionOptions.test.ts | 7 ++----- tests/unit/sea/execution.test.ts | 5 +---- 4 files changed, 5 insertions(+), 15 deletions(-) diff --git a/lib/sea/SeaAuth.ts b/lib/sea/SeaAuth.ts index 00fe2b5f..aaadf3b7 100644 --- a/lib/sea/SeaAuth.ts +++ b/lib/sea/SeaAuth.ts @@ -296,9 +296,7 @@ export function buildSeaConnectionOptions(options: ConnectionOptions): SeaNative const { maxConnections } = options as { maxConnections?: number }; if (maxConnections !== undefined) { if (!Number.isInteger(maxConnections) || maxConnections < 1) { - throw new HiveDriverError( - `SEA backend: \`maxConnections\` must be a positive integer; got ${maxConnections}.`, - ); + throw new HiveDriverError(`SEA backend: \`maxConnections\` must be a positive integer; got ${maxConnections}.`); } if (maxConnections > MAX_U32) { throw new HiveDriverError( diff --git a/lib/sea/SeaOperationBackend.ts b/lib/sea/SeaOperationBackend.ts index ad187d3d..1f5899e6 100644 --- a/lib/sea/SeaOperationBackend.ts +++ b/lib/sea/SeaOperationBackend.ts @@ -156,9 +156,7 @@ export default class SeaOperationBackend implements IOperationBackend { constructor({ asyncStatement, statement, context, id }: SeaOperationBackendOptions) { if ((asyncStatement === undefined) === (statement === undefined)) { - throw new HiveDriverError( - 'SeaOperationBackend: exactly one of `asyncStatement` or `statement` must be provided', - ); + throw new HiveDriverError('SeaOperationBackend: exactly one of `asyncStatement` or `statement` must be provided'); } this.asyncStatement = asyncStatement; this.blockingStatement = statement; diff --git a/tests/unit/sea/connectionOptions.test.ts b/tests/unit/sea/connectionOptions.test.ts index 8b065697..96e3f87a 100644 --- a/tests/unit/sea/connectionOptions.test.ts +++ b/tests/unit/sea/connectionOptions.test.ts @@ -21,7 +21,7 @@ const PAT = { host: 'h.databricks.com', path: '/sql/1.0/warehouses/abc', token: // Cast helper: the SEA connection-tuning/TLS options live on the internal // surface, so tests build untyped option literals. -const opts = (extra: Record) => ({ ...PAT, ...extra }) as unknown as ConnectionOptions; +const opts = (extra: Record) => ({ ...PAT, ...extra } as unknown as ConnectionOptions); describe('SeaAuth connection options — intervalsAsString default', () => { it('always sets intervalsAsString:true (thrift-compatible interval rendering)', () => { @@ -90,10 +90,7 @@ describe('SeaAuth TLS options (buildSeaTlsOptions)', () => { }); it('rejects a non-PEM string', () => { - expect(() => buildSeaTlsOptions(opts({ customCaCert: 'not-a-pem' }))).to.throw( - HiveDriverError, - /PEM certificate/, - ); + expect(() => buildSeaTlsOptions(opts({ customCaCert: 'not-a-pem' }))).to.throw(HiveDriverError, /PEM certificate/); }); it('rejects an empty Buffer', () => { diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index 716d213b..10301889 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -740,10 +740,7 @@ describe('SeaOperationBackend — async (submitStatement) path', () => { it('rejects when neither asyncStatement nor statement is provided', () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any - expect(() => new SeaOperationBackend({ context: makeContext() } as any)).to.throw( - HiveDriverError, - /exactly one/, - ); + expect(() => new SeaOperationBackend({ context: makeContext() } as any)).to.throw(HiveDriverError, /exactly one/); }); it('rejects when BOTH asyncStatement and statement are provided', () => { From 147006fd05680ccacab241af26c117e12fd8d604 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Wed, 3 Jun 2026 07:06:42 +0000 Subject: [PATCH 4/6] [SEA-NodeJS] Address code-review findings on async/TLS/options (#413) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code-review #413 (81/100). Validated each against the code + a live warehouse: - F1 (HIGH): the async poll loop threw plain HiveDriverError for server-driven Cancelled/Closed/Unknown. The DBSQLOperation facade only mirrors its cancelled/closed flags when `err instanceof OperationStateError` (and OperationStateError extends HiveDriverError, not the reverse), so a server-side cancel/close/admin-kill left the facade desynced. Now throws OperationStateError(Canceled/Closed/Unknown) — matching the Thrift backend. The Failed branch still surfaces the kernel SQL-error envelope via awaitResult. - F2 (MED): the server-Cancelled test asserted only instanceOf(HiveDriverError), which passes for both the correct and incorrect type — it couldn't catch F1. Now asserts instanceOf(OperationStateError) + errorCode, plus a new Closed test. - F3 (MED): queryTimeout was forwarded to submitStatement but the kernel ignores queryTimeoutSecs on submit (always wait_timeout=0s), so the documented public option was a silent no-op, and the poll loop had no client-side deadline (a stalled Running statement polled forever). Now enforced client-side: the poll loop tracks a deadline, best-effort cancels the statement on expiry, and throws OperationStateError(Timeout) — matching Thrift's server TIMEDOUT outcome. Stopped forwarding the ignored queryTimeoutSecs to the napi options. Validated live: a 2s timeout interrupts a slow cross-join with TIMEOUT. - F4 (LOW): customCaCert PEM string check now requires the END marker too (a truncated/headerless cert no longer passes), consistent with the Buffer path. - F5 (LOW): SeaAuth reads the SEA-only fields (checkServerCertificate / customCaCert / maxConnections) through `InternalConnectionOptions` instead of ad-hoc inline casts, so a typo'd key fails to compile. - F6 (LOW): corrected the poll-loop comment — the prior text justified polling by an incorrect "blocking awaitResult holds the mutex and queues cancel" claim; cancel() is documented lock-free. The real rationale (real-time status to the progress callback + cancel observed between ticks) is now stated. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/sea/SeaAuth.ts | 22 ++++++++---- lib/sea/SeaOperationBackend.ts | 59 +++++++++++++++++++++++++------- lib/sea/SeaSessionBackend.ts | 18 ++++++---- tests/unit/sea/execution.test.ts | 53 ++++++++++++++++++++++++---- 4 files changed, 120 insertions(+), 32 deletions(-) diff --git a/lib/sea/SeaAuth.ts b/lib/sea/SeaAuth.ts index aaadf3b7..3dd62cef 100644 --- a/lib/sea/SeaAuth.ts +++ b/lib/sea/SeaAuth.ts @@ -13,6 +13,7 @@ // limitations under the License. import { ConnectionOptions } from '../contracts/IDBSQLClient'; +import { InternalConnectionOptions } from '../contracts/InternalConnectionOptions'; import AuthenticationError from '../errors/AuthenticationError'; import HiveDriverError from '../errors/HiveDriverError'; @@ -181,10 +182,10 @@ const MAX_U32 = 0xffffffff; * (for strings) lacks a PEM certificate header. */ export function buildSeaTlsOptions(options: ConnectionOptions): SeaTlsOptions { - const { checkServerCertificate, customCaCert } = options as { - checkServerCertificate?: boolean; - customCaCert?: Buffer | string; - }; + // Read the SEA-only fields through the purpose-built internal options type + // rather than an ad-hoc inline cast, so the shape can't silently drift from + // its declaration and a typo'd key fails to compile. + const { checkServerCertificate, customCaCert } = options as ConnectionOptions & InternalConnectionOptions; const tls: SeaTlsOptions = {}; @@ -194,10 +195,17 @@ export function buildSeaTlsOptions(options: ConnectionOptions): SeaTlsOptions { if (customCaCert !== undefined) { if (typeof customCaCert === 'string') { - if (!customCaCert.includes('-----BEGIN CERTIFICATE-----')) { + // Light PEM sanity check — require both the BEGIN and END markers so a + // truncated/headerless cert is rejected here rather than surfacing as an + // opaque kernel TLS error. Full parsing is deferred to the kernel. + if ( + !customCaCert.includes('-----BEGIN CERTIFICATE-----') || + !customCaCert.includes('-----END CERTIFICATE-----') + ) { throw new HiveDriverError( 'SEA backend: `customCaCert` string does not look like a PEM certificate ' + - "(missing '-----BEGIN CERTIFICATE-----'). Pass PEM text or a Buffer of PEM bytes.", + "(missing the '-----BEGIN CERTIFICATE-----' / '-----END CERTIFICATE-----' markers). " + + 'Pass PEM text or a Buffer of PEM bytes.', ); } tls.customCaCert = Buffer.from(customCaCert, 'utf8'); @@ -293,7 +301,7 @@ export function buildSeaConnectionOptions(options: ConnectionOptions): SeaNative // SEA-only pool sizing; read via cast to match how this function reads the // other SEA-specific options (TLS) — they live on the internal options // surface, not the published public `ConnectionOptions` `.d.ts`. - const { maxConnections } = options as { maxConnections?: number }; + const { maxConnections } = options as ConnectionOptions & InternalConnectionOptions; if (maxConnections !== undefined) { if (!Number.isInteger(maxConnections) || maxConnections < 1) { throw new HiveDriverError(`SEA backend: \`maxConnections\` must be a positive integer; got ${maxConnections}.`); diff --git a/lib/sea/SeaOperationBackend.ts b/lib/sea/SeaOperationBackend.ts index 1f5899e6..0831d9ea 100644 --- a/lib/sea/SeaOperationBackend.ts +++ b/lib/sea/SeaOperationBackend.ts @@ -45,6 +45,7 @@ import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; import Status from '../dto/Status'; import HiveDriverError from '../errors/HiveDriverError'; +import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; import ArrowResultConverter from '../result/ArrowResultConverter'; import ResultSlicer from '../result/ResultSlicer'; import SeaResultsProvider from './SeaResultsProvider'; @@ -121,6 +122,15 @@ export interface SeaOperationBackendOptions { * handle exposes one, else a fresh UUIDv4. */ id?: string; + /** + * Client-side query timeout in whole seconds (the public `queryTimeout`). + * The kernel ignores `queryTimeoutSecs` on the async submit path + * (`submitStatement` always sends `wait_timeout=0s`), so the JS poll loop + * enforces it as a deadline — on expiry it best-effort cancels the statement + * and throws `OperationStateError(Timeout)`, matching the Thrift path's + * server-side TIMEDOUT outcome. Omitted ⇒ no client-side deadline. + */ + queryTimeoutSecs?: number; } export default class SeaOperationBackend implements IOperationBackend { @@ -154,7 +164,11 @@ export default class SeaOperationBackend implements IOperationBackend { // already-terminal statement. Drives both fetch and result-metadata. private fetchHandlePromise?: Promise; - constructor({ asyncStatement, statement, context, id }: SeaOperationBackendOptions) { + // Client-side query-timeout deadline in ms (the public `queryTimeout`), + // undefined when unset. Enforced in the async poll loop. + private readonly queryTimeoutMs?: number; + + constructor({ asyncStatement, statement, context, id, queryTimeoutSecs }: SeaOperationBackendOptions) { if ((asyncStatement === undefined) === (statement === undefined)) { throw new HiveDriverError('SeaOperationBackend: exactly one of `asyncStatement` or `statement` must be provided'); } @@ -163,6 +177,7 @@ export default class SeaOperationBackend implements IOperationBackend { this.lifecycleHandle = (asyncStatement ?? statement) as SeaStatementHandle; this.context = context; this._id = id ?? asyncStatement?.statementId ?? statement?.statementId ?? uuidv4(); + this.queryTimeoutMs = queryTimeoutSecs !== undefined && queryTimeoutSecs > 0 ? queryTimeoutSecs * 1000 : undefined; } public get id(): string { @@ -329,20 +344,31 @@ export default class SeaOperationBackend implements IOperationBackend { // --------------------------------------------------------------------------- /** - * Poll the kernel `AsyncStatement` to a terminal state, mirroring the Thrift - * backend's `waitUntilReady` loop (100ms cadence). Polling `status()` rather - * than awaiting `awaitResult()` directly is deliberate: a blocking - * `awaitResult()` holds the kernel statement mutex for the whole query and - * would queue a concurrent `cancel()` behind it, whereas the poll loop - * releases the mutex between ticks so `cancel()` stays responsive. On - * success it materialises the result handle (so the first fetch is free); - * on a bad terminal state it surfaces the real kernel error. + * Poll the kernel `AsyncStatement` to a terminal state on a fixed 100ms + * cadence, mirroring the Thrift backend's `waitUntilReady` loop. We poll + * `status()` (a cheap GetStatementStatus RPC) rather than awaiting + * `awaitResult()` directly so that `status()` reports the real + * Pending/Running/Succeeded state to a progress callback each tick, and so a + * JS-initiated `cancel()`/`close()` is observed between ticks via + * `failIfNotActive`. On success it materialises the result handle (so the + * first fetch is free); on a server-driven terminal state it throws the typed + * error the `IOperationBackend` contract requires. + * + * Terminal errors are thrown as `OperationStateError` (NOT plain + * `HiveDriverError`) for Cancelled/Closed/Unknown, because the DBSQLOperation + * facade only mirrors its `cancelled`/`closed` flags when + * `err instanceof OperationStateError` — exactly as the Thrift backend does. + * The Failed branch surfaces the kernel's typed SQL-error envelope via + * `awaitResult()`. */ private async waitUntilReadyAsync(options?: IOperationBackendWaitOptions): Promise { // Already materialised → terminal-and-ready, nothing to wait for. if (this.fetchHandlePromise) { return; } + // Client-side timeout deadline: the kernel ignores queryTimeoutSecs on the + // async submit path, so we enforce the public `queryTimeout` here. + const deadline = this.queryTimeoutMs !== undefined ? Date.now() + this.queryTimeoutMs : undefined; for (;;) { // A JS-initiated cancel/close short-circuits before the next poll. failIfNotActive(this.lifecycle); @@ -373,11 +399,20 @@ export default class SeaOperationBackend implements IOperationBackend { await this.throwAsyncError(); break; case OperationState.Cancelled: - throw new HiveDriverError(`SEA operation ${this._id} was cancelled server-side.`); + throw new OperationStateError(OperationStateErrorCode.Canceled); case OperationState.Closed: - throw new HiveDriverError(`SEA operation ${this._id} was closed before it produced a result.`); + throw new OperationStateError(OperationStateErrorCode.Closed); default: - throw new HiveDriverError(`SEA operation ${this._id} reached an unexpected state: ${state}.`); + throw new OperationStateError(OperationStateErrorCode.Unknown); + } + + // Still Pending/Running — enforce the client-side timeout before sleeping. + if (deadline !== undefined && Date.now() >= deadline) { + // Best-effort server-side cancel so the statement doesn't keep running + // after we stop waiting; never mask the timeout with a cancel failure. + // eslint-disable-next-line no-await-in-loop + await this.cancel().catch(() => undefined); + throw new OperationStateError(OperationStateErrorCode.Timeout); } // eslint-disable-next-line no-await-in-loop diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index 17f74821..ad95ec11 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -169,7 +169,15 @@ export default class SeaSessionBackend implements ISessionBackend { } catch (err) { throw this.logAndMapError('executeStatement', err); } - return new SeaOperationBackend({ asyncStatement: asyncStatement!, context: this.context }); + // `queryTimeout` is enforced client-side by the operation backend's poll + // loop: the kernel ignores `queryTimeoutSecs` on the async submit path + // (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward + // it to the napi options — passing it there would be a silent no-op. + return new SeaOperationBackend({ + asyncStatement: asyncStatement!, + context: this.context, + queryTimeoutSecs: options.queryTimeout !== undefined ? Number(options.queryTimeout) : undefined, + }); } /** @@ -195,11 +203,9 @@ export default class SeaSessionBackend implements ISessionBackend { if (namedParams !== undefined) { execOptions.namedParams = namedParams; } - // JDBC `setQueryTimeout` is whole seconds; the kernel's `queryTimeoutSecs` - // (SEA wait timeout) is the native equivalent. The SEA wire caps it at 50s. - if (options.queryTimeout !== undefined) { - execOptions.queryTimeoutSecs = Number(options.queryTimeout); - } + // `queryTimeout` is intentionally NOT forwarded here — the kernel ignores + // `queryTimeoutSecs` on `submitStatement`, so it is enforced client-side by + // the operation backend's poll-loop deadline instead (see executeStatement). if (options.rowLimit !== undefined) { execOptions.rowLimit = Number(options.rowLimit); } diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index 10301889..5b34e419 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -22,6 +22,7 @@ import IClientContext, { ClientConfig } from '../../../lib/contracts/IClientCont import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; import HiveDriverError from '../../../lib/errors/HiveDriverError'; import ParameterError from '../../../lib/errors/ParameterError'; +import OperationStateError, { OperationStateErrorCode } from '../../../lib/errors/OperationStateError'; import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; import { OperationState } from '../../../lib/contracts/OperationStatus'; @@ -515,11 +516,14 @@ describe('SeaSessionBackend', () => { expect((thrown as Error).message).to.equal('Driver does not support both ordinal and named parameters.'); }); - it('executeStatement forwards queryTimeout as queryTimeoutSecs', async () => { + it('executeStatement does NOT forward queryTimeout to submit (kernel ignores it; enforced client-side)', async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); await session.executeStatement('SELECT 1', { queryTimeout: 30 }); - expect((connection.lastOptions as { queryTimeoutSecs?: number }).queryTimeoutSecs).to.equal(30); + // queryTimeout alone must not produce napi submit options — the kernel + // ignores queryTimeoutSecs on submitStatement, so it's enforced client-side + // by the operation backend's poll deadline instead (covered below). + expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined); }); it('executeStatement forwards rowLimit', async () => { @@ -734,9 +738,9 @@ describe('SeaOperationBackend', () => { }); describe('SeaOperationBackend — async (submitStatement) path', () => { - const makeAsyncOp = (asyncStatement: FakeAsyncStatement) => + const makeAsyncOp = (asyncStatement: FakeAsyncStatement, queryTimeoutSecs?: number) => // eslint-disable-next-line @typescript-eslint/no-explicit-any - new SeaOperationBackend({ asyncStatement: asyncStatement as any, context: makeContext() }); + new SeaOperationBackend({ asyncStatement: asyncStatement as any, context: makeContext(), queryTimeoutSecs }); it('rejects when neither asyncStatement nor statement is provided', () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -794,7 +798,13 @@ describe('SeaOperationBackend — async (submitStatement) path', () => { expect((thrown as Error).message).to.match(/TABLE_OR_VIEW_NOT_FOUND/); }); - it('waitUntilReady() throws on a server-side Cancelled statement', async () => { + // A server-driven terminal state MUST throw OperationStateError (not a plain + // HiveDriverError) so the DBSQLOperation facade — which only mirrors its + // cancelled/closed flags when `err instanceof OperationStateError` — stays in + // sync. Asserting the subclass + errorCode is what catches a regression to + // the bare HiveDriverError (which would pass an `instanceOf HiveDriverError` + // check since OperationStateError extends it). + it('waitUntilReady() throws OperationStateError(Canceled) on a server-side Cancelled statement', async () => { const op = makeAsyncOp(new FakeAsyncStatement('Cancelled')); let thrown: unknown; try { @@ -802,8 +812,37 @@ describe('SeaOperationBackend — async (submitStatement) path', () => { } catch (err) { thrown = err; } - expect(thrown).to.be.instanceOf(HiveDriverError); - expect((thrown as Error).message).to.match(/cancelled/i); + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Canceled); + }); + + it('waitUntilReady() throws OperationStateError(Closed) on a server-side Closed statement', async () => { + const op = makeAsyncOp(new FakeAsyncStatement('Closed')); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Closed); + }); + + it('waitUntilReady() enforces queryTimeout client-side: throws Timeout and cancels a stuck Running statement', async function timeoutTest() { + // eslint-disable-next-line no-invalid-this + this.timeout(5000); + const stmt = new FakeAsyncStatement('Running'); // never reaches a terminal state + const op = makeAsyncOp(stmt, 0.05); // 50ms client-side deadline + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Timeout); + // Best-effort server-side cancel fired so the statement doesn't keep running. + expect(stmt.cancelled).to.equal(true); }); it('cancel() forwards to the async statement and short-circuits a subsequent poll', async () => { From 38eea67bd6af79ea4dd735de0a87c2b83c8965a7 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Wed, 3 Jun 2026 08:30:30 +0000 Subject: [PATCH 5/6] [SEA-NodeJS] add TIMESTAMP_NTZ / TIMESTAMP_LTZ bound-param types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consolidates the last net-new bit of the superseded #408: two SEA-path DBSQLParameterType variants for binding timezone-explicit timestamps. The type name flows through the existing param codec (toSparkParameter → sqlType), which the kernel accepts — validated live (SELECT ? with TIMESTAMP_NTZ/LTZ returns the bound values). On the Thrift backend they degrade to a plain TIMESTAMP bind. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/DBSQLParameter.ts | 8 ++++++++ tests/unit/sea/positionalParams.test.ts | 12 ++++++++++++ 2 files changed, 20 insertions(+) diff --git a/lib/DBSQLParameter.ts b/lib/DBSQLParameter.ts index 5e7f0abc..786db676 100644 --- a/lib/DBSQLParameter.ts +++ b/lib/DBSQLParameter.ts @@ -8,6 +8,14 @@ export enum DBSQLParameterType { STRING = 'STRING', DATE = 'DATE', TIMESTAMP = 'TIMESTAMP', + // Timezone-explicit timestamp variants. A bare `Date` value defaults to + // `TIMESTAMP`; set one of these explicitly to bind a TIMESTAMP_NTZ (no + // timezone, wall-clock) or TIMESTAMP_LTZ (local timezone) parameter. These + // are SEA-path types the kernel param codec accepts; the Thrift wire only + // has `TIMESTAMP`, so on the Thrift backend they degrade to a plain + // TIMESTAMP bind. + TIMESTAMP_NTZ = 'TIMESTAMP_NTZ', + TIMESTAMP_LTZ = 'TIMESTAMP_LTZ', FLOAT = 'FLOAT', DECIMAL = 'DECIMAL', DOUBLE = 'DOUBLE', diff --git a/tests/unit/sea/positionalParams.test.ts b/tests/unit/sea/positionalParams.test.ts index ab0f065c..a3389fcc 100644 --- a/tests/unit/sea/positionalParams.test.ts +++ b/tests/unit/sea/positionalParams.test.ts @@ -90,6 +90,18 @@ describe('SeaPositionalParams.buildSeaPositionalParams', () => { { sqlType: 'TIMESTAMP', value: '2024-01-15 10:30:00' }, ]); }); + + it('honours explicit TIMESTAMP_NTZ / TIMESTAMP_LTZ types (kernel param codec)', () => { + expect( + buildSeaPositionalParams([ + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_NTZ, value: '2024-01-15 10:30:00' }), + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_LTZ, value: '2024-01-15 10:30:00' }), + ]), + ).to.deep.equal([ + { sqlType: 'TIMESTAMP_NTZ', value: '2024-01-15 10:30:00' }, + { sqlType: 'TIMESTAMP_LTZ', value: '2024-01-15 10:30:00' }, + ]); + }); }); describe('SeaPositionalParams.buildSeaNamedParams', () => { From 4584657636cada86e1768342acc71c9c57a20dc8 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Thu, 4 Jun 2026 06:43:00 +0000 Subject: [PATCH 6/6] [SEA-NodeJS] Address #413 review: TIMESTAMP_LTZ wire type + Int64 queryTimeout coercion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code-review #413 (gopalldb). Two P1s: - TIMESTAMP_LTZ was sent verbatim on the wire, but Spark has no distinct TIMESTAMP_LTZ type (TIMESTAMP already carries LTZ semantics) — so a Thrift caller got an opaque server bind error, and the enum comment falsely claimed NTZ/LTZ "degrade to a plain TIMESTAMP bind" (there was no such logic). `toSparkParameter` now maps TIMESTAMP_LTZ → `TIMESTAMP` (valid on both Thrift and kernel); TIMESTAMP_NTZ stays native (a real Spark type). Comment corrected. Added DBSQLParameter tests for both wire types (the Thrift behaviour the review flagged as untested) and updated the kernel positional-params test. - queryTimeout (`number | bigint | Int64`) was coerced with `Number(...)`, which yields NaN for an Int64 (node-int64 has no valueOf) → the client-side deadline was silently disabled for Int64 inputs. Now uses `numberToInt64(...).toNumber()`, matching the Thrift backend. Added a regression test that an `Int64(1)` queryTimeout actually fires the deadline (OperationStateError(Timeout)) rather than polling forever. (P1 "queryTimeout silently dropped on submit" and the unbounded-poll note were already resolved earlier by the client-side deadline fix; doc comment updated to match. P2 polarity/Date-NTZ items noted for the public-surface follow-up.) Validated live: NTZ binds natively and LTZ binds as TIMESTAMP on the kernel path. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/DBSQLParameter.ts | 29 +++++++++++++++---------- lib/sea/SeaSessionBackend.ts | 9 ++++++-- tests/unit/DBSQLParameter.test.ts | 23 ++++++++++++++++++++ tests/unit/sea/execution.test.ts | 21 ++++++++++++++++++ tests/unit/sea/positionalParams.test.ts | 4 ++-- 5 files changed, 71 insertions(+), 15 deletions(-) diff --git a/lib/DBSQLParameter.ts b/lib/DBSQLParameter.ts index 786db676..63c2465b 100644 --- a/lib/DBSQLParameter.ts +++ b/lib/DBSQLParameter.ts @@ -8,13 +8,14 @@ export enum DBSQLParameterType { STRING = 'STRING', DATE = 'DATE', TIMESTAMP = 'TIMESTAMP', - // Timezone-explicit timestamp variants. A bare `Date` value defaults to - // `TIMESTAMP`; set one of these explicitly to bind a TIMESTAMP_NTZ (no - // timezone, wall-clock) or TIMESTAMP_LTZ (local timezone) parameter. These - // are SEA-path types the kernel param codec accepts; the Thrift wire only - // has `TIMESTAMP`, so on the Thrift backend they degrade to a plain - // TIMESTAMP bind. + // `TIMESTAMP_NTZ` binds a timezone-free (wall-clock) timestamp. It is a real + // Spark type, bound natively on both the Thrift and kernel backends (requires + // a server that supports TIMESTAMP_NTZ; Spark 3.4+ / recent DBR). TIMESTAMP_NTZ = 'TIMESTAMP_NTZ', + // `TIMESTAMP_LTZ` is an alias for `TIMESTAMP`: Spark has no distinct + // TIMESTAMP_LTZ type — `TIMESTAMP` already carries local/instant (LTZ) + // semantics. `toSparkParameter` therefore binds it as `TIMESTAMP` on the wire + // (valid on both backends); it exists only as a self-documenting alias. TIMESTAMP_LTZ = 'TIMESTAMP_LTZ', FLOAT = 'FLOAT', DECIMAL = 'DECIMAL', @@ -58,10 +59,16 @@ export class DBSQLParameter { return new TSparkParameter({ name }); // for NULL neither `type` nor `value` should be set } + // Map timezone-explicit timestamp aliases to their Spark wire type. Spark + // has no distinct TIMESTAMP_LTZ type (TIMESTAMP carries LTZ semantics), so + // bind it as TIMESTAMP — valid on both the Thrift and kernel backends. + // TIMESTAMP_NTZ is a real Spark type and is bound natively. + const wireType = this.type === DBSQLParameterType.TIMESTAMP_LTZ ? DBSQLParameterType.TIMESTAMP : this.type; + if (typeof this.value === 'boolean') { return new TSparkParameter({ name, - type: this.type ?? DBSQLParameterType.BOOLEAN, + type: wireType ?? DBSQLParameterType.BOOLEAN, value: new TSparkParameterValue({ stringValue: this.value ? 'TRUE' : 'FALSE', }), @@ -71,7 +78,7 @@ export class DBSQLParameter { if (typeof this.value === 'number') { return new TSparkParameter({ name, - type: this.type ?? (Number.isInteger(this.value) ? DBSQLParameterType.INTEGER : DBSQLParameterType.DOUBLE), + type: wireType ?? (Number.isInteger(this.value) ? DBSQLParameterType.INTEGER : DBSQLParameterType.DOUBLE), value: new TSparkParameterValue({ stringValue: Number(this.value).toString(), }), @@ -81,7 +88,7 @@ export class DBSQLParameter { if (this.value instanceof Int64 || typeof this.value === 'bigint') { return new TSparkParameter({ name, - type: this.type ?? DBSQLParameterType.BIGINT, + type: wireType ?? DBSQLParameterType.BIGINT, value: new TSparkParameterValue({ stringValue: this.value.toString(), }), @@ -91,7 +98,7 @@ export class DBSQLParameter { if (this.value instanceof Date) { return new TSparkParameter({ name, - type: this.type ?? DBSQLParameterType.TIMESTAMP, + type: wireType ?? DBSQLParameterType.TIMESTAMP, value: new TSparkParameterValue({ stringValue: this.value.toISOString(), }), @@ -100,7 +107,7 @@ export class DBSQLParameter { return new TSparkParameter({ name, - type: this.type ?? DBSQLParameterType.STRING, + type: wireType ?? DBSQLParameterType.STRING, value: new TSparkParameterValue({ stringValue: this.value, }), diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index ad95ec11..bbd84cd4 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -35,6 +35,7 @@ import ParameterError from '../errors/ParameterError'; import { LogLevel } from '../contracts/IDBSQLLogger'; import { SeaConnection, SeaNativeExecuteOptions, SeaStatement } from './SeaNativeLoader'; import { decodeNapiKernelError } from './SeaErrorMapping'; +import { numberToInt64 } from '../thrift-backend/ThriftSessionBackend'; import SeaOperationBackend from './SeaOperationBackend'; import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams'; import { seaServerInfoValue } from './SeaServerInfo'; @@ -121,7 +122,9 @@ export default class SeaSessionBackend implements ISessionBackend { * Per-statement options forwarded to the kernel `ExecuteOptions`: * - `ordinalParameters` / `namedParameters` → bound params (mutually * exclusive — the kernel binds one placeholder style per statement); - * - `queryTimeout` → `queryTimeoutSecs` (SEA server wait timeout); + * - `queryTimeout` → enforced client-side by the operation backend's poll + * deadline (the kernel ignores `queryTimeoutSecs` on the async submit + * path), NOT forwarded to the napi options; * - `rowLimit` → `rowLimit` (SEA-only server-side row cap); * - `queryTags` → serialised into the conf overlay's reserved * `query_tags` key (the same wire shape Thrift's `serializeQueryTags` @@ -176,7 +179,9 @@ export default class SeaSessionBackend implements ISessionBackend { return new SeaOperationBackend({ asyncStatement: asyncStatement!, context: this.context, - queryTimeoutSecs: options.queryTimeout !== undefined ? Number(options.queryTimeout) : undefined, + // `queryTimeout` is typed `number | bigint | Int64`; `numberToInt64(...).toNumber()` + // coerces all three (a bare `Number(int64)` yields NaN — node-int64 has no valueOf). + queryTimeoutSecs: options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined, }); } diff --git a/tests/unit/DBSQLParameter.test.ts b/tests/unit/DBSQLParameter.test.ts index a3f7659e..deefb13e 100644 --- a/tests/unit/DBSQLParameter.test.ts +++ b/tests/unit/DBSQLParameter.test.ts @@ -101,4 +101,27 @@ describe('DBSQLParameter', () => { expect(dbsqlParam.toSparkParameter()).to.deep.equal(expectedParam); } }); + + it('maps timezone-explicit timestamp types to valid Spark wire types', () => { + // TIMESTAMP_NTZ is a real Spark type → bound verbatim. + expect( + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_NTZ, value: '2024-01-15 10:30:00' }).toSparkParameter(), + ).to.deep.equal( + new TSparkParameter({ + type: DBSQLParameterType.TIMESTAMP_NTZ, + value: new TSparkParameterValue({ stringValue: '2024-01-15 10:30:00' }), + }), + ); + // TIMESTAMP_LTZ has no distinct Spark type → bound as TIMESTAMP (valid on + // both Thrift and kernel; the old verbatim 'TIMESTAMP_LTZ' was rejected by + // the Thrift server). + expect( + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_LTZ, value: '2024-01-15 10:30:00' }).toSparkParameter(), + ).to.deep.equal( + new TSparkParameter({ + type: DBSQLParameterType.TIMESTAMP, + value: new TSparkParameterValue({ stringValue: '2024-01-15 10:30:00' }), + }), + ); + }); }); diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index 5b34e419..dcd9561b 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -14,6 +14,7 @@ import { expect } from 'chai'; import sinon from 'sinon'; +import Int64 from 'node-int64'; import SeaBackend from '../../../lib/sea/SeaBackend'; import SeaSessionBackend from '../../../lib/sea/SeaSessionBackend'; import SeaOperationBackend from '../../../lib/sea/SeaOperationBackend'; @@ -526,6 +527,26 @@ describe('SeaSessionBackend', () => { expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined); }); + it('coerces an Int64 queryTimeout into the client-side deadline (not NaN)', async function int64Timeout() { + // Regression: `Number(new Int64(...))` yields NaN (node-int64 has no valueOf), + // which would silently disable the deadline. The backend must coerce via + // numberToInt64(...).toNumber() so an Int64 queryTimeout still bounds the poll. + // eslint-disable-next-line no-invalid-this + this.timeout(5000); + const connection = new FakeNativeConnection(); + connection.submitStatusValue = 'Running'; // never reaches a terminal state + const session = makeSession(connection); + const op = await session.executeStatement('SELECT 1', { queryTimeout: new Int64(1) }); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown, 'Int64(1) timeout must fire — NaN would poll forever').to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Timeout); + }); + it('executeStatement forwards rowLimit', async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); diff --git a/tests/unit/sea/positionalParams.test.ts b/tests/unit/sea/positionalParams.test.ts index a3389fcc..d9902303 100644 --- a/tests/unit/sea/positionalParams.test.ts +++ b/tests/unit/sea/positionalParams.test.ts @@ -91,7 +91,7 @@ describe('SeaPositionalParams.buildSeaPositionalParams', () => { ]); }); - it('honours explicit TIMESTAMP_NTZ / TIMESTAMP_LTZ types (kernel param codec)', () => { + it('binds TIMESTAMP_NTZ natively and TIMESTAMP_LTZ as TIMESTAMP (Spark has no distinct LTZ type)', () => { expect( buildSeaPositionalParams([ new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_NTZ, value: '2024-01-15 10:30:00' }), @@ -99,7 +99,7 @@ describe('SeaPositionalParams.buildSeaPositionalParams', () => { ]), ).to.deep.equal([ { sqlType: 'TIMESTAMP_NTZ', value: '2024-01-15 10:30:00' }, - { sqlType: 'TIMESTAMP_LTZ', value: '2024-01-15 10:30:00' }, + { sqlType: 'TIMESTAMP', value: '2024-01-15 10:30:00' }, ]); }); });