Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/appkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"@types/semver": "^7.7.1",
"dotenv": "^16.6.1",
"express": "^4.22.0",
"obug": "^2.1.1",
"pg": "^8.16.3",
"semver": "^7.7.3",
"shared": "workspace:*",
Expand Down
35 changes: 28 additions & 7 deletions packages/appkit/src/analytics/analytics.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { WorkspaceClient } from "@databricks/sdk-experimental";
import type express from "express";
import type {
IAppRouter,
PluginExecuteConfig,
Expand All @@ -11,7 +12,7 @@ import {
getWarehouseId,
getWorkspaceClient,
} from "../context";
import type express from "express";
import { createLogger } from "../logging/logger";
import { Plugin, toPlugin } from "../plugin";
import { queryDefaults } from "./defaults";
import { QueryProcessor } from "./query";
Expand All @@ -21,6 +22,8 @@ import type {
IAnalyticsQueryRequest,
} from "./types";

const logger = createLogger("analytics");

export class AnalyticsPlugin extends Plugin {
name = "analytics";
envVars = [];
Expand Down Expand Up @@ -95,22 +98,28 @@ export class AnalyticsPlugin extends Plugin {
const { jobId } = req.params;
const workspaceClient = getWorkspaceClient();

console.log(
`Processing Arrow job request: ${jobId} for plugin: ${this.name}`,
);
logger.debug("Processing Arrow job request for jobId=%s", jobId);

const event = logger.event(req);
event?.setComponent("analytics", "getArrowData").setContext("analytics", {
job_id: jobId,
plugin: this.name,
});

const result = await this.getArrowData(workspaceClient, jobId);

res.setHeader("Content-Type", "application/octet-stream");
res.setHeader("Content-Length", result.data.length.toString());
res.setHeader("Cache-Control", "public, max-age=3600");

console.log(
`Sending Arrow buffer: ${result.data.length} bytes for job ${jobId}`,
logger.debug(
"Sending Arrow buffer: %d bytes for job %s",
result.data.length,
jobId,
);
res.send(Buffer.from(result.data));
} catch (error) {
console.error(`Arrow job error for ${this.name}:`, error);
logger.error("Arrow job error: %O", error);
res.status(404).json({
error: error instanceof Error ? error.message : "Arrow job not found",
plugin: this.name,
Expand All @@ -128,6 +137,18 @@ export class AnalyticsPlugin extends Plugin {
): Promise<void> {
const { query_key } = req.params;
const { parameters, format = "JSON" } = req.body as IAnalyticsQueryRequest;

// Request-scoped logging with WideEvent tracking
logger.debug(req, "Executing query: %s (format=%s)", query_key, format);

const event = logger.event(req);
event?.setComponent("analytics", "executeQuery").setContext("analytics", {
query_key,
format,
parameter_count: parameters ? Object.keys(parameters).length : 0,
plugin: this.name,
});

const queryParameters =
format === "ARROW"
? {
Expand Down
16 changes: 10 additions & 6 deletions packages/appkit/src/analytics/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { createHash } from "node:crypto";
import type { sql } from "@databricks/sdk-experimental";
import { isSQLTypeMarker, type SQLTypeMarker, sql as sqlHelpers } from "shared";
import { getWorkspaceId } from "../context";
import { ValidationError } from "../errors";

type SQLParameterValue = SQLTypeMarker | null | undefined;

Expand Down Expand Up @@ -45,10 +46,11 @@ export class QueryProcessor {
// only allow parameters that exist in the query
for (const key of Object.keys(parameters)) {
if (!queryParams.has(key)) {
throw new Error(
`Parameter "${key}" not found in query. Valid parameters: ${
Array.from(queryParams).join(", ") || "none"
}`,
const validParams = Array.from(queryParams).join(", ") || "none";
throw ValidationError.invalidValue(
key,
parameters[key],
`a parameter defined in the query (valid: ${validParams})`,
);
}
}
Expand All @@ -74,8 +76,10 @@ export class QueryProcessor {
}

if (!isSQLTypeMarker(value)) {
throw new Error(
`Parameter "${key}" must be a SQL type. Use sql.string(), sql.number(), sql.date(), sql.timestamp(), or sql.boolean().`,
throw ValidationError.invalidValue(
key,
value,
"SQL type (use sql.string(), sql.number(), sql.date(), sql.timestamp(), or sql.boolean())",
);
}

Expand Down
12 changes: 5 additions & 7 deletions packages/appkit/src/analytics/tests/query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe("QueryProcessor", () => {
expect(() => {
processor.convertToSQLParameters(query, parameters);
}).toThrow(
'Parameter "malicious_param" not found in query. Valid parameters: user_id',
"Invalid value for malicious_param: expected a parameter defined in the query (valid: user_id)",
);
});

Expand All @@ -61,7 +61,7 @@ describe("QueryProcessor", () => {

expect(() => {
processor.convertToSQLParameters(query, parameters);
}).toThrow('Parameter "admin_flag" not found in query');
}).toThrow("Invalid value for admin_flag");
});

test("should allow parameters with underscores and mixed case", () => {
Expand All @@ -86,7 +86,7 @@ describe("QueryProcessor", () => {
expect(() => {
processor.convertToSQLParameters(query, parameters);
}).toThrow(
'Parameter "user_id" not found in query. Valid parameters: none',
"Invalid value for user_id: expected a parameter defined in the query (valid: none)",
);
});

Expand Down Expand Up @@ -135,7 +135,7 @@ describe("QueryProcessor", () => {

expect(() => {
processor.convertToSQLParameters(query, attackParameters);
}).toThrow('Parameter "admin_override" not found in query');
}).toThrow("Invalid value for admin_override");
});

test("should handle duplicate parameter names in query correctly", () => {
Expand Down Expand Up @@ -252,9 +252,7 @@ describe("QueryProcessor", () => {

expect(() => {
processor.convertToSQLParameters(query, parameters);
}).toThrow(
'Parameter "userId" must be a SQL type. Use sql.string(), sql.number(), sql.date(), sql.timestamp(), or sql.boolean().',
);
}).toThrow("Invalid value for userId");
});
});
});
24 changes: 16 additions & 8 deletions packages/appkit/src/app/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import fs from "node:fs/promises";
import path from "node:path";
import { createLogger } from "../logging/logger";

const logger = createLogger("app");

interface RequestLike {
query?: Record<string, any>;
Expand Down Expand Up @@ -27,8 +30,9 @@ export class AppManager {
): Promise<string | null> {
// Security: Sanitize query key to prevent path traversal
if (!queryKey || !/^[a-zA-Z0-9_-]+$/.test(queryKey)) {
console.error(
`Invalid query key format: "${queryKey}". Only alphanumeric characters, underscores, and hyphens are allowed.`,
logger.error(
"Invalid query key format: %s. Only alphanumeric characters, underscores, and hyphens are allowed.",
queryKey,
);
return null;
}
Expand All @@ -44,7 +48,7 @@ export class AppManager {
const queriesDir = path.resolve(process.cwd(), "config/queries");

if (!resolvedPath.startsWith(queriesDir)) {
console.error(`Invalid query path: path traversal detected`);
logger.error("Invalid query path: path traversal detected");
return null;
}

Expand All @@ -57,8 +61,10 @@ export class AppManager {
const relativePath = path.relative(process.cwd(), resolvedPath);
return await devFileReader.readFile(relativePath, req);
} catch (error) {
console.error(
`Failed to read query "${queryKey}" from dev tunnel: ${(error as Error).message}`,
logger.error(
"Failed to read query %s from dev tunnel: %s",
queryKey,
(error as Error).message,
);
return null;
}
Expand All @@ -70,11 +76,13 @@ export class AppManager {
return query;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
console.error(`Query "${queryKey}" not found at path: ${resolvedPath}`);
logger.debug("Query %s not found at path: %s", queryKey, resolvedPath);
return null;
}
console.error(
`Failed to read query "${queryKey}" from server filesystem: ${(error as Error).message}`,
logger.error(
"Failed to read query %s from server filesystem: %s",
queryKey,
(error as Error).message,
);
return null;
}
Expand Down
37 changes: 32 additions & 5 deletions packages/appkit/src/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ import { createHash } from "node:crypto";
import { WorkspaceClient } from "@databricks/sdk-experimental";
import type { CacheConfig, CacheStorage } from "shared";
import { LakebaseConnector } from "@/connectors";
import { AppKitError, ExecutionError, InitializationError } from "../errors";
import { createLogger } from "../logging/logger";
import type { Counter, TelemetryProvider } from "../telemetry";
import { SpanStatusCode, TelemetryManager } from "../telemetry";
import { deepMerge } from "../utils";
import { cacheDefaults } from "./defaults";
import { InMemoryStorage, PersistentStorage } from "./storage";

const logger = createLogger("cache");

/**
* Cache manager class to handle cache operations.
* Can be used with in-memory storage or persistent storage (Lakebase).
Expand All @@ -34,7 +38,6 @@ export class CacheManager {
private cleanupInProgress: boolean;
private lastCleanupAttempt: number;

// Telemetry
private telemetry: TelemetryProvider;
private telemetryMetrics: {
cacheHitCount: Counter;
Expand Down Expand Up @@ -72,8 +75,9 @@ export class CacheManager {
*/
static getInstanceSync(): CacheManager {
if (!CacheManager.instance) {
throw new Error(
"CacheManager not initialized. Ensure AppKit.create() has completed before accessing the cache.",
throw InitializationError.notInitialized(
"CacheManager",
"Ensure AppKit.create() has completed before accessing the cache",
);
}

Expand Down Expand Up @@ -203,6 +207,12 @@ export class CacheManager {
this.telemetryMetrics.cacheHitCount.add(1, {
"cache.key": cacheKey,
});

logger.event()?.setExecution({
cache_hit: true,
cache_key: cacheKey,
});

return cached.value as T;
}

Expand All @@ -219,6 +229,13 @@ export class CacheManager {
"cache.key": cacheKey,
"cache.deduplication": "true",
});

logger.event()?.setExecution({
cache_hit: true,
cache_key: cacheKey,
cache_deduplication: true,
});

span.end();
return inFlight as Promise<T>;
}
Expand All @@ -230,6 +247,11 @@ export class CacheManager {
"cache.key": cacheKey,
});

logger.event()?.setExecution({
cache_hit: false,
cache_key: cacheKey,
});

const promise = fn()
.then(async (result) => {
await this.set(cacheKey, result, options);
Expand All @@ -242,7 +264,12 @@ export class CacheManager {
.catch((error) => {
span.recordException(error);
span.setStatus({ code: SpanStatusCode.ERROR });
throw error;
if (error instanceof AppKitError) {
throw error;
}
throw ExecutionError.statementFailed(
error instanceof Error ? error.message : String(error),
);
})
.finally(() => {
this.inFlightRequests.delete(cacheKey);
Expand Down Expand Up @@ -304,7 +331,7 @@ export class CacheManager {
(this.storage as PersistentStorage)
.cleanupExpired()
.catch((error) => {
console.debug("Error cleaning up expired entries:", error);
logger.debug("Error cleaning up expired entries: %O", error);
})
.finally(() => {
this.cleanupInProgress = false;
Expand Down
22 changes: 14 additions & 8 deletions packages/appkit/src/cache/storage/persistent.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { createHash } from "node:crypto";
import type { CacheConfig, CacheEntry, CacheStorage } from "shared";
import type { LakebaseConnector } from "../../connectors";
import { InitializationError, ValidationError } from "../../errors";
import { createLogger } from "../../logging/logger";
import { lakebaseStorageDefaults } from "./defaults";

const logger = createLogger("cache:persistent");

/**
* Persistent cache storage implementation. Uses a least recently used (LRU) eviction policy
* to manage memory usage and ensure efficient cache operations.
Expand Down Expand Up @@ -47,7 +51,7 @@ export class PersistentStorage implements CacheStorage {
await this.runMigrations();
this.initialized = true;
} catch (error) {
console.error("Error in persistent storage initialization:", error);
logger.error("Error in persistent storage initialization: %O", error);
throw error;
}
}
Expand Down Expand Up @@ -80,7 +84,7 @@ export class PersistentStorage implements CacheStorage {
[keyHash],
)
.catch(() => {
console.debug("Error updating last_accessed time for key:", key);
logger.debug("Error updating last_accessed time for key: %s", key);
});

return {
Expand All @@ -104,8 +108,10 @@ export class PersistentStorage implements CacheStorage {
const byteSize = keyBytes.length + valueBytes.length;

if (byteSize > this.maxEntryBytes) {
throw new Error(
`Cache entry too large: ${byteSize} bytes exceeds maximum of ${this.maxEntryBytes} bytes`,
throw ValidationError.invalidValue(
"cache entry size",
byteSize,
`maximum ${this.maxEntryBytes} bytes`,
);
}

Expand Down Expand Up @@ -251,7 +257,7 @@ export class PersistentStorage implements CacheStorage {

/** Generate a 64-bit hash for the cache key using SHA256 */
private hashKey(key: string): bigint {
if (!key) throw new Error("Cache key cannot be empty");
if (!key) throw ValidationError.missingField("key");
const hash = createHash("sha256").update(key).digest();
return hash.readBigInt64BE(0);
}
Expand Down Expand Up @@ -302,11 +308,11 @@ export class PersistentStorage implements CacheStorage {
`CREATE INDEX IF NOT EXISTS idx_${this.tableName}_byte_size ON ${this.tableName} (byte_size); `,
);
} catch (error) {
console.error(
"Error in running migrations for persistent storage:",
logger.error(
"Error in running migrations for persistent storage: %O",
error,
);
throw error;
throw InitializationError.migrationFailed(error as Error);
}
}
}
Loading
Loading