Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 191 additions & 12 deletions lib/result/ArrowResultConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
TypeMap,
DataType,
Type,
Interval,
IntervalUnit,
StructRow,
MapRow,
Vector,
Expand All @@ -15,6 +17,7 @@ import {
} from 'apache-arrow';
import { TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import HiveDriverError from '../errors/HiveDriverError';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { ArrowBatch, getSchemaColumns, convertThriftValue } from './utils';

Expand All @@ -23,6 +26,137 @@ const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;
type ArrowSchema = Schema<TypeMap>;
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;

/**
* Metadata key carrying the original Arrow `Duration` time unit on fields
* rewritten to `Int64` by the SEA IPC pre-processor
* (`lib/sea/SeaArrowIpcDurationFix.ts`). Re-declared here (rather than
* imported) to keep this generic `lib/result` converter free of a
* compile-time dependency on `lib/sea`.
*
* **SEA-gated by construction — NOT shared with Thrift.** This key (and the
* `DataType.isInterval` / duration branches below) only ever execute on the
* SEA path. The Thrift backend sets `intervalTypesAsArrow: false` and maps
* both INTERVAL `TTypeId`s to `ArrowString` (`lib/result/utils.ts`), so the
* server pre-formats intervals to strings and this logic is never reached.
* `export`ed so `SeaIntervalParity.test` can pin it equal to the SEA-side
* declaration and catch a rename/typo that would silently no-op the consumer.
*/
export const DURATION_UNIT_METADATA_KEY = 'databricks.arrow.duration_unit';
const ZERO_BIGINT = BigInt(0);
const NS_PER_MICRO = BigInt(1_000);
const NS_PER_MILLI = BigInt(1_000_000);
const NS_PER_SEC = BigInt(1_000_000_000);
const NS_PER_MIN = NS_PER_SEC * BigInt(60);
const NS_PER_HOUR = NS_PER_MIN * BigInt(60);
const NS_PER_DAY = NS_PER_HOUR * BigInt(24);

/**
* Format a native Arrow `Interval[YearMonth]` value into the canonical thrift
* string `"Y-M"` (e.g. 1 year 2 months → `"1-2"`, -1 month → `"-0-1"`).
*
* Arrow surfaces YEAR-MONTH as an `Int32Array(2)` `[years, months]` via the
* `GetVisitor` (years/months derived from a single int32 of total months).
*
* **Only YEAR_MONTH reaches here.** The kernel emits INTERVAL DAY-TIME as an
* Arrow `Duration` (rewritten to `Int64`), handled by
* `formatDurationToIntervalDayTime` — never as a native `Interval[DayTime]`.
* Any other unit (DAY_TIME / MONTH_DAY_NANO / undefined) is therefore
* unexpected; we throw rather than silently misread the value as `[days, ms]`
* and emit a confidently-wrong string (the old non-exhaustive default).
*/
function formatArrowInterval(value: Int32Array, valueType: Interval): string {
if (valueType?.unit !== IntervalUnit.YEAR_MONTH) {
throw new HiveDriverError(
`SEA result converter: unsupported Arrow Interval unit ${valueType?.unit}. The kernel emits only ` +
`YEAR_MONTH as a native Arrow Interval (DAY-TIME arrives as Duration); MONTH_DAY_NANO is unsupported.`,
);
}
return formatYearMonth(Number(value[0]), Number(value[1]));
}

/**
* Format the (years, months) decomposition into `"Y-M"` (or `"-Y-M"`
* for negative intervals). Arrow's `getIntervalYearMonth` (in
* `apache-arrow/visitor/get.js:179`) decomposes a signed total-months
* int32 via integer truncation, so years and months always share the
* same sign. We render the absolute values with a single leading `-`
* to match the Spark display format used on the thrift path.
*/
function formatYearMonth(years: number, months: number): string {
const total = years * 12 + months;
if (total < 0) {
const abs = -total;
const y = Math.trunc(abs / 12);
const m = abs % 12;
return `-${y}-${m}`;
}
return `${years}-${months}`;
}

/**
* Format an Arrow `Duration` value (rewritten by the SEA IPC
* pre-processor to `Int64`) into the thrift INTERVAL DAY-TIME string.
*
* @param value the duration value as `bigint` (signed nanos/micros/
* millis/seconds depending on `unit`)
* @param unit one of `SECOND` / `MILLISECOND` / `MICROSECOND` /
* `NANOSECOND` (the original Arrow time unit, captured
* by `SeaArrowIpcDurationFix.ts`)
*/
function formatDurationToIntervalDayTime(value: bigint | number, unit: string): string {
const bi = typeof value === 'bigint' ? value : BigInt(value);
const nanos = toNanoseconds(bi, unit);
return formatDayTimeFromTotal(nanos);
}

/**
* Scale a duration value to nanoseconds based on its unit.
*
* SECOND → ×1_000_000_000
* MILLISECOND → × 1_000_000
* MICROSECOND → × 1_000
* NANOSECOND → × 1
*/
function toNanoseconds(value: bigint, unit: string): bigint {
switch (unit) {
case 'SECOND':
return value * NS_PER_SEC;
case 'MILLISECOND':
return value * NS_PER_MILLI;
case 'MICROSECOND':
return value * NS_PER_MICRO;
case 'NANOSECOND':
default:
return value;
}
}

/**
* Format a signed total-nanoseconds value as `"D HH:mm:ss.fffffffff"`.
* Always emits 9 fractional digits to match the thrift driver's wire
* format (`"1 02:03:04.000000000"` — 9 digits regardless of the
* server-side storage precision). Negative values get a single
* leading `-`. The caller has already scaled to nanoseconds.
*/
function formatDayTimeFromTotal(totalNanos: bigint): string {
const sign = totalNanos < ZERO_BIGINT ? '-' : '';
const abs = totalNanos < ZERO_BIGINT ? -totalNanos : totalNanos;

const days = abs / NS_PER_DAY;
let rem = abs % NS_PER_DAY;
const hours = rem / NS_PER_HOUR;
rem %= NS_PER_HOUR;
const minutes = rem / NS_PER_MIN;
rem %= NS_PER_MIN;
const seconds = rem / NS_PER_SEC;
const subSeconds = rem % NS_PER_SEC;

const pad2 = (n: bigint): string => n.toString().padStart(2, '0');
const fraction = `.${subSeconds.toString().padStart(9, '0')}`;

return `${sign}${days.toString()} ${pad2(hours)}:${pad2(minutes)}:${pad2(seconds)}${fraction}`;
}

export default class ArrowResultConverter implements IResultsProvider<Array<any>> {
private readonly context: IClientContext;

Expand Down Expand Up @@ -147,37 +281,52 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
return rows.map((row) => {
// First, convert native Arrow values to corresponding plain JS objects
const record = this.convertArrowTypes(row, undefined, schema.fields);
const record = this.convertArrowTypes(row, undefined, schema.fields, undefined);
// Second, cast all the values to original Thrift types
return this.convertThriftTypes(record);
});
}

private convertArrowTypes(value: any, valueType: DataType | undefined, fields: Array<ArrowSchemaField> = []): any {
private convertArrowTypes(
value: any,
valueType: DataType | undefined,
fields: Array<ArrowSchemaField> = [],
field?: ArrowSchemaField,
): any {
if (value === null) {
return value;
}

const fieldsMap: Record<string, ArrowSchemaField> = {};
for (const field of fields) {
fieldsMap[field.name] = field;
for (const f of fields) {
fieldsMap[f.name] = f;
}

// Convert structures to plain JS object and process all its fields recursively
if (value instanceof StructRow) {
const result = value.toJSON();
for (const key of Object.keys(result)) {
const field: ArrowSchemaField | undefined = fieldsMap[key];
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
const childField: ArrowSchemaField | undefined = fieldsMap[key];
result[key] = this.convertArrowTypes(
result[key],
childField?.type,
childField?.type.children || [],
childField,
);
}
return result;
}
if (value instanceof MapRow) {
const result = value.toJSON();
// Map type consists of its key and value types. We need only value type here, key will be cast to string anyway
const field = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
const valueField = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
for (const key of Object.keys(result)) {
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
result[key] = this.convertArrowTypes(
result[key],
valueField?.type,
valueField?.type.children || [],
valueField,
);
}
return result;
}
Expand All @@ -186,31 +335,61 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
if (value instanceof Vector) {
const result = value.toJSON();
// Array type contains the only child which defines a type of each array's element
const field = fieldsMap.element;
return result.map((item) => this.convertArrowTypes(item, field?.type, field?.type.children || []));
const elementField = fieldsMap.element;
return result.map((item) =>
this.convertArrowTypes(item, elementField?.type, elementField?.type.children || [], elementField),
);
}

if (DataType.isTimestamp(valueType)) {
return new Date(value);
}

// INTERVAL — Spark/Databricks SEA emits two flavours: native Arrow
// `Interval[YearMonth]` / `Interval[DayTime]` (handled here) and
// `Duration` (transparently rewritten to `Int64` upstream by
// `SeaArrowIpcDurationFix.ts`; handled in the bigint/Int64 branch
// below). In every case we coerce to the canonical thrift string
// form so the SEA path is byte-identical with the thrift path:
// YEAR-MONTH → `"Y-M"`
// DAY-TIME → `"D HH:mm:ss.fffffffff"`
if (DataType.isInterval(valueType)) {
return formatArrowInterval(value, valueType);
}

// Convert big number values to BigInt
// Decimals are also represented as big numbers in Arrow, so additionally process them (convert to float)
if (value instanceof Object && value[isArrowBigNumSymbol]) {
const result = bigNumToBigInt(value);
if (DataType.isDecimal(valueType)) {
return Number(result) / 10 ** valueType.scale;
}
// A rewritten Duration Int64 surfaces as a raw `bigint`, not a BigNum
// wrapper, so it is handled in the bigint branch below — not here.
return result;
}

// Convert binary data to Buffer
// Convert binary data to Buffer.
if (value instanceof Uint8Array) {
// Note: Arrow `Int32Array` / `BigInt64Array` are NOT `instanceof
// Uint8Array` (they are sibling TypedArrays), so an interval value never
// reaches this branch — intervals are handled by the `isInterval` /
// bigint branches above. This is purely the binary-column → Buffer path.
return Buffer.from(value);
}

// Bigint fallback — for raw bigints (not BigNum wrappers), the
// duration_unit metadata also gates the INTERVAL DAY-TIME format.
if (typeof value === 'bigint') {
const durationUnit = field?.metadata.get(DURATION_UNIT_METADATA_KEY);
if (durationUnit) {
return formatDurationToIntervalDayTime(value, durationUnit);
}
return Number(value);
}

// Return other values as is
return typeof value === 'bigint' ? Number(value) : value;
return value;
}

private convertThriftTypes(record: Record<string, any>): any {
Expand Down
88 changes: 88 additions & 0 deletions tests/e2e/sea/interval-edge-e2e.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2026 Databricks, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/* eslint-disable no-console */

import { expect } from 'chai';
import { DBSQLClient } from '../../../lib';
import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient';
import { InternalConnectionOptions } from '../../../lib/contracts/InternalConnectionOptions';
import { getSeaNative } from '../../../lib/sea/SeaNativeLoader';

// INTERVAL edge cases the unit suite can't easily build (null, multi-row).
// Verified byte-identical to the Thrift backend against a live warehouse.
// Requires the pecotesting secrets AND the native binding — skips otherwise.

interface PecoSecrets {
host: string;
path: string;
token: string;
}

function readSecrets(): PecoSecrets | null {
const host = process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME;
const path = process.env.DATABRICKS_PECOTESTING_HTTP_PATH;
const token = process.env.DATABRICKS_PECOTESTING_TOKEN_PERSONAL;
if (!host || !path || !token) return null;
return { host, path, token };
}

async function seaValues(sql: string, secrets: PecoSecrets): Promise<unknown[]> {
const client = new DBSQLClient();
await client.connect({ ...secrets, useSEA: true } as ConnectionOptions & InternalConnectionOptions);
try {
const session = await client.openSession();
const op = await session.executeStatement(sql);
const rows = (await op.fetchAll()) as Array<Record<string, unknown>>;
await op.close();
await session.close();
return rows.map((r) => r.v);
} finally {
await client.close();
}
}

describe('SEA INTERVAL edge cases end-to-end', function suite() {
this.timeout(120_000);

const secrets = readSecrets();

before(function gate() {
// eslint-disable-next-line no-invalid-this
const self = this;
if (!secrets) {
self.skip();
return;
}
// Skip (not error) when the native binding isn't built/installed.
try {
getSeaNative();
} catch {
self.skip();
}
});

it('NULL INTERVAL DAY-TIME → null', async () => {
const values = await seaValues('SELECT CAST(NULL AS INTERVAL DAY TO SECOND) AS v', secrets as PecoSecrets);
expect(values).to.deep.equal([null]);
});

it('multi-row INTERVAL DAY-TIME batch formats every row', async () => {
const values = await seaValues(
"SELECT * FROM VALUES (INTERVAL '1' DAY), (INTERVAL '2 03:00:00' DAY TO SECOND), (INTERVAL '0' DAY) AS t(v)",
secrets as PecoSecrets,
);
expect(values).to.deep.equal(['1 00:00:00.000000000', '2 03:00:00.000000000', '0 00:00:00.000000000']);
});
});
Loading
Loading