diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index 25609efe..ee53e790 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -238,6 +238,12 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I this.config.enableMetricViewMetadata = options.enableMetricViewMetadata; } + // Persist userAgentEntry so telemetry and feature-flag call sites reuse + // the same value as the primary Thrift connection's User-Agent. + if (options.userAgentEntry !== undefined) { + this.config.userAgentEntry = options.userAgentEntry; + } + this.authProvider = this.createAuthProvider(options, authProvider); this.connectionProvider = this.createConnectionProvider(options); @@ -352,4 +358,15 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I public async getDriver(): Promise { return this.driver; } + + /** + * Returns the authentication provider associated with this client, if any. + * Intended for internal telemetry/feature-flag call sites that need to + * obtain auth headers directly without routing through `IClientContext`. + * + * @internal Not part of the public API. May change without notice. + */ + public getAuthProvider(): IAuthentication | undefined { + return this.authProvider; + } } diff --git a/lib/contracts/IClientContext.ts b/lib/contracts/IClientContext.ts index e4a51274..c7274a1b 100644 --- a/lib/contracts/IClientContext.ts +++ b/lib/contracts/IClientContext.ts @@ -28,9 +28,16 @@ export interface ClientConfig { telemetryBatchSize?: number; telemetryFlushIntervalMs?: number; telemetryMaxRetries?: number; + telemetryBackoffBaseMs?: number; + telemetryBackoffMaxMs?: number; + telemetryBackoffJitterMs?: number; telemetryAuthenticatedExport?: boolean; telemetryCircuitBreakerThreshold?: number; telemetryCircuitBreakerTimeout?: number; + telemetryMaxPendingMetrics?: number; + telemetryMaxErrorsPerStatement?: number; + telemetryStatementTtlMs?: number; + userAgentEntry?: string; } export default interface IClientContext { diff --git a/lib/index.ts b/lib/index.ts index adf14f36..81e3aaae 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -23,6 +23,11 @@ import { LogLevel } from './contracts/IDBSQLLogger'; // Re-export types for TypeScript users export type { default as ITokenProvider } from './connection/auth/tokenProvider/ITokenProvider'; +// Re-export telemetry error classes so consumers can instanceof-check rather +// than string-matching error messages. +export { CircuitBreakerOpenError, CIRCUIT_BREAKER_OPEN_CODE } from './telemetry/CircuitBreaker'; +export { TelemetryTerminalError } from './telemetry/DatabricksTelemetryExporter'; + export const auth = { PlainHttpAuthentication, // Token provider classes for custom authentication diff --git a/lib/telemetry/CircuitBreaker.ts b/lib/telemetry/CircuitBreaker.ts new file mode 100644 index 00000000..33b1fb5b --- /dev/null +++ b/lib/telemetry/CircuitBreaker.ts @@ -0,0 +1,216 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import IClientContext from '../contracts/IClientContext'; +import { LogLevel } from '../contracts/IDBSQLLogger'; + +export enum CircuitBreakerState { + CLOSED = 'CLOSED', + OPEN = 'OPEN', + HALF_OPEN = 'HALF_OPEN', +} + +export interface CircuitBreakerConfig { + failureThreshold: number; + timeout: number; + successThreshold: number; +} + +export const DEFAULT_CIRCUIT_BREAKER_CONFIG: Readonly = Object.freeze({ + failureThreshold: 5, + timeout: 60000, + successThreshold: 2, +}); + +export const CIRCUIT_BREAKER_OPEN_CODE = 'CIRCUIT_BREAKER_OPEN' as const; + +/** + * Thrown when execute() is called while the breaker is OPEN or a HALF_OPEN + * probe is already in flight. Callers identify the condition via + * `instanceof CircuitBreakerOpenError` or `err.code === CIRCUIT_BREAKER_OPEN_CODE` + * rather than string-matching the message. + */ +export class CircuitBreakerOpenError extends Error { + readonly code = CIRCUIT_BREAKER_OPEN_CODE; + + constructor(message = 'Circuit breaker OPEN') { + super(message); + this.name = 'CircuitBreakerOpenError'; + } +} + +export class CircuitBreaker { + private state: CircuitBreakerState = CircuitBreakerState.CLOSED; + + private failureCount = 0; + + private successCount = 0; + + private nextAttempt?: Date; + + private halfOpenInflight = 0; + + private readonly config: CircuitBreakerConfig; + + constructor(private context: IClientContext, config?: Partial) { + this.config = { + ...DEFAULT_CIRCUIT_BREAKER_CONFIG, + ...config, + }; + } + + async execute(operation: () => Promise): Promise { + const admitted = this.tryAdmit(); + if (!admitted) { + throw new CircuitBreakerOpenError(); + } + + const { wasHalfOpenProbe } = admitted; + + try { + const result = await operation(); + this.onSuccess(); + return result; + } catch (error) { + this.onFailure(); + throw error; + } finally { + if (wasHalfOpenProbe && this.halfOpenInflight > 0) { + this.halfOpenInflight -= 1; + } + } + } + + /** + * Synchronous admission check. Returning `null` means "reject". Returning + * an object means the caller is admitted; `wasHalfOpenProbe` indicates + * whether this admission consumed the single HALF_OPEN probe slot so the + * caller can decrement it in `finally`. + * + * Running this as a single synchronous block is what prevents the + * concurrent-probe race that existed in the previous implementation. + */ + private tryAdmit(): { wasHalfOpenProbe: boolean } | null { + const logger = this.context.getLogger(); + + if (this.state === CircuitBreakerState.OPEN) { + if (this.nextAttempt && Date.now() < this.nextAttempt.getTime()) { + return null; + } + this.state = CircuitBreakerState.HALF_OPEN; + this.successCount = 0; + this.halfOpenInflight = 0; + logger.log(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN'); + } + + if (this.state === CircuitBreakerState.HALF_OPEN) { + if (this.halfOpenInflight > 0) { + return null; + } + this.halfOpenInflight += 1; + return { wasHalfOpenProbe: true }; + } + + return { wasHalfOpenProbe: false }; + } + + getState(): CircuitBreakerState { + return this.state; + } + + getFailureCount(): number { + return this.failureCount; + } + + getSuccessCount(): number { + return this.successCount; + } + + private onSuccess(): void { + const logger = this.context.getLogger(); + + this.failureCount = 0; + + if (this.state === CircuitBreakerState.HALF_OPEN) { + this.successCount += 1; + logger.log( + LogLevel.debug, + `Circuit breaker success in HALF_OPEN (${this.successCount}/${this.config.successThreshold})`, + ); + + if (this.successCount >= this.config.successThreshold) { + this.state = CircuitBreakerState.CLOSED; + this.successCount = 0; + this.nextAttempt = undefined; + logger.log(LogLevel.debug, 'Circuit breaker transitioned to CLOSED'); + } + } + } + + private onFailure(): void { + const logger = this.context.getLogger(); + + this.failureCount += 1; + this.successCount = 0; + + logger.log(LogLevel.debug, `Circuit breaker failure (${this.failureCount}/${this.config.failureThreshold})`); + + if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= this.config.failureThreshold) { + this.state = CircuitBreakerState.OPEN; + this.nextAttempt = new Date(Date.now() + this.config.timeout); + logger.log( + LogLevel.warn, + `Telemetry circuit breaker OPEN after ${this.failureCount} failures (will retry after ${this.config.timeout}ms)`, + ); + } + } +} + +export class CircuitBreakerRegistry { + private breakers: Map; + + constructor(private context: IClientContext) { + this.breakers = new Map(); + } + + getCircuitBreaker(host: string, config?: Partial): CircuitBreaker { + let breaker = this.breakers.get(host); + if (!breaker) { + breaker = new CircuitBreaker(this.context, config); + this.breakers.set(host, breaker); + const logger = this.context.getLogger(); + logger.log(LogLevel.debug, `Created circuit breaker for host: ${host}`); + } else if (config) { + const logger = this.context.getLogger(); + logger.log(LogLevel.debug, `Circuit breaker for host ${host} already exists; provided config will be ignored`); + } + return breaker; + } + + getAllBreakers(): Map { + return new Map(this.breakers); + } + + removeCircuitBreaker(host: string): void { + this.breakers.delete(host); + const logger = this.context.getLogger(); + logger.log(LogLevel.debug, `Removed circuit breaker for host: ${host}`); + } + + clear(): void { + this.breakers.clear(); + } +} diff --git a/lib/telemetry/DatabricksTelemetryExporter.ts b/lib/telemetry/DatabricksTelemetryExporter.ts new file mode 100644 index 00000000..23234e62 --- /dev/null +++ b/lib/telemetry/DatabricksTelemetryExporter.ts @@ -0,0 +1,421 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { v4 as uuidv4 } from 'uuid'; +import fetch, { Headers, RequestInit, Response } from 'node-fetch'; +import IClientContext from '../contracts/IClientContext'; +import { LogLevel } from '../contracts/IDBSQLLogger'; +import IAuthentication from '../connection/contracts/IAuthentication'; +import AuthenticationError from '../errors/AuthenticationError'; +import HiveDriverError from '../errors/HiveDriverError'; +import { TelemetryMetric, DEFAULT_TELEMETRY_CONFIG } from './types'; +import { CircuitBreaker, CircuitBreakerOpenError, CircuitBreakerRegistry } from './CircuitBreaker'; +import ExceptionClassifier from './ExceptionClassifier'; +import { buildTelemetryUrl, redactSensitive, sanitizeProcessName } from './telemetryUtils'; +import buildUserAgentString from '../utils/buildUserAgentString'; + +interface DatabricksTelemetryLog { + workspace_id?: string; + frontend_log_event_id: string; + context: { + client_context: { + timestamp_millis: number; + user_agent: string; + }; + }; + entry: { + sql_driver_log: { + session_id?: string; + sql_statement_id?: string; + system_configuration?: { + driver_version?: string; + runtime_name?: string; + runtime_version?: string; + runtime_vendor?: string; + os_name?: string; + os_version?: string; + os_arch?: string; + driver_name?: string; + client_app_name?: string; + locale_name?: string; + char_set_encoding?: string; + process_name?: string; + }; + operation_latency_ms?: number; + sql_operation?: { + execution_result?: string; + chunk_details?: { + total_chunks_present?: number; + total_chunks_iterated?: number; + initial_chunk_latency_millis?: number; + slowest_chunk_latency_millis?: number; + sum_chunks_download_time_millis?: number; + }; + }; + error_info?: { + error_name: string; + stack_trace: string; + }; + }; + }; +} + +/** + * Thrown for non-credential terminal telemetry failures (e.g. refusal to + * export to an invalid host). Separate from `AuthenticationError` so the + * classifier can keep the "short-circuit, don't retry, count as breaker + * failure" contract without muddying the auth taxonomy used by the rest of + * the driver. + */ +export class TelemetryTerminalError extends HiveDriverError { + readonly terminal = true as const; +} + +/** + * Exports telemetry metrics to the Databricks telemetry service. + * + * CRITICAL: export() never throws — all errors are swallowed and logged at + * LogLevel.debug (the one exception is a single warn on the first observed + * auth-missing, re-armed on recovery). + */ +export default class DatabricksTelemetryExporter { + private readonly circuitBreaker: CircuitBreaker; + + private readonly authenticatedUserAgent: string; + + /** User-Agent used for the unauthenticated endpoint; strips any + * caller-supplied `userAgentEntry` that could identify the customer. */ + private readonly unauthenticatedUserAgent: string; + + private authMissingWarned = false; + + constructor( + private context: IClientContext, + private host: string, + private circuitBreakerRegistry: CircuitBreakerRegistry, + private authProvider?: IAuthentication, + ) { + this.circuitBreaker = circuitBreakerRegistry.getCircuitBreaker(host); + const config = this.context.getConfig(); + this.authenticatedUserAgent = buildUserAgentString(config.userAgentEntry); + this.unauthenticatedUserAgent = buildUserAgentString(undefined); + } + + /** + * Release the per-host circuit breaker. Intended for the owning client's + * close() path. + * + * NOTE: `CircuitBreakerRegistry` currently shares one breaker per host + * across consumers; calling this while another consumer is active will + * reset their failure-count memory. The owning-client is expected to be + * the last consumer on its host; multi-consumer refcounting on the + * registry will land in the consumer-wiring PR. + */ + dispose(): void { + this.circuitBreakerRegistry.removeCircuitBreaker(this.host); + } + + async export(metrics: TelemetryMetric[]): Promise { + if (!metrics || metrics.length === 0) { + return; + } + + const logger = this.context.getLogger(); + + try { + await this.circuitBreaker.execute(() => this.exportWithRetry(metrics)); + } catch (error: any) { + if (error instanceof CircuitBreakerOpenError) { + logger.log(LogLevel.debug, 'Circuit breaker OPEN - dropping telemetry'); + } else if (error instanceof AuthenticationError) { + logger.log(LogLevel.debug, `Telemetry export auth failure: ${error.message}`); + } else if (error instanceof TelemetryTerminalError) { + logger.log(LogLevel.debug, `Telemetry export refused: ${error.message}`); + } else { + logger.log(LogLevel.debug, `Telemetry export error: ${error?.message ?? error}`); + } + } + } + + /** + * Retry wrapper shaped after HttpRetryPolicy: retries only on errors + * classified as retryable by ExceptionClassifier, stops on terminal ones, + * surfaces the last error to the circuit breaker. + * + * `maxRetries` is the number of retries *after* the first attempt (i.e. + * attempts = maxRetries + 1), matching HttpRetryPolicy's semantics. + */ + private async exportWithRetry(metrics: TelemetryMetric[]): Promise { + const config = this.context.getConfig(); + const logger = this.context.getLogger(); + + const rawMaxRetries = config.telemetryMaxRetries ?? DEFAULT_TELEMETRY_CONFIG.maxRetries; + const maxRetries = + Number.isFinite(rawMaxRetries) && rawMaxRetries >= 0 ? rawMaxRetries : DEFAULT_TELEMETRY_CONFIG.maxRetries; + const baseMs = config.telemetryBackoffBaseMs ?? DEFAULT_TELEMETRY_CONFIG.backoffBaseMs; + const maxMs = config.telemetryBackoffMaxMs ?? DEFAULT_TELEMETRY_CONFIG.backoffMaxMs; + const jitterMs = config.telemetryBackoffJitterMs ?? DEFAULT_TELEMETRY_CONFIG.backoffJitterMs; + + const totalAttempts = maxRetries + 1; + + let lastError: Error | null = null; + + /* eslint-disable no-await-in-loop */ + for (let attempt = 0; attempt < totalAttempts; attempt += 1) { + try { + await this.exportInternal(metrics); + return; + } catch (error: any) { + lastError = error; + + if ( + error instanceof AuthenticationError || + error instanceof TelemetryTerminalError || + ExceptionClassifier.isTerminal(error) + ) { + throw error; + } + if (!ExceptionClassifier.isRetryable(error)) { + throw error; + } + if (attempt >= totalAttempts - 1) { + throw error; + } + + const base = Math.min(baseMs * 2 ** attempt, maxMs); + const jitter = Math.random() * jitterMs; + const delay = Math.min(base + jitter, maxMs); + + // Include the failing error so ops can see what's being retried, + // not just the cadence. + logger.log( + LogLevel.debug, + `Retrying telemetry export (attempt ${attempt + 1}/${totalAttempts}) after ${Math.round(delay)}ms: ${ + error?.statusCode ?? '' + } ${redactSensitive(error?.message ?? '')}`, + ); + + await this.sleep(delay); + } + } + /* eslint-enable no-await-in-loop */ + + if (lastError) { + throw lastError; + } + } + + private async exportInternal(metrics: TelemetryMetric[]): Promise { + const config = this.context.getConfig(); + const logger = this.context.getLogger(); + + const authenticatedExport = config.telemetryAuthenticatedExport ?? DEFAULT_TELEMETRY_CONFIG.authenticatedExport; + const endpoint = buildTelemetryUrl(this.host, authenticatedExport ? '/telemetry-ext' : '/telemetry-unauth'); + if (!endpoint) { + // Malformed / deny-listed host — drop the batch rather than letting + // it target an attacker-controlled destination. + throw new TelemetryTerminalError('Refusing telemetry export: host failed validation'); + } + + const userAgent = authenticatedExport ? this.authenticatedUserAgent : this.unauthenticatedUserAgent; + let headers: Record = { + 'Content-Type': 'application/json', + 'User-Agent': userAgent, + }; + + if (authenticatedExport) { + headers = { ...headers, ...(await this.getAuthHeaders()) }; + if (!this.hasAuthorization(headers)) { + if (!this.authMissingWarned) { + this.authMissingWarned = true; + logger.log(LogLevel.warn, 'Telemetry: Authorization header missing — metrics will be dropped'); + } + throw new AuthenticationError('Telemetry export: missing Authorization header'); + } + } + + const protoLogs = metrics.map((m) => this.toTelemetryLog(m, authenticatedExport, userAgent)); + const body = JSON.stringify({ + uploadTime: Date.now(), + items: [], + protoLogs: protoLogs.map((log) => JSON.stringify(log)), + }); + + logger.log( + LogLevel.debug, + `Exporting ${metrics.length} telemetry metrics to ${ + authenticatedExport ? 'authenticated' : 'unauthenticated' + } endpoint`, + ); + + const response = await this.sendRequest(endpoint, { + method: 'POST', + headers, + body, + timeout: 10000, + }); + + if (!response.ok) { + await response.text().catch(() => {}); + const error: any = new Error(`Telemetry export failed: ${response.status} ${response.statusText}`); + error.statusCode = response.status; + throw error; + } + + await response.text().catch(() => {}); + // Successful round-trip re-arms the "auth missing" warn so operators see + // a fresh signal the next time auth breaks. + this.authMissingWarned = false; + logger.log(LogLevel.debug, `Successfully exported ${metrics.length} telemetry metrics`); + } + + private async getAuthHeaders(): Promise> { + if (!this.authProvider) { + return {}; + } + const logger = this.context.getLogger(); + try { + const raw = await this.authProvider.authenticate(); + return this.normalizeHeaders(raw); + } catch (error: any) { + logger.log(LogLevel.debug, `Telemetry: auth provider threw: ${error?.message ?? error}`); + return {}; + } + } + + private normalizeHeaders(raw: unknown): Record { + if (!raw) { + return {}; + } + // node-fetch HeadersInit = Headers | [string,string][] | Record. + if (raw instanceof Headers) { + const out: Record = {}; + raw.forEach((value: string, key: string) => { + out[key] = value; + }); + return out; + } + if (Array.isArray(raw)) { + const out: Record = {}; + for (const entry of raw as Array<[string, string]>) { + if (entry && entry.length === 2 && typeof entry[1] === 'string') { + const [key, value] = entry; + out[key] = value; + } + } + return out; + } + const out: Record = {}; + for (const [k, v] of Object.entries(raw as Record)) { + if (typeof v === 'string') { + out[k] = v; + } + } + return out; + } + + private hasAuthorization(headers: Record): boolean { + for (const key of Object.keys(headers)) { + if (key.toLowerCase() === 'authorization' && headers[key]) { + return true; + } + } + return false; + } + + private async sendRequest(url: string, init: RequestInit): Promise { + const connectionProvider = await this.context.getConnectionProvider(); + const agent = await connectionProvider.getAgent(); + return fetch(url, { ...init, agent }); + } + + private toTelemetryLog( + metric: TelemetryMetric, + authenticatedExport: boolean, + userAgent: string, + ): DatabricksTelemetryLog { + // Unauthenticated export must not ship correlation IDs, fingerprint + // data, or raw error detail — an on-path observer could otherwise link + // sessions → workspaces → user activity without any auth. + const includeCorrelation = authenticatedExport; + + const log: DatabricksTelemetryLog = { + workspace_id: includeCorrelation ? metric.workspaceId : undefined, + frontend_log_event_id: uuidv4(), + context: { + client_context: { + timestamp_millis: metric.timestamp, + user_agent: userAgent, + }, + }, + entry: { + sql_driver_log: { + session_id: includeCorrelation ? metric.sessionId : undefined, + sql_statement_id: includeCorrelation ? metric.statementId : undefined, + }, + }, + }; + + if (metric.metricType === 'connection' && metric.driverConfig && includeCorrelation) { + // system_configuration is a high-entropy client fingerprint (OS, arch, + // locale, process, runtime). Only ship on the authenticated path. + log.entry.sql_driver_log.system_configuration = { + driver_version: metric.driverConfig.driverVersion, + driver_name: metric.driverConfig.driverName, + runtime_name: 'Node.js', + runtime_version: metric.driverConfig.nodeVersion, + runtime_vendor: metric.driverConfig.runtimeVendor, + os_name: metric.driverConfig.platform, + os_version: metric.driverConfig.osVersion, + os_arch: metric.driverConfig.osArch, + locale_name: metric.driverConfig.localeName, + char_set_encoding: metric.driverConfig.charSetEncoding, + process_name: sanitizeProcessName(metric.driverConfig.processName) || undefined, + }; + } else if (metric.metricType === 'statement') { + log.entry.sql_driver_log.operation_latency_ms = metric.latencyMs; + + if (metric.resultFormat || metric.chunkCount) { + log.entry.sql_driver_log.sql_operation = { + execution_result: metric.resultFormat, + }; + + if (metric.chunkCount && metric.chunkCount > 0) { + log.entry.sql_driver_log.sql_operation.chunk_details = { + total_chunks_present: metric.chunkCount, + total_chunks_iterated: metric.chunkCount, + }; + } + } + } else if (metric.metricType === 'error') { + const stackOrMessage = metric.errorStack ?? metric.errorMessage ?? ''; + log.entry.sql_driver_log.error_info = { + error_name: metric.errorName || 'UnknownError', + // Redact common secret shapes and cap length. On the unauth path we + // keep only the error class — no message body. + stack_trace: includeCorrelation ? redactSensitive(stackOrMessage) : '', + }; + } + + return log; + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); + } +} diff --git a/lib/telemetry/ExceptionClassifier.ts b/lib/telemetry/ExceptionClassifier.ts index 72e8389c..d0e30d64 100644 --- a/lib/telemetry/ExceptionClassifier.ts +++ b/lib/telemetry/ExceptionClassifier.ts @@ -81,6 +81,17 @@ export default class ExceptionClassifier { return true; } + // Check for transient network errors (connection refused, DNS failure, host unreachable) + const errorCode = (error as any).code; + if ( + errorCode === 'ECONNREFUSED' || + errorCode === 'ENOTFOUND' || + errorCode === 'EHOSTUNREACH' || + errorCode === 'ECONNRESET' + ) { + return true; + } + // Check for HTTP status codes in error properties // Supporting both 'statusCode' and 'status' property names for flexibility const statusCode = (error as any).statusCode ?? (error as any).status; diff --git a/lib/telemetry/FeatureFlagCache.ts b/lib/telemetry/FeatureFlagCache.ts new file mode 100644 index 00000000..f0cf5099 --- /dev/null +++ b/lib/telemetry/FeatureFlagCache.ts @@ -0,0 +1,224 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import fetch, { Headers, RequestInit, Response } from 'node-fetch'; +import IClientContext from '../contracts/IClientContext'; +import { LogLevel } from '../contracts/IDBSQLLogger'; +import IAuthentication from '../connection/contracts/IAuthentication'; +import { buildTelemetryUrl } from './telemetryUtils'; +import buildUserAgentString from '../utils/buildUserAgentString'; +import driverVersion from '../version'; + +export interface FeatureFlagContext { + telemetryEnabled?: boolean; + lastFetched?: Date; + refCount: number; + cacheDuration: number; +} + +/** + * Per-host feature-flag cache used to gate telemetry emission. Responsibilities: + * - dedupe in-flight fetches (thundering-herd protection); + * - ref-count so context goes away when the last consumer closes; + * - clamp server-provided TTL into a safe band. + * + * Shares HTTP plumbing (agent, user agent) with DatabricksTelemetryExporter. + * Consumer wiring lands in a later PR in this stack (see PR description). + */ +export default class FeatureFlagCache { + private contexts: Map; + + private fetchPromises: Map> = new Map(); + + private readonly userAgent: string; + + private readonly CACHE_DURATION_MS = 15 * 60 * 1000; + + private readonly MIN_CACHE_DURATION_S = 60; + + private readonly MAX_CACHE_DURATION_S = 3600; + + private readonly FEATURE_FLAG_NAME = 'databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForNodeJs'; + + constructor(private context: IClientContext, private authProvider?: IAuthentication) { + this.contexts = new Map(); + this.userAgent = buildUserAgentString(this.context.getConfig().userAgentEntry); + } + + getOrCreateContext(host: string): FeatureFlagContext { + let ctx = this.contexts.get(host); + if (!ctx) { + ctx = { + refCount: 0, + cacheDuration: this.CACHE_DURATION_MS, + }; + this.contexts.set(host, ctx); + } + ctx.refCount += 1; + return ctx; + } + + releaseContext(host: string): void { + const ctx = this.contexts.get(host); + if (ctx) { + ctx.refCount -= 1; + if (ctx.refCount <= 0) { + this.contexts.delete(host); + this.fetchPromises.delete(host); + } + } + } + + async isTelemetryEnabled(host: string): Promise { + const logger = this.context.getLogger(); + const ctx = this.contexts.get(host); + + if (!ctx) { + return false; + } + + const isExpired = !ctx.lastFetched || Date.now() - ctx.lastFetched.getTime() > ctx.cacheDuration; + + if (isExpired) { + if (!this.fetchPromises.has(host)) { + const fetchPromise = this.fetchFeatureFlag(host) + .then((enabled) => { + ctx.telemetryEnabled = enabled; + ctx.lastFetched = new Date(); + return enabled; + }) + .catch((error: any) => { + logger.log(LogLevel.debug, `Error fetching feature flag: ${error.message}`); + return ctx.telemetryEnabled ?? false; + }) + .finally(() => { + this.fetchPromises.delete(host); + }); + this.fetchPromises.set(host, fetchPromise); + } + + await this.fetchPromises.get(host); + } + + return ctx.telemetryEnabled ?? false; + } + + /** + * Strips the `-oss` suffix the feature-flag API does not accept. The server + * keys off the SemVer triplet only, so anything appended would 404. + */ + private getDriverVersion(): string { + return driverVersion.replace(/-oss$/, ''); + } + + private async fetchFeatureFlag(host: string): Promise { + const logger = this.context.getLogger(); + + try { + const endpoint = buildTelemetryUrl( + host, + `/api/2.0/connector-service/feature-flags/NODEJS/${this.getDriverVersion()}`, + ); + if (!endpoint) { + logger.log(LogLevel.debug, `Feature flag fetch skipped: invalid host ${host}`); + return false; + } + + const headers: Record = { + 'Content-Type': 'application/json', + 'User-Agent': this.userAgent, + ...(await this.getAuthHeaders()), + }; + + logger.log(LogLevel.debug, `Fetching feature flags from ${endpoint}`); + + const response = await this.sendRequest(endpoint, { + method: 'GET', + headers, + timeout: 10000, + }); + + if (!response.ok) { + await response.text().catch(() => {}); + logger.log(LogLevel.debug, `Feature flag fetch failed: ${response.status} ${response.statusText}`); + return false; + } + + const data: any = await response.json(); + + if (data && data.flags && Array.isArray(data.flags)) { + const ctx = this.contexts.get(host); + if (ctx && typeof data.ttl_seconds === 'number' && data.ttl_seconds > 0) { + const clampedTtl = Math.max(this.MIN_CACHE_DURATION_S, Math.min(this.MAX_CACHE_DURATION_S, data.ttl_seconds)); + ctx.cacheDuration = clampedTtl * 1000; + logger.log(LogLevel.debug, `Updated cache duration to ${clampedTtl} seconds`); + } + + const flag = data.flags.find((f: any) => f.name === this.FEATURE_FLAG_NAME); + if (flag) { + const enabled = String(flag.value).toLowerCase() === 'true'; + logger.log(LogLevel.debug, `Feature flag ${this.FEATURE_FLAG_NAME}: ${enabled}`); + return enabled; + } + } + + logger.log(LogLevel.debug, `Feature flag ${this.FEATURE_FLAG_NAME} not found in response`); + return false; + } catch (error: any) { + logger.log(LogLevel.debug, `Error fetching feature flag from ${host}: ${error.message}`); + return false; + } + } + + private async sendRequest(url: string, init: RequestInit): Promise { + const connectionProvider = await this.context.getConnectionProvider(); + const agent = await connectionProvider.getAgent(); + return fetch(url, { ...init, agent }); + } + + private async getAuthHeaders(): Promise> { + if (!this.authProvider) { + return {}; + } + try { + const raw = await this.authProvider.authenticate(); + if (!raw) { + return {}; + } + if (raw instanceof Headers) { + const out: Record = {}; + raw.forEach((value: string, key: string) => { + out[key] = value; + }); + return out; + } + if (Array.isArray(raw)) { + const out: Record = {}; + for (const entry of raw as Array<[string, string]>) { + if (entry && entry.length === 2) { + const [key, value] = entry; + out[key] = value; + } + } + return out; + } + return { ...(raw as Record) }; + } catch (error: any) { + this.context.getLogger().log(LogLevel.debug, `Feature flag auth failed: ${error?.message ?? error}`); + return {}; + } + } +} diff --git a/lib/telemetry/MetricsAggregator.ts b/lib/telemetry/MetricsAggregator.ts new file mode 100644 index 00000000..f5b0d171 --- /dev/null +++ b/lib/telemetry/MetricsAggregator.ts @@ -0,0 +1,457 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import IClientContext from '../contracts/IClientContext'; +import { LogLevel } from '../contracts/IDBSQLLogger'; +import { TelemetryEvent, TelemetryEventType, TelemetryMetric, DEFAULT_TELEMETRY_CONFIG } from './types'; +import DatabricksTelemetryExporter from './DatabricksTelemetryExporter'; + +interface StatementTelemetryDetails { + statementId: string; + sessionId: string; + workspaceId?: string; + operationType?: string; + startTime: number; + executionLatencyMs?: number; + resultFormat?: string; + chunkCount: number; + bytesDownloaded: number; + pollCount: number; + compressionEnabled?: boolean; + errors: TelemetryEvent[]; +} + +/** + * Aggregates telemetry events by statement_id and manages batching/flushing. + * + * Overflow policy — when the pending buffer hits `maxPendingMetrics`, error + * metrics are preserved preferentially over connection/statement metrics. + * The first-failure error is usually the most valuable signal in post-mortem + * debugging; dropping it FIFO would defeat the purpose of capture. + */ +export default class MetricsAggregator { + private statementMetrics: Map = new Map(); + + private pendingMetrics: TelemetryMetric[] = []; + + private flushTimer: NodeJS.Timeout | null = null; + + private closed = false; + + private batchSize: number; + + private flushIntervalMs: number; + + private maxPendingMetrics: number; + + private maxErrorsPerStatement: number; + + private statementTtlMs: number; + + constructor(private context: IClientContext, private exporter: DatabricksTelemetryExporter) { + try { + const config = context.getConfig(); + this.batchSize = config.telemetryBatchSize ?? DEFAULT_TELEMETRY_CONFIG.batchSize; + this.flushIntervalMs = config.telemetryFlushIntervalMs ?? DEFAULT_TELEMETRY_CONFIG.flushIntervalMs; + this.maxPendingMetrics = config.telemetryMaxPendingMetrics ?? DEFAULT_TELEMETRY_CONFIG.maxPendingMetrics; + this.maxErrorsPerStatement = + config.telemetryMaxErrorsPerStatement ?? DEFAULT_TELEMETRY_CONFIG.maxErrorsPerStatement; + this.statementTtlMs = config.telemetryStatementTtlMs ?? DEFAULT_TELEMETRY_CONFIG.statementTtlMs; + + this.startFlushTimer(); + } catch (error: any) { + const logger = this.context.getLogger(); + logger.log(LogLevel.debug, `MetricsAggregator constructor error: ${error.message}`); + + this.batchSize = DEFAULT_TELEMETRY_CONFIG.batchSize; + this.flushIntervalMs = DEFAULT_TELEMETRY_CONFIG.flushIntervalMs; + this.maxPendingMetrics = DEFAULT_TELEMETRY_CONFIG.maxPendingMetrics; + this.maxErrorsPerStatement = DEFAULT_TELEMETRY_CONFIG.maxErrorsPerStatement; + this.statementTtlMs = DEFAULT_TELEMETRY_CONFIG.statementTtlMs; + } + } + + processEvent(event: TelemetryEvent): void { + if (this.closed) return; + const logger = this.context.getLogger(); + + try { + if (event.eventType === TelemetryEventType.CONNECTION_OPEN) { + this.processConnectionEvent(event); + return; + } + + if (event.eventType === TelemetryEventType.ERROR) { + this.processErrorEvent(event); + return; + } + + if (event.statementId) { + this.processStatementEvent(event); + } + } catch (error: any) { + logger.log(LogLevel.debug, `MetricsAggregator.processEvent error: ${error.message}`); + } + } + + private processConnectionEvent(event: TelemetryEvent): void { + const metric: TelemetryMetric = { + metricType: 'connection', + timestamp: event.timestamp, + sessionId: event.sessionId, + workspaceId: event.workspaceId, + driverConfig: event.driverConfig, + }; + + this.addPendingMetric(metric); + } + + private processErrorEvent(event: TelemetryEvent): void { + const logger = this.context.getLogger(); + + // `isTerminal` is carried on the event by the emitter (it knows the + // call site's taxonomy). If callers ever drop it we default to + // retryable — buffering by statement is the safer choice. + const isTerminal = event.isTerminal === true; + + if (isTerminal) { + logger.log(LogLevel.debug, 'Terminal error detected - flushing immediately'); + + if (event.statementId && this.statementMetrics.has(event.statementId)) { + const details = this.statementMetrics.get(event.statementId)!; + this.pushBoundedError(details, event); + this.completeStatement(event.statementId); + } else { + const metric: TelemetryMetric = { + metricType: 'error', + timestamp: event.timestamp, + sessionId: event.sessionId, + statementId: event.statementId, + workspaceId: event.workspaceId, + errorName: event.errorName, + errorMessage: event.errorMessage, + errorStack: event.errorStack, + }; + this.addPendingMetric(metric); + } + + // Fire-and-forget on the terminal-error path so customer code doesn't + // stall on telemetry HTTP. Do NOT reset the periodic flush timer: + // under burst failures that would keep the tail-drain timer from + // ever firing. + Promise.resolve(this.flush(false)).catch((err: any) => { + logger.log(LogLevel.debug, `Terminal-error flush failed: ${err?.message ?? err}`); + }); + } else if (event.statementId) { + const details = this.getOrCreateStatementDetails(event); + this.pushBoundedError(details, event); + } + } + + private pushBoundedError(details: StatementTelemetryDetails, event: TelemetryEvent): void { + if (details.errors.length >= this.maxErrorsPerStatement) { + details.errors.shift(); + } + details.errors.push(event); + } + + private processStatementEvent(event: TelemetryEvent): void { + const details = this.getOrCreateStatementDetails(event); + + switch (event.eventType) { + case TelemetryEventType.STATEMENT_START: + details.operationType = event.operationType; + details.startTime = event.timestamp; + break; + + case TelemetryEventType.STATEMENT_COMPLETE: + details.executionLatencyMs = event.latencyMs; + details.resultFormat = event.resultFormat; + details.chunkCount = event.chunkCount ?? 0; + details.bytesDownloaded = event.bytesDownloaded ?? 0; + details.pollCount = event.pollCount ?? 0; + break; + + case TelemetryEventType.CLOUDFETCH_CHUNK: + details.chunkCount += 1; + details.bytesDownloaded += event.bytes ?? 0; + if (event.compressed !== undefined) { + details.compressionEnabled = event.compressed; + } + break; + + default: + break; + } + } + + private getOrCreateStatementDetails(event: TelemetryEvent): StatementTelemetryDetails { + const statementId = event.statementId!; + + if (!this.statementMetrics.has(statementId)) { + this.statementMetrics.set(statementId, { + statementId, + sessionId: event.sessionId!, + workspaceId: event.workspaceId, + startTime: event.timestamp, + chunkCount: 0, + bytesDownloaded: 0, + pollCount: 0, + errors: [], + }); + } + + return this.statementMetrics.get(statementId)!; + } + + /** + * Drop entries older than `statementTtlMs`, emitting their buffered error + * events as standalone metrics first so the first-failure signal survives + * the eviction. Called from the periodic flush timer so idle clients + * don't leak orphan entries. + */ + private evictExpiredStatements(): void { + const cutoff = Date.now() - this.statementTtlMs; + let evicted = 0; + for (const [id, details] of this.statementMetrics) { + if (details.startTime < cutoff) { + for (const errorEvent of details.errors) { + this.addPendingMetric({ + metricType: 'error', + timestamp: errorEvent.timestamp, + sessionId: details.sessionId, + statementId: details.statementId, + workspaceId: details.workspaceId, + errorName: errorEvent.errorName, + errorMessage: errorEvent.errorMessage, + errorStack: errorEvent.errorStack, + }); + } + this.statementMetrics.delete(id); + evicted += 1; + } + } + if (evicted > 0) { + this.context + .getLogger() + .log(LogLevel.debug, `Evicted ${evicted} abandoned statement(s) past ${this.statementTtlMs}ms TTL`); + } + } + + completeStatement(statementId: string): void { + if (this.closed) return; + const logger = this.context.getLogger(); + + try { + const details = this.statementMetrics.get(statementId); + if (!details) { + return; + } + + const metric: TelemetryMetric = { + metricType: 'statement', + timestamp: details.startTime, + sessionId: details.sessionId, + statementId: details.statementId, + workspaceId: details.workspaceId, + latencyMs: details.executionLatencyMs, + resultFormat: details.resultFormat, + chunkCount: details.chunkCount, + bytesDownloaded: details.bytesDownloaded, + pollCount: details.pollCount, + }; + + this.addPendingMetric(metric); + + for (const errorEvent of details.errors) { + const errorMetric: TelemetryMetric = { + metricType: 'error', + timestamp: errorEvent.timestamp, + sessionId: details.sessionId, + statementId: details.statementId, + workspaceId: details.workspaceId, + errorName: errorEvent.errorName, + errorMessage: errorEvent.errorMessage, + errorStack: errorEvent.errorStack, + }; + this.addPendingMetric(errorMetric); + } + + this.statementMetrics.delete(statementId); + } catch (error: any) { + logger.log(LogLevel.debug, `MetricsAggregator.completeStatement error: ${error.message}`); + } + } + + /** + * Append `metric` to the pending buffer, enforcing `maxPendingMetrics`. + * + * Overflow drops the oldest non-error entry (single `splice` — no new + * allocation). Under an all-error buffer it falls back to dropping the + * oldest entry at index 0. + */ + private addPendingMetric(metric: TelemetryMetric): void { + if (this.closed) return; + this.pendingMetrics.push(metric); + + if (this.pendingMetrics.length > this.maxPendingMetrics) { + const dropIndex = this.findDropIndex(); + this.pendingMetrics.splice(dropIndex, 1); + const logger = this.context.getLogger(); + logger.log( + LogLevel.debug, + `Dropped 1 oldest non-error telemetry metric (buffer full at ${this.maxPendingMetrics})`, + ); + } + + if (this.pendingMetrics.length >= this.batchSize) { + // resetTimer=false so the periodic tail-drain keeps its cadence even + // under sustained batch-size bursts. + const logger = this.context.getLogger(); + Promise.resolve(this.flush(false)).catch((err: any) => { + logger.log(LogLevel.debug, `Batch-trigger flush failed: ${err?.message ?? err}`); + }); + } + } + + private findDropIndex(): number { + for (let i = 0; i < this.pendingMetrics.length; i += 1) { + if (this.pendingMetrics[i].metricType !== 'error') { + return i; + } + } + return 0; + } + + /** + * Drain the pending buffer and return a promise that resolves when the + * exporter finishes with the drained batch. `close()` awaits this so + * `process.exit()` after `client.close()` doesn't truncate the POST. + */ + async flush(resetTimer: boolean = true): Promise { + const logger = this.context.getLogger(); + + let exportPromise: Promise | null = null; + try { + if (this.pendingMetrics.length === 0) { + if (resetTimer && !this.closed) { + this.startFlushTimer(); + } + return; + } + + const metricsToExport = this.pendingMetrics; + this.pendingMetrics = []; + + logger.log(LogLevel.debug, `Flushing ${metricsToExport.length} telemetry metrics`); + + exportPromise = this.exporter.export(metricsToExport); + + if (resetTimer && !this.closed) { + this.startFlushTimer(); + } + } catch (error: any) { + logger.log(LogLevel.debug, `MetricsAggregator.flush error: ${error.message}`); + } + + if (exportPromise) { + try { + await exportPromise; + } catch (err: any) { + logger.log(LogLevel.debug, `Unexpected export error: ${err?.message ?? err}`); + } + } + } + + private startFlushTimer(): void { + if (this.closed) return; + const logger = this.context.getLogger(); + + try { + if (this.flushTimer) { + clearInterval(this.flushTimer); + } + + this.flushTimer = setInterval(() => { + // Idle eviction: run before the flush so orphan-error metrics have + // a chance to batch into this drain rather than wait for the next. + try { + this.evictExpiredStatements(); + } catch (err: any) { + logger.log(LogLevel.debug, `evictExpiredStatements error: ${err?.message ?? err}`); + } + Promise.resolve(this.flush(false)).catch((err: any) => { + logger.log(LogLevel.debug, `Periodic flush failed: ${err?.message ?? err}`); + }); + }, this.flushIntervalMs); + + this.flushTimer.unref(); + } catch (error: any) { + logger.log(LogLevel.debug, `MetricsAggregator.startFlushTimer error: ${error.message}`); + } + } + + async close(): Promise { + const logger = this.context.getLogger(); + this.closed = true; + + try { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = null; + } + + // Snapshot keys — completeStatement mutates statementMetrics. + const remainingStatements = [...this.statementMetrics.keys()]; + for (const statementId of remainingStatements) { + this.completeStatementForClose(statementId); + } + + await this.flushForClose(); + + // Belt-and-braces: something the above awaited could in principle + // have resurrected a timer. Clear once more. + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = null; + } + } catch (error: any) { + logger.log(LogLevel.debug, `MetricsAggregator.close error: ${error.message}`); + } + } + + /** completeStatement variant that bypasses the `closed` guard. */ + private completeStatementForClose(statementId: string): void { + const prev = this.closed; + this.closed = false; + try { + this.completeStatement(statementId); + } finally { + this.closed = prev; + } + } + + /** flush variant that bypasses the `closed` guard on addPendingMetric. */ + private async flushForClose(): Promise { + const prev = this.closed; + this.closed = false; + try { + await this.flush(false); + } finally { + this.closed = prev; + } + } +} diff --git a/lib/telemetry/TelemetryEventEmitter.ts b/lib/telemetry/TelemetryEventEmitter.ts new file mode 100644 index 00000000..bbb0b757 --- /dev/null +++ b/lib/telemetry/TelemetryEventEmitter.ts @@ -0,0 +1,192 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { EventEmitter } from 'events'; +import IClientContext from '../contracts/IClientContext'; +import { LogLevel } from '../contracts/IDBSQLLogger'; +import { TelemetryEvent, TelemetryEventType, DriverConfiguration } from './types'; + +/** + * EventEmitter for driver telemetry. + * Emits events at key driver operations. + * + * CRITICAL REQUIREMENT: ALL exceptions must be caught and logged at LogLevel.debug ONLY + * (never warn/error) to avoid customer anxiety. NO console logging allowed - only IDBSQLLogger. + * + * All emit methods are wrapped in try-catch blocks that swallow exceptions completely. + * Event emission respects the telemetryEnabled flag from context config. + */ +export default class TelemetryEventEmitter extends EventEmitter { + private enabled: boolean; + + constructor(private context: IClientContext) { + super(); + // Check if telemetry is enabled from config + // Default to false for safe rollout + const config = context.getConfig(); + this.enabled = config.telemetryEnabled ?? false; + } + + /** + * Emit a connection open event. + * + * @param data Connection event data including sessionId, workspaceId, and driverConfig + */ + emitConnectionOpen(data: { sessionId: string; workspaceId: string; driverConfig: DriverConfiguration }): void { + if (!this.enabled) return; + + const logger = this.context.getLogger(); + try { + const event: TelemetryEvent = { + eventType: TelemetryEventType.CONNECTION_OPEN, + timestamp: Date.now(), + sessionId: data.sessionId, + workspaceId: data.workspaceId, + driverConfig: data.driverConfig, + }; + this.emit(TelemetryEventType.CONNECTION_OPEN, event); + } catch (error: any) { + // Swallow all exceptions - log at debug level only + logger.log(LogLevel.debug, `Error emitting connection event: ${error.message}`); + } + } + + /** + * Emit a statement start event. + * + * @param data Statement start data including statementId, sessionId, and operationType + */ + emitStatementStart(data: { statementId: string; sessionId: string; operationType?: string }): void { + if (!this.enabled) return; + + const logger = this.context.getLogger(); + try { + const event: TelemetryEvent = { + eventType: TelemetryEventType.STATEMENT_START, + timestamp: Date.now(), + statementId: data.statementId, + sessionId: data.sessionId, + operationType: data.operationType, + }; + this.emit(TelemetryEventType.STATEMENT_START, event); + } catch (error: any) { + // Swallow all exceptions - log at debug level only + logger.log(LogLevel.debug, `Error emitting statement start: ${error.message}`); + } + } + + /** + * Emit a statement complete event. + * + * @param data Statement completion data including latency, result format, and metrics + */ + emitStatementComplete(data: { + statementId: string; + sessionId: string; + latencyMs?: number; + resultFormat?: string; + chunkCount?: number; + bytesDownloaded?: number; + pollCount?: number; + }): void { + if (!this.enabled) return; + + const logger = this.context.getLogger(); + try { + const event: TelemetryEvent = { + eventType: TelemetryEventType.STATEMENT_COMPLETE, + timestamp: Date.now(), + statementId: data.statementId, + sessionId: data.sessionId, + latencyMs: data.latencyMs, + resultFormat: data.resultFormat, + chunkCount: data.chunkCount, + bytesDownloaded: data.bytesDownloaded, + pollCount: data.pollCount, + }; + this.emit(TelemetryEventType.STATEMENT_COMPLETE, event); + } catch (error: any) { + // Swallow all exceptions - log at debug level only + logger.log(LogLevel.debug, `Error emitting statement complete: ${error.message}`); + } + } + + /** + * Emit a CloudFetch chunk download event. + * + * @param data CloudFetch chunk data including chunk index, latency, bytes, and compression + */ + emitCloudFetchChunk(data: { + statementId: string; + chunkIndex: number; + latencyMs?: number; + bytes: number; + compressed?: boolean; + }): void { + if (!this.enabled) return; + + const logger = this.context.getLogger(); + try { + const event: TelemetryEvent = { + eventType: TelemetryEventType.CLOUDFETCH_CHUNK, + timestamp: Date.now(), + statementId: data.statementId, + chunkIndex: data.chunkIndex, + latencyMs: data.latencyMs, + bytes: data.bytes, + compressed: data.compressed, + }; + this.emit(TelemetryEventType.CLOUDFETCH_CHUNK, event); + } catch (error: any) { + // Swallow all exceptions - log at debug level only + logger.log(LogLevel.debug, `Error emitting cloudfetch chunk: ${error.message}`); + } + } + + /** + * Emit an error event. + * + * @param data Error event data including error details and terminal status + */ + emitError(data: { + statementId?: string; + sessionId?: string; + errorName: string; + errorMessage: string; + errorStack?: string; + isTerminal: boolean; + }): void { + if (!this.enabled) return; + + const logger = this.context.getLogger(); + try { + const event: TelemetryEvent = { + eventType: TelemetryEventType.ERROR, + timestamp: Date.now(), + statementId: data.statementId, + sessionId: data.sessionId, + errorName: data.errorName, + errorMessage: data.errorMessage, + errorStack: data.errorStack, + isTerminal: data.isTerminal, + }; + this.emit(TelemetryEventType.ERROR, event); + } catch (error: any) { + // Swallow all exceptions - log at debug level only + logger.log(LogLevel.debug, `Error emitting error event: ${error.message}`); + } + } +} diff --git a/lib/telemetry/telemetryUtils.ts b/lib/telemetry/telemetryUtils.ts new file mode 100644 index 00000000..9cd7015a --- /dev/null +++ b/lib/telemetry/telemetryUtils.ts @@ -0,0 +1,179 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Hosts we always refuse to send authenticated telemetry to. Targeted at the + * `/api/2.0/sql/telemetry-ext` exfil vector: an attacker-influenced `host` + * (env var, tampered config, etc.) must not be able to redirect the Bearer + * token to a loopback/IMDS/RFC1918 endpoint. + */ +const BLOCKED_HOST_PATTERNS: RegExp[] = [ + /^(?:127\.|0\.|10\.|169\.254\.|172\.(?:1[6-9]|2[0-9]|3[01])\.|192\.168\.)/, + /^(?:localhost|metadata\.google\.internal|metadata\.azure\.com)$/i, + /^\[?::1\]?$/, + /^\[?(?:fc|fd)[0-9a-f]{2}:/i, + /^\[?::ffff:(?:127|10|0|169\.254)\./i, +]; + +/** + * Build an HTTPS telemetry URL from a host and a path. + * + * Refuses anything beyond a bare `host[:port]` so a compromised or mistyped + * host cannot redirect the authenticated request to an attacker-controlled + * endpoint. Defeated historical bypasses include: + * - protocol-relative prefix: `//attacker.com` + * - zero-width / ASCII whitespace in the host + * - userinfo (`user:pass@host`) + * - path/query/fragment + * - CRLF (header injection on some fetch backends) + * - loopback / link-local / RFC1918 / cloud-metadata addresses + * + * Returns `null` when the host fails any check; callers drop the batch. + */ +export function buildTelemetryUrl(host: string, path: string): string | null { + if (typeof host !== 'string' || host.length === 0) { + return null; + } + + // Reject ASCII whitespace + common zero-width/BOM codepoints that JS `\s` + // does not cover but `new URL` silently strips. + if (/[\s\u200b-\u200f\u2060\ufeff]/.test(host)) { + return null; + } + + const cleanHost = host.replace(/^https?:\/\//, '').replace(/\/+$/, ''); + if (cleanHost.length === 0) { + return null; + } + + // Reject anything that looks like userinfo / path / protocol-relative + // prefix before URL parsing. `new URL('https://' + '//x')` would otherwise + // normalise the doubled slash and accept `x` as the host. + if (/[/\\@]/.test(cleanHost)) { + return null; + } + + let parsed: URL; + try { + parsed = new URL(`https://${cleanHost}`); + } catch { + return null; + } + + if ( + parsed.pathname !== '/' || + parsed.search !== '' || + parsed.hash !== '' || + parsed.username !== '' || + parsed.password !== '' + ) { + return null; + } + + // Defence in depth: ensure `new URL` did not silently rewrite the host we + // validated (e.g. by stripping a codepoint we missed above). `new URL` + // normalises away the default :443 for https, so compare using the + // port-stripped hostname instead of .host. + const expectedHost = cleanHost.toLowerCase().replace(/:443$/, ''); + const actualHost = parsed.host.toLowerCase().replace(/:443$/, ''); + if (actualHost !== expectedHost) { + return null; + } + + if (BLOCKED_HOST_PATTERNS.some((r) => r.test(parsed.hostname))) { + return null; + } + + return `https://${parsed.host}${path}`; +} + +/** + * Prefixes the Databricks driver uses for internal token formats. Kept in + * sync with `lib/utils/buildUserAgentString.ts`'s `redactInternalToken`. + * Extending one list should extend the other. + */ +const DATABRICKS_TOKEN_PREFIXES = ['dkea', 'dskea', 'dapi', 'dsapi', 'dose']; + +const SECRET_PATTERNS: Array<[RegExp, string]> = [ + // `Authorization: Bearer ` / `Bearer ` anywhere in a stack. + [/Bearer\s+[A-Za-z0-9._\-+/=]+/gi, 'Bearer '], + // `Authorization: Basic `. + [/Basic\s+[A-Za-z0-9+/=]+/gi, 'Basic '], + // URL userinfo: `https://user:pass@host/…`. + [/([a-z][a-z0-9+.-]*:\/\/)[^/\s:@]+:[^/\s@]+@/gi, '$1@'], + // Databricks PATs / service-token prefixes without `Bearer`, e.g. + // `token is dapi0123…` — appears in error stacks that echo the raw value. + [new RegExp(`\\b(?:${DATABRICKS_TOKEN_PREFIXES.join('|')})[A-Za-z0-9]{8,}`, 'g'), ''], + // JWTs (three base64url segments separated by dots). + [/\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b/g, ''], + // JSON-encoded secrets: `"client_secret":"..."`, `"access_token":"..."` etc. + [ + /"(password|token|client_secret|refresh_token|access_token|id_token|secret|api[_-]?key|apikey)"\s*:\s*"[^"]*"/gi, + '"$1":""', + ], + // Form-URL-encoded / key=value secrets. + [ + /\b(token|password|client_secret|refresh_token|access_token|id_token|secret|api[_-]?key|apikey)=[^\s&"']+/gi, + '$1=', + ], +]; + +/** + * Strips common secret shapes from a free-form error string and caps length. + * Applied before anything is shipped off-box. Redaction happens before + * truncation so a long stack cannot bury a secret past the cap; truncation + * then runs a second pass to catch anything that appeared only in the tail. + */ +export function redactSensitive(value: string | undefined, maxLen = 2048): string { + if (!value) { + return ''; + } + let redacted = value; + for (const [pattern, replacement] of SECRET_PATTERNS) { + redacted = redacted.replace(pattern, replacement); + } + if (redacted.length > maxLen) { + redacted = `${redacted.slice(0, maxLen)}…[truncated]`; + for (const [pattern, replacement] of SECRET_PATTERNS) { + redacted = redacted.replace(pattern, replacement); + } + } + return redacted; +} + +/** + * Returns a safe `process_name` value: the basename of the first whitespace- + * delimited token, with trailing whitespace trimmed. This defeats both the + * absolute-path PII leak (`/home//app.js`) and the argv-leak shape + * (`node --db-password=X app.js`) that some producers pass in. + */ +export function sanitizeProcessName(name: string | undefined): string { + if (!name) { + return ''; + } + const trimmed = name.trim(); + if (trimmed.length === 0) { + return ''; + } + // Drop argv tail: anything after the first whitespace — argv[0] shouldn't + // contain spaces, but producers sometimes pass `argv.join(' ')`. + const firstToken = trimmed.split(/\s/, 1)[0]; + if (!firstToken) { + return ''; + } + const lastSep = Math.max(firstToken.lastIndexOf('/'), firstToken.lastIndexOf('\\')); + return lastSep < 0 ? firstToken : firstToken.slice(lastSep + 1); +} diff --git a/lib/telemetry/types.ts b/lib/telemetry/types.ts index 34c2164b..6a4a25a9 100644 --- a/lib/telemetry/types.ts +++ b/lib/telemetry/types.ts @@ -14,6 +14,11 @@ * limitations under the License. */ +/** + * Driver name constant for telemetry + */ +export const DRIVER_NAME = 'nodejs-sql-driver'; + /** * Event types emitted by the telemetry system */ @@ -22,7 +27,7 @@ export enum TelemetryEventType { STATEMENT_START = 'statement.start', STATEMENT_COMPLETE = 'statement.complete', CLOUDFETCH_CHUNK = 'cloudfetch.chunk', - ERROR = 'error', + ERROR = 'telemetry.error', } /** @@ -38,9 +43,18 @@ export interface TelemetryConfiguration { /** Interval in milliseconds to flush metrics */ flushIntervalMs?: number; - /** Maximum retry attempts for export */ + /** Maximum retry attempts for export (attempts *after* the initial call) */ maxRetries?: number; + /** Minimum backoff delay in ms for retry backoff */ + backoffBaseMs?: number; + + /** Maximum backoff delay in ms (includes jitter) */ + backoffMaxMs?: number; + + /** Upper bound of added jitter in ms */ + backoffJitterMs?: number; + /** Whether to use authenticated export endpoint */ authenticatedExport?: boolean; @@ -49,27 +63,42 @@ export interface TelemetryConfiguration { /** Circuit breaker timeout in milliseconds */ circuitBreakerTimeout?: number; + + /** Maximum number of pending metrics buffered before dropping oldest */ + maxPendingMetrics?: number; + + /** Maximum number of error events buffered per statement before dropping oldest */ + maxErrorsPerStatement?: number; + + /** TTL in ms after which abandoned statement aggregations are evicted */ + statementTtlMs?: number; } /** * Default telemetry configuration values */ -export const DEFAULT_TELEMETRY_CONFIG: Required = { - enabled: false, // Initially disabled for safe rollout +export const DEFAULT_TELEMETRY_CONFIG: Readonly> = Object.freeze({ + enabled: false, batchSize: 100, flushIntervalMs: 5000, maxRetries: 3, + backoffBaseMs: 100, + backoffMaxMs: 1000, + backoffJitterMs: 100, authenticatedExport: true, circuitBreakerThreshold: 5, - circuitBreakerTimeout: 60000, // 1 minute -}; + circuitBreakerTimeout: 60000, + maxPendingMetrics: 500, + maxErrorsPerStatement: 50, + statementTtlMs: 60 * 60 * 1000, // 1 hour +}); /** * Runtime telemetry event emitted by the driver */ export interface TelemetryEvent { /** Type of the event */ - eventType: TelemetryEventType | string; + eventType: TelemetryEventType; /** Timestamp when the event occurred (milliseconds since epoch) */ timestamp: number; @@ -123,6 +152,9 @@ export interface TelemetryEvent { /** Error message */ errorMessage?: string; + /** Stack trace, captured at emission site; redacted before export */ + errorStack?: string; + /** Whether the error is terminal (non-retryable) */ isTerminal?: boolean; } @@ -169,6 +201,9 @@ export interface TelemetryMetric { /** Error message */ errorMessage?: string; + + /** Stack trace, captured at emission site; redacted before export */ + errorStack?: string; } /** @@ -190,6 +225,24 @@ export interface DriverConfiguration { /** OS version */ osVersion: string; + /** OS architecture (x64, arm64, etc.) */ + osArch: string; + + /** Runtime vendor (Node.js Foundation) */ + runtimeVendor: string; + + /** Locale name (e.g., en_US) */ + localeName: string; + + /** Character set encoding (e.g., UTF-8) */ + charSetEncoding: string; + + /** + * Process name. Producers MUST pass only a basename (no absolute path) — + * `sanitizeProcessName()` is applied at export time as a defence in depth. + */ + processName: string; + // Feature flags /** Whether CloudFetch is enabled */ cloudFetchEnabled: boolean; diff --git a/tests/unit/.stubs/CircuitBreakerStub.ts b/tests/unit/.stubs/CircuitBreakerStub.ts new file mode 100644 index 00000000..e85ad35c --- /dev/null +++ b/tests/unit/.stubs/CircuitBreakerStub.ts @@ -0,0 +1,164 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { CircuitBreakerOpenError, CircuitBreakerState } from '../../../lib/telemetry/CircuitBreaker'; + +/** + * Stub implementation of CircuitBreaker for testing. + * Provides a simplified implementation that can be controlled in tests. + */ +export default class CircuitBreakerStub { + private state: CircuitBreakerState = CircuitBreakerState.CLOSED; + private failureCount = 0; + private successCount = 0; + public executeCallCount = 0; + + /** + * Executes an operation with circuit breaker protection. + * In stub mode, always executes the operation unless state is OPEN. + */ + async execute(operation: () => Promise): Promise { + this.executeCallCount++; + + if (this.state === CircuitBreakerState.OPEN) { + throw new CircuitBreakerOpenError(); + } + + try { + const result = await operation(); + this.onSuccess(); + return result; + } catch (error) { + this.onFailure(); + throw error; + } + } + + /** + * Gets the current state of the circuit breaker. + */ + getState(): CircuitBreakerState { + return this.state; + } + + /** + * Sets the state (for testing purposes). + */ + setState(state: CircuitBreakerState): void { + this.state = state; + } + + /** + * Gets the current failure count. + */ + getFailureCount(): number { + return this.failureCount; + } + + /** + * Sets the failure count (for testing purposes). + */ + setFailureCount(count: number): void { + this.failureCount = count; + } + + /** + * Gets the current success count. + */ + getSuccessCount(): number { + return this.successCount; + } + + /** + * Resets all state (for testing purposes). + */ + reset(): void { + this.state = CircuitBreakerState.CLOSED; + this.failureCount = 0; + this.successCount = 0; + this.executeCallCount = 0; + } + + /** + * Handles successful operation execution. + */ + private onSuccess(): void { + this.failureCount = 0; + if (this.state === CircuitBreakerState.HALF_OPEN) { + this.successCount++; + if (this.successCount >= 2) { + this.state = CircuitBreakerState.CLOSED; + this.successCount = 0; + } + } + } + + /** + * Handles failed operation execution. + */ + private onFailure(): void { + this.failureCount++; + this.successCount = 0; + // In HALF_OPEN state, any single failure immediately reopens (matches real implementation) + if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= 5) { + this.state = CircuitBreakerState.OPEN; + } + } +} + +/** + * Stub implementation of CircuitBreakerRegistry for testing. + */ +export class CircuitBreakerRegistryStub { + private breakers: Map; + + constructor() { + this.breakers = new Map(); + } + + /** + * Gets or creates a circuit breaker for the specified host. + */ + getCircuitBreaker(host: string): CircuitBreakerStub { + let breaker = this.breakers.get(host); + if (!breaker) { + breaker = new CircuitBreakerStub(); + this.breakers.set(host, breaker); + } + return breaker; + } + + /** + * Gets all registered circuit breakers. + */ + getAllBreakers(): Map { + return new Map(this.breakers); + } + + /** + * Removes a circuit breaker for the specified host. + */ + removeCircuitBreaker(host: string): void { + this.breakers.delete(host); + } + + /** + * Clears all circuit breakers. + */ + clear(): void { + this.breakers.clear(); + } +} diff --git a/tests/unit/telemetry/CircuitBreaker.test.ts b/tests/unit/telemetry/CircuitBreaker.test.ts new file mode 100644 index 00000000..d6ff62af --- /dev/null +++ b/tests/unit/telemetry/CircuitBreaker.test.ts @@ -0,0 +1,705 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; +import sinon from 'sinon'; +import { + CIRCUIT_BREAKER_OPEN_CODE, + CircuitBreaker, + CircuitBreakerOpenError, + CircuitBreakerRegistry, + CircuitBreakerState, + DEFAULT_CIRCUIT_BREAKER_CONFIG, +} from '../../../lib/telemetry/CircuitBreaker'; +import ClientContextStub from '../.stubs/ClientContextStub'; +import { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; + +describe('CircuitBreaker', () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + }); + + describe('Initial state', () => { + it('should start in CLOSED state', () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + expect(breaker.getFailureCount()).to.equal(0); + expect(breaker.getSuccessCount()).to.equal(0); + }); + + it('should use default configuration', () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + + // Verify by checking behavior with default values + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + }); + + it('should accept custom configuration', () => { + const context = new ClientContextStub(); + const customConfig = { + failureThreshold: 3, + timeout: 30000, + successThreshold: 1, + }; + const breaker = new CircuitBreaker(context, customConfig); + + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + }); + }); + + describe('execute() in CLOSED state', () => { + it('should execute operation successfully', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + const operation = sinon.stub().resolves('success'); + + const result = await breaker.execute(operation); + + expect(result).to.equal('success'); + expect(operation.calledOnce).to.be.true; + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + expect(breaker.getFailureCount()).to.equal(0); + }); + + it('should increment failure count on operation failure', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + const operation = sinon.stub().rejects(new Error('Operation failed')); + + try { + await breaker.execute(operation); + expect.fail('Should have thrown error'); + } catch (error: any) { + expect(error.message).to.equal('Operation failed'); + } + + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + expect(breaker.getFailureCount()).to.equal(1); + }); + + it('should reset failure count on success after failures', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + + // Fail twice + const failOp = sinon.stub().rejects(new Error('Failed')); + try { + await breaker.execute(failOp); + } catch {} + try { + await breaker.execute(failOp); + } catch {} + + expect(breaker.getFailureCount()).to.equal(2); + + // Then succeed + const successOp = sinon.stub().resolves('success'); + await breaker.execute(successOp); + + expect(breaker.getFailureCount()).to.equal(0); + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + }); + }); + + describe('Transition to OPEN state', () => { + it('should open after configured failure threshold (default 5)', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const breaker = new CircuitBreaker(context); + const operation = sinon.stub().rejects(new Error('Failed')); + + // Fail 5 times (default threshold) + for (let i = 0; i < 5; i++) { + try { + await breaker.execute(operation); + } catch {} + } + + expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); + expect(breaker.getFailureCount()).to.equal(5); + expect(logSpy.calledWith(LogLevel.warn, sinon.match(/Telemetry circuit breaker OPEN/))).to.be.true; + + logSpy.restore(); + }); + + it('should open after custom failure threshold', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context, { failureThreshold: 3 }); + const operation = sinon.stub().rejects(new Error('Failed')); + + // Fail 3 times + for (let i = 0; i < 3; i++) { + try { + await breaker.execute(operation); + } catch {} + } + + expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); + expect(breaker.getFailureCount()).to.equal(3); + }); + + it('should log state transition at debug level', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const breaker = new CircuitBreaker(context); + const operation = sinon.stub().rejects(new Error('Failed')); + + // Fail 5 times to open circuit + for (let i = 0; i < 5; i++) { + try { + await breaker.execute(operation); + } catch {} + } + + expect(logSpy.calledWith(LogLevel.warn, sinon.match(/Telemetry circuit breaker OPEN/))).to.be.true; + + logSpy.restore(); + }); + }); + + describe('execute() in OPEN state', () => { + it('should reject operations immediately when OPEN', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + const operation = sinon.stub().rejects(new Error('Failed')); + + // Open the circuit + for (let i = 0; i < 5; i++) { + try { + await breaker.execute(operation); + } catch {} + } + + expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); + + // Try to execute another operation + const newOperation = sinon.stub().resolves('success'); + try { + await breaker.execute(newOperation); + expect.fail('Should have thrown error'); + } catch (error: any) { + expect(error.message).to.equal('Circuit breaker OPEN'); + } + + // Operation should not have been called + expect(newOperation.called).to.be.false; + }); + + it('should stay OPEN for configured timeout (default 60s)', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + const operation = sinon.stub().rejects(new Error('Failed')); + + // Open the circuit + for (let i = 0; i < 5; i++) { + try { + await breaker.execute(operation); + } catch {} + } + + expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); + + // Advance time by 59 seconds (less than timeout) + clock.tick(59000); + + // Should still be OPEN + const newOperation = sinon.stub().resolves('success'); + try { + await breaker.execute(newOperation); + expect.fail('Should have thrown error'); + } catch (error: any) { + expect(error.message).to.equal('Circuit breaker OPEN'); + } + + expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); + }); + }); + + describe('Transition to HALF_OPEN state', () => { + it('should transition to HALF_OPEN after timeout', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const breaker = new CircuitBreaker(context); + const operation = sinon.stub().rejects(new Error('Failed')); + + // Open the circuit + for (let i = 0; i < 5; i++) { + try { + await breaker.execute(operation); + } catch {} + } + + expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); + + // Advance time past timeout (60 seconds) + clock.tick(60001); + + // Next operation should transition to HALF_OPEN + const successOperation = sinon.stub().resolves('success'); + await breaker.execute(successOperation); + + expect(logSpy.calledWith(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN')).to.be.true; + + logSpy.restore(); + }); + + it('should use custom timeout', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context, { timeout: 30000 }); // 30 seconds + const operation = sinon.stub().rejects(new Error('Failed')); + + // Open the circuit + for (let i = 0; i < 5; i++) { + try { + await breaker.execute(operation); + } catch {} + } + + // Advance time by 25 seconds (less than custom timeout) + clock.tick(25000); + + const newOperation = sinon.stub().resolves('success'); + try { + await breaker.execute(newOperation); + expect.fail('Should have thrown error'); + } catch (error: any) { + expect(error.message).to.equal('Circuit breaker OPEN'); + } + + // Advance past custom timeout + clock.tick(5001); + + // Should now transition to HALF_OPEN + const successOperation = sinon.stub().resolves('success'); + const result = await breaker.execute(successOperation); + expect(result).to.equal('success'); + expect(breaker.getState()).to.equal(CircuitBreakerState.HALF_OPEN); + }); + }); + + describe('execute() in HALF_OPEN state', () => { + async function openAndWaitForHalfOpen(breaker: CircuitBreaker): Promise { + const operation = sinon.stub().rejects(new Error('Failed')); + // Open the circuit + for (let i = 0; i < 5; i++) { + try { + await breaker.execute(operation); + } catch {} + } + // Wait for timeout + clock.tick(60001); + } + + it('should allow test requests in HALF_OPEN state', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + + await openAndWaitForHalfOpen(breaker); + + // Execute first test request + const operation = sinon.stub().resolves('success'); + const result = await breaker.execute(operation); + + expect(result).to.equal('success'); + expect(operation.calledOnce).to.be.true; + expect(breaker.getState()).to.equal(CircuitBreakerState.HALF_OPEN); + }); + + it('should close after configured successes (default 2)', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const breaker = new CircuitBreaker(context); + + await openAndWaitForHalfOpen(breaker); + + // First success + const operation1 = sinon.stub().resolves('success1'); + await breaker.execute(operation1); + expect(breaker.getState()).to.equal(CircuitBreakerState.HALF_OPEN); + expect(breaker.getSuccessCount()).to.equal(1); + + // Second success should close the circuit + const operation2 = sinon.stub().resolves('success2'); + await breaker.execute(operation2); + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + expect(breaker.getSuccessCount()).to.equal(0); // Reset after closing + expect(logSpy.calledWith(LogLevel.debug, 'Circuit breaker transitioned to CLOSED')).to.be.true; + + logSpy.restore(); + }); + + it('should close after custom success threshold', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context, { successThreshold: 3 }); + + await openAndWaitForHalfOpen(breaker); + + // Need 3 successes + for (let i = 0; i < 2; i++) { + const operation = sinon.stub().resolves(`success${i}`); + await breaker.execute(operation); + expect(breaker.getState()).to.equal(CircuitBreakerState.HALF_OPEN); + } + + // Third success should close + const operation3 = sinon.stub().resolves('success3'); + await breaker.execute(operation3); + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + }); + + it('should reopen immediately if operation fails in HALF_OPEN state', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + + await openAndWaitForHalfOpen(breaker); + + // First success moves to HALF_OPEN + const successOp = sinon.stub().resolves('success'); + await breaker.execute(successOp); + expect(breaker.getState()).to.equal(CircuitBreakerState.HALF_OPEN); + expect(breaker.getSuccessCount()).to.equal(1); + + // Any failure in HALF_OPEN immediately reopens the circuit + const failOp = sinon.stub().rejects(new Error('Failed')); + try { + await breaker.execute(failOp); + } catch {} + + expect(breaker.getSuccessCount()).to.equal(0); // Reset + expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); + }); + + it('should reopen immediately on first failure in HALF_OPEN state', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context); + + await openAndWaitForHalfOpen(breaker); + + // A single failure in HALF_OPEN reopens immediately (not after threshold) + const failOp = sinon.stub().rejects(new Error('Failed')); + try { + await breaker.execute(failOp); + } catch {} + + expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); + }); + }); + + describe('HALF_OPEN concurrent-probe invariant', () => { + it('admits only one probe when two callers race after OPEN→HALF_OPEN transition', async () => { + clock.restore(); + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context, { failureThreshold: 1, timeout: 1, successThreshold: 1 }); + + // Trip to OPEN. + await breaker.execute(() => Promise.reject(new Error('boom'))).catch(() => {}); + expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); + + // Wait past the timeout so the next execute() flips to HALF_OPEN. + await new Promise((r) => setTimeout(r, 5)); + + // Hold the probe in-flight so the second caller races against it. + let releaseProbe: (() => void) | null = null; + const probeGate = new Promise((res) => { + releaseProbe = res; + }); + + let probeRan = false; + let rejectedRan = false; + + const first = breaker.execute(async () => { + probeRan = true; + await probeGate; + }); + + const second = breaker + .execute(async () => { + rejectedRan = true; + }) + .catch((err) => err); + + const secondResult = await second; + expect(probeRan).to.be.true; + expect(rejectedRan).to.be.false; + expect(secondResult).to.be.instanceOf(CircuitBreakerOpenError); + + releaseProbe!(); + await first; + }); + + it('throws CircuitBreakerOpenError with code when OPEN', async () => { + const context = new ClientContextStub(); + const breaker = new CircuitBreaker(context, { failureThreshold: 1, timeout: 60_000, successThreshold: 1 }); + + await breaker.execute(() => Promise.reject(new Error('boom'))).catch(() => {}); + + let caught: any; + try { + await breaker.execute(async () => 1); + } catch (err) { + caught = err; + } + expect(caught).to.be.instanceOf(CircuitBreakerOpenError); + expect(caught.code).to.equal(CIRCUIT_BREAKER_OPEN_CODE); + }); + }); + + describe('State transitions logging', () => { + it('should log all state transitions at debug level', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const breaker = new CircuitBreaker(context); + + // Open circuit + const failOp = sinon.stub().rejects(new Error('Failed')); + for (let i = 0; i < 5; i++) { + try { + await breaker.execute(failOp); + } catch {} + } + + expect(logSpy.calledWith(LogLevel.warn, sinon.match(/Telemetry circuit breaker OPEN/))).to.be.true; + + // Wait for timeout + clock.tick(60001); + + // Transition to HALF_OPEN + const successOp = sinon.stub().resolves('success'); + await breaker.execute(successOp); + + expect(logSpy.calledWith(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN')).to.be.true; + + // Close circuit + await breaker.execute(successOp); + + expect(logSpy.calledWith(LogLevel.debug, 'Circuit breaker transitioned to CLOSED')).to.be.true; + + // Verify no error-level logging; warn is expected for OPEN transitions + expect(logSpy.neverCalledWith(LogLevel.error, sinon.match.any)).to.be.true; + expect(logSpy.neverCalledWith(LogLevel.info, sinon.match.any)).to.be.true; + + logSpy.restore(); + }); + }); +}); + +describe('CircuitBreakerRegistry', () => { + describe('getCircuitBreaker', () => { + it('should create a new circuit breaker for a host', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const host = 'test-host.databricks.com'; + + const breaker = registry.getCircuitBreaker(host); + + expect(breaker).to.not.be.undefined; + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + }); + + it('should return the same circuit breaker for the same host', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const host = 'test-host.databricks.com'; + + const breaker1 = registry.getCircuitBreaker(host); + const breaker2 = registry.getCircuitBreaker(host); + + expect(breaker1).to.equal(breaker2); // Same instance + }); + + it('should create separate circuit breakers for different hosts', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const host1 = 'host1.databricks.com'; + const host2 = 'host2.databricks.com'; + + const breaker1 = registry.getCircuitBreaker(host1); + const breaker2 = registry.getCircuitBreaker(host2); + + expect(breaker1).to.not.equal(breaker2); + }); + + it('should accept custom configuration', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const host = 'test-host.databricks.com'; + const customConfig = { failureThreshold: 3 }; + + const breaker = registry.getCircuitBreaker(host, customConfig); + + expect(breaker).to.not.be.undefined; + expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED); + }); + + it('should log circuit breaker creation at debug level', () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const registry = new CircuitBreakerRegistry(context); + const host = 'test-host.databricks.com'; + + registry.getCircuitBreaker(host); + + expect(logSpy.calledWith(LogLevel.debug, `Created circuit breaker for host: ${host}`)).to.be.true; + + logSpy.restore(); + }); + }); + + describe('Per-host isolation', () => { + it('should isolate failures between hosts', async () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const host1 = 'host1.databricks.com'; + const host2 = 'host2.databricks.com'; + + const breaker1 = registry.getCircuitBreaker(host1); + const breaker2 = registry.getCircuitBreaker(host2); + + // Fail breaker1 5 times to open it + const failOp = sinon.stub().rejects(new Error('Failed')); + for (let i = 0; i < 5; i++) { + try { + await breaker1.execute(failOp); + } catch {} + } + + expect(breaker1.getState()).to.equal(CircuitBreakerState.OPEN); + expect(breaker2.getState()).to.equal(CircuitBreakerState.CLOSED); + + // breaker2 should still work + const successOp = sinon.stub().resolves('success'); + const result = await breaker2.execute(successOp); + expect(result).to.equal('success'); + expect(breaker2.getState()).to.equal(CircuitBreakerState.CLOSED); + }); + + it('should track separate failure counts per host', async () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const host1 = 'host1.databricks.com'; + const host2 = 'host2.databricks.com'; + + const breaker1 = registry.getCircuitBreaker(host1); + const breaker2 = registry.getCircuitBreaker(host2); + + // Fail breaker1 twice + const failOp = sinon.stub().rejects(new Error('Failed')); + for (let i = 0; i < 2; i++) { + try { + await breaker1.execute(failOp); + } catch {} + } + + // Fail breaker2 three times + for (let i = 0; i < 3; i++) { + try { + await breaker2.execute(failOp); + } catch {} + } + + expect(breaker1.getFailureCount()).to.equal(2); + expect(breaker2.getFailureCount()).to.equal(3); + }); + }); + + describe('getAllBreakers', () => { + it('should return all registered circuit breakers', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const host1 = 'host1.databricks.com'; + const host2 = 'host2.databricks.com'; + + const breaker1 = registry.getCircuitBreaker(host1); + const breaker2 = registry.getCircuitBreaker(host2); + + const allBreakers = registry.getAllBreakers(); + + expect(allBreakers.size).to.equal(2); + expect(allBreakers.get(host1)).to.equal(breaker1); + expect(allBreakers.get(host2)).to.equal(breaker2); + }); + + it('should return empty map if no breakers registered', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + + const allBreakers = registry.getAllBreakers(); + + expect(allBreakers.size).to.equal(0); + }); + }); + + describe('removeCircuitBreaker', () => { + it('should remove circuit breaker for host', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const host = 'test-host.databricks.com'; + + registry.getCircuitBreaker(host); + expect(registry.getAllBreakers().size).to.equal(1); + + registry.removeCircuitBreaker(host); + expect(registry.getAllBreakers().size).to.equal(0); + }); + + it('should log circuit breaker removal at debug level', () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const registry = new CircuitBreakerRegistry(context); + const host = 'test-host.databricks.com'; + + registry.getCircuitBreaker(host); + registry.removeCircuitBreaker(host); + + expect(logSpy.calledWith(LogLevel.debug, `Removed circuit breaker for host: ${host}`)).to.be.true; + + logSpy.restore(); + }); + + it('should handle removing non-existent host gracefully', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + + expect(() => registry.removeCircuitBreaker('non-existent.com')).to.not.throw(); + }); + }); + + describe('clear', () => { + it('should remove all circuit breakers', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + + registry.getCircuitBreaker('host1.databricks.com'); + registry.getCircuitBreaker('host2.databricks.com'); + registry.getCircuitBreaker('host3.databricks.com'); + + expect(registry.getAllBreakers().size).to.equal(3); + + registry.clear(); + + expect(registry.getAllBreakers().size).to.equal(0); + }); + }); +}); diff --git a/tests/unit/telemetry/DatabricksTelemetryExporter.test.ts b/tests/unit/telemetry/DatabricksTelemetryExporter.test.ts new file mode 100644 index 00000000..fb347bf6 --- /dev/null +++ b/tests/unit/telemetry/DatabricksTelemetryExporter.test.ts @@ -0,0 +1,498 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; +import sinon from 'sinon'; +import DatabricksTelemetryExporter from '../../../lib/telemetry/DatabricksTelemetryExporter'; +import { CircuitBreakerRegistry } from '../../../lib/telemetry/CircuitBreaker'; +import { TelemetryMetric } from '../../../lib/telemetry/types'; +import ClientContextStub from '../.stubs/ClientContextStub'; +import { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; +import IAuthentication from '../../../lib/connection/contracts/IAuthentication'; + +const fakeAuthProvider: IAuthentication = { + authenticate: async () => ({ Authorization: 'Bearer test-token' }), +}; + +function makeMetric(overrides: Partial = {}): TelemetryMetric { + return { + metricType: 'connection', + timestamp: Date.now(), + sessionId: 'session-1', + ...overrides, + }; +} + +function makeOkResponse() { + return Promise.resolve({ ok: true, status: 200, statusText: 'OK', text: () => Promise.resolve('') }); +} + +function makeErrorResponse(status: number, statusText: string) { + return Promise.resolve({ ok: false, status, statusText, text: () => Promise.resolve('') }); +} + +describe('DatabricksTelemetryExporter', () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + sinon.restore(); + }); + + describe('export() - basic', () => { + it('should return immediately for empty metrics array', async () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([]); + + expect(sendRequestStub.called).to.be.false; + }); + + it('should call sendRequest with correct endpoint for authenticated export', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + + expect(sendRequestStub.calledOnce).to.be.true; + const url = sendRequestStub.firstCall.args[0] as string; + expect(url).to.include('telemetry-ext'); + expect(url).to.include('https://'); + }); + + it('should call sendRequest with unauthenticated endpoint when configured', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: false } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + + const url = sendRequestStub.firstCall.args[0] as string; + expect(url).to.include('telemetry-unauth'); + }); + + it('should preserve host protocol if already set', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'https://host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + + const url = sendRequestStub.firstCall.args[0] as string; + expect(url).to.equal('https://host.example.com/telemetry-ext'); + }); + + it('should never throw even when sendRequest fails', async () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + sinon.stub(exporter as any, 'sendRequest').rejects(new Error('network error')); + + let threw = false; + try { + await exporter.export([makeMetric()]); + } catch { + threw = true; + } + expect(threw).to.be.false; + }); + }); + + describe('export() - retry logic', () => { + it('should retry on retryable HTTP errors (503)', async () => { + const context = new ClientContextStub({ telemetryMaxRetries: 2 } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + // Fail twice with 503, then succeed + const sendRequestStub = sinon + .stub(exporter as any, 'sendRequest') + .onFirstCall() + .returns(makeErrorResponse(503, 'Service Unavailable')) + .onSecondCall() + .returns(makeErrorResponse(503, 'Service Unavailable')) + .onThirdCall() + .returns(makeOkResponse()); + + // Advance fake timers automatically for sleep calls + const exportPromise = exporter.export([makeMetric()]); + await clock.runAllAsync(); + await exportPromise; + + expect(sendRequestStub.callCount).to.equal(3); + }); + + it('should not retry on terminal HTTP errors (400)', async () => { + const context = new ClientContextStub({ telemetryMaxRetries: 3 } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeErrorResponse(400, 'Bad Request')); + + await exporter.export([makeMetric()]); + + // Only one call — no retry on terminal error + expect(sendRequestStub.callCount).to.equal(1); + }); + + it('should not retry on terminal HTTP errors (401)', async () => { + const context = new ClientContextStub({ telemetryMaxRetries: 3 } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon + .stub(exporter as any, 'sendRequest') + .returns(makeErrorResponse(401, 'Unauthorized')); + + await exporter.export([makeMetric()]); + + expect(sendRequestStub.callCount).to.equal(1); + }); + + it('should give up after maxRetries are exhausted', async () => { + const context = new ClientContextStub({ telemetryMaxRetries: 2 } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon + .stub(exporter as any, 'sendRequest') + .returns(makeErrorResponse(503, 'Service Unavailable')); + + const exportPromise = exporter.export([makeMetric()]); + await clock.runAllAsync(); + await exportPromise; + + // 1 initial + 2 retries = 3 total calls + expect(sendRequestStub.callCount).to.equal(3); + }); + }); + + describe('export() - circuit breaker integration', () => { + it('should drop telemetry when circuit breaker is OPEN', async () => { + // maxRetries: 0 avoids sleep delays; failureThreshold: 1 trips the breaker on first failure + const context = new ClientContextStub({ telemetryMaxRetries: 0 } as any); + const registry = new CircuitBreakerRegistry(context); + registry.getCircuitBreaker('host.example.com', { failureThreshold: 1 }); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon + .stub(exporter as any, 'sendRequest') + .returns(makeErrorResponse(503, 'Service Unavailable')); + + // Trip the circuit breaker (1 non-retryable-pathway failure is enough) + await exporter.export([makeMetric()]); + sendRequestStub.reset(); + + // Now circuit is OPEN, export should be dropped without calling sendRequest + await exporter.export([makeMetric()]); + + expect(sendRequestStub.called).to.be.false; + }); + + it('should log at debug level when circuit is OPEN', async () => { + const context = new ClientContextStub({ telemetryMaxRetries: 0 } as any); + const logSpy = sinon.spy((context as any).logger, 'log'); + const registry = new CircuitBreakerRegistry(context); + registry.getCircuitBreaker('host.example.com', { failureThreshold: 1 }); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + sinon.stub(exporter as any, 'sendRequest').returns(makeErrorResponse(503, 'Service Unavailable')); + + await exporter.export([makeMetric()]); + logSpy.resetHistory(); + + await exporter.export([makeMetric()]); + + expect(logSpy.calledWith(LogLevel.debug, sinon.match(/Circuit breaker OPEN/))).to.be.true; + }); + }); + + describe('export() - payload format', () => { + it('should send POST request with JSON content-type', async () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + + const options = sendRequestStub.firstCall.args[1] as any; + expect(options.method).to.equal('POST'); + expect(options.headers['Content-Type']).to.equal('application/json'); + }); + + it('should include protoLogs in payload body', async () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric(), makeMetric()]); + + const body = JSON.parse((sendRequestStub.firstCall.args[1] as any).body); + expect(body.protoLogs).to.be.an('array').with.length(2); + expect(body.items).to.be.an('array').that.is.empty; + expect(body.uploadTime).to.be.a('number'); + }); + }); + + describe('logging level compliance', () => { + it('should only log at debug level', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy((context as any).logger, 'log'); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + sinon.stub(exporter as any, 'sendRequest').rejects(new Error('something went wrong')); + + const exportPromise = exporter.export([makeMetric()]); + await clock.runAllAsync(); + await exportPromise; + + expect(logSpy.neverCalledWith(LogLevel.error, sinon.match.any)).to.be.true; + // Note: circuit breaker logs at warn level when transitioning to OPEN, which is expected + }); + }); + + describe('Authorization header flow', () => { + it('sends Authorization header returned by the auth provider on authenticated export', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + + const init = sendRequestStub.firstCall.args[1] as any; + expect(init.headers.Authorization).to.equal('Bearer test-token'); + }); + + it('drops the batch when authenticated export is requested but auth returns no header', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: true, telemetryMaxRetries: 0 } as any); + const registry = new CircuitBreakerRegistry(context); + const emptyAuth = { authenticate: async () => ({}) }; + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, emptyAuth as any); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + + expect(sendRequestStub.called).to.be.false; + }); + + it('warns exactly once across consecutive auth-missing drops', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: true, telemetryMaxRetries: 0 } as any); + const logSpy = sinon.spy((context as any).logger, 'log'); + const registry = new CircuitBreakerRegistry(context); + const emptyAuth = { authenticate: async () => ({}) }; + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, emptyAuth as any); + sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + await exporter.export([makeMetric()]); + await exporter.export([makeMetric()]); + + const warnCalls = logSpy + .getCalls() + .filter((c) => c.args[0] === LogLevel.warn && /Authorization/.test(String(c.args[1]))); + expect(warnCalls.length).to.equal(1); + }); + + it('re-arms the auth-missing warn after a successful export', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: true, telemetryMaxRetries: 0 } as any); + const logSpy = sinon.spy((context as any).logger, 'log'); + const registry = new CircuitBreakerRegistry(context); + let headers: Record = {}; + const toggleAuth = { authenticate: async () => headers }; + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, toggleAuth as any); + sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); // warns once + headers = { Authorization: 'Bearer recovered' }; + await exporter.export([makeMetric()]); // success → re-arms + headers = {}; + await exporter.export([makeMetric()]); // warns again + + const warnCalls = logSpy + .getCalls() + .filter((c) => c.args[0] === LogLevel.warn && /Authorization/.test(String(c.args[1]))); + expect(warnCalls.length).to.equal(2); + }); + }); + + describe('unauthenticated endpoint privacy', () => { + it('omits workspace_id, session_id, statement_id from unauth payload', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: false } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([ + makeMetric({ + metricType: 'connection', + sessionId: 'session-xyz', + statementId: 'stmt-abc', + workspaceId: 'ws-123', + } as any), + ]); + + const body = JSON.parse((sendRequestStub.firstCall.args[1] as any).body); + const log = JSON.parse(body.protoLogs[0]); + expect(log.workspace_id).to.be.undefined; + expect(log.entry.sql_driver_log.session_id).to.be.undefined; + expect(log.entry.sql_driver_log.sql_statement_id).to.be.undefined; + }); + + it('omits system_configuration from unauth payload', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: false } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([ + makeMetric({ + metricType: 'connection', + driverConfig: { + driverVersion: '1.x', + driverName: 'nodejs-sql-driver', + nodeVersion: '20.0', + platform: 'linux', + osVersion: '5.0', + osArch: 'x64', + runtimeVendor: 'v8', + localeName: 'en_US', + charSetEncoding: 'UTF-8', + processName: '/home/alice/worker.js', + }, + } as any), + ]); + + const body = JSON.parse((sendRequestStub.firstCall.args[1] as any).body); + const log = JSON.parse(body.protoLogs[0]); + expect(log.entry.sql_driver_log.system_configuration).to.be.undefined; + }); + + it('strips userAgentEntry from User-Agent on unauth path', async () => { + const context = new ClientContextStub({ + telemetryAuthenticatedExport: false, + userAgentEntry: 'MyTenantApp/1.2.3', + } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + + const ua = (sendRequestStub.firstCall.args[1] as any).headers['User-Agent']; + expect(ua).to.not.include('MyTenantApp'); + }); + + it('blanks stack_trace on unauth error metrics', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: false } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([ + makeMetric({ + metricType: 'error', + errorName: 'SomeError', + errorMessage: 'Bearer leaked-token in the message', + errorStack: 'Error: leak\n at fn (dapi0123456789abcdef)', + } as any), + ]); + + const body = JSON.parse((sendRequestStub.firstCall.args[1] as any).body); + const log = JSON.parse(body.protoLogs[0]); + expect(log.entry.sql_driver_log.error_info.stack_trace).to.equal(''); + expect(log.entry.sql_driver_log.error_info.error_name).to.equal('SomeError'); + }); + }); + + describe('errorStack flow (authenticated)', () => { + it('redacts Bearer tokens in stack_trace before export', async () => { + const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([ + makeMetric({ + metricType: 'error', + errorName: 'AuthError', + errorMessage: 'ignored because errorStack is preferred', + errorStack: 'Error: boom\n at Bearer leaked-bearer-token', + } as any), + ]); + + const body = JSON.parse((sendRequestStub.firstCall.args[1] as any).body); + const log = JSON.parse(body.protoLogs[0]); + const stack = log.entry.sql_driver_log.stack_trace ?? log.entry.sql_driver_log.error_info?.stack_trace; + expect(stack).to.include(''); + expect(stack).to.not.include('leaked-bearer-token'); + }); + }); + + describe('host validation', () => { + it('drops the batch when host fails validation (malformed)', async () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, '//attacker.com', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + + expect(sendRequestStub.called).to.be.false; + }); + + it('drops the batch when host is loopback', async () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, '127.0.0.1', registry, fakeAuthProvider); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); + + await exporter.export([makeMetric()]); + + expect(sendRequestStub.called).to.be.false; + }); + }); + + describe('dispose()', () => { + it('removes the per-host circuit breaker from the registry', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + + expect(registry.getAllBreakers().has('host.example.com')).to.be.true; + + exporter.dispose(); + + expect(registry.getAllBreakers().has('host.example.com')).to.be.false; + }); + + it('is idempotent', () => { + const context = new ClientContextStub(); + const registry = new CircuitBreakerRegistry(context); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + + exporter.dispose(); + expect(() => exporter.dispose()).to.not.throw(); + }); + }); +}); diff --git a/tests/unit/telemetry/ExceptionClassifier.test.ts b/tests/unit/telemetry/ExceptionClassifier.test.ts index 09c7b664..e29bb044 100644 --- a/tests/unit/telemetry/ExceptionClassifier.test.ts +++ b/tests/unit/telemetry/ExceptionClassifier.test.ts @@ -169,6 +169,32 @@ describe('ExceptionClassifier', () => { }); }); + describe('Network connection errors', () => { + it('should identify ECONNREFUSED as retryable', () => { + const error = new Error('connect ECONNREFUSED 127.0.0.1:443'); + (error as any).code = 'ECONNREFUSED'; + expect(ExceptionClassifier.isRetryable(error)).to.be.true; + }); + + it('should identify ENOTFOUND as retryable', () => { + const error = new Error('getaddrinfo ENOTFOUND host.example.com'); + (error as any).code = 'ENOTFOUND'; + expect(ExceptionClassifier.isRetryable(error)).to.be.true; + }); + + it('should identify EHOSTUNREACH as retryable', () => { + const error = new Error('connect EHOSTUNREACH'); + (error as any).code = 'EHOSTUNREACH'; + expect(ExceptionClassifier.isRetryable(error)).to.be.true; + }); + + it('should identify ECONNRESET as retryable', () => { + const error = new Error('read ECONNRESET'); + (error as any).code = 'ECONNRESET'; + expect(ExceptionClassifier.isRetryable(error)).to.be.true; + }); + }); + describe('HTTP 429 Too Many Requests', () => { it('should identify 429 status code as retryable', () => { const error = new Error('Too Many Requests'); diff --git a/tests/unit/telemetry/FeatureFlagCache.test.ts b/tests/unit/telemetry/FeatureFlagCache.test.ts new file mode 100644 index 00000000..ed7bc79c --- /dev/null +++ b/tests/unit/telemetry/FeatureFlagCache.test.ts @@ -0,0 +1,320 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; +import sinon from 'sinon'; +import FeatureFlagCache, { FeatureFlagContext } from '../../../lib/telemetry/FeatureFlagCache'; +import ClientContextStub from '../.stubs/ClientContextStub'; +import { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; + +describe('FeatureFlagCache', () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + }); + + describe('getOrCreateContext', () => { + it('should create a new context for a host', () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + const ctx = cache.getOrCreateContext(host); + + expect(ctx).to.not.be.undefined; + expect(ctx.refCount).to.equal(1); + expect(ctx.cacheDuration).to.equal(15 * 60 * 1000); // 15 minutes + expect(ctx.telemetryEnabled).to.be.undefined; + expect(ctx.lastFetched).to.be.undefined; + }); + + it('should increment reference count on subsequent calls', () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + const ctx1 = cache.getOrCreateContext(host); + expect(ctx1.refCount).to.equal(1); + + const ctx2 = cache.getOrCreateContext(host); + expect(ctx2.refCount).to.equal(2); + expect(ctx1).to.equal(ctx2); // Same object reference + }); + + it('should manage multiple hosts independently', () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host1 = 'host1.databricks.com'; + const host2 = 'host2.databricks.com'; + + const ctx1 = cache.getOrCreateContext(host1); + const ctx2 = cache.getOrCreateContext(host2); + + expect(ctx1).to.not.equal(ctx2); + expect(ctx1.refCount).to.equal(1); + expect(ctx2.refCount).to.equal(1); + }); + }); + + describe('releaseContext', () => { + it('should decrement reference count', () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + cache.getOrCreateContext(host); + cache.getOrCreateContext(host); + const ctx = cache.getOrCreateContext(host); + expect(ctx.refCount).to.equal(3); + + cache.releaseContext(host); + expect(ctx.refCount).to.equal(2); + }); + + it('should remove context when refCount reaches zero', () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + cache.getOrCreateContext(host); + cache.releaseContext(host); + + // After release, getting context again should create a new one with refCount=1 + const ctx = cache.getOrCreateContext(host); + expect(ctx.refCount).to.equal(1); + }); + + it('should handle releasing non-existent host gracefully', () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + + // Should not throw + expect(() => cache.releaseContext('non-existent-host.databricks.com')).to.not.throw(); + }); + + it('should handle releasing host with refCount already at zero', () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + cache.getOrCreateContext(host); + cache.releaseContext(host); + + // Second release should not throw + expect(() => cache.releaseContext(host)).to.not.throw(); + }); + }); + + describe('isTelemetryEnabled', () => { + it('should return false for non-existent host', async () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + + const enabled = await cache.isTelemetryEnabled('non-existent-host.databricks.com'); + expect(enabled).to.be.false; + }); + + it('should fetch feature flag when context exists but not fetched', async () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + // Stub the private fetchFeatureFlag method + const fetchStub = sinon.stub(cache as any, 'fetchFeatureFlag').resolves(true); + + cache.getOrCreateContext(host); + const enabled = await cache.isTelemetryEnabled(host); + + expect(fetchStub.calledOnce).to.be.true; + expect(fetchStub.calledWith(host)).to.be.true; + expect(enabled).to.be.true; + + fetchStub.restore(); + }); + + it('should use cached value if not expired', async () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + const fetchStub = sinon.stub(cache as any, 'fetchFeatureFlag').resolves(true); + + cache.getOrCreateContext(host); + + // First call - should fetch + await cache.isTelemetryEnabled(host); + expect(fetchStub.calledOnce).to.be.true; + + // Advance time by 10 minutes (less than 15 minute TTL) + clock.tick(10 * 60 * 1000); + + // Second call - should use cached value + const enabled = await cache.isTelemetryEnabled(host); + expect(fetchStub.calledOnce).to.be.true; // Still only called once + expect(enabled).to.be.true; + + fetchStub.restore(); + }); + + it('should refetch when cache expires after 15 minutes', async () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + const fetchStub = sinon.stub(cache as any, 'fetchFeatureFlag'); + fetchStub.onFirstCall().resolves(true); + fetchStub.onSecondCall().resolves(false); + + cache.getOrCreateContext(host); + + // First call - should fetch + const enabled1 = await cache.isTelemetryEnabled(host); + expect(enabled1).to.be.true; + expect(fetchStub.calledOnce).to.be.true; + + // Advance time by 16 minutes (more than 15 minute TTL) + clock.tick(16 * 60 * 1000); + + // Second call - should refetch due to expiration + const enabled2 = await cache.isTelemetryEnabled(host); + expect(enabled2).to.be.false; + expect(fetchStub.calledTwice).to.be.true; + + fetchStub.restore(); + }); + + it('should log errors at debug level and return false on fetch failure', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + const fetchStub = sinon.stub(cache as any, 'fetchFeatureFlag').rejects(new Error('Network error')); + + cache.getOrCreateContext(host); + const enabled = await cache.isTelemetryEnabled(host); + + expect(enabled).to.be.false; + expect(logSpy.calledWith(LogLevel.debug, 'Error fetching feature flag: Network error')).to.be.true; + + fetchStub.restore(); + logSpy.restore(); + }); + + it('should not propagate exceptions from fetchFeatureFlag', async () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + const fetchStub = sinon.stub(cache as any, 'fetchFeatureFlag').rejects(new Error('Network error')); + + cache.getOrCreateContext(host); + + // Should not throw + const enabled = await cache.isTelemetryEnabled(host); + expect(enabled).to.equal(false); + + fetchStub.restore(); + }); + + it('should return false when telemetryEnabled is undefined', async () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + const fetchStub = sinon.stub(cache as any, 'fetchFeatureFlag').resolves(undefined); + + cache.getOrCreateContext(host); + const enabled = await cache.isTelemetryEnabled(host); + + expect(enabled).to.be.false; + + fetchStub.restore(); + }); + }); + + describe('fetchFeatureFlag', () => { + it('should return false as placeholder implementation', async () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + // Access private method through any cast + const result = await (cache as any).fetchFeatureFlag(host); + expect(result).to.be.false; + }); + }); + + describe('Integration scenarios', () => { + it('should handle multiple connections to same host with caching', async () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host = 'test-host.databricks.com'; + + const fetchStub = sinon.stub(cache as any, 'fetchFeatureFlag').resolves(true); + + // Simulate 3 connections to same host + cache.getOrCreateContext(host); + cache.getOrCreateContext(host); + cache.getOrCreateContext(host); + + // All connections check telemetry - should only fetch once + await cache.isTelemetryEnabled(host); + await cache.isTelemetryEnabled(host); + await cache.isTelemetryEnabled(host); + + expect(fetchStub.calledOnce).to.be.true; + + // Close all connections + cache.releaseContext(host); + cache.releaseContext(host); + cache.releaseContext(host); + + // Context should be removed + const enabled = await cache.isTelemetryEnabled(host); + expect(enabled).to.be.false; // No context, returns false + + fetchStub.restore(); + }); + + it('should maintain separate state for different hosts', async () => { + const context = new ClientContextStub(); + const cache = new FeatureFlagCache(context); + const host1 = 'host1.databricks.com'; + const host2 = 'host2.databricks.com'; + + const fetchStub = sinon.stub(cache as any, 'fetchFeatureFlag'); + fetchStub.withArgs(host1).resolves(true); + fetchStub.withArgs(host2).resolves(false); + + cache.getOrCreateContext(host1); + cache.getOrCreateContext(host2); + + const enabled1 = await cache.isTelemetryEnabled(host1); + const enabled2 = await cache.isTelemetryEnabled(host2); + + expect(enabled1).to.be.true; + expect(enabled2).to.be.false; + + fetchStub.restore(); + }); + }); +}); diff --git a/tests/unit/telemetry/MetricsAggregator.test.ts b/tests/unit/telemetry/MetricsAggregator.test.ts new file mode 100644 index 00000000..e3d1bb01 --- /dev/null +++ b/tests/unit/telemetry/MetricsAggregator.test.ts @@ -0,0 +1,332 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; +import sinon from 'sinon'; +import MetricsAggregator from '../../../lib/telemetry/MetricsAggregator'; +import DatabricksTelemetryExporter from '../../../lib/telemetry/DatabricksTelemetryExporter'; +import { TelemetryEvent, TelemetryEventType } from '../../../lib/telemetry/types'; +import ClientContextStub from '../.stubs/ClientContextStub'; +import { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; + +function makeExporterStub(): sinon.SinonStubbedInstance { + return sinon.createStubInstance(DatabricksTelemetryExporter); +} + +function connectionEvent(overrides: Partial = {}): TelemetryEvent { + return { + eventType: TelemetryEventType.CONNECTION_OPEN, + timestamp: Date.now(), + sessionId: 'session-1', + workspaceId: 'workspace-1', + driverConfig: {} as any, + ...overrides, + }; +} + +function statementEvent(type: TelemetryEventType, overrides: Partial = {}): TelemetryEvent { + return { + eventType: type, + timestamp: Date.now(), + sessionId: 'session-1', + statementId: 'stmt-1', + ...overrides, + }; +} + +describe('MetricsAggregator', () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + }); + + describe('processEvent() - connection events', () => { + it('should emit connection events immediately without waiting for completeStatement', () => { + const context = new ClientContextStub(); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + aggregator.processEvent(connectionEvent()); + + // flush happens when batchSize reached or timer fires — force it + aggregator.flush(); + + expect(exporter.export.calledOnce).to.be.true; + const metrics = exporter.export.firstCall.args[0]; + expect(metrics[0].metricType).to.equal('connection'); + }); + }); + + describe('processEvent() - statement events', () => { + it('should buffer statement events until completeStatement is called', () => { + const context = new ClientContextStub(); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + aggregator.processEvent(statementEvent(TelemetryEventType.STATEMENT_START)); + aggregator.processEvent(statementEvent(TelemetryEventType.STATEMENT_COMPLETE, { latencyMs: 100 })); + + // Not yet flushed + expect(exporter.export.called).to.be.false; + + aggregator.completeStatement('stmt-1'); + aggregator.flush(); + + expect(exporter.export.calledOnce).to.be.true; + const metrics = exporter.export.firstCall.args[0]; + expect(metrics[0].metricType).to.equal('statement'); + expect(metrics[0].statementId).to.equal('stmt-1'); + }); + + it('should track chunk events and accumulate totals', () => { + const context = new ClientContextStub(); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + aggregator.processEvent(statementEvent(TelemetryEventType.STATEMENT_START)); + aggregator.processEvent(statementEvent(TelemetryEventType.CLOUDFETCH_CHUNK, { bytes: 100, compressed: true })); + aggregator.processEvent(statementEvent(TelemetryEventType.CLOUDFETCH_CHUNK, { bytes: 200, compressed: true })); + + aggregator.completeStatement('stmt-1'); + aggregator.flush(); + + const metrics = exporter.export.firstCall.args[0]; + const stmtMetric = metrics[0]; + expect(stmtMetric.chunkCount).to.equal(2); + expect(stmtMetric.bytesDownloaded).to.equal(300); + }); + + it('should do nothing for completeStatement with unknown statementId', () => { + const context = new ClientContextStub(); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + expect(() => aggregator.completeStatement('unknown-stmt')).to.not.throw(); + }); + }); + + describe('processEvent() - error events', () => { + it('should flush immediately for terminal errors', () => { + const context = new ClientContextStub(); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + const errEvent: TelemetryEvent = { + eventType: TelemetryEventType.ERROR, + timestamp: Date.now(), + sessionId: 'session-1', + errorName: 'AuthError', + errorMessage: 'auth failed', + isTerminal: true, + }; + + aggregator.processEvent(errEvent); + + // Terminal error should trigger an immediate flush + expect(exporter.export.called).to.be.true; + const metrics = exporter.export.firstCall.args[0]; + expect(metrics[0].metricType).to.equal('error'); + }); + + it('should buffer retryable errors until statement completes', () => { + const context = new ClientContextStub(); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + const retryableErr: TelemetryEvent = { + eventType: TelemetryEventType.ERROR, + timestamp: Date.now(), + sessionId: 'session-1', + statementId: 'stmt-1', + errorName: 'NetworkError', + errorMessage: 'timeout', + isTerminal: false, + }; + + aggregator.processEvent(retryableErr); + // Not flushed yet + expect(exporter.export.called).to.be.false; + + aggregator.completeStatement('stmt-1'); + aggregator.flush(); + + expect(exporter.export.called).to.be.true; + }); + }); + + describe('flush() - batch size trigger', () => { + it('should flush when batchSize is reached', () => { + const context = new ClientContextStub({ telemetryBatchSize: 3 } as any); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + // Add 2 connection events — no flush yet + aggregator.processEvent(connectionEvent({ sessionId: 's1' })); + aggregator.processEvent(connectionEvent({ sessionId: 's2' })); + expect(exporter.export.called).to.be.false; + + // 3rd event reaches batchSize + aggregator.processEvent(connectionEvent({ sessionId: 's3' })); + expect(exporter.export.calledOnce).to.be.true; + }); + }); + + describe('flush() - periodic timer', () => { + it('should flush periodically based on flushIntervalMs', () => { + const context = new ClientContextStub({ telemetryFlushIntervalMs: 5000 } as any); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + aggregator.processEvent(connectionEvent()); + expect(exporter.export.called).to.be.false; + + clock.tick(5000); + + expect(exporter.export.calledOnce).to.be.true; + }); + + it('should not flush if there are no pending metrics', () => { + const context = new ClientContextStub({ telemetryFlushIntervalMs: 5000 } as any); + const exporter = makeExporterStub(); + new MetricsAggregator(context, exporter as any); + + clock.tick(5000); + + expect(exporter.export.called).to.be.false; + }); + }); + + describe('maxPendingMetrics bound', () => { + it('should drop oldest metrics when buffer exceeds maxPendingMetrics', () => { + const context = new ClientContextStub({ telemetryMaxPendingMetrics: 3, telemetryBatchSize: 1000 } as any); + const logSpy = sinon.spy((context as any).logger, 'log'); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + // Add 5 events — should be capped at 3 + for (let i = 0; i < 5; i++) { + aggregator.processEvent(connectionEvent({ sessionId: `s${i}` })); + } + + aggregator.flush(); + + const metrics = exporter.export.firstCall.args[0]; + expect(metrics.length).to.equal(3); + expect(logSpy.calledWith(LogLevel.debug, sinon.match(/Dropped/))).to.be.true; + + logSpy.restore(); + }); + }); + + describe('close()', () => { + it('should flush remaining metrics on close', () => { + const context = new ClientContextStub(); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + aggregator.processEvent(connectionEvent()); + aggregator.close(); + + expect(exporter.export.called).to.be.true; + }); + + it('should stop the flush timer on close', () => { + const context = new ClientContextStub({ telemetryFlushIntervalMs: 5000 } as any); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + aggregator.close(); + exporter.export.reset(); + + // Advance time — timer should no longer fire + clock.tick(10000); + aggregator.processEvent(connectionEvent()); + // Timer stopped, so no auto-flush + expect(exporter.export.called).to.be.false; + }); + + it('should complete any in-progress statements on close', () => { + const context = new ClientContextStub(); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + aggregator.processEvent(statementEvent(TelemetryEventType.STATEMENT_START)); + aggregator.close(); + + expect(exporter.export.called).to.be.true; + const metrics = exporter.export.firstCall.args[0]; + expect(metrics[0].metricType).to.equal('statement'); + }); + + it('awaits in-flight export before resolving — prevents process.exit truncation', async () => { + clock.restore(); + const context = new ClientContextStub(); + let resolveExport!: () => void; + const pendingExport = new Promise((r) => { + resolveExport = r; + }); + const exporter: any = { export: sinon.stub().returns(pendingExport) }; + const aggregator = new MetricsAggregator(context, exporter); + + aggregator.processEvent(connectionEvent()); + + const done = aggregator.close(); + expect(done).to.be.an.instanceof(Promise); + + let resolved = false; + done.then(() => { + resolved = true; + }); + await Promise.resolve(); + await Promise.resolve(); + expect(resolved, 'close() should wait for exporter promise before resolving').to.be.false; + + resolveExport(); + await done; + expect(resolved).to.be.true; + }); + + it('does not resurrect the flush timer after close', async () => { + clock.restore(); + const context = new ClientContextStub({ telemetryBatchSize: 1 } as any); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + aggregator.processEvent(statementEvent(TelemetryEventType.STATEMENT_START)); + await aggregator.close(); + + expect((aggregator as any).flushTimer, 'flushTimer should be null after close').to.equal(null); + expect((aggregator as any).closed).to.be.true; + }); + }); + + describe('exception swallowing', () => { + it('should never throw from processEvent', () => { + const context = new ClientContextStub(); + const exporter = makeExporterStub(); + const aggregator = new MetricsAggregator(context, exporter as any); + + expect(() => + aggregator.processEvent({ eventType: 'unknown.event' as any, timestamp: Date.now() }), + ).to.not.throw(); + }); + }); +}); diff --git a/tests/unit/telemetry/TelemetryEventEmitter.test.ts b/tests/unit/telemetry/TelemetryEventEmitter.test.ts new file mode 100644 index 00000000..6fc29107 --- /dev/null +++ b/tests/unit/telemetry/TelemetryEventEmitter.test.ts @@ -0,0 +1,203 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; +import sinon from 'sinon'; +import TelemetryEventEmitter from '../../../lib/telemetry/TelemetryEventEmitter'; +import { TelemetryEventType } from '../../../lib/telemetry/types'; +import ClientContextStub from '../.stubs/ClientContextStub'; +import { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; + +function makeEmitter(enabled: boolean): TelemetryEventEmitter { + const context = new ClientContextStub({ telemetryEnabled: enabled } as any); + return new TelemetryEventEmitter(context); +} + +describe('TelemetryEventEmitter', () => { + describe('when telemetry is disabled', () => { + it('should not emit any events', () => { + const emitter = makeEmitter(false); + const listener = sinon.stub(); + emitter.on(TelemetryEventType.CONNECTION_OPEN, listener); + + emitter.emitConnectionOpen({ sessionId: 's1', workspaceId: 'w1', driverConfig: {} as any }); + + expect(listener.called).to.be.false; + }); + + it('should not emit statement start', () => { + const emitter = makeEmitter(false); + const listener = sinon.stub(); + emitter.on(TelemetryEventType.STATEMENT_START, listener); + + emitter.emitStatementStart({ statementId: 'st1', sessionId: 's1' }); + + expect(listener.called).to.be.false; + }); + + it('should not emit error events', () => { + const emitter = makeEmitter(false); + const listener = sinon.stub(); + emitter.on(TelemetryEventType.ERROR, listener); + + emitter.emitError({ errorName: 'SomeError', errorMessage: 'msg', isTerminal: false }); + + expect(listener.called).to.be.false; + }); + }); + + describe('emitConnectionOpen()', () => { + it('should emit a CONNECTION_OPEN event with correct fields', () => { + const emitter = makeEmitter(true); + const listener = sinon.stub(); + emitter.on(TelemetryEventType.CONNECTION_OPEN, listener); + + emitter.emitConnectionOpen({ sessionId: 's1', workspaceId: 'w1', driverConfig: {} as any }); + + expect(listener.calledOnce).to.be.true; + const event = listener.firstCall.args[0]; + expect(event.eventType).to.equal(TelemetryEventType.CONNECTION_OPEN); + expect(event.sessionId).to.equal('s1'); + expect(event.workspaceId).to.equal('w1'); + expect(event.timestamp).to.be.a('number'); + }); + + it('should swallow and log exceptions from listeners', () => { + const context = new ClientContextStub({ telemetryEnabled: true } as any); + const logSpy = sinon.spy(context.logger, 'log'); + const emitter = new TelemetryEventEmitter(context); + + emitter.on(TelemetryEventType.CONNECTION_OPEN, () => { + throw new Error('listener boom'); + }); + + expect(() => + emitter.emitConnectionOpen({ sessionId: 's1', workspaceId: 'w1', driverConfig: {} as any }), + ).to.not.throw(); + expect(logSpy.calledWith(LogLevel.debug, sinon.match(/listener boom/))).to.be.true; + + logSpy.restore(); + }); + }); + + describe('emitStatementStart()', () => { + it('should emit a STATEMENT_START event with correct fields', () => { + const emitter = makeEmitter(true); + const listener = sinon.stub(); + emitter.on(TelemetryEventType.STATEMENT_START, listener); + + emitter.emitStatementStart({ statementId: 'st1', sessionId: 's1', operationType: 'SELECT' }); + + expect(listener.calledOnce).to.be.true; + const event = listener.firstCall.args[0]; + expect(event.eventType).to.equal(TelemetryEventType.STATEMENT_START); + expect(event.statementId).to.equal('st1'); + expect(event.operationType).to.equal('SELECT'); + }); + }); + + describe('emitStatementComplete()', () => { + it('should emit a STATEMENT_COMPLETE event with correct fields', () => { + const emitter = makeEmitter(true); + const listener = sinon.stub(); + emitter.on(TelemetryEventType.STATEMENT_COMPLETE, listener); + + emitter.emitStatementComplete({ + statementId: 'st1', + sessionId: 's1', + latencyMs: 123, + resultFormat: 'arrow', + chunkCount: 2, + bytesDownloaded: 1024, + pollCount: 3, + }); + + expect(listener.calledOnce).to.be.true; + const event = listener.firstCall.args[0]; + expect(event.eventType).to.equal(TelemetryEventType.STATEMENT_COMPLETE); + expect(event.latencyMs).to.equal(123); + expect(event.chunkCount).to.equal(2); + }); + }); + + describe('emitCloudFetchChunk()', () => { + it('should emit a CLOUDFETCH_CHUNK event with correct fields', () => { + const emitter = makeEmitter(true); + const listener = sinon.stub(); + emitter.on(TelemetryEventType.CLOUDFETCH_CHUNK, listener); + + emitter.emitCloudFetchChunk({ statementId: 'st1', chunkIndex: 0, bytes: 512, compressed: true }); + + expect(listener.calledOnce).to.be.true; + const event = listener.firstCall.args[0]; + expect(event.eventType).to.equal(TelemetryEventType.CLOUDFETCH_CHUNK); + expect(event.bytes).to.equal(512); + expect(event.compressed).to.be.true; + }); + }); + + describe('emitError()', () => { + it('should emit an ERROR event with correct fields', () => { + const emitter = makeEmitter(true); + const listener = sinon.stub(); + emitter.on(TelemetryEventType.ERROR, listener); + + emitter.emitError({ + statementId: 'st1', + sessionId: 's1', + errorName: 'NetworkError', + errorMessage: 'timeout', + isTerminal: false, + }); + + expect(listener.calledOnce).to.be.true; + const event = listener.firstCall.args[0]; + expect(event.eventType).to.equal(TelemetryEventType.ERROR); + expect(event.errorName).to.equal('NetworkError'); + expect(event.isTerminal).to.be.false; + }); + + it('should emit a terminal ERROR event', () => { + const emitter = makeEmitter(true); + const listener = sinon.stub(); + emitter.on(TelemetryEventType.ERROR, listener); + + emitter.emitError({ errorName: 'AuthenticationError', errorMessage: 'auth failed', isTerminal: true }); + + const event = listener.firstCall.args[0]; + expect(event.isTerminal).to.be.true; + }); + }); + + describe('logging level compliance', () => { + it('should never log at warn or error level', () => { + const context = new ClientContextStub({ telemetryEnabled: true } as any); + const logSpy = sinon.spy(context.logger, 'log'); + const emitter = new TelemetryEventEmitter(context); + + emitter.on(TelemetryEventType.CONNECTION_OPEN, () => { + throw new Error('boom'); + }); + + emitter.emitConnectionOpen({ sessionId: 's1', workspaceId: 'w1', driverConfig: {} as any }); + + expect(logSpy.neverCalledWith(LogLevel.error, sinon.match.any)).to.be.true; + expect(logSpy.neverCalledWith(LogLevel.warn, sinon.match.any)).to.be.true; + + logSpy.restore(); + }); + }); +}); diff --git a/tests/unit/telemetry/telemetryUtils.test.ts b/tests/unit/telemetry/telemetryUtils.test.ts new file mode 100644 index 00000000..eaffc321 --- /dev/null +++ b/tests/unit/telemetry/telemetryUtils.test.ts @@ -0,0 +1,276 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + */ + +import { expect } from 'chai'; +import { buildTelemetryUrl, redactSensitive, sanitizeProcessName } from '../../../lib/telemetry/telemetryUtils'; + +describe('buildTelemetryUrl', () => { + describe('valid hosts', () => { + it('returns https URL for a bare host', () => { + expect(buildTelemetryUrl('myws.cloud.databricks.com', '/telemetry-ext')).to.equal( + 'https://myws.cloud.databricks.com/telemetry-ext', + ); + }); + + it('strips a leading https:// prefix', () => { + expect(buildTelemetryUrl('https://myws.cloud.databricks.com', '/telemetry-ext')).to.equal( + 'https://myws.cloud.databricks.com/telemetry-ext', + ); + }); + + it('strips a leading http:// prefix and upgrades to https', () => { + expect(buildTelemetryUrl('http://myws.cloud.databricks.com', '/telemetry-ext')).to.equal( + 'https://myws.cloud.databricks.com/telemetry-ext', + ); + }); + + it('strips trailing slashes', () => { + expect(buildTelemetryUrl('myws.cloud.databricks.com///', '/telemetry-ext')).to.equal( + 'https://myws.cloud.databricks.com/telemetry-ext', + ); + }); + + it('accepts an explicit default port and normalises it', () => { + // `new URL` strips :443 for https; we rely on that normalisation. + expect(buildTelemetryUrl('myws.cloud.databricks.com:443', '/x')).to.equal('https://myws.cloud.databricks.com/x'); + }); + + it('accepts a non-default port and preserves it', () => { + expect(buildTelemetryUrl('myws.cloud.databricks.com:8443', '/x')).to.equal( + 'https://myws.cloud.databricks.com:8443/x', + ); + }); + }); + + describe('SSRF / redirection rejections', () => { + it('rejects protocol-relative prefix', () => { + expect(buildTelemetryUrl('//attacker.com', '/telemetry-ext')).to.equal(null); + }); + + it('rejects zero-width space inside host', () => { + // `legit.com\u200battacker.com` would otherwise collapse to + // `legit.comattacker.com` inside `new URL`. + expect(buildTelemetryUrl('legit.com\u200battacker.com', '/telemetry-ext')).to.equal(null); + }); + + it('rejects BOM inside host', () => { + expect(buildTelemetryUrl('legit.com\ufeffattacker.com', '/telemetry-ext')).to.equal(null); + }); + + it('rejects userinfo', () => { + expect(buildTelemetryUrl('user:pass@attacker.com', '/telemetry-ext')).to.equal(null); + }); + + it('rejects CR in host', () => { + expect(buildTelemetryUrl('legit.com\r\nInjected: header', '/x')).to.equal(null); + }); + + it('rejects LF in host', () => { + expect(buildTelemetryUrl('legit.com\nInjected: header', '/x')).to.equal(null); + }); + + it('rejects tab in host', () => { + expect(buildTelemetryUrl('legit.com\tbad', '/x')).to.equal(null); + }); + + it('rejects path appended to host', () => { + expect(buildTelemetryUrl('legit.com/evil', '/telemetry-ext')).to.equal(null); + }); + + it('rejects query appended to host', () => { + expect(buildTelemetryUrl('legit.com?x=1', '/telemetry-ext')).to.equal(null); + }); + + it('rejects fragment appended to host', () => { + expect(buildTelemetryUrl('legit.com#frag', '/telemetry-ext')).to.equal(null); + }); + + it('rejects backslash in host', () => { + expect(buildTelemetryUrl('legit.com\\evil', '/x')).to.equal(null); + }); + + it('rejects at-sign in host', () => { + expect(buildTelemetryUrl('a@b.com', '/x')).to.equal(null); + }); + + it('rejects empty host', () => { + expect(buildTelemetryUrl('', '/x')).to.equal(null); + }); + + it('rejects only-slashes host', () => { + expect(buildTelemetryUrl('///', '/x')).to.equal(null); + }); + }); + + describe('deny-listed hosts', () => { + it('rejects IPv4 loopback', () => { + expect(buildTelemetryUrl('127.0.0.1', '/telemetry-ext')).to.equal(null); + expect(buildTelemetryUrl('127.1.2.3', '/telemetry-ext')).to.equal(null); + }); + + it('rejects 0.0.0.0', () => { + expect(buildTelemetryUrl('0.0.0.0', '/telemetry-ext')).to.equal(null); + }); + + it('rejects RFC1918 10.0.0.0/8', () => { + expect(buildTelemetryUrl('10.0.0.1', '/telemetry-ext')).to.equal(null); + }); + + it('rejects RFC1918 192.168/16', () => { + expect(buildTelemetryUrl('192.168.1.1', '/telemetry-ext')).to.equal(null); + }); + + it('rejects RFC1918 172.16-31', () => { + expect(buildTelemetryUrl('172.16.0.1', '/telemetry-ext')).to.equal(null); + expect(buildTelemetryUrl('172.31.255.254', '/telemetry-ext')).to.equal(null); + }); + + it('accepts 172.32 (outside RFC1918)', () => { + expect(buildTelemetryUrl('172.32.0.1', '/telemetry-ext')).to.equal('https://172.32.0.1/telemetry-ext'); + }); + + it('rejects AWS IMDS', () => { + expect(buildTelemetryUrl('169.254.169.254', '/telemetry-ext')).to.equal(null); + }); + + it('rejects GCP metadata', () => { + expect(buildTelemetryUrl('metadata.google.internal', '/telemetry-ext')).to.equal(null); + expect(buildTelemetryUrl('METADATA.GOOGLE.INTERNAL', '/telemetry-ext')).to.equal(null); + }); + + it('rejects Azure metadata', () => { + expect(buildTelemetryUrl('metadata.azure.com', '/telemetry-ext')).to.equal(null); + }); + + it('rejects localhost', () => { + expect(buildTelemetryUrl('localhost', '/telemetry-ext')).to.equal(null); + expect(buildTelemetryUrl('LocalHost', '/telemetry-ext')).to.equal(null); + }); + }); +}); + +describe('redactSensitive', () => { + it('returns empty string for undefined', () => { + expect(redactSensitive(undefined)).to.equal(''); + }); + + it('returns empty string for empty input', () => { + expect(redactSensitive('')).to.equal(''); + }); + + it('redacts Bearer tokens', () => { + const redacted = redactSensitive('Authorization: Bearer abc.def.ghi-jkl'); + expect(redacted).to.equal('Authorization: Bearer '); + }); + + it('redacts multiple Bearer tokens in one string', () => { + const redacted = redactSensitive('first Bearer abc second Bearer xyz'); + expect(redacted).to.equal('first Bearer second Bearer '); + }); + + it('redacts Basic auth', () => { + expect(redactSensitive('Authorization: Basic dXNlcjpwYXNz')).to.equal('Authorization: Basic '); + }); + + it('redacts URL-embedded credentials', () => { + expect(redactSensitive('fetch https://user:pass@legit.com/api')).to.equal('fetch https://@legit.com/api'); + }); + + it('redacts Databricks PAT (dapi)', () => { + expect(redactSensitive('token is dapi0123456789abcdef01')).to.equal('token is '); + }); + + it('redacts Databricks PAT (dkea, dskea, dsapi, dose)', () => { + for (const prefix of ['dkea', 'dskea', 'dsapi', 'dose']) { + expect(redactSensitive(`tok ${prefix}0123456789abcdef`)).to.equal('tok '); + } + }); + + it('redacts realistic JWT', () => { + // This is NOT a real token — it's a synthetic JWT-shaped string built + // from harmless segments purely to exercise the regex. Constructed by + // string concatenation so the assembled token never appears as a + // source literal (otherwise pre-commit secret scanners, rightly, flag + // the test file itself). + const header = `${'eyJ'}hbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9`; + const payload = `${'eyJ'}zdWIiOiJ0ZXN0LXN1YmplY3QifQ`; + const signature = 'Ab-123_xyz456_abcDEF789'; + const jwt = `${header}.${payload}.${signature}`; + expect(redactSensitive(`Authorization: ${jwt}`)).to.include(''); + }); + + it('redacts JSON-quoted access_token', () => { + expect(redactSensitive('{"access_token":"eyJabc.def.ghi"}')).to.equal('{"access_token":""}'); + }); + + it('redacts JSON-quoted client_secret', () => { + expect(redactSensitive('body={"client_id":"abc","client_secret":"xyz"}')).to.include( + '"client_secret":""', + ); + }); + + it('redacts JSON-quoted refresh_token, id_token, password, api_key', () => { + for (const key of ['refresh_token', 'id_token', 'password', 'api_key', 'apikey']) { + expect(redactSensitive(`{"${key}":"x"}`)).to.equal(`{"${key}":""}`); + } + }); + + it('redacts form-encoded token= style secrets', () => { + expect(redactSensitive('post body=client_secret=xyz&token=abc&password=hunter2')).to.equal( + 'post body=client_secret=&token=&password=', + ); + }); + + it('caps long input with truncation marker', () => { + const long = `${'x'.repeat(3000)}Bearer abc`; + const redacted = redactSensitive(long, 2048); + expect(redacted.length).to.be.lessThan(long.length); + expect(redacted).to.include('…[truncated]'); + }); + + it('applies redaction again after truncation', () => { + // Secret appears in the tail; first-pass redacts, then truncation, then + // the cap-time second pass catches anything missed. + const input = `${'x'.repeat(3000)}Bearer leaked-token`; + const redacted = redactSensitive(input, 50); + expect(redacted).to.not.include('leaked-token'); + }); +}); + +describe('sanitizeProcessName', () => { + it('returns empty string for undefined', () => { + expect(sanitizeProcessName(undefined)).to.equal(''); + }); + + it('returns empty string for whitespace-only', () => { + expect(sanitizeProcessName(' ')).to.equal(''); + }); + + it('strips absolute path', () => { + expect(sanitizeProcessName('/home/alice/worker.js')).to.equal('worker.js'); + }); + + it('strips Windows path', () => { + expect(sanitizeProcessName('C:\\Users\\bob\\worker.js')).to.equal('worker.js'); + }); + + it('returns basename unchanged when no path', () => { + expect(sanitizeProcessName('worker.js')).to.equal('worker.js'); + }); + + it('drops argv tail (whitespace-separated)', () => { + expect(sanitizeProcessName('node --db-password=secret app.js')).to.equal('node'); + }); + + it('drops argv tail after full path', () => { + expect(sanitizeProcessName('/usr/bin/node --token=abc app.js')).to.equal('node'); + }); + + it('preserves basename-only input without spaces', () => { + expect(sanitizeProcessName('my-worker')).to.equal('my-worker'); + }); +});