Skip to content
Merged
4 changes: 4 additions & 0 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

private authType?: string;

private useProxy?: boolean;

private telemetryClient?: TelemetryClient;

private telemetryEmitter?: TelemetryEventEmitter;
Expand Down Expand Up @@ -415,6 +417,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
// Connection parameters
httpPath: this.httpPath,
enableMetricViewMetadata: this.config.enableMetricViewMetadata,
useProxy: this.useProxy,
};
}

Expand Down Expand Up @@ -594,6 +597,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
this.host = options.host;
this.httpPath = options.path;
this.authType = this.mapAuthType(options);
this.useProxy = Boolean(options.proxy);

// Store enableMetricViewMetadata configuration
if (options.enableMetricViewMetadata !== undefined) {
Expand Down
54 changes: 53 additions & 1 deletion lib/telemetry/DatabricksTelemetryExporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,24 @@ interface DatabricksTelemetryLog {
char_set_encoding?: string;
process_name?: string;
};
auth_type?: string;
driver_connection_params?: {
host_info?: { host_url?: string };
http_path?: string;
mode?: string;
use_proxy?: boolean;
enable_arrow?: boolean;
enable_direct_results?: boolean;
socket_timeout?: number;
enable_metric_view_metadata?: boolean;
};
operation_latency_ms?: number;
sql_operation?: {
execution_result?: string;
is_compressed?: boolean;
operation_detail?: {
operation_type?: string;
};
chunk_details?: {
total_chunks_present?: number;
total_chunks_iterated?: number;
Expand Down Expand Up @@ -368,6 +383,11 @@ export default class DatabricksTelemetryExporter {
if (metric.latencyMs !== undefined) {
log.entry.sql_driver_log.operation_latency_ms = metric.latencyMs;
}
if (metric.operationType) {
log.entry.sql_driver_log.sql_operation = {
operation_detail: { operation_type: metric.operationType },
};
}
if (metric.driverConfig && includeCorrelation) {
// system_configuration is a high-entropy client fingerprint (OS, arch,
// locale, process, runtime). Only ship on the authenticated path.
Expand All @@ -384,15 +404,47 @@ export default class DatabricksTelemetryExporter {
char_set_encoding: metric.driverConfig.charSetEncoding,
process_name: sanitizeProcessName(metric.driverConfig.processName) || undefined,
};

// auth_type and host/http-path are workspace-correlated, so they ride
// the same auth-only path as system_configuration.
if (metric.driverConfig.authType) {
log.entry.sql_driver_log.auth_type = metric.driverConfig.authType;
}
log.entry.sql_driver_log.driver_connection_params = {
host_info: { host_url: this.host },
http_path: metric.driverConfig.httpPath,
mode: 'THRIFT',
use_proxy: metric.driverConfig.useProxy,
enable_arrow: metric.driverConfig.arrowEnabled,
enable_direct_results: metric.driverConfig.directResultsEnabled,
// The proto `socket_timeout` field is defined in seconds, but the driver
// tracks socketTimeout in milliseconds — convert so the receiver records
// the correct unit (e.g. 900000ms -> 900s) instead of treating ms as seconds.
socket_timeout:
typeof metric.driverConfig.socketTimeout === 'number'
? Math.round(metric.driverConfig.socketTimeout / 1000)
: metric.driverConfig.socketTimeout,
enable_metric_view_metadata: metric.driverConfig.enableMetricViewMetadata,
};
}
} else if (metric.metricType === 'statement') {
log.entry.sql_driver_log.operation_latency_ms = metric.latencyMs;

if (metric.resultFormat || metric.chunkCount) {
if (metric.resultFormat || metric.chunkCount || metric.operationType || metric.compressed !== undefined) {
log.entry.sql_driver_log.sql_operation = {
execution_result: metric.resultFormat,
};

if (metric.compressed !== undefined) {
log.entry.sql_driver_log.sql_operation.is_compressed = metric.compressed;
}

if (metric.operationType) {
log.entry.sql_driver_log.sql_operation.operation_detail = {
operation_type: metric.operationType,
};
}

if ((metric.chunkCount ?? 0) > 0) {
log.entry.sql_driver_log.sql_operation.chunk_details = {
total_chunks_present: metric.chunkCount,
Expand Down
4 changes: 2 additions & 2 deletions lib/telemetry/telemetryTypeMappers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ export function mapOperationTypeToTelemetryType(operationType?: TOperationType):
/**
* Map Thrift TSparkRowSetType to telemetry ExecutionResult.Format enum string.
*/
export function mapResultFormatToTelemetryType(resultFormat?: TSparkRowSetType): string | undefined {
export function mapResultFormatToTelemetryType(resultFormat?: TSparkRowSetType): string {
if (resultFormat === undefined) {
return undefined;
return 'FORMAT_UNSPECIFIED';
}

switch (resultFormat) {
Expand Down
3 changes: 3 additions & 0 deletions lib/telemetry/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ export interface DriverConfiguration {

/** Whether metric view metadata is enabled */
enableMetricViewMetadata?: boolean;

/** Whether an HTTP/SOCKS proxy is configured on the connection */
useProxy?: boolean;
}

/**
Expand Down
45 changes: 45 additions & 0 deletions tests/unit/telemetry/DatabricksTelemetryExporter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,51 @@ describe('DatabricksTelemetryExporter', () => {
});
});

describe('export() - driver_connection_params', () => {
// The driver tracks socketTimeout in milliseconds, but the receiver proto
// defines `socket_timeout` in seconds. Lock in the ms -> s conversion so a
// 15-minute (900000ms) timeout is reported as 900s, not 900000s.
function getConnectionParams(sendRequestStub: sinon.SinonStub): any {
const init = sendRequestStub.firstCall.args[1] as { body: string };
const body = JSON.parse(init.body);
const log = JSON.parse(body.protoLogs[0]);
return log.entry.sql_driver_log.driver_connection_params;
}

function makeConnectionMetric(socketTimeout: number): TelemetryMetric {
return makeMetric({
metricType: 'connection',
driverConfig: {
driverName: 'nodejs-sql-driver',
driverVersion: '1.14.0',
socketTimeout,
} as any,
});
}

it('converts socketTimeout from milliseconds to seconds', async () => {
const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any);
const registry = new CircuitBreakerRegistry(context);
const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider);
const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse());

await exporter.export([makeConnectionMetric(900000)]);

expect(getConnectionParams(sendRequestStub).socket_timeout).to.equal(900);
});

it('rounds sub-second socketTimeout values', async () => {
const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any);
const registry = new CircuitBreakerRegistry(context);
const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider);
const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse());

await exporter.export([makeConnectionMetric(1500)]);

expect(getConnectionParams(sendRequestStub).socket_timeout).to.equal(2);
});
});

describe('export() - retry logic', () => {
it('should retry on retryable HTTP errors (503)', async () => {
const context = new ClientContextStub({ telemetryMaxRetries: 2 } as any);
Expand Down
Loading