Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions lib/contracts/IDBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,33 @@ export type ConnectionOptions = {
* @internal Not stable; M0 stub only.
*/
useSEA?: boolean;
/**
* Whether to verify the server's TLS certificate (SEA backend only).
*
* Defaults to `true` — **secure by default**: strict validation against
* the system trust store (full chain + expiry + hostname), matching the
* JDBC/ODBC drivers and every modern HTTPS client.
*
* Set to `false` to disable verification: self-signed, untrusted, and
* expired certificates are accepted and the hostname-vs-certificate check
* is skipped. This is **insecure** — it provides no protection against
* active man-in-the-middle attacks — and exists only as an opt-out for
* parity with the legacy NodeJS Thrift driver, which hard-codes
* `rejectUnauthorized: false`.
*
* For corporate TLS-inspecting proxies or on-prem deployments with an
* internal CA, prefer the default `checkServerCertificate: true` together
* with `customCaCert` over disabling verification entirely.
*/
checkServerCertificate?: boolean;
/**
* PEM-encoded CA certificate to add to the trust store on top of the
* system roots (SEA backend only). Accepts a PEM string or its raw
* `Buffer` bytes. Use this for a corporate proxy that re-signs TLS or an
* on-prem Databricks deployment that uses an internal CA. Honoured
* regardless of `checkServerCertificate`.
*/
customCaCert?: Buffer | string;
} & AuthOptions;

export interface OpenSessionRequest {
Expand Down
28 changes: 15 additions & 13 deletions lib/result/ArrowResultConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
* thrift-path which has no SEA awareness.
*/
const DURATION_UNIT_METADATA_KEY = 'databricks.arrow.duration_unit';
const ZERO_BIGINT = BigInt(0);
const NS_PER_MICRO = BigInt(1_000);
const NS_PER_MILLI = BigInt(1_000_000);
const NS_PER_SEC = BigInt(1_000_000_000);
const MS_PER_DAY = BigInt(86_400_000);
const NS_PER_MIN = NS_PER_SEC * BigInt(60);
const NS_PER_HOUR = NS_PER_MIN * BigInt(60);
const NS_PER_DAY = NS_PER_HOUR * BigInt(24);

/**
* Format an Arrow `Interval[YearMonth]` or `Interval[DayTime]` value
Expand Down Expand Up @@ -63,8 +71,8 @@ function formatArrowInterval(value: any, valueType: any): string {
// We re-normalise: total milliseconds = a * 86_400_000 + b, then split into
// days, hours, minutes, seconds, nanoseconds (nanoseconds is always 0
// because the legacy IntervalDayTime carries only millisecond precision).
const totalMs = BigInt(a) * BigInt(86_400_000) + BigInt(b);
return formatDayTimeFromTotal(totalMs * BigInt(1_000_000) /* → ns */, 'NANOSECOND');
const totalMs = BigInt(a) * MS_PER_DAY + BigInt(b);
return formatDayTimeFromTotal(totalMs * NS_PER_MILLI /* → ns */, 'NANOSECOND');
}

/**
Expand Down Expand Up @@ -113,11 +121,11 @@ function formatDurationToIntervalDayTime(value: bigint | number, unit: string):
function toNanoseconds(value: bigint, unit: string): bigint {
switch (unit) {
case 'SECOND':
return value * BigInt(1_000_000_000);
return value * NS_PER_SEC;
case 'MILLISECOND':
return value * BigInt(1_000_000);
return value * NS_PER_MILLI;
case 'MICROSECOND':
return value * BigInt(1_000);
return value * NS_PER_MICRO;
case 'NANOSECOND':
default:
return value;
Expand All @@ -136,14 +144,8 @@ function toNanoseconds(value: bigint, unit: string): bigint {
* for future use if a unit-aware precision is ever needed.
*/
function formatDayTimeFromTotal(totalNanos: bigint, _unit: string): string {
const ZERO = BigInt(0);
const sign = totalNanos < ZERO ? '-' : '';
const abs = totalNanos < ZERO ? -totalNanos : totalNanos;

const NS_PER_SEC = BigInt(1_000_000_000);
const NS_PER_MIN = NS_PER_SEC * BigInt(60);
const NS_PER_HOUR = NS_PER_MIN * BigInt(60);
const NS_PER_DAY = NS_PER_HOUR * BigInt(24);
const sign = totalNanos < ZERO_BIGINT ? '-' : '';
const abs = totalNanos < ZERO_BIGINT ? -totalNanos : totalNanos;

const days = abs / NS_PER_DAY;
let rem = abs % NS_PER_DAY;
Expand Down
13 changes: 8 additions & 5 deletions lib/sea/SeaArrowIpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ const DATABRICKS_TYPE_NAME = 'databricks.type_name';
*
* Re-parsing inside the converter is unavoidable because `RecordBatch`
* instances created here cannot be passed across the converter's
* `Buffer[]` boundary without rewriting the converter. The IPC bytes
* themselves are small enough (one record batch per call) that the
* double-parse cost is negligible for M0.
* `Buffer[]` boundary without rewriting the converter. Callers that already
* patched the IPC bytes can set `alreadyPatched` to avoid running the
* FlatBuffer rewrite twice on the hot fetch path.
*/
export function decodeIpcBatch(ipcBytes: Buffer): { schema: Schema<TypeMap>; rowCount: number } {
const patched = rewriteDurationToInt64(ipcBytes);
export function decodeIpcBatch(
ipcBytes: Buffer,
options: { alreadyPatched?: boolean } = {},
): { schema: Schema<TypeMap>; rowCount: number } {
const patched = options.alreadyPatched ? ipcBytes : rewriteDurationToInt64(ipcBytes);
const reader = RecordBatchReader.from<TypeMap>(patched);
// Eagerly open so `schema` is populated.
reader.open();
Expand Down
58 changes: 2 additions & 56 deletions lib/sea/SeaArrowIpcDurationFix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,62 +251,8 @@ function maybeRewriteSchemaMessage(schemaMessageBytes: Buffer): Buffer | null {
return null;
}

// Snapshot the (name, originalTypeType, durationUnit, originalCustomMetadata)
// for every field, then rebuild the schema using the flatbuffer builder.
type FieldSnapshot = {
name: string;
nullable: boolean;
isDuration: boolean;
durationUnit?: number; // FbTimeUnit
/** Preserved metadata key→value pairs (we add ours on top for Duration). */
metadata: Array<[string, string]>;
/** Raw bytes for the original field if no rewrite needed; we'll re-encode it. */
typeType: number;
/** Pre-decoded type sub-table bytes for non-Duration fields. */
// For M0 we only rewrite Duration; other fields we re-create with the
// same primitive type. To keep the rewriter narrow, we only support
// schemas where non-Duration fields use type sub-tables that can be
// round-tripped via Field.decode → re-encode through flatbuffers'
// SizedByteArray serialization. That's complex, so instead we use
// a different approach: copy the raw FlatBuffer field offset
// directly when no rewrite is needed (handled by the
// copy-field-by-reference path below).
};
// We can't simply "copy field by reference" across FlatBuffer
// builders, so we have to re-encode every field. For non-Duration
// fields, we re-encode using the apache-arrow `fb/*` accessors.
// That requires touching every existing supported type.
//
// To keep this rewriter narrow and DRY, we take a different
// approach: in-place patch. We do NOT rebuild the FlatBuffer.
// Instead, we mutate the field's `type_type` byte from Duration(18)
// to Int(2), and we point its `type` offset at a freshly-appended
// Int sub-table that we splice into the message bytes. Then we
// append a fresh `KeyValue` for `databricks.arrow.duration_unit`
// into the field's `custom_metadata` vector. This avoids re-encoding
// every other field.
//
// FlatBuffer in-place mutation is tricky because tables have vtables
// and offsets are 32-bit relative pointers. The fields we need to
// change are:
// 1. Field.type_type (1-byte enum at vtable slot for field #2):
// mutate the byte from 18 → 2. Same width, safe to overwrite.
// 2. Field.type (4-byte relative offset to the type sub-table):
// change the offset to point at our appended Int sub-table.
// Same width, safe to overwrite.
// 3. Field.custom_metadata (4-byte relative offset to vector):
// either rewrite the existing vector to add our entry, or
// append a new vector and update the offset.
//
// Because relative offsets are forward-only in FlatBuffers (offset is
// distance from the storage location to the target), and our
// appended sub-tables live AFTER the storage location, the math
// works out. We append to a growing byte buffer and patch the
// existing offset fields to point at the new tail.

// Bail back to the full rebuild approach; in-place patching of
// arbitrary vtable layouts is fragile (vtables may share storage
// across fields). Re-encode the whole schema.
// Re-encode the whole schema. This is more verbose than an in-place
// FlatBuffer patch, but it avoids relying on vtable layout details.
return rebuildSchemaWithDurationRewritten(message, fbSchema);
}

Expand Down
77 changes: 76 additions & 1 deletion lib/sea/SeaAuth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,29 @@ export interface SeaSessionDefaults {
complexTypesAsJson?: boolean;
}

/**
* TLS options shared across all auth-mode variants. Mirror the napi
* binding's `ConnectionOptions.checkServerCertificate` / `.customCaCert`
* (kernel `Session::builder().tls(TlsConfig)`).
*
* The napi shape takes `customCaCert` as a `Buffer` only; the public
* `ConnectionOptions` additionally accepts a PEM string, which
* `buildSeaConnectionOptions` normalises to a `Buffer` before crossing
* the FFI boundary.
*/
export interface SeaTlsOptions {
/**
* Verify the server cert. Omitted ⇒ napi default (secure: strict chain
* + hostname validation). Set `false` to disable (thrift-parity
* insecure). See `ConnectionOptions.checkServerCertificate`.
*/
checkServerCertificate?: boolean;
/** PEM-encoded CA bytes to add to the trust store. */
customCaCert?: Buffer;
}

export type SeaNativeConnectionOptions = SeaSessionDefaults &
SeaTlsOptions &
(
| {
hostName: string;
Expand Down Expand Up @@ -132,6 +154,55 @@ export function isBlankOrReserved(s: string): boolean {
return normalized.length === 0 || normalized === 'undefined' || normalized === 'null';
}

/**
* Normalise the public TLS options (`checkServerCertificate` /
* `customCaCert`) into the napi shape.
*
* - `checkServerCertificate` passes through verbatim (only when set; an
* absent value leaves the napi default, which is secure — strict chain +
* hostname validation. Set `false` to opt out for thrift parity).
* - `customCaCert` accepts a PEM string or `Buffer` on the public
* surface; we convert a string to a `Buffer` here and do a light PEM
* sanity check. The bytes are NOT parsed in JS — the kernel returns a
* meaningful error if the PEM is malformed.
*
* Throws `HiveDriverError` when `customCaCert` is supplied but empty or
* (for strings) lacks a PEM certificate header.
*/
export function buildSeaTlsOptions(options: ConnectionOptions): SeaTlsOptions {
const { checkServerCertificate, customCaCert } = options as {
checkServerCertificate?: boolean;
customCaCert?: Buffer | string;
};

const tls: SeaTlsOptions = {};

if (checkServerCertificate !== undefined) {
tls.checkServerCertificate = checkServerCertificate;
}

if (customCaCert !== undefined) {
if (typeof customCaCert === 'string') {
if (!customCaCert.includes('-----BEGIN CERTIFICATE-----')) {
throw new HiveDriverError(
'SEA backend: `customCaCert` string does not look like a PEM certificate ' +
"(missing '-----BEGIN CERTIFICATE-----'). Pass PEM text or a Buffer of PEM bytes.",
);
}
tls.customCaCert = Buffer.from(customCaCert, 'utf8');
} else if (Buffer.isBuffer(customCaCert)) {
if (customCaCert.length === 0) {
throw new HiveDriverError('SEA backend: `customCaCert` Buffer is empty.');
}
tls.customCaCert = customCaCert;
} else {
throw new HiveDriverError('SEA backend: `customCaCert` must be a PEM string or a Buffer.');
}
}

return tls;
}

/**
* Validate the user-supplied `ConnectionOptions` and build the
* napi-binding's connection-options shape.
Expand Down Expand Up @@ -186,6 +257,10 @@ export function buildSeaConnectionOptions(options: ConnectionOptions): SeaNative
// the kernel default (native Arrow) — they already decode identically
// to Thrift via the shared Arrow converter.
intervalsAsString: true,
// TLS knobs (server-cert verification toggle + custom CA). Validated
// and normalised (string PEM → Buffer) here so the napi shape only
// ever sees a Buffer.
...buildSeaTlsOptions(options),
};

const oauth = options as {
Expand All @@ -200,7 +275,7 @@ export function buildSeaConnectionOptions(options: ConnectionOptions): SeaNative
const { token } = options as { token?: string };
if (typeof token !== 'string' || isBlankOrReserved(token)) {
throw new AuthenticationError(
'SEA backend: a non-empty PAT must be supplied via `token` when using `authType: \'access-token\'`.',
"SEA backend: a non-empty PAT must be supplied via `token` when using `authType: 'access-token'`.",
);
}
if (oauth.oauthClientId !== undefined || oauth.oauthClientSecret !== undefined) {
Expand Down
25 changes: 20 additions & 5 deletions lib/sea/SeaBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ import IBackend from '../contracts/IBackend';
import ISessionBackend from '../contracts/ISessionBackend';
import IClientContext from '../contracts/IClientContext';
import { ConnectionOptions, OpenSessionRequest } from '../contracts/IDBSQLClient';
import { LogLevel } from '../contracts/IDBSQLLogger';
import HiveDriverError from '../errors/HiveDriverError';
import {
getSeaNative,
SeaNativeBinding,
SeaNativeConnection,
} from './SeaNativeLoader';
import { getSeaNative, SeaNativeBinding, SeaNativeConnection } from './SeaNativeLoader';
import { decodeNapiKernelError } from './SeaErrorMapping';
import { buildSeaConnectionOptions, SeaNativeConnectionOptions } from './SeaAuth';
import SeaSessionBackend from './SeaSessionBackend';
Expand Down Expand Up @@ -82,6 +79,23 @@ export default class SeaBackend implements IBackend {
// Any non-PAT mode (or a missing/empty token) throws here, before
// we ever touch the native binding.
this.nativeOptions = buildSeaConnectionOptions(options);

// Server-cert verification is ON by default. Only an explicit
// `checkServerCertificate: false` disables it — a deliberate insecure
// opt-out for thrift parity — so warn (not debug) when that happens.
if (this.nativeOptions.checkServerCertificate === false) {
this.context
?.getLogger()
.log(
LogLevel.warn,
'SEA backend: TLS server-certificate verification is DISABLED ' +
'(checkServerCertificate: false). The connection accepts ' +
'self-signed/untrusted/expired certs and skips the hostname check — this ' +
'matches the legacy Thrift driver but offers no protection against active ' +
'man-in-the-middle attacks. Remove the override (defaults to strict ' +
'validation), optionally with `customCaCert` for corporate/on-prem CAs.',
);
}
}

public async openSession(request: OpenSessionRequest): Promise<ISessionBackend> {
Expand Down Expand Up @@ -119,6 +133,7 @@ export default class SeaBackend implements IBackend {
return new SeaSessionBackend({
connection: nativeConnection!,
context: this.context,
id: nativeConnection!.sessionId,
});
}

Expand Down
2 changes: 2 additions & 0 deletions lib/sea/SeaNativeLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export interface SeaArrowSchema {
* `await` them without `any` casts.
*/
export interface SeaNativeStatement {
readonly statementId: string;
fetchNextBatch(): Promise<SeaArrowBatch | null>;
schema(): Promise<SeaArrowSchema>;
cancel(): Promise<void>;
Expand Down Expand Up @@ -153,6 +154,7 @@ export interface SeaNativeExecuteOptions {
}

export interface SeaNativeConnection {
readonly sessionId: string;
/**
* Execute a SQL statement. Catalog / schema / sessionConf are
* session-level — set on `openSession`. Per-statement options (bound
Expand Down
2 changes: 1 addition & 1 deletion lib/sea/SeaOperationBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ export default class SeaOperationBackend implements IOperationBackend {
this.blockingStatement = statement;
this.lifecycleHandle = (asyncStatement ?? statement) as SeaStatementHandle;
this.context = context;
this._id = id ?? asyncStatement?.statementId ?? uuidv4();
this._id = id ?? asyncStatement?.statementId ?? statement?.statementId ?? uuidv4();
}

public get id(): string {
Expand Down
Loading
Loading