From 907ddc046130e0fc4a28789ab1a1b14aaf5f1d32 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 26 Mar 2026 16:22:42 +0000 Subject: [PATCH 01/16] Implement organization-scoped ClickHouse instances The only way to get a ClickHouse client now is through the factory. Refactored all existing code to use that and pass in an org. The runReplication and otlpExporter are the hot paths here which need special attention in reviews. --- .cursor/mcp.json | 6 +- .../organization-scoped-clickhouse.md | 6 + .../v3/ApiRunListPresenter.server.ts | 5 +- .../v3/CreateBulkActionPresenter.server.ts | 5 +- .../v3/RunTagListPresenter.server.ts | 5 +- .../presenters/v3/TaskListPresenter.server.ts | 25 +- .../presenters/v3/UsagePresenter.server.ts | 5 +- .../v3/ViewSchedulePresenter.server.ts | 5 +- .../v3/WaitpointPresenter.server.ts | 5 +- .../route.tsx | 8 +- .../route.tsx | 5 +- .../route.tsx | 5 +- .../route.tsx | 5 +- .../route.tsx | 5 +- .../route.tsx | 5 +- .../route.tsx | 7 +- .../route.tsx | 5 +- .../route.tsx | 5 +- .../route.tsx | 5 +- .../route.tsx | 5 +- .../webapp/app/routes/api.v1.prompts.$slug.ts | 15 +- .../routes/api.v1.prompts.$slug.versions.ts | 12 +- .../app/routes/api.v1.prompts._index.ts | 5 +- apps/webapp/app/routes/otel.v1.logs.ts | 5 +- apps/webapp/app/routes/otel.v1.metrics.ts | 5 +- apps/webapp/app/routes/otel.v1.traces.ts | 5 +- ...projectParam.env.$envParam.logs.$logId.tsx | 5 +- ...ojects.$projectParam.env.$envParam.logs.ts | 5 +- ...nvParam.prompts.$promptSlug.generations.ts | 5 +- .../services/admin/missingLlmModels.server.ts | 10 +- .../clickhouseCredentialsService.server.ts | 109 +++++ .../clickhouse/clickhouseFactory.server.ts | 422 ++++++++++++++++++ .../clickhouse/clickhouseFactory.test.ts | 155 +++++++ .../clickhouseSecretSchemas.server.ts | 11 + .../app/services/clickhouseInstance.server.ts | 130 ------ .../app/services/queryService.server.ts | 5 +- .../services/runsReplicationService.server.ts | 107 ++++- apps/webapp/app/v3/otlpExporter.server.ts | 47 +- .../v3/services/bulk/BulkActionV2.server.ts | 8 +- 39 files changed, 959 insertions(+), 234 deletions(-) create mode 100644 .server-changes/organization-scoped-clickhouse.md create mode 100644 apps/webapp/app/services/clickhouse/clickhouseCredentialsService.server.ts create mode 100644 apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts create mode 100644 apps/webapp/app/services/clickhouse/clickhouseFactory.test.ts create mode 100644 apps/webapp/app/services/clickhouse/clickhouseSecretSchemas.server.ts delete mode 100644 apps/webapp/app/services/clickhouseInstance.server.ts diff --git a/.cursor/mcp.json b/.cursor/mcp.json index da39e4ffafe..c4b06a67630 100644 --- a/.cursor/mcp.json +++ b/.cursor/mcp.json @@ -1,3 +1,7 @@ { - "mcpServers": {} + "mcpServers": { + "linear": { + "url": "https://mcp.linear.app/mcp" + } + } } diff --git a/.server-changes/organization-scoped-clickhouse.md b/.server-changes/organization-scoped-clickhouse.md new file mode 100644 index 00000000000..874b9dc6026 --- /dev/null +++ b/.server-changes/organization-scoped-clickhouse.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Organization-scoped ClickHouse routing enables customers with HIPAA and other data security requirements to use dedicated database instances diff --git a/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts index 254ec18d1c0..b0ba01b9dc6 100644 --- a/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts @@ -9,7 +9,7 @@ import { type Project, type RuntimeEnvironment, type TaskRunStatus } from "@trig import assertNever from "assert-never"; import { z } from "zod"; import { API_VERSIONS, RunStatusUnspecifiedApiVersion } from "~/api/versions"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { logger } from "~/services/logger.server"; import { CoercedDate } from "~/utils/zod"; import { ServiceValidationError } from "~/v3/services/baseService.server"; @@ -259,7 +259,8 @@ export class ApiRunListPresenter extends BasePresenter { options.machines = searchParams["filter[machine]"]; } - const presenter = new NextRunListPresenter(this._replica, clickhouseClient); + const clickhouse = await getClickhouseForOrganization(organizationId, "standard"); + const presenter = new NextRunListPresenter(this._replica, clickhouse); logger.debug("Calling RunListPresenter", { options }); diff --git a/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts b/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts index acf511f0f5e..5e8bfc405b8 100644 --- a/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts @@ -1,6 +1,6 @@ import { type PrismaClient } from "@trigger.dev/database"; import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { getRunFiltersFromRequest } from "../RunFilters.server"; import { BasePresenter } from "./basePresenter.server"; @@ -24,8 +24,9 @@ export class CreateBulkActionPresenter extends BasePresenter { Object.fromEntries(new URL(request.url).searchParams) ); + const clickhouse = await getClickhouseForOrganization(organizationId, "standard"); const runsRepository = new RunsRepository({ - clickhouse: clickhouseClient, + clickhouse, prisma: this._replica as PrismaClient, }); diff --git a/apps/webapp/app/presenters/v3/RunTagListPresenter.server.ts b/apps/webapp/app/presenters/v3/RunTagListPresenter.server.ts index e9de368eceb..89b9c8b41fa 100644 --- a/apps/webapp/app/presenters/v3/RunTagListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunTagListPresenter.server.ts @@ -1,6 +1,6 @@ import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { BasePresenter } from "./basePresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { type PrismaClient } from "@trigger.dev/database"; import { timeFilters } from "~/components/runs/v3/SharedFilters"; @@ -37,8 +37,9 @@ export class RunTagListPresenter extends BasePresenter { }: TagListOptions) { const hasFilters = Boolean(name?.trim()); + const clickhouse = await getClickhouseForOrganization(organizationId, "standard"); const runsRepository = new RunsRepository({ - clickhouse: clickhouseClient, + clickhouse, prisma: this._replica as PrismaClient, }); diff --git a/apps/webapp/app/presenters/v3/TaskListPresenter.server.ts b/apps/webapp/app/presenters/v3/TaskListPresenter.server.ts index f1635f23375..a6471c30c60 100644 --- a/apps/webapp/app/presenters/v3/TaskListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/TaskListPresenter.server.ts @@ -4,7 +4,7 @@ import { type TaskTriggerSource, } from "@trigger.dev/database"; import { $replica } from "~/db.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { type AverageDurations, ClickHouseEnvironmentMetricsRepository, @@ -25,10 +25,7 @@ export type TaskListItem = { export type TaskActivity = DailyTaskActivity[string]; export class TaskListPresenter { - constructor( - private readonly environmentMetricsRepository: EnvironmentMetricsRepository, - private readonly _replica: PrismaClientOrTransaction - ) {} + constructor(private readonly _replica: PrismaClientOrTransaction) {} public async call({ organizationId, @@ -76,9 +73,15 @@ export class TaskListPresenter { const slugs = tasks.map((t) => t.slug); + // Create org-specific environment metrics repository + const clickhouse = await getClickhouseForOrganization(organizationId, "standard"); + const environmentMetricsRepository = new ClickHouseEnvironmentMetricsRepository({ + clickhouse, + }); + // IMPORTANT: Don't await these, we want to return the promises // so we can defer the loading of the data - const activity = this.environmentMetricsRepository.getDailyTaskActivity({ + const activity = environmentMetricsRepository.getDailyTaskActivity({ organizationId, projectId, environmentId, @@ -86,7 +89,7 @@ export class TaskListPresenter { tasks: slugs, }); - const runningStats = this.environmentMetricsRepository.getCurrentRunningStats({ + const runningStats = environmentMetricsRepository.getCurrentRunningStats({ organizationId, projectId, environmentId, @@ -94,7 +97,7 @@ export class TaskListPresenter { tasks: slugs, }); - const durations = this.environmentMetricsRepository.getAverageDurations({ + const durations = environmentMetricsRepository.getAverageDurations({ organizationId, projectId, environmentId, @@ -109,9 +112,5 @@ export class TaskListPresenter { export const taskListPresenter = singleton("taskListPresenter", setupTaskListPresenter); function setupTaskListPresenter() { - const environmentMetricsRepository = new ClickHouseEnvironmentMetricsRepository({ - clickhouse: clickhouseClient, - }); - - return new TaskListPresenter(environmentMetricsRepository, $replica); + return new TaskListPresenter($replica); } diff --git a/apps/webapp/app/presenters/v3/UsagePresenter.server.ts b/apps/webapp/app/presenters/v3/UsagePresenter.server.ts index 2fac95617a6..c4654e870ed 100644 --- a/apps/webapp/app/presenters/v3/UsagePresenter.server.ts +++ b/apps/webapp/app/presenters/v3/UsagePresenter.server.ts @@ -4,7 +4,7 @@ import { getUsage, getUsageSeries } from "~/services/platform.v3.server"; import { createTimeSeriesData } from "~/utils/graphs"; import { BasePresenter } from "./basePresenter.server"; import { DataPoint, linear } from "regression"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; type Options = { organizationId: string; @@ -124,7 +124,8 @@ async function getTaskUsageByOrganization( endOfMonth: Date, replica: PrismaClientOrTransaction ) { - const [queryError, tasks] = await clickhouseClient.taskRuns.getTaskUsageByOrganization({ + const clickhouse = await getClickhouseForOrganization(organizationId, "standard"); + const [queryError, tasks] = await clickhouse.taskRuns.getTaskUsageByOrganization({ startTime: startOfMonth.getTime(), endTime: endOfMonth.getTime(), organizationId, diff --git a/apps/webapp/app/presenters/v3/ViewSchedulePresenter.server.ts b/apps/webapp/app/presenters/v3/ViewSchedulePresenter.server.ts index f0e955fd04d..52ebad96b4e 100644 --- a/apps/webapp/app/presenters/v3/ViewSchedulePresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ViewSchedulePresenter.server.ts @@ -1,7 +1,7 @@ import { ScheduleObject } from "@trigger.dev/core/v3"; import { PrismaClient, prisma } from "~/db.server"; import { displayableEnvironment } from "~/models/runtimeEnvironment.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { nextScheduledTimestamps } from "~/v3/utils/calculateNextSchedule.server"; import { NextRunListPresenter } from "./NextRunListPresenter.server"; import { scheduleWhereClause } from "~/models/schedules.server"; @@ -75,7 +75,8 @@ export class ViewSchedulePresenter { ? nextScheduledTimestamps(schedule.generatorExpression, schedule.timezone, new Date(), 5) : []; - const runPresenter = new NextRunListPresenter(this.#prismaClient, clickhouseClient); + const clickhouse = await getClickhouseForOrganization(schedule.project.organizationId, "standard"); + const runPresenter = new NextRunListPresenter(this.#prismaClient, clickhouse); const { runs } = await runPresenter.call(schedule.project.organizationId, environmentId, { projectId: schedule.project.id, scheduleId: schedule.id, diff --git a/apps/webapp/app/presenters/v3/WaitpointPresenter.server.ts b/apps/webapp/app/presenters/v3/WaitpointPresenter.server.ts index 9abcdf32215..15eaef0d13d 100644 --- a/apps/webapp/app/presenters/v3/WaitpointPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/WaitpointPresenter.server.ts @@ -1,5 +1,5 @@ import { isWaitpointOutputTimeout, prettyPrintPacket } from "@trigger.dev/core/v3"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { generateHttpCallbackUrl } from "~/services/httpCallback.server"; import { logger } from "~/services/logger.server"; import { BasePresenter } from "./basePresenter.server"; @@ -79,7 +79,8 @@ export class WaitpointPresenter extends BasePresenter { const connectedRuns: NextRunListItem[] = []; if (connectedRunIds.length > 0) { - const runPresenter = new NextRunListPresenter(this._prisma, clickhouseClient); + const clickhouse = await getClickhouseForOrganization(waitpoint.environment.organizationId, "standard"); + const runPresenter = new NextRunListPresenter(this._prisma, clickhouse); const { runs } = await runPresenter.call( waitpoint.environment.organizationId, environmentId, diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx index 57b6b71db6f..9868d195b35 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx @@ -32,7 +32,7 @@ import { MetricDashboardPresenter, } from "~/presenters/v3/MetricDashboardPresenter.server"; import { PromptPresenter } from "~/presenters/v3/PromptPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { requireUser } from "~/services/session.server"; import { cn } from "~/utils/cn"; import { EnvironmentParamSchema } from "~/utils/pathBuilder"; @@ -75,10 +75,12 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const filters = dashboard.filters ?? ["tasks", "queues"]; + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + // Load distinct models from ClickHouse if the dashboard has a models filter let possibleModels: { model: string; system: string }[] = []; if (filters.includes("models")) { - const queryFn = clickhouseClient.reader.query({ + const queryFn = clickhouse.reader.query({ name: "getDistinctModels", query: `SELECT response_model, any(gen_ai_system) AS gen_ai_system FROM trigger_dev.llm_metrics_v1 WHERE organization_id = {organizationId: String} AND project_id = {projectId: String} AND environment_id = {environmentId: String} AND response_model != '' GROUP BY response_model ORDER BY response_model`, params: z.object({ @@ -98,7 +100,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { } } - const promptPresenter = new PromptPresenter(clickhouseClient); + const promptPresenter = new PromptPresenter(clickhouse); const [possiblePrompts, possibleOperations, possibleProviders] = await Promise.all([ filters.includes("prompts") ? promptPresenter.getDistinctPromptSlugs(project.organizationId, project.id, environment.id) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors._index/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors._index/route.tsx index e92b5b34644..35f8131189b 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors._index/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors._index/route.tsx @@ -70,7 +70,7 @@ import { type ErrorOccurrences, type ErrorsList as ErrorsListData, } from "~/presenters/v3/ErrorsListPresenter.server"; -import { logsClickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { getCurrentPlan } from "~/services/platform.v3.server"; import { requireUser } from "~/services/session.server"; import { formatNumberCompact } from "~/utils/numberFormatter"; @@ -123,7 +123,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const plan = await getCurrentPlan(project.organizationId); const retentionLimitDays = plan?.v3Subscription?.plan?.limits.logRetentionDays.number ?? 30; - const presenter = new ErrorsListPresenter($replica, logsClickhouseClient); + const logsClickhouse = await getClickhouseForOrganization(project.organizationId, "logs"); + const presenter = new ErrorsListPresenter($replica, logsClickhouse); const listPromise = presenter .call(project.organizationId, environment.id, { diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs/route.tsx index 80a5c6ef232..6d668ddc9bf 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs/route.tsx @@ -16,7 +16,7 @@ import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { LogsListPresenter, LogEntry } from "~/presenters/v3/LogsListPresenter.server"; import type { LogLevel } from "~/utils/logUtils"; import { $replica, prisma } from "~/db.server"; -import { logsClickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { NavBar, PageTitle } from "~/components/primitives/PageHeader"; import { PageBody, PageContainer } from "~/components/layout/AppLayout"; import { Suspense, useCallback, useEffect, useMemo, useRef, useState, useTransition } from "react"; @@ -134,7 +134,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const plan = await getCurrentPlan(project.organizationId); const retentionLimitDays = plan?.v3Subscription?.plan?.limits.logRetentionDays.number ?? 30; - const presenter = new LogsListPresenter($replica, logsClickhouseClient); + const logsClickhouse = await getClickhouseForOrganization(project.organizationId, "logs"); + const presenter = new LogsListPresenter($replica, logsClickhouse); const listPromise = presenter .call(project.organizationId, environment.id, { diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.$modelId/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.$modelId/route.tsx index 2d2a0ac850f..1ec627ddb5e 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.$modelId/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.$modelId/route.tsx @@ -28,7 +28,7 @@ import type { QueryWidgetConfig } from "~/components/metrics/QueryWidget"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { ModelRegistryPresenter } from "~/presenters/v3/ModelRegistryPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { requireUserId } from "~/services/session.server"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; @@ -68,7 +68,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { throw new Response("Environment not found", { status: 404 }); } - const presenter = new ModelRegistryPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + const presenter = new ModelRegistryPresenter(clickhouse); const model = await presenter.getModelDetail(modelId); if (!model) { diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models._index/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models._index/route.tsx index 394530b6335..0f3da02b9c0 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models._index/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models._index/route.tsx @@ -68,7 +68,7 @@ import { type PopularModel, ModelRegistryPresenter, } from "~/presenters/v3/ModelRegistryPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { requireUserId } from "~/services/session.server"; import { useEnvironment } from "~/hooks/useEnvironment"; import { useOrganization } from "~/hooks/useOrganizations"; @@ -106,7 +106,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { throw new Response("Environment not found", { status: 404 }); } - const presenter = new ModelRegistryPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + const presenter = new ModelRegistryPresenter(clickhouse); const catalog = await presenter.getModelCatalog(); const now = new Date(); diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.compare/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.compare/route.tsx index 661fb294268..879dcf47e6b 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.compare/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.compare/route.tsx @@ -20,7 +20,7 @@ import { type ModelComparisonItem, ModelRegistryPresenter, } from "~/presenters/v3/ModelRegistryPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { requireUserId } from "~/services/session.server"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; @@ -55,7 +55,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { return typedjson({ comparison: [] as ModelComparisonItem[], models: responseModels }); } - const presenter = new ModelRegistryPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + const presenter = new ModelRegistryPresenter(clickhouse); const now = new Date(); const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug/route.tsx index 5a953c0199b..f37e8d3fed9 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug/route.tsx @@ -70,7 +70,7 @@ import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { type GenerationRow, PromptPresenter } from "~/presenters/v3/PromptPresenter.server"; import { SpanView } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { getResizableSnapshot } from "~/services/resizablePanel.server"; import { requireUserId } from "~/services/session.server"; import { PromptService } from "~/v3/services/promptService.server"; @@ -242,7 +242,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const startTime = fromTime ? new Date(fromTime) : new Date(Date.now() - periodMs); const endTime = toTime ? new Date(toTime) : new Date(); - const presenter = new PromptPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + const presenter = new PromptPresenter(clickhouse); let generations: Awaited>["generations"] = []; let generationsPagination: { next?: string } = {}; try { @@ -273,7 +274,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { // Load distinct filter values and resizable snapshots in parallel const distinctQuery = (col: string, name: string) => - clickhouseClient.reader.query({ + clickhouse.reader.query({ name, query: `SELECT DISTINCT ${col} AS val FROM trigger_dev.llm_metrics_v1 WHERE environment_id = {environmentId: String} AND prompt_slug = {promptSlug: String} AND ${col} != '' ORDER BY val`, params: z.object({ environmentId: z.string(), promptSlug: z.string() }), diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts._index/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts._index/route.tsx index 02c7cc444b7..4e229a48f74 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts._index/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts._index/route.tsx @@ -22,7 +22,7 @@ import { useProject } from "~/hooks/useProject"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { PromptPresenter } from "~/presenters/v3/PromptPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { requireUserId } from "~/services/session.server"; import { docsPath, EnvironmentParamSchema, v3PromptsPath } from "~/utils/pathBuilder"; import { LinkButton } from "~/components/primitives/Buttons"; @@ -46,7 +46,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { throw new Response("Environment not found", { status: 404 }); } - const presenter = new PromptPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + const presenter = new PromptPresenter(clickhouse); const prompts = await presenter.listPrompts(project.id, environment.id); const sparklines = await presenter.getUsageSparklines( diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx index dc1f3fa2703..de83d859f74 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx @@ -89,7 +89,7 @@ import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server"; import { RunEnvironmentMismatchError, RunPresenter } from "~/presenters/v3/RunPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { getImpersonationId } from "~/services/impersonation.server"; import { logger } from "~/services/logger.server"; import { getResizableSnapshot } from "~/services/resizablePanel.server"; @@ -179,7 +179,8 @@ async function getRunsListFromTableState({ return null; } - const runsListPresenter = new NextRunListPresenter($replica, clickhouseClient); + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + const runsListPresenter = new NextRunListPresenter($replica, clickhouse); const currentPageResult = await runsListPresenter.call(project.organizationId, environment.id, { userId, projectId: project.id, diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx index 9f8cf278bef..c5cc894c7c1 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx @@ -42,7 +42,7 @@ import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { getRunFiltersFromRequest } from "~/presenters/RunFilters.server"; import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { setRootOnlyFilterPreference, uiPreferencesStorage, @@ -85,7 +85,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const filters = await getRunFiltersFromRequest(request); - const presenter = new NextRunListPresenter($replica, clickhouseClient); + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + const presenter = new NextRunListPresenter($replica, clickhouse); const list = presenter.call(project.organizationId, environment.id, { userId, projectId: project.id, diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx index ee69419e1b7..38356c6a247 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx @@ -74,7 +74,7 @@ import { Dialog, DialogContent, DialogHeader, DialogTrigger } from "~/components import { DialogClose, DialogDescription } from "@radix-ui/react-dialog"; import { FormButtons } from "~/components/primitives/FormButtons"; import { $replica } from "~/db.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { RegionsPresenter, type Region } from "~/presenters/v3/RegionsPresenter.server"; import { TestSidebarTabs } from "./TestSidebarTabs"; import { AIPayloadTabContent } from "./AIPayloadTabContent"; @@ -102,7 +102,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }); } - const presenter = new TestTaskPresenter($replica, clickhouseClient); + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + const presenter = new TestTaskPresenter($replica, clickhouse); try { const [result, regionsResult] = await Promise.all([ presenter.call({ diff --git a/apps/webapp/app/routes/api.v1.prompts.$slug.ts b/apps/webapp/app/routes/api.v1.prompts.$slug.ts index 32ea1525c14..230ceb1277f 100644 --- a/apps/webapp/app/routes/api.v1.prompts.$slug.ts +++ b/apps/webapp/app/routes/api.v1.prompts.$slug.ts @@ -2,7 +2,7 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { prisma } from "~/db.server"; import { PromptPresenter } from "~/presenters/v3/PromptPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { createActionApiRoute, createLoaderApiRoute, @@ -33,6 +33,13 @@ export const loader = createLoaderApiRoute( slug: params.slug, }, }, + include: { + project: { + select: { + organizationId: true, + }, + }, + }, }); }, authorization: { @@ -46,7 +53,8 @@ export const loader = createLoaderApiRoute( return json({ error: "Prompt not found" }, { status: 404 }); } - const presenter = new PromptPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(prompt.project.organizationId, "standard"); + const presenter = new PromptPresenter(clickhouse); const version = await presenter.resolveVersion(prompt.id, { version: searchParams.version, label: searchParams.label, @@ -117,7 +125,8 @@ const { action } = createActionApiRoute( return json({ error: "Prompt not found" }, { status: 404 }); } - const presenter = new PromptPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(authentication.environment.organizationId, "standard"); + const presenter = new PromptPresenter(clickhouse); const version = await presenter.resolveVersion(prompt.id, { version: body.version, label: body.label, diff --git a/apps/webapp/app/routes/api.v1.prompts.$slug.versions.ts b/apps/webapp/app/routes/api.v1.prompts.$slug.versions.ts index c40b3e62dbf..17b88b12c42 100644 --- a/apps/webapp/app/routes/api.v1.prompts.$slug.versions.ts +++ b/apps/webapp/app/routes/api.v1.prompts.$slug.versions.ts @@ -2,7 +2,7 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { prisma } from "~/db.server"; import { PromptPresenter } from "~/presenters/v3/PromptPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; const ParamsSchema = z.object({ @@ -23,6 +23,13 @@ export const loader = createLoaderApiRoute( slug: params.slug, }, }, + include: { + project: { + select: { + organizationId: true, + }, + }, + }, }); }, authorization: { @@ -36,7 +43,8 @@ export const loader = createLoaderApiRoute( return json({ error: "Prompt not found" }, { status: 404 }); } - const presenter = new PromptPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(prompt.project.organizationId, "standard"); + const presenter = new PromptPresenter(clickhouse); const versions = await presenter.listVersions(prompt.id); return json({ diff --git a/apps/webapp/app/routes/api.v1.prompts._index.ts b/apps/webapp/app/routes/api.v1.prompts._index.ts index ccbc0ec38d0..44f2f86d0cb 100644 --- a/apps/webapp/app/routes/api.v1.prompts._index.ts +++ b/apps/webapp/app/routes/api.v1.prompts._index.ts @@ -1,6 +1,6 @@ import { json } from "@remix-run/server-runtime"; import { PromptPresenter } from "~/presenters/v3/PromptPresenter.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; export const loader = createLoaderApiRoute( @@ -15,7 +15,8 @@ export const loader = createLoaderApiRoute( }, }, async ({ authentication }) => { - const presenter = new PromptPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(authentication.environment.organizationId, "standard"); + const presenter = new PromptPresenter(clickhouse); const prompts = await presenter.listPrompts( authentication.environment.projectId, authentication.environment.id diff --git a/apps/webapp/app/routes/otel.v1.logs.ts b/apps/webapp/app/routes/otel.v1.logs.ts index a05ddd24cf2..1dc7c07c16c 100644 --- a/apps/webapp/app/routes/otel.v1.logs.ts +++ b/apps/webapp/app/routes/otel.v1.logs.ts @@ -4,12 +4,13 @@ import { otlpExporter } from "~/v3/otlpExporter.server"; export async function action({ request }: ActionFunctionArgs) { try { + const exporter = await otlpExporter; const contentType = request.headers.get("content-type")?.toLowerCase() ?? ""; if (contentType.startsWith("application/json")) { const body = await request.json(); - const exportResponse = await otlpExporter.exportLogs(body as ExportLogsServiceRequest); + const exportResponse = await exporter.exportLogs(body as ExportLogsServiceRequest); return json(exportResponse, { status: 200 }); } else if (contentType.startsWith("application/x-protobuf")) { @@ -17,7 +18,7 @@ export async function action({ request }: ActionFunctionArgs) { const exportRequest = ExportLogsServiceRequest.decode(new Uint8Array(buffer)); - const exportResponse = await otlpExporter.exportLogs(exportRequest); + const exportResponse = await exporter.exportLogs(exportRequest); return new Response(ExportLogsServiceResponse.encode(exportResponse).finish(), { status: 200, diff --git a/apps/webapp/app/routes/otel.v1.metrics.ts b/apps/webapp/app/routes/otel.v1.metrics.ts index 5529f9310ec..9a09cb18233 100644 --- a/apps/webapp/app/routes/otel.v1.metrics.ts +++ b/apps/webapp/app/routes/otel.v1.metrics.ts @@ -7,12 +7,13 @@ import { otlpExporter } from "~/v3/otlpExporter.server"; export async function action({ request }: ActionFunctionArgs) { try { + const exporter = await otlpExporter; const contentType = request.headers.get("content-type")?.toLowerCase() ?? ""; if (contentType.startsWith("application/json")) { const body = await request.json(); - const exportResponse = await otlpExporter.exportMetrics( + const exportResponse = await exporter.exportMetrics( body as ExportMetricsServiceRequest ); @@ -22,7 +23,7 @@ export async function action({ request }: ActionFunctionArgs) { const exportRequest = ExportMetricsServiceRequest.decode(new Uint8Array(buffer)); - const exportResponse = await otlpExporter.exportMetrics(exportRequest); + const exportResponse = await exporter.exportMetrics(exportRequest); return new Response(ExportMetricsServiceResponse.encode(exportResponse).finish(), { status: 200, diff --git a/apps/webapp/app/routes/otel.v1.traces.ts b/apps/webapp/app/routes/otel.v1.traces.ts index 609b72c0465..8e974c7b1dd 100644 --- a/apps/webapp/app/routes/otel.v1.traces.ts +++ b/apps/webapp/app/routes/otel.v1.traces.ts @@ -4,12 +4,13 @@ import { otlpExporter } from "~/v3/otlpExporter.server"; export async function action({ request }: ActionFunctionArgs) { try { + const exporter = await otlpExporter; const contentType = request.headers.get("content-type")?.toLowerCase() ?? ""; if (contentType.startsWith("application/json")) { const body = await request.json(); - const exportResponse = await otlpExporter.exportTraces(body as ExportTraceServiceRequest); + const exportResponse = await exporter.exportTraces(body as ExportTraceServiceRequest); return json(exportResponse, { status: 200 }); } else if (contentType.startsWith("application/x-protobuf")) { @@ -17,7 +18,7 @@ export async function action({ request }: ActionFunctionArgs) { const exportRequest = ExportTraceServiceRequest.decode(new Uint8Array(buffer)); - const exportResponse = await otlpExporter.exportTraces(exportRequest); + const exportResponse = await exporter.exportTraces(exportRequest); return new Response(ExportTraceServiceResponse.encode(exportResponse).finish(), { status: 200, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.$logId.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.$logId.tsx index f862ced6b05..0e0469bcd1b 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.$logId.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.$logId.tsx @@ -1,7 +1,7 @@ import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; import { typedjson } from "remix-typedjson"; import { z } from "zod"; -import { logsClickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { requireUserId } from "~/services/session.server"; import { LogDetailPresenter } from "~/presenters/v3/LogDetailPresenter.server"; import { findProjectBySlug } from "~/models/project.server"; @@ -43,7 +43,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const [traceId, spanId, , startTime] = parts; - const presenter = new LogDetailPresenter($replica, logsClickhouseClient); + const logsClickhouse = await getClickhouseForOrganization(project.organizationId, "logs"); + const presenter = new LogDetailPresenter($replica, logsClickhouse); let result; try { diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.ts index 66ddebe4e2a..d55c7496258 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.ts @@ -6,7 +6,7 @@ import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { LogsListPresenter, type LogLevel, LogsListOptionsSchema } from "~/presenters/v3/LogsListPresenter.server"; import { $replica } from "~/db.server"; -import { logsClickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { getCurrentPlan } from "~/services/platform.v3.server"; // Valid log levels for filtering @@ -69,7 +69,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { retentionLimitDays, }) as any; // Validated by LogsListOptionsSchema at runtime - const presenter = new LogsListPresenter($replica, logsClickhouseClient); + const logsClickhouse = await getClickhouseForOrganization(project.organizationId, "logs"); + const presenter = new LogsListPresenter($replica, logsClickhouse); const result = await presenter.call(project.organizationId, environment.id, options); return json({ diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug.generations.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug.generations.ts index 77a55ec3f0b..17a11e05837 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug.generations.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug.generations.ts @@ -6,7 +6,7 @@ import { EnvironmentParamSchema } from "~/utils/pathBuilder"; import { parsePeriodToMs } from "~/utils/periods"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { PromptPresenter, type GenerationRow, @@ -59,7 +59,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const operations = url.searchParams.getAll("operations").filter(Boolean); const providers = url.searchParams.getAll("providers").filter(Boolean); - const presenter = new PromptPresenter(clickhouseClient); + const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard"); + const presenter = new PromptPresenter(clickhouse); const result = await presenter.listGenerations({ environmentId: environment.id, promptSlug, diff --git a/apps/webapp/app/services/admin/missingLlmModels.server.ts b/apps/webapp/app/services/admin/missingLlmModels.server.ts index 7ce6bc2ab7e..07e6160ee03 100644 --- a/apps/webapp/app/services/admin/missingLlmModels.server.ts +++ b/apps/webapp/app/services/admin/missingLlmModels.server.ts @@ -1,4 +1,4 @@ -import { adminClickhouseClient } from "~/services/clickhouseInstance.server"; +import { getAdminClickhouse } from "~/services/clickhouse/clickhouseFactory.server"; import { llmPricingRegistry } from "~/v3/llmPricingRegistry.server"; export type MissingLlmModel = { @@ -13,8 +13,10 @@ export async function getMissingLlmModels(opts: { const lookbackHours = opts.lookbackHours ?? 24; const since = new Date(Date.now() - lookbackHours * 60 * 60 * 1000); + const adminClickhouse = getAdminClickhouse(); + // queryBuilderFast returns a factory function — call it to get the builder - const createBuilder = adminClickhouseClient.reader.queryBuilderFast<{ + const createBuilder = adminClickhouse.reader.queryBuilderFast<{ model: string; system: string; cnt: string; @@ -93,7 +95,9 @@ export async function getMissingModelSamples(opts: { const limit = opts.limit ?? 10; const since = new Date(Date.now() - lookbackHours * 60 * 60 * 1000); - const createBuilder = adminClickhouseClient.reader.queryBuilderFast({ + const adminClickhouse = getAdminClickhouse(); + + const createBuilder = adminClickhouse.reader.queryBuilderFast({ name: "missingModelSamples", table: "trigger_dev.task_events_v2", columns: [ diff --git a/apps/webapp/app/services/clickhouse/clickhouseCredentialsService.server.ts b/apps/webapp/app/services/clickhouse/clickhouseCredentialsService.server.ts new file mode 100644 index 00000000000..c2c8c77f7c4 --- /dev/null +++ b/apps/webapp/app/services/clickhouse/clickhouseCredentialsService.server.ts @@ -0,0 +1,109 @@ +import { getSecretStore } from "~/services/secrets/secretStore.server"; +import { prisma } from "~/db.server"; +import { + ClickhouseConnectionSchema, + getClickhouseSecretKey, +} from "./clickhouseSecretSchemas.server"; +import { clearClickhouseCacheForOrganization } from "./clickhouseFactory.server"; + +export async function setOrganizationClickhouseUrl( + organizationId: string, + clientType: "standard" | "events" | "replication", + url: string +): Promise { + // Validate URL format + const connection = ClickhouseConnectionSchema.parse({ url }); + + // Store in SecretStore + const secretStore = getSecretStore("DATABASE"); + const secretKey = getClickhouseSecretKey(organizationId, clientType); + await secretStore.setSecret(secretKey, connection); + + // Update featureFlags to reference the secret + const org = await prisma.organization.findUnique({ + where: { id: organizationId }, + select: { featureFlags: true }, + }); + + const featureFlags = (org?.featureFlags || {}) as any; + const clickhouseConfig = featureFlags.clickhouse || {}; + clickhouseConfig[clientType] = secretKey; + featureFlags.clickhouse = clickhouseConfig; + + await prisma.organization.update({ + where: { id: organizationId }, + data: { featureFlags }, + }); + + // Clear cache + clearClickhouseCacheForOrganization(organizationId); +} + +export async function removeOrganizationClickhouseUrl( + organizationId: string, + clientType: "standard" | "events" | "replication" +): Promise { + // Remove from SecretStore + const secretStore = getSecretStore("DATABASE"); + const secretKey = getClickhouseSecretKey(organizationId, clientType); + await secretStore.deleteSecret(secretKey); + + // Update featureFlags + const org = await prisma.organization.findUnique({ + where: { id: organizationId }, + select: { featureFlags: true }, + }); + + if (org?.featureFlags) { + const featureFlags = org.featureFlags as any; + if (featureFlags.clickhouse && featureFlags.clickhouse[clientType]) { + delete featureFlags.clickhouse[clientType]; + + // If no more clickhouse configs, remove the clickhouse key entirely + if (Object.keys(featureFlags.clickhouse).length === 0) { + delete featureFlags.clickhouse; + } + + await prisma.organization.update({ + where: { id: organizationId }, + data: { featureFlags }, + }); + } + } + + // Clear cache + clearClickhouseCacheForOrganization(organizationId); +} + +export async function getOrganizationClickhouseUrl( + organizationId: string, + clientType: "standard" | "events" | "replication" +): Promise { + const org = await prisma.organization.findUnique({ + where: { id: organizationId }, + select: { featureFlags: true }, + }); + + if (!org?.featureFlags) { + return null; + } + + const clickhouseConfig = (org.featureFlags as any).clickhouse; + if (!clickhouseConfig || typeof clickhouseConfig !== "object") { + return null; + } + + const secretKey = clickhouseConfig[clientType]; + if (!secretKey || typeof secretKey !== "string") { + return null; + } + + const secretStore = getSecretStore("DATABASE"); + const connection = await secretStore.getSecret(ClickhouseConnectionSchema, secretKey); + + if (!connection) { + return null; + } + + return connection.url; +} diff --git a/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts new file mode 100644 index 00000000000..94498868759 --- /dev/null +++ b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts @@ -0,0 +1,422 @@ +/** + * ClickHouse Factory - Organization-Scoped ClickHouse Routing + * + * This module provides organization-scoped ClickHouse instance routing to support: + * - HIPAA compliance (dedicated ClickHouse clusters) + * - High-volume customer isolation + * - Geographic data residency requirements + * - Performance tier differentiation + * + * ## Architecture + * + * ### Credential Storage + * - ClickHouse URLs stored encrypted in SecretStore (AES-256-GCM) + * - Organization references secret via `featureFlags.clickhouse` JSON + * - No plaintext credentials in database + * + * ### Caching Strategy + * - **Org configs**: Unkey cache with LRU memory (5min fresh, 10min stale, SWR) + * - **ClickHouse clients**: Cached by hostname hash (multiple orgs share same instance) + * - **Event repositories**: Cached by hostname hash (stateful, must be reused) + * - **Security**: Memory-only cache for org configs (no credentials in Redis) + * + * ## Usage in Presenters + * + * Presenters should fetch org-specific ClickHouse clients in their `call()` method: + * + * ```typescript + * import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; + * + * export class MyPresenter extends BasePresenter { + * constructor(private options: PresenterOptions = {}) { + * super(); + * } + * + * async call({ organizationId, ... }) { + * const clickhouse = await getClickhouseForOrganization(organizationId, "standard"); + * // Use clickhouse for queries... + * } + * } + * ``` + * + * ## Usage in Services + * + * The replication service and OTLP exporter automatically route data by organization. + * Other services should follow the same pattern when working with ClickHouse. + * + * @module clickhouseFactory + */ + +import { ClickHouse } from "@internal/clickhouse"; +import { createHash } from "crypto"; +import { createCache, DefaultStatefulContext, Namespace } from "@unkey/cache"; +import { createLRUMemoryStore } from "@internal/cache"; +import { getSecretStore } from "~/services/secrets/secretStore.server"; +import { prisma } from "~/db.server"; +import { + ClickhouseConnectionSchema, + getClickhouseSecretKey, +} from "./clickhouseSecretSchemas.server"; +import { ClickhouseEventRepository } from "~/v3/eventRepository/clickhouseEventRepository.server"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; + +// Module-level caches for ClickHouse clients and event repositories +const clickhouseClientCache = new Map(); +const eventRepositoryCache = new Map(); + +// Default ClickHouse clients (not exported - internal use only) +const defaultClickhouseClient = singleton("clickhouseClient", initializeClickhouseClient); + +function initializeClickhouseClient() { + const url = new URL(env.CLICKHOUSE_URL); + url.searchParams.delete("secure"); + + console.log(`šŸ—ƒļø Clickhouse service enabled to host ${url.host}`); + + return new ClickHouse({ + url: url.toString(), + name: "clickhouse-instance", + keepAlive: { + enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.CLICKHOUSE_LOG_LEVEL, + compression: { + request: true, + }, + maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); +} + +const defaultLogsClickhouseClient = singleton( + "logsClickhouseClient", + initializeLogsClickhouseClient +); + +function initializeLogsClickhouseClient() { + if (!env.LOGS_CLICKHOUSE_URL) { + throw new Error("LOGS_CLICKHOUSE_URL is not set"); + } + + const url = new URL(env.LOGS_CLICKHOUSE_URL); + url.searchParams.delete("secure"); + + return new ClickHouse({ + url: url.toString(), + name: "logs-clickhouse", + keepAlive: { + enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.CLICKHOUSE_LOG_LEVEL, + compression: { + request: true, + }, + maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, + clickhouseSettings: { + max_memory_usage: env.CLICKHOUSE_LOGS_LIST_MAX_MEMORY_USAGE.toString(), + max_bytes_before_external_sort: + env.CLICKHOUSE_LOGS_LIST_MAX_BYTES_BEFORE_EXTERNAL_SORT.toString(), + max_threads: env.CLICKHOUSE_LOGS_LIST_MAX_THREADS, + ...(env.CLICKHOUSE_LOGS_LIST_MAX_ROWS_TO_READ && { + max_rows_to_read: env.CLICKHOUSE_LOGS_LIST_MAX_ROWS_TO_READ.toString(), + }), + ...(env.CLICKHOUSE_LOGS_LIST_MAX_EXECUTION_TIME && { + max_execution_time: env.CLICKHOUSE_LOGS_LIST_MAX_EXECUTION_TIME, + }), + }, + }); +} + +const defaultAdminClickhouseClient = singleton( + "adminClickhouseClient", + initializeAdminClickhouseClient +); + +function initializeAdminClickhouseClient() { + if (!env.ADMIN_CLICKHOUSE_URL) { + throw new Error("ADMIN_CLICKHOUSE_URL is not set"); + } + + const url = new URL(env.ADMIN_CLICKHOUSE_URL); + url.searchParams.delete("secure"); + + return new ClickHouse({ + url: url.toString(), + name: "admin-clickhouse", + keepAlive: { + enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.CLICKHOUSE_LOG_LEVEL, + compression: { + request: true, + }, + maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); +} + +const defaultQueryClickhouseClient = singleton( + "queryClickhouseClient", + initializeQueryClickhouseClient +); + +function initializeQueryClickhouseClient() { + if (!env.QUERY_CLICKHOUSE_URL) { + throw new Error("QUERY_CLICKHOUSE_URL is not set"); + } + + const url = new URL(env.QUERY_CLICKHOUSE_URL); + url.searchParams.delete("secure"); + + return new ClickHouse({ + url: url.toString(), + name: "query-clickhouse", + keepAlive: { + enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.CLICKHOUSE_LOG_LEVEL, + compression: { + request: true, + }, + maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); +} + +// Org config cache with Unkey (memory-only, no Redis for security) +type OrgClickhouseConfig = { + organizationId: string; + hostnameHash: string; + url: string; + clientType: string; +}; + +const ctx = new DefaultStatefulContext(); +const memory = createLRUMemoryStore(1000); + +const orgConfigCache = createCache({ + orgClickhouse: new Namespace(ctx, { + stores: [memory], // Memory-only, no Redis store for security + fresh: 5 * 60 * 1000, // 5 minutes + stale: 10 * 60 * 1000, // 10 minutes (SWR pattern) + }), +}); + +function hashHostname(url: string): string { + const parsed = new URL(url); + return createHash("sha256").update(parsed.hostname).digest("hex"); +} + +async function getOrgClickhouseConfig( + ctx: DefaultStatefulContext, + orgId: string, + clientType: string +): Promise { + const org = await prisma.organization.findUnique({ + where: { id: orgId }, + select: { featureFlags: true }, + }); + + if (!org?.featureFlags) { + return null; + } + + const clickhouseConfig = (org.featureFlags as any).clickhouse; + if (!clickhouseConfig || typeof clickhouseConfig !== "object") { + return null; + } + + const secretKey = clickhouseConfig[clientType]; + if (!secretKey || typeof secretKey !== "string") { + return null; + } + + const secretStore = getSecretStore("DATABASE"); + const connection = await secretStore.getSecret(ClickhouseConnectionSchema, secretKey); + + if (!connection) { + return null; + } + + const hostnameHash = hashHostname(connection.url); + + return { + organizationId: orgId, + hostnameHash, + url: connection.url, + clientType, + }; +} + +export async function getClickhouseForOrganization( + organizationId: string, + clientType: "standard" | "events" | "replication" | "logs" | "query" | "admin" +): Promise { + // Try to get org-specific config + const configResult = await orgConfigCache.orgClickhouse.swr( + `org:${organizationId}:ch:${clientType}`, + async () => getOrgClickhouseConfig(ctx, organizationId, clientType) + ); + + // Handle Result type - check for error or null value + const config = configResult.err ? null : configResult.val; + + // If no custom config, return appropriate default client + if (!config) { + switch (clientType) { + case "standard": + case "events": + case "replication": + return defaultClickhouseClient; + case "logs": + return defaultLogsClickhouseClient; + case "query": + return defaultQueryClickhouseClient; + case "admin": + return defaultAdminClickhouseClient; + } + } + + // Check if client already exists for this hostname + const cacheKey = `${config.hostnameHash}:${clientType}`; + let client = clickhouseClientCache.get(cacheKey); + + if (!client) { + const url = new URL(config.url); + url.searchParams.delete("secure"); + + client = new ClickHouse({ + url: url.toString(), + name: `org-clickhouse-${clientType}`, + keepAlive: { + enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.CLICKHOUSE_LOG_LEVEL, + compression: { + request: true, + }, + maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); + clickhouseClientCache.set(cacheKey, client); + } + + return client; +} + +export async function getEventRepositoryForOrganization( + organizationId: string +): Promise { + // Try to get org-specific config + const configResult = await orgConfigCache.orgClickhouse.swr( + `org:${organizationId}:ch:events`, + async () => getOrgClickhouseConfig(ctx, organizationId, "events") + ); + + // Handle Result type - check for error or null value + const config = configResult.err ? null : configResult.val; + + // If no custom config, return default repository (created on demand) + if (!config) { + const defaultKey = "default:events"; + let defaultRepo = eventRepositoryCache.get(defaultKey); + if (!defaultRepo) { + // Create default event repository using standard clickhouse client + // This matches the existing pattern in clickhouseEventRepositoryInstance.server.ts + const eventsClickhouse = await getEventsClickhouseClient(); + defaultRepo = new ClickhouseEventRepository({ + clickhouse: eventsClickhouse, + batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE, + flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS, + maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT, + maximumTraceDetailedSummaryViewCount: + env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT, + maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING, + insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY, + waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", + asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, + asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, + startTimeMaxAgeMs: env.EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS, + llmMetricsBatchSize: env.LLM_METRICS_BATCH_SIZE, + llmMetricsFlushInterval: env.LLM_METRICS_FLUSH_INTERVAL_MS, + llmMetricsMaxBatchSize: env.LLM_METRICS_MAX_BATCH_SIZE, + llmMetricsMaxConcurrency: env.LLM_METRICS_MAX_CONCURRENCY, + version: "v2", + }); + eventRepositoryCache.set(defaultKey, defaultRepo); + } + return defaultRepo; + } + + // Check if repository already exists for this hostname + const cacheKey = `${config.hostnameHash}:events`; + let repository = eventRepositoryCache.get(cacheKey); + + if (!repository) { + const client = await getClickhouseForOrganization(organizationId, "events"); + repository = new ClickhouseEventRepository({ + clickhouse: client, + batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE, + flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS, + maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT, + maximumTraceDetailedSummaryViewCount: + env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT, + maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING, + insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY, + waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", + asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, + asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, + startTimeMaxAgeMs: env.EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS, + llmMetricsBatchSize: env.LLM_METRICS_BATCH_SIZE, + llmMetricsFlushInterval: env.LLM_METRICS_FLUSH_INTERVAL_MS, + llmMetricsMaxBatchSize: env.LLM_METRICS_MAX_BATCH_SIZE, + llmMetricsMaxConcurrency: env.LLM_METRICS_MAX_CONCURRENCY, + version: "v2", + }); + eventRepositoryCache.set(cacheKey, repository); + } + + return repository; +} + +// Helper to create the default events ClickHouse client +async function getEventsClickhouseClient(): Promise { + if (!env.EVENTS_CLICKHOUSE_URL) { + throw new Error("EVENTS_CLICKHOUSE_URL is not set"); + } + + const url = new URL(env.EVENTS_CLICKHOUSE_URL); + url.searchParams.delete("secure"); + + return new ClickHouse({ + url: url.toString(), + name: "task-events", + keepAlive: { + enabled: env.EVENTS_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.EVENTS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.EVENTS_CLICKHOUSE_LOG_LEVEL, + compression: { + request: env.EVENTS_CLICKHOUSE_COMPRESSION_REQUEST === "1", + }, + maxOpenConnections: env.EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); +} + +/** + * Get admin ClickHouse client for cross-organization queries + * This should only be used for admin tools and analytics that need to query across all orgs + */ +export function getAdminClickhouse(): ClickHouse { + return defaultAdminClickhouseClient; +} + +// Clear caches when needed (e.g., when org config changes) +export function clearClickhouseCacheForOrganization(organizationId: string): void { + // The Unkey cache will naturally expire based on TTL (5min fresh, 10min stale) + // No explicit removal needed - cache entries will be refreshed on next access + // Note: We don't clear client/repository caches as they're keyed by hostname + // and may be shared by other orgs +} diff --git a/apps/webapp/app/services/clickhouse/clickhouseFactory.test.ts b/apps/webapp/app/services/clickhouse/clickhouseFactory.test.ts new file mode 100644 index 00000000000..f0b24b941b1 --- /dev/null +++ b/apps/webapp/app/services/clickhouse/clickhouseFactory.test.ts @@ -0,0 +1,155 @@ +import { describe, it, expect, beforeEach, vi } from "vitest"; +import { prisma } from "~/db.server"; +import { + getClickhouseForOrganization, + getEventRepositoryForOrganization, + clearClickhouseCacheForOrganization, +} from "./clickhouseFactory.server"; +import { + setOrganizationClickhouseUrl, + removeOrganizationClickhouseUrl, + getOrganizationClickhouseUrl, +} from "./clickhouseCredentialsService.server"; + +describe("ClickHouse Factory", () => { + const testOrgId = "test-org-" + Date.now(); + const testClickhouseUrl = "https://test-ch.example.com:8443?user=test&password=secret"; + + beforeEach(async () => { + // Clean up any existing test data + await prisma.organization.deleteMany({ + where: { id: testOrgId }, + }); + }); + + it("should return default ClickHouse client when org has no custom config", async () => { + const client = await getClickhouseForOrganization(testOrgId, "standard"); + expect(client).toBeDefined(); + // Default client should be returned (not null) + expect(client).toBeTruthy(); + }); + + it("should set and retrieve organization ClickHouse URL", async () => { + // First create the test organization + await prisma.organization.create({ + data: { + id: testOrgId, + title: "Test Org", + slug: "test-org-" + Date.now(), + }, + }); + + // Set the URL + await setOrganizationClickhouseUrl(testOrgId, "standard", testClickhouseUrl); + + // Retrieve it + const retrievedUrl = await getOrganizationClickhouseUrl(testOrgId, "standard"); + expect(retrievedUrl).toBe(testClickhouseUrl); + + // Verify it's stored in featureFlags + const org = await prisma.organization.findUnique({ + where: { id: testOrgId }, + select: { featureFlags: true }, + }); + + expect(org?.featureFlags).toBeDefined(); + const featureFlags = org?.featureFlags as any; + expect(featureFlags.clickhouse).toBeDefined(); + expect(featureFlags.clickhouse.standard).toBeDefined(); + + // Clean up + await removeOrganizationClickhouseUrl(testOrgId, "standard"); + await prisma.organization.delete({ where: { id: testOrgId } }); + }); + + it("should remove organization ClickHouse URL", async () => { + // First create the test organization + await prisma.organization.create({ + data: { + id: testOrgId, + title: "Test Org", + slug: "test-org-" + Date.now(), + }, + }); + + // Set and then remove + await setOrganizationClickhouseUrl(testOrgId, "standard", testClickhouseUrl); + await removeOrganizationClickhouseUrl(testOrgId, "standard"); + + // Verify it's gone + const retrievedUrl = await getOrganizationClickhouseUrl(testOrgId, "standard"); + expect(retrievedUrl).toBeNull(); + + // Clean up + await prisma.organization.delete({ where: { id: testOrgId } }); + }); + + it("should cache ClickHouse clients by hostname", async () => { + // This test verifies that multiple orgs pointing to the same ClickHouse hostname + // share the same client instance (deduplication) + + const org1Id = testOrgId + "-1"; + const org2Id = testOrgId + "-2"; + + // Create test organizations + await prisma.organization.createMany({ + data: [ + { id: org1Id, title: "Test Org 1", slug: "test-org-1-" + Date.now() }, + { id: org2Id, title: "Test Org 2", slug: "test-org-2-" + Date.now() }, + ], + }); + + // Set both orgs to use the same ClickHouse URL + await setOrganizationClickhouseUrl(org1Id, "standard", testClickhouseUrl); + await setOrganizationClickhouseUrl(org2Id, "standard", testClickhouseUrl); + + // Get clients for both orgs + const client1 = await getClickhouseForOrganization(org1Id, "standard"); + const client2 = await getClickhouseForOrganization(org2Id, "standard"); + + // Both should be defined + expect(client1).toBeDefined(); + expect(client2).toBeDefined(); + + // They should be the same instance (cached by hostname) + expect(client1).toBe(client2); + + // Clean up + await removeOrganizationClickhouseUrl(org1Id, "standard"); + await removeOrganizationClickhouseUrl(org2Id, "standard"); + await prisma.organization.deleteMany({ + where: { id: { in: [org1Id, org2Id] } }, + }); + }); + + it("should clear cache when organization config changes", async () => { + // Create test organization + await prisma.organization.create({ + data: { + id: testOrgId, + title: "Test Org", + slug: "test-org-" + Date.now(), + }, + }); + + // Set URL + await setOrganizationClickhouseUrl(testOrgId, "standard", testClickhouseUrl); + + // Get client to populate cache + const client1 = await getClickhouseForOrganization(testOrgId, "standard"); + + // Clear cache + clearClickhouseCacheForOrganization(testOrgId); + + // Get client again (should hit the database again, not cache) + const client2 = await getClickhouseForOrganization(testOrgId, "standard"); + + // Both should be defined + expect(client1).toBeDefined(); + expect(client2).toBeDefined(); + + // Clean up + await removeOrganizationClickhouseUrl(testOrgId, "standard"); + await prisma.organization.delete({ where: { id: testOrgId } }); + }); +}); diff --git a/apps/webapp/app/services/clickhouse/clickhouseSecretSchemas.server.ts b/apps/webapp/app/services/clickhouse/clickhouseSecretSchemas.server.ts new file mode 100644 index 00000000000..016eb717c18 --- /dev/null +++ b/apps/webapp/app/services/clickhouse/clickhouseSecretSchemas.server.ts @@ -0,0 +1,11 @@ +import { z } from "zod"; + +export const ClickhouseConnectionSchema = z.object({ + url: z.string().url(), +}); + +export type ClickhouseConnection = z.infer; + +export function getClickhouseSecretKey(orgId: string, clientType: string): string { + return `org:${orgId}:clickhouse:${clientType}`; +} diff --git a/apps/webapp/app/services/clickhouseInstance.server.ts b/apps/webapp/app/services/clickhouseInstance.server.ts deleted file mode 100644 index 9c4941671f3..00000000000 --- a/apps/webapp/app/services/clickhouseInstance.server.ts +++ /dev/null @@ -1,130 +0,0 @@ -import { ClickHouse } from "@internal/clickhouse"; -import { env } from "~/env.server"; -import { singleton } from "~/utils/singleton"; - -export const clickhouseClient = singleton("clickhouseClient", initializeClickhouseClient); - -function initializeClickhouseClient() { - const url = new URL(env.CLICKHOUSE_URL); - - // Remove secure param - url.searchParams.delete("secure"); - - console.log(`šŸ—ƒļø Clickhouse service enabled to host ${url.host}`); - - const clickhouse = new ClickHouse({ - url: url.toString(), - name: "clickhouse-instance", - keepAlive: { - enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", - idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, - }, - logLevel: env.CLICKHOUSE_LOG_LEVEL, - compression: { - request: true, - }, - maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, - }); - - return clickhouse; -} - -export const logsClickhouseClient = singleton( - "logsClickhouseClient", - initializeLogsClickhouseClient -); - -function initializeLogsClickhouseClient() { - if (!env.LOGS_CLICKHOUSE_URL) { - throw new Error("LOGS_CLICKHOUSE_URL is not set"); - } - - const url = new URL(env.LOGS_CLICKHOUSE_URL); - - // Remove secure param - url.searchParams.delete("secure"); - - return new ClickHouse({ - url: url.toString(), - name: "logs-clickhouse", - keepAlive: { - enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", - idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, - }, - logLevel: env.CLICKHOUSE_LOG_LEVEL, - compression: { - request: true, - }, - maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, - clickhouseSettings: { - max_memory_usage: env.CLICKHOUSE_LOGS_LIST_MAX_MEMORY_USAGE.toString(), - max_bytes_before_external_sort: - env.CLICKHOUSE_LOGS_LIST_MAX_BYTES_BEFORE_EXTERNAL_SORT.toString(), - max_threads: env.CLICKHOUSE_LOGS_LIST_MAX_THREADS, - ...(env.CLICKHOUSE_LOGS_LIST_MAX_ROWS_TO_READ && { - max_rows_to_read: env.CLICKHOUSE_LOGS_LIST_MAX_ROWS_TO_READ.toString(), - }), - ...(env.CLICKHOUSE_LOGS_LIST_MAX_EXECUTION_TIME && { - max_execution_time: env.CLICKHOUSE_LOGS_LIST_MAX_EXECUTION_TIME, - }), - }, - }); -} - -export const adminClickhouseClient = singleton( - "adminClickhouseClient", - initializeAdminClickhouseClient -); - -function initializeAdminClickhouseClient() { - if (!env.ADMIN_CLICKHOUSE_URL) { - throw new Error("ADMIN_CLICKHOUSE_URL is not set"); - } - - const url = new URL(env.ADMIN_CLICKHOUSE_URL); - url.searchParams.delete("secure"); - - return new ClickHouse({ - url: url.toString(), - name: "admin-clickhouse", - keepAlive: { - enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", - idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, - }, - logLevel: env.CLICKHOUSE_LOG_LEVEL, - compression: { - request: true, - }, - maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, - }); -} - -export const queryClickhouseClient = singleton( - "queryClickhouseClient", - initializeQueryClickhouseClient -); - -function initializeQueryClickhouseClient() { - if (!env.QUERY_CLICKHOUSE_URL) { - throw new Error("QUERY_CLICKHOUSE_URL is not set"); - } - - const url = new URL(env.QUERY_CLICKHOUSE_URL); - - // Remove secure param - url.searchParams.delete("secure"); - - return new ClickHouse({ - url: url.toString(), - name: "query-clickhouse", - keepAlive: { - enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", - idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, - }, - logLevel: env.CLICKHOUSE_LOG_LEVEL, - compression: { - request: true, - }, - maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, - }); -} diff --git a/apps/webapp/app/services/queryService.server.ts b/apps/webapp/app/services/queryService.server.ts index 1f3bdbba18a..f24df9eb023 100644 --- a/apps/webapp/app/services/queryService.server.ts +++ b/apps/webapp/app/services/queryService.server.ts @@ -11,7 +11,7 @@ import type { TableSchema, WhereClauseCondition } from "@internal/tsql"; import { z } from "zod"; import { prisma } from "~/db.server"; import { env } from "~/env.server"; -import { queryClickhouseClient } from "./clickhouseInstance.server"; +import { getClickhouseForOrganization } from "./clickhouse/clickhouseFactory.server"; import { queryConcurrencyLimiter, DEFAULT_ORG_CONCURRENCY_LIMIT, @@ -275,7 +275,8 @@ export async function executeQuery( environment: Object.fromEntries(environments.map((e) => [e.id, e.slug])), }; - const result = await executeTSQL(queryClickhouseClient.reader, { + const queryClickhouse = await getClickhouseForOrganization(organizationId, "query"); + const result = await executeTSQL(queryClickhouse.reader, { ...baseOptions, schema: z.record(z.any()), tableSchema: querySchemas, diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 7930c05481f..ca1fba686a2 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -617,18 +617,65 @@ export class RunsReplicationService { payloadInserts: payloadInserts.length, }); + // Group task runs by organization for routing to correct ClickHouse instance + const taskRunsByOrg = new Map(); + for (const taskRun of taskRunInserts) { + const orgId = getTaskRunField(taskRun, "organization_id"); + const orgRuns = taskRunsByOrg.get(orgId) || []; + orgRuns.push(taskRun); + taskRunsByOrg.set(orgId, orgRuns); + } + + // Group payloads by organization (extract from run_id -> task runs mapping) + const payloadsByOrg = new Map(); + for (const payload of payloadInserts) { + const runId = getPayloadField(payload, "run_id"); + // Find the corresponding task run to get its organization + const taskRun = taskRunInserts.find((tr) => getTaskRunField(tr, "run_id") === runId); + if (taskRun) { + const orgId = getTaskRunField(taskRun, "organization_id"); + const orgPayloads = payloadsByOrg.get(orgId) || []; + orgPayloads.push(payload); + payloadsByOrg.set(orgId, orgPayloads); + } + } + // Insert task runs and payloads with retry logic for connection errors - const [taskRunError, taskRunResult] = await this.#insertWithRetry( - (attempt) => this.#insertTaskRunInserts(taskRunInserts, attempt), - "task run inserts", - flushId + // Process each organization's data in parallel + const insertPromises = Array.from(taskRunsByOrg.entries()).map( + async ([orgId, orgTaskRuns]) => { + const orgPayloads = payloadsByOrg.get(orgId) || []; + + const [taskRunError, taskRunResult] = await this.#insertWithRetry( + (attempt) => this.#insertTaskRunInserts(orgId, orgTaskRuns, attempt), + "task run inserts", + flushId + ); + + const [payloadError, payloadResult] = await this.#insertWithRetry( + (attempt) => this.#insertPayloadInserts(orgId, orgPayloads, attempt), + "payload inserts", + flushId + ); + + return { taskRunError, payloadError, orgId }; + } ); - const [payloadError, payloadResult] = await this.#insertWithRetry( - (attempt) => this.#insertPayloadInserts(payloadInserts, attempt), - "payload inserts", - flushId - ); + const results = await Promise.all(insertPromises); + + // Aggregate errors from all organizations + let taskRunError: Error | null = null; + let payloadError: Error | null = null; + + for (const result of results) { + if (result.taskRunError) { + taskRunError = result.taskRunError; + } + if (result.payloadError) { + payloadError = result.payloadError; + } + } // Log any errors that occurred if (taskRunError) { @@ -770,19 +817,32 @@ export class RunsReplicationService { }; } - async #insertTaskRunInserts(taskRunInserts: TaskRunInsertArray[], attempt: number) { + async #insertTaskRunInserts( + organizationId: string, + taskRunInserts: TaskRunInsertArray[], + attempt: number + ) { return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { - const [insertError, insertResult] = - await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, { + // Get the appropriate ClickHouse client for this organization + const { getClickhouseForOrganization } = await import( + "~/services/clickhouse/clickhouseFactory.server" + ); + const clickhouse = await getClickhouseForOrganization(organizationId, "replication"); + + const [insertError, insertResult] = await clickhouse.taskRuns.insertCompactArrays( + taskRunInserts, + { params: { clickhouse_settings: this.#getClickhouseInsertSettings(), }, - }); + } + ); if (insertError) { this.logger.error("Error inserting task run inserts attempt", { error: insertError, attempt, + organizationId, }); recordSpanError(span, insertError); @@ -793,19 +853,32 @@ export class RunsReplicationService { }); } - async #insertPayloadInserts(payloadInserts: PayloadInsertArray[], attempt: number) { + async #insertPayloadInserts( + organizationId: string, + payloadInserts: PayloadInsertArray[], + attempt: number + ) { return await startSpan(this._tracer, "insertPayloadInserts", async (span) => { - const [insertError, insertResult] = - await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, { + // Get the appropriate ClickHouse client for this organization + const { getClickhouseForOrganization } = await import( + "~/services/clickhouse/clickhouseFactory.server" + ); + const clickhouse = await getClickhouseForOrganization(organizationId, "replication"); + + const [insertError, insertResult] = await clickhouse.taskRuns.insertPayloadsCompactArrays( + payloadInserts, + { params: { clickhouse_settings: this.#getClickhouseInsertSettings(), }, - }); + } + ); if (insertError) { this.logger.error("Error inserting payload inserts attempt", { error: insertError, attempt, + organizationId, }); recordSpanError(span, insertError); diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index 7505693e3ab..f8b22d7c442 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -20,7 +20,6 @@ import { } from "@trigger.dev/otlp-importer"; import type { MetricsV1Input } from "@internal/clickhouse"; import { logger } from "~/services/logger.server"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server"; import { ClickhouseEventRepository } from "./eventRepository/clickhouseEventRepository.server"; import { @@ -118,21 +117,26 @@ class OTLPExporter { async #exportEvents( eventsWithStores: { events: Array; taskEventStore: string }[] ) { - const eventsGroupedByStore = eventsWithStores.reduce((acc, { events, taskEventStore }) => { - acc[taskEventStore] = acc[taskEventStore] || []; - acc[taskEventStore].push(...events); + // Group events by both store and organization for proper routing + const eventsGroupedByStoreAndOrg = eventsWithStores.reduce((acc, { events, taskEventStore }) => { + for (const event of events) { + const orgId = event.organizationId || "default"; + const key = `${taskEventStore}:${orgId}`; + acc[key] = acc[key] || { store: taskEventStore, orgId, events: [] }; + acc[key].events.push(event); + } return acc; - }, {} as Record>); + }, {} as Record }>); let eventCount = 0; - for (const [store, events] of Object.entries(eventsGroupedByStore)) { - const eventRepository = this.#getEventRepositoryForStore(store); + for (const { store, orgId, events } of Object.values(eventsGroupedByStoreAndOrg)) { + const eventRepository = await this.#getEventRepositoryForStoreAndOrg(store, orgId); await waitForLlmPricingReady(); const enrichedEvents = enrichCreatableEvents(events); - this.#logEventsVerbose(enrichedEvents, `exportEvents ${store}`); + this.#logEventsVerbose(enrichedEvents, `exportEvents ${store}:${orgId}`); eventCount += enrichedEvents.length; @@ -142,6 +146,19 @@ class OTLPExporter { return eventCount; } + async #getEventRepositoryForStoreAndOrg(store: string, orgId: string): Promise { + // For ClickHouse stores with a specific org (not "default"), use org-specific repository + if ((store === "clickhouse" || store === "clickhouse_v2") && orgId !== "default") { + const { getEventRepositoryForOrganization } = await import( + "~/services/clickhouse/clickhouseFactory.server" + ); + return await getEventRepositoryForOrganization(orgId); + } + + // Fall back to default repositories for non-ClickHouse stores or default org + return this.#getEventRepositoryForStore(store); + } + #getEventRepositoryForStore(store: string): IEventRepository { if (store === "clickhouse") { return this._clickhouseEventRepository; @@ -1172,12 +1189,22 @@ function hasUnpairedSurrogateAtEnd(str: string): boolean { export const otlpExporter = singleton("otlpExporter", initializeOTLPExporter); -function initializeOTLPExporter() { +async function initializeOTLPExporter() { + // Metrics are written globally (not per-org), use standard clickhouse + // We use a dummy org ID since metrics table is global + const { getClickhouseForOrganization } = await import( + "~/services/clickhouse/clickhouseFactory.server" + ); + + // Use a sentinel org ID for global metrics writes + // In practice, all orgs currently share the same metrics table/instance + const metricsClickhouse = await getClickhouseForOrganization("METRICS_GLOBAL", "standard"); + const metricsFlushScheduler = new DynamicFlushScheduler({ batchSize: env.METRICS_CLICKHOUSE_BATCH_SIZE, flushInterval: env.METRICS_CLICKHOUSE_FLUSH_INTERVAL_MS, callback: async (_flushId, batch) => { - await clickhouseClient.metrics.insert(batch); + await metricsClickhouse.metrics.insert(batch); }, minConcurrency: 1, maxConcurrency: env.METRICS_CLICKHOUSE_MAX_CONCURRENCY, diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 156b68bff59..07a4286297f 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -7,7 +7,7 @@ import { } from "@trigger.dev/database"; import { getRunFiltersFromRequest } from "~/presenters/RunFilters.server"; import { type CreateBulkActionPayload } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction"; -import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; import { parseRunListInputOptions, type RunListInputFilters, @@ -38,8 +38,9 @@ export class BulkActionService extends BaseService { const filters = await getFilters(payload, request); // Count the runs that will be affected by the bulk action + const clickhouse = await getClickhouseForOrganization(organizationId, "standard"); const runsRepository = new RunsRepository({ - clickhouse: clickhouseClient, + clickhouse, prisma: this._replica as PrismaClient, }); const count = await runsRepository.countRuns({ @@ -147,8 +148,9 @@ export class BulkActionService extends BaseService { ...rawParams, }); + const clickhouse = await getClickhouseForOrganization(group.project.organizationId, "standard"); const runsRepository = new RunsRepository({ - clickhouse: clickhouseClient, + clickhouse, prisma: this._replica as PrismaClient, }); From 9c128b29f4b2e34390d878ce53a569e9a56c3841 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 26 Mar 2026 17:32:05 +0000 Subject: [PATCH 02/16] Better replication performance --- .../services/runsReplicationService.server.ts | 102 +++++++++++------- 1 file changed, 63 insertions(+), 39 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index ca1fba686a2..78b7e084e31 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -29,6 +29,7 @@ import EventEmitter from "node:events"; import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; +import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -617,51 +618,59 @@ export class RunsReplicationService { payloadInserts: payloadInserts.length, }); - // Group task runs by organization for routing to correct ClickHouse instance - const taskRunsByOrg = new Map(); - for (const taskRun of taskRunInserts) { - const orgId = getTaskRunField(taskRun, "organization_id"); - const orgRuns = taskRunsByOrg.get(orgId) || []; - orgRuns.push(taskRun); - taskRunsByOrg.set(orgId, orgRuns); - } + // Task runs are already sorted by org (lines 571-576), so we can stream through + // and flush when org changes - no grouping overhead, no O(n²) lookups - // Group payloads by organization (extract from run_id -> task runs mapping) + // Build run_id -> org_id index for O(1) payload->org lookups + const runIdToOrgId = new Map( + taskRunInserts.map(tr => [getTaskRunField(tr, "run_id"), getTaskRunField(tr, "organization_id")]) + ); + + // Group payloads by org using the index (O(n) instead of O(n²)) const payloadsByOrg = new Map(); for (const payload of payloadInserts) { const runId = getPayloadField(payload, "run_id"); - // Find the corresponding task run to get its organization - const taskRun = taskRunInserts.find((tr) => getTaskRunField(tr, "run_id") === runId); - if (taskRun) { - const orgId = getTaskRunField(taskRun, "organization_id"); - const orgPayloads = payloadsByOrg.get(orgId) || []; - orgPayloads.push(payload); - payloadsByOrg.set(orgId, orgPayloads); + const orgId = runIdToOrgId.get(runId); + if (orgId) { + const orgPayloads = payloadsByOrg.get(orgId); + if (orgPayloads) { + orgPayloads.push(payload); + } else { + payloadsByOrg.set(orgId, [payload]); + } } } - // Insert task runs and payloads with retry logic for connection errors - // Process each organization's data in parallel - const insertPromises = Array.from(taskRunsByOrg.entries()).map( - async ([orgId, orgTaskRuns]) => { - const orgPayloads = payloadsByOrg.get(orgId) || []; + // Stream through task runs, flushing when org changes + const insertPromises: Promise<{ taskRunError: Error | null; payloadError: Error | null; orgId: string }>[] = []; + let currentOrgId: string | null = null; + let currentOrgTaskRuns: TaskRunInsertArray[] = []; - const [taskRunError, taskRunResult] = await this.#insertWithRetry( - (attempt) => this.#insertTaskRunInserts(orgId, orgTaskRuns, attempt), - "task run inserts", - flushId - ); + for (const taskRun of taskRunInserts) { + const orgId = getTaskRunField(taskRun, "organization_id"); - const [payloadError, payloadResult] = await this.#insertWithRetry( - (attempt) => this.#insertPayloadInserts(orgId, orgPayloads, attempt), - "payload inserts", - flushId + // Org changed? Flush previous org's batch + if (currentOrgId !== null && currentOrgId !== orgId) { + const orgPayloads = payloadsByOrg.get(currentOrgId) || []; + insertPromises.push( + this.#insertOrgBatch(currentOrgId, currentOrgTaskRuns, orgPayloads, flushId) ); - - return { taskRunError, payloadError, orgId }; + currentOrgTaskRuns = []; } - ); + currentOrgId = orgId; + currentOrgTaskRuns.push(taskRun); + } + + // Flush final org's batch + if (currentOrgId !== null && currentOrgTaskRuns.length > 0) { + const orgPayloads = payloadsByOrg.get(currentOrgId) || []; + insertPromises.push( + this.#insertOrgBatch(currentOrgId, currentOrgTaskRuns, orgPayloads, flushId) + ); + } + + // Wait for all org batches to complete (parallel execution) const results = await Promise.all(insertPromises); // Aggregate errors from all organizations @@ -817,6 +826,27 @@ export class RunsReplicationService { }; } + async #insertOrgBatch( + organizationId: string, + taskRunInserts: TaskRunInsertArray[], + payloadInserts: PayloadInsertArray[], + flushId: string + ): Promise<{ taskRunError: Error | null; payloadError: Error | null; orgId: string }> { + const [taskRunError] = await this.#insertWithRetry( + (attempt) => this.#insertTaskRunInserts(organizationId, taskRunInserts, attempt), + "task run inserts", + flushId + ); + + const [payloadError] = await this.#insertWithRetry( + (attempt) => this.#insertPayloadInserts(organizationId, payloadInserts, attempt), + "payload inserts", + flushId + ); + + return { taskRunError, payloadError, orgId: organizationId }; + } + async #insertTaskRunInserts( organizationId: string, taskRunInserts: TaskRunInsertArray[], @@ -824,9 +854,6 @@ export class RunsReplicationService { ) { return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { // Get the appropriate ClickHouse client for this organization - const { getClickhouseForOrganization } = await import( - "~/services/clickhouse/clickhouseFactory.server" - ); const clickhouse = await getClickhouseForOrganization(organizationId, "replication"); const [insertError, insertResult] = await clickhouse.taskRuns.insertCompactArrays( @@ -860,9 +887,6 @@ export class RunsReplicationService { ) { return await startSpan(this._tracer, "insertPayloadInserts", async (span) => { // Get the appropriate ClickHouse client for this organization - const { getClickhouseForOrganization } = await import( - "~/services/clickhouse/clickhouseFactory.server" - ); const clickhouse = await getClickhouseForOrganization(organizationId, "replication"); const [insertError, insertResult] = await clickhouse.taskRuns.insertPayloadsCompactArrays( From e6522a84eb26b8bde2e730475ba204c61c489d3c Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 26 Mar 2026 17:32:19 +0000 Subject: [PATCH 03/16] Removed dynamic imports --- CLAUDE.md | 11 +++++++++++ apps/webapp/app/v3/otlpExporter.server.ts | 9 +-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 0a54cced672..c43bddf323c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -66,6 +66,17 @@ containerTest("should use both", async ({ prisma, redisOptions }) => { }); ``` +## Code Style + +### Imports + +**Prefer static imports over dynamic imports.** Only use dynamic `import()` when: +- Circular dependencies cannot be resolved otherwise +- Code splitting is genuinely needed for performance +- The module must be loaded conditionally at runtime + +Dynamic imports add unnecessary overhead in hot paths and make code harder to analyze. If you find yourself using `await import()`, ask if a regular `import` statement would work instead. + ## Changesets and Server Changes When modifying any public package (`packages/*` or `integrations/*`), add a changeset: diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index f8b22d7c442..fd16717a584 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -40,6 +40,7 @@ import { waitForLlmPricingReady } from "./llmPricingRegistry.server"; import { env } from "~/env.server"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { singleton } from "~/utils/singleton"; +import { getClickhouseForOrganization, getEventRepositoryForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; class OTLPExporter { private _tracer: Tracer; @@ -149,9 +150,6 @@ class OTLPExporter { async #getEventRepositoryForStoreAndOrg(store: string, orgId: string): Promise { // For ClickHouse stores with a specific org (not "default"), use org-specific repository if ((store === "clickhouse" || store === "clickhouse_v2") && orgId !== "default") { - const { getEventRepositoryForOrganization } = await import( - "~/services/clickhouse/clickhouseFactory.server" - ); return await getEventRepositoryForOrganization(orgId); } @@ -1191,11 +1189,6 @@ export const otlpExporter = singleton("otlpExporter", initializeOTLPExporter); async function initializeOTLPExporter() { // Metrics are written globally (not per-org), use standard clickhouse - // We use a dummy org ID since metrics table is global - const { getClickhouseForOrganization } = await import( - "~/services/clickhouse/clickhouseFactory.server" - ); - // Use a sentinel org ID for global metrics writes // In practice, all orgs currently share the same metrics table/instance const metricsClickhouse = await getClickhouseForOrganization("METRICS_GLOBAL", "standard"); From a95eb9198b392440571798320cf4fef366f1789f Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 27 Mar 2026 16:48:56 +0000 Subject: [PATCH 04/16] otlpExporter.server reverted to main --- apps/webapp/app/v3/otlpExporter.server.ts | 40 ++++++----------------- 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index fd16717a584..7505693e3ab 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -20,6 +20,7 @@ import { } from "@trigger.dev/otlp-importer"; import type { MetricsV1Input } from "@internal/clickhouse"; import { logger } from "~/services/logger.server"; +import { clickhouseClient } from "~/services/clickhouseInstance.server"; import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server"; import { ClickhouseEventRepository } from "./eventRepository/clickhouseEventRepository.server"; import { @@ -40,7 +41,6 @@ import { waitForLlmPricingReady } from "./llmPricingRegistry.server"; import { env } from "~/env.server"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { singleton } from "~/utils/singleton"; -import { getClickhouseForOrganization, getEventRepositoryForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; class OTLPExporter { private _tracer: Tracer; @@ -118,26 +118,21 @@ class OTLPExporter { async #exportEvents( eventsWithStores: { events: Array; taskEventStore: string }[] ) { - // Group events by both store and organization for proper routing - const eventsGroupedByStoreAndOrg = eventsWithStores.reduce((acc, { events, taskEventStore }) => { - for (const event of events) { - const orgId = event.organizationId || "default"; - const key = `${taskEventStore}:${orgId}`; - acc[key] = acc[key] || { store: taskEventStore, orgId, events: [] }; - acc[key].events.push(event); - } + const eventsGroupedByStore = eventsWithStores.reduce((acc, { events, taskEventStore }) => { + acc[taskEventStore] = acc[taskEventStore] || []; + acc[taskEventStore].push(...events); return acc; - }, {} as Record }>); + }, {} as Record>); let eventCount = 0; - for (const { store, orgId, events } of Object.values(eventsGroupedByStoreAndOrg)) { - const eventRepository = await this.#getEventRepositoryForStoreAndOrg(store, orgId); + for (const [store, events] of Object.entries(eventsGroupedByStore)) { + const eventRepository = this.#getEventRepositoryForStore(store); await waitForLlmPricingReady(); const enrichedEvents = enrichCreatableEvents(events); - this.#logEventsVerbose(enrichedEvents, `exportEvents ${store}:${orgId}`); + this.#logEventsVerbose(enrichedEvents, `exportEvents ${store}`); eventCount += enrichedEvents.length; @@ -147,16 +142,6 @@ class OTLPExporter { return eventCount; } - async #getEventRepositoryForStoreAndOrg(store: string, orgId: string): Promise { - // For ClickHouse stores with a specific org (not "default"), use org-specific repository - if ((store === "clickhouse" || store === "clickhouse_v2") && orgId !== "default") { - return await getEventRepositoryForOrganization(orgId); - } - - // Fall back to default repositories for non-ClickHouse stores or default org - return this.#getEventRepositoryForStore(store); - } - #getEventRepositoryForStore(store: string): IEventRepository { if (store === "clickhouse") { return this._clickhouseEventRepository; @@ -1187,17 +1172,12 @@ function hasUnpairedSurrogateAtEnd(str: string): boolean { export const otlpExporter = singleton("otlpExporter", initializeOTLPExporter); -async function initializeOTLPExporter() { - // Metrics are written globally (not per-org), use standard clickhouse - // Use a sentinel org ID for global metrics writes - // In practice, all orgs currently share the same metrics table/instance - const metricsClickhouse = await getClickhouseForOrganization("METRICS_GLOBAL", "standard"); - +function initializeOTLPExporter() { const metricsFlushScheduler = new DynamicFlushScheduler({ batchSize: env.METRICS_CLICKHOUSE_BATCH_SIZE, flushInterval: env.METRICS_CLICKHOUSE_FLUSH_INTERVAL_MS, callback: async (_flushId, batch) => { - await metricsClickhouse.metrics.insert(batch); + await clickhouseClient.metrics.insert(batch); }, minConcurrency: 1, maxConcurrency: env.METRICS_CLICKHOUSE_MAX_CONCURRENCY, From 26f5bad8b3ad151cd1a942a4e50e1726fe5b30d6 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 31 Mar 2026 22:44:36 +0100 Subject: [PATCH 05/16] Switch to a DataStore registry --- apps/webapp/app/env.server.ts | 3 + .../clickhouseCredentialsService.server.ts | 109 ------ .../clickhouse/clickhouseFactory.server.ts | 340 ++++++++---------- ...ganizationDataStoreConfigSchemas.server.ts | 35 ++ .../organizationDataStoresRegistry.server.ts | 82 +++++ ...zationDataStoresRegistryInstance.server.ts | 46 +++ .../migration.sql | 21 ++ .../database/prisma/schema.prisma | 21 ++ 8 files changed, 361 insertions(+), 296 deletions(-) delete mode 100644 apps/webapp/app/services/clickhouse/clickhouseCredentialsService.server.ts create mode 100644 apps/webapp/app/services/dataStores/organizationDataStoreConfigSchemas.server.ts create mode 100644 apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts create mode 100644 apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts create mode 100644 internal-packages/database/prisma/migrations/20260331212308_add_organization_data_stores/migration.sql diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 82b73b34853..de02c39a477 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1301,6 +1301,9 @@ const EnvironmentSchema = z EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000), EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000), + // Organization data stores registry + ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes + // LLM cost tracking LLM_COST_TRACKING_ENABLED: BoolEnv.default(true), LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes diff --git a/apps/webapp/app/services/clickhouse/clickhouseCredentialsService.server.ts b/apps/webapp/app/services/clickhouse/clickhouseCredentialsService.server.ts deleted file mode 100644 index c2c8c77f7c4..00000000000 --- a/apps/webapp/app/services/clickhouse/clickhouseCredentialsService.server.ts +++ /dev/null @@ -1,109 +0,0 @@ -import { getSecretStore } from "~/services/secrets/secretStore.server"; -import { prisma } from "~/db.server"; -import { - ClickhouseConnectionSchema, - getClickhouseSecretKey, -} from "./clickhouseSecretSchemas.server"; -import { clearClickhouseCacheForOrganization } from "./clickhouseFactory.server"; - -export async function setOrganizationClickhouseUrl( - organizationId: string, - clientType: "standard" | "events" | "replication", - url: string -): Promise { - // Validate URL format - const connection = ClickhouseConnectionSchema.parse({ url }); - - // Store in SecretStore - const secretStore = getSecretStore("DATABASE"); - const secretKey = getClickhouseSecretKey(organizationId, clientType); - await secretStore.setSecret(secretKey, connection); - - // Update featureFlags to reference the secret - const org = await prisma.organization.findUnique({ - where: { id: organizationId }, - select: { featureFlags: true }, - }); - - const featureFlags = (org?.featureFlags || {}) as any; - const clickhouseConfig = featureFlags.clickhouse || {}; - clickhouseConfig[clientType] = secretKey; - featureFlags.clickhouse = clickhouseConfig; - - await prisma.organization.update({ - where: { id: organizationId }, - data: { featureFlags }, - }); - - // Clear cache - clearClickhouseCacheForOrganization(organizationId); -} - -export async function removeOrganizationClickhouseUrl( - organizationId: string, - clientType: "standard" | "events" | "replication" -): Promise { - // Remove from SecretStore - const secretStore = getSecretStore("DATABASE"); - const secretKey = getClickhouseSecretKey(organizationId, clientType); - await secretStore.deleteSecret(secretKey); - - // Update featureFlags - const org = await prisma.organization.findUnique({ - where: { id: organizationId }, - select: { featureFlags: true }, - }); - - if (org?.featureFlags) { - const featureFlags = org.featureFlags as any; - if (featureFlags.clickhouse && featureFlags.clickhouse[clientType]) { - delete featureFlags.clickhouse[clientType]; - - // If no more clickhouse configs, remove the clickhouse key entirely - if (Object.keys(featureFlags.clickhouse).length === 0) { - delete featureFlags.clickhouse; - } - - await prisma.organization.update({ - where: { id: organizationId }, - data: { featureFlags }, - }); - } - } - - // Clear cache - clearClickhouseCacheForOrganization(organizationId); -} - -export async function getOrganizationClickhouseUrl( - organizationId: string, - clientType: "standard" | "events" | "replication" -): Promise { - const org = await prisma.organization.findUnique({ - where: { id: organizationId }, - select: { featureFlags: true }, - }); - - if (!org?.featureFlags) { - return null; - } - - const clickhouseConfig = (org.featureFlags as any).clickhouse; - if (!clickhouseConfig || typeof clickhouseConfig !== "object") { - return null; - } - - const secretKey = clickhouseConfig[clientType]; - if (!secretKey || typeof secretKey !== "string") { - return null; - } - - const secretStore = getSecretStore("DATABASE"); - const connection = await secretStore.getSecret(ClickhouseConnectionSchema, secretKey); - - if (!connection) { - return null; - } - - return connection.url; -} diff --git a/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts index 94498868759..69a7555384a 100644 --- a/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts +++ b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts @@ -11,61 +11,45 @@ * * ### Credential Storage * - ClickHouse URLs stored encrypted in SecretStore (AES-256-GCM) - * - Organization references secret via `featureFlags.clickhouse` JSON + * - Organization data store overrides live in the `OrganizationDataStore` table + * - The config JSON stores a `secretKey` that references the SecretStore entry * - No plaintext credentials in database * * ### Caching Strategy - * - **Org configs**: Unkey cache with LRU memory (5min fresh, 10min stale, SWR) - * - **ClickHouse clients**: Cached by hostname hash (multiple orgs share same instance) - * - **Event repositories**: Cached by hostname hash (stateful, must be reused) - * - **Security**: Memory-only cache for org configs (no credentials in Redis) + * - **Org → data store mapping**: `OrganizationDataStoresRegistry` (in-memory Map, reloaded + * periodically via setInterval) + * - **SecretKey → resolved URL**: module-level Map (persists for process lifetime) + * - **ClickHouse clients**: cached by hostname hash (multiple orgs share same instance) + * - **Event repositories**: cached by hostname hash (stateful, must be reused) * * ## Usage in Presenters * - * Presenters should fetch org-specific ClickHouse clients in their `call()` method: - * * ```typescript * import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server"; * * export class MyPresenter extends BasePresenter { - * constructor(private options: PresenterOptions = {}) { - * super(); - * } - * * async call({ organizationId, ... }) { * const clickhouse = await getClickhouseForOrganization(organizationId, "standard"); - * // Use clickhouse for queries... * } * } * ``` * - * ## Usage in Services - * - * The replication service and OTLP exporter automatically route data by organization. - * Other services should follow the same pattern when working with ClickHouse. - * * @module clickhouseFactory */ import { ClickHouse } from "@internal/clickhouse"; import { createHash } from "crypto"; -import { createCache, DefaultStatefulContext, Namespace } from "@unkey/cache"; -import { createLRUMemoryStore } from "@internal/cache"; import { getSecretStore } from "~/services/secrets/secretStore.server"; -import { prisma } from "~/db.server"; -import { - ClickhouseConnectionSchema, - getClickhouseSecretKey, -} from "./clickhouseSecretSchemas.server"; +import { ClickhouseConnectionSchema } from "./clickhouseSecretSchemas.server"; import { ClickhouseEventRepository } from "~/v3/eventRepository/clickhouseEventRepository.server"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; +import { organizationDataStoresRegistry } from "~/services/dataStores/organizationDataStoresRegistryInstance.server"; -// Module-level caches for ClickHouse clients and event repositories -const clickhouseClientCache = new Map(); -const eventRepositoryCache = new Map(); +// --------------------------------------------------------------------------- +// Default clients (singleton per process) +// --------------------------------------------------------------------------- -// Default ClickHouse clients (not exported - internal use only) const defaultClickhouseClient = singleton("clickhouseClient", initializeClickhouseClient); function initializeClickhouseClient() { @@ -82,9 +66,7 @@ function initializeClickhouseClient() { idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, }, logLevel: env.CLICKHOUSE_LOG_LEVEL, - compression: { - request: true, - }, + compression: { request: true }, maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, }); } @@ -110,9 +92,7 @@ function initializeLogsClickhouseClient() { idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, }, logLevel: env.CLICKHOUSE_LOG_LEVEL, - compression: { - request: true, - }, + compression: { request: true }, maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, clickhouseSettings: { max_memory_usage: env.CLICKHOUSE_LOGS_LIST_MAX_MEMORY_USAGE.toString(), @@ -150,9 +130,7 @@ function initializeAdminClickhouseClient() { idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, }, logLevel: env.CLICKHOUSE_LOG_LEVEL, - compression: { - request: true, - }, + compression: { request: true }, maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, }); } @@ -178,93 +156,108 @@ function initializeQueryClickhouseClient() { idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, }, logLevel: env.CLICKHOUSE_LOG_LEVEL, - compression: { - request: true, - }, + compression: { request: true }, maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, }); } -// Org config cache with Unkey (memory-only, no Redis for security) -type OrgClickhouseConfig = { - organizationId: string; - hostnameHash: string; - url: string; - clientType: string; -}; - -const ctx = new DefaultStatefulContext(); -const memory = createLRUMemoryStore(1000); - -const orgConfigCache = createCache({ - orgClickhouse: new Namespace(ctx, { - stores: [memory], // Memory-only, no Redis store for security - fresh: 5 * 60 * 1000, // 5 minutes - stale: 10 * 60 * 1000, // 10 minutes (SWR pattern) - }), -}); +// --------------------------------------------------------------------------- +// Org-scoped client caches +// --------------------------------------------------------------------------- + +/** ClickHouse clients keyed by hostname hash (shared across orgs pointing at the same host). */ +const clickhouseClientCache = new Map(); + +/** Event repositories keyed by hostname hash (stateful, must be reused). */ +const eventRepositoryCache = new Map(); + +/** Resolved connection URLs keyed by secret-store key (avoids repeated secret fetches). */ +const resolvedConnectionCache = new Map(); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- function hashHostname(url: string): string { const parsed = new URL(url); return createHash("sha256").update(parsed.hostname).digest("hex"); } -async function getOrgClickhouseConfig( - ctx: DefaultStatefulContext, - orgId: string, - clientType: string -): Promise { - const org = await prisma.organization.findUnique({ - where: { id: orgId }, - select: { featureFlags: true }, - }); +type ClientType = "standard" | "events" | "replication" | "logs" | "query" | "admin"; - if (!org?.featureFlags) { - return null; - } - - const clickhouseConfig = (org.featureFlags as any).clickhouse; - if (!clickhouseConfig || typeof clickhouseConfig !== "object") { - return null; - } - - const secretKey = clickhouseConfig[clientType]; - if (!secretKey || typeof secretKey !== "string") { - return null; - } +/** + * Resolve a secret-store key to a connection URL + hostname hash. + * Results are cached for the process lifetime (the registry reloads keep org→key mapping fresh). + */ +async function resolveSecretKey( + secretKey: string +): Promise<{ url: string; hostnameHash: string } | null> { + const cached = resolvedConnectionCache.get(secretKey); + if (cached) return cached; const secretStore = getSecretStore("DATABASE"); const connection = await secretStore.getSecret(ClickhouseConnectionSchema, secretKey); + if (!connection) return null; - if (!connection) { - return null; - } + const resolved = { url: connection.url, hostnameHash: hashHostname(connection.url) }; + resolvedConnectionCache.set(secretKey, resolved); + return resolved; +} - const hostnameHash = hashHostname(connection.url); +function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHouse { + const parsed = new URL(url); + parsed.searchParams.delete("secure"); - return { - organizationId: orgId, - hostnameHash, - url: connection.url, - clientType, - }; + return new ClickHouse({ + url: parsed.toString(), + name: `org-clickhouse-${clientType}`, + keepAlive: { + enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.CLICKHOUSE_LOG_LEVEL, + compression: { request: true }, + maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); } +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + export async function getClickhouseForOrganization( organizationId: string, - clientType: "standard" | "events" | "replication" | "logs" | "query" | "admin" + clientType: ClientType ): Promise { - // Try to get org-specific config - const configResult = await orgConfigCache.orgClickhouse.swr( - `org:${organizationId}:ch:${clientType}`, - async () => getOrgClickhouseConfig(ctx, organizationId, clientType) - ); + if (!organizationDataStoresRegistry.isLoaded) { + await organizationDataStoresRegistry.isReady; + } + + const dataStore = organizationDataStoresRegistry.get(organizationId, "CLICKHOUSE"); + + if (!dataStore) { + // No override — use the appropriate default client. + switch (clientType) { + case "standard": + case "events": + case "replication": + return defaultClickhouseClient; + case "logs": + return defaultLogsClickhouseClient; + case "query": + return defaultQueryClickhouseClient; + case "admin": + return defaultAdminClickhouseClient; + } + } - // Handle Result type - check for error or null value - const config = configResult.err ? null : configResult.val; + const { secretKey } = dataStore.config.data; + const connection = await resolveSecretKey(secretKey); - // If no custom config, return appropriate default client - if (!config) { + if (!connection) { + console.warn( + `[clickhouseFactory] Secret key "${secretKey}" not found for org ${organizationId}; falling back to default` + ); switch (clientType) { case "standard": case "events": @@ -279,27 +272,11 @@ export async function getClickhouseForOrganization( } } - // Check if client already exists for this hostname - const cacheKey = `${config.hostnameHash}:${clientType}`; + const cacheKey = `${connection.hostnameHash}:${clientType}`; let client = clickhouseClientCache.get(cacheKey); if (!client) { - const url = new URL(config.url); - url.searchParams.delete("secure"); - - client = new ClickHouse({ - url: url.toString(), - name: `org-clickhouse-${clientType}`, - keepAlive: { - enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", - idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, - }, - logLevel: env.CLICKHOUSE_LOG_LEVEL, - compression: { - request: true, - }, - maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, - }); + client = buildOrgClickhouseClient(connection.url, clientType); clickhouseClientCache.set(cacheKey, client); } @@ -309,79 +286,64 @@ export async function getClickhouseForOrganization( export async function getEventRepositoryForOrganization( organizationId: string ): Promise { - // Try to get org-specific config - const configResult = await orgConfigCache.orgClickhouse.swr( - `org:${organizationId}:ch:events`, - async () => getOrgClickhouseConfig(ctx, organizationId, "events") - ); + if (!organizationDataStoresRegistry.isLoaded) { + await organizationDataStoresRegistry.isReady; + } - // Handle Result type - check for error or null value - const config = configResult.err ? null : configResult.val; + const dataStore = organizationDataStoresRegistry.get(organizationId, "CLICKHOUSE"); - // If no custom config, return default repository (created on demand) - if (!config) { + if (!dataStore) { const defaultKey = "default:events"; let defaultRepo = eventRepositoryCache.get(defaultKey); if (!defaultRepo) { - // Create default event repository using standard clickhouse client - // This matches the existing pattern in clickhouseEventRepositoryInstance.server.ts const eventsClickhouse = await getEventsClickhouseClient(); - defaultRepo = new ClickhouseEventRepository({ - clickhouse: eventsClickhouse, - batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE, - flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS, - maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT, - maximumTraceDetailedSummaryViewCount: - env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT, - maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING, - insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY, - waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", - asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, - asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, - startTimeMaxAgeMs: env.EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS, - llmMetricsBatchSize: env.LLM_METRICS_BATCH_SIZE, - llmMetricsFlushInterval: env.LLM_METRICS_FLUSH_INTERVAL_MS, - llmMetricsMaxBatchSize: env.LLM_METRICS_MAX_BATCH_SIZE, - llmMetricsMaxConcurrency: env.LLM_METRICS_MAX_CONCURRENCY, - version: "v2", - }); + defaultRepo = buildEventRepository(eventsClickhouse); eventRepositoryCache.set(defaultKey, defaultRepo); } return defaultRepo; } - // Check if repository already exists for this hostname - const cacheKey = `${config.hostnameHash}:events`; + const { secretKey } = dataStore.config.data; + const connection = await resolveSecretKey(secretKey); + + if (!connection) { + console.warn( + `[clickhouseFactory] Secret key "${secretKey}" not found for org ${organizationId}; falling back to default event repository` + ); + const defaultKey = "default:events"; + let defaultRepo = eventRepositoryCache.get(defaultKey); + if (!defaultRepo) { + const eventsClickhouse = await getEventsClickhouseClient(); + defaultRepo = buildEventRepository(eventsClickhouse); + eventRepositoryCache.set(defaultKey, defaultRepo); + } + return defaultRepo; + } + + const cacheKey = `${connection.hostnameHash}:events`; let repository = eventRepositoryCache.get(cacheKey); if (!repository) { const client = await getClickhouseForOrganization(organizationId, "events"); - repository = new ClickhouseEventRepository({ - clickhouse: client, - batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE, - flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS, - maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT, - maximumTraceDetailedSummaryViewCount: - env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT, - maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING, - insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY, - waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", - asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, - asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, - startTimeMaxAgeMs: env.EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS, - llmMetricsBatchSize: env.LLM_METRICS_BATCH_SIZE, - llmMetricsFlushInterval: env.LLM_METRICS_FLUSH_INTERVAL_MS, - llmMetricsMaxBatchSize: env.LLM_METRICS_MAX_BATCH_SIZE, - llmMetricsMaxConcurrency: env.LLM_METRICS_MAX_CONCURRENCY, - version: "v2", - }); + repository = buildEventRepository(client); eventRepositoryCache.set(cacheKey, repository); } return repository; } -// Helper to create the default events ClickHouse client +/** + * Get admin ClickHouse client for cross-organization queries. + * Only use for admin tools and analytics that need to query across all orgs. + */ +export function getAdminClickhouse(): ClickHouse { + return defaultAdminClickhouseClient; +} + +// --------------------------------------------------------------------------- +// Private helpers +// --------------------------------------------------------------------------- + async function getEventsClickhouseClient(): Promise { if (!env.EVENTS_CLICKHOUSE_URL) { throw new Error("EVENTS_CLICKHOUSE_URL is not set"); @@ -398,25 +360,29 @@ async function getEventsClickhouseClient(): Promise { idleSocketTtl: env.EVENTS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, }, logLevel: env.EVENTS_CLICKHOUSE_LOG_LEVEL, - compression: { - request: env.EVENTS_CLICKHOUSE_COMPRESSION_REQUEST === "1", - }, + compression: { request: env.EVENTS_CLICKHOUSE_COMPRESSION_REQUEST === "1" }, maxOpenConnections: env.EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS, }); } -/** - * Get admin ClickHouse client for cross-organization queries - * This should only be used for admin tools and analytics that need to query across all orgs - */ -export function getAdminClickhouse(): ClickHouse { - return defaultAdminClickhouseClient; -} - -// Clear caches when needed (e.g., when org config changes) -export function clearClickhouseCacheForOrganization(organizationId: string): void { - // The Unkey cache will naturally expire based on TTL (5min fresh, 10min stale) - // No explicit removal needed - cache entries will be refreshed on next access - // Note: We don't clear client/repository caches as they're keyed by hostname - // and may be shared by other orgs +function buildEventRepository(clickhouse: ClickHouse): ClickhouseEventRepository { + return new ClickhouseEventRepository({ + clickhouse, + batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE, + flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS, + maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT, + maximumTraceDetailedSummaryViewCount: + env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT, + maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING, + insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY, + waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", + asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, + asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, + startTimeMaxAgeMs: env.EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS, + llmMetricsBatchSize: env.LLM_METRICS_BATCH_SIZE, + llmMetricsFlushInterval: env.LLM_METRICS_FLUSH_INTERVAL_MS, + llmMetricsMaxBatchSize: env.LLM_METRICS_MAX_BATCH_SIZE, + llmMetricsMaxConcurrency: env.LLM_METRICS_MAX_CONCURRENCY, + version: "v2", + }); } diff --git a/apps/webapp/app/services/dataStores/organizationDataStoreConfigSchemas.server.ts b/apps/webapp/app/services/dataStores/organizationDataStoreConfigSchemas.server.ts new file mode 100644 index 00000000000..89465ff5b4a --- /dev/null +++ b/apps/webapp/app/services/dataStores/organizationDataStoreConfigSchemas.server.ts @@ -0,0 +1,35 @@ +import { z } from "zod"; + +// --------------------------------------------------------------------------- +// ClickHouse config (kind = CLICKHOUSE) +// --------------------------------------------------------------------------- + +/** V1: single secret-store key that supplies the ClickHouse connection URL. */ +export const ClickhouseDataStoreConfigV1 = z.object({ + version: z.literal(1), + data: z.object({ + /** Key into the SecretStore that resolves to a ClickhouseConnection ({url}). */ + secretKey: z.string(), + }), +}); + +export type ClickhouseDataStoreConfigV1 = z.infer; + +/** Discriminated union over version — extend by adding new literals here. */ +export const ClickhouseDataStoreConfig = z.discriminatedUnion("version", [ + ClickhouseDataStoreConfigV1, +]); + +export type ClickhouseDataStoreConfig = z.infer; + +// --------------------------------------------------------------------------- +// Top-level per-kind union +// --------------------------------------------------------------------------- + +export type ParsedClickhouseDataStore = { + kind: "CLICKHOUSE"; + config: ClickhouseDataStoreConfig; +}; + +/** Union of all parsed data store types. Extend as new DataStoreKind values are added. */ +export type ParsedDataStore = ParsedClickhouseDataStore; diff --git a/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts b/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts new file mode 100644 index 00000000000..8f398f680ea --- /dev/null +++ b/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts @@ -0,0 +1,82 @@ +import type { PrismaClient, PrismaReplicaClient } from "@trigger.dev/database"; +import { + ClickhouseDataStoreConfig, + type ParsedDataStore, +} from "./organizationDataStoreConfigSchemas.server"; + +export class OrganizationDataStoresRegistry { + private _prisma: PrismaClient | PrismaReplicaClient; + /** Keyed by `${organizationId}:${kind}` */ + private _lookup: Map = new Map(); + private _loaded = false; + private _readyResolve!: () => void; + + /** Resolves once the initial `loadFromDatabase()` completes successfully. */ + readonly isReady: Promise; + + constructor(prisma: PrismaClient | PrismaReplicaClient) { + this._prisma = prisma; + this.isReady = new Promise((resolve) => { + this._readyResolve = resolve; + }); + } + + get isLoaded(): boolean { + return this._loaded; + } + + async loadFromDatabase(): Promise { + const rows = await this._prisma.organizationDataStore.findMany(); + + const lookup = new Map(); + + for (const row of rows) { + let parsed: ParsedDataStore | null = null; + + switch (row.kind) { + case "CLICKHOUSE": { + const result = ClickhouseDataStoreConfig.safeParse(row.config); + if (!result.success) { + console.warn( + `[OrganizationDataStoresRegistry] Invalid config for OrganizationDataStore "${row.key}" (kind=CLICKHOUSE): ${result.error.message}` + ); + continue; + } + parsed = { kind: "CLICKHOUSE", config: result.data }; + break; + } + default: { + console.warn( + `[OrganizationDataStoresRegistry] Unknown kind "${row.kind}" for OrganizationDataStore "${row.key}" — skipping` + ); + continue; + } + } + + for (const orgId of row.organizationIds) { + const key = `${orgId}:${row.kind}`; + lookup.set(key, parsed); + } + } + + this._lookup = lookup; + + if (!this._loaded) { + this._loaded = true; + this._readyResolve(); + } + } + + async reload(): Promise { + await this.loadFromDatabase(); + } + + /** + * Returns the parsed data store config for the given organization and kind, + * or `null` if no override is configured (caller should use the default). + */ + get(organizationId: string, kind: "CLICKHOUSE"): ParsedDataStore | null { + if (!this._loaded) return null; + return this._lookup.get(`${organizationId}:${kind}`) ?? null; + } +} diff --git a/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts b/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts new file mode 100644 index 00000000000..ac5c25d9aaa --- /dev/null +++ b/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts @@ -0,0 +1,46 @@ +import { $replica } from "~/db.server"; +import { env } from "~/env.server"; +import { signalsEmitter } from "~/services/signals.server"; +import { singleton } from "~/utils/singleton"; +import { OrganizationDataStoresRegistry } from "./organizationDataStoresRegistry.server"; + +export const organizationDataStoresRegistry = singleton( + "organizationDataStoresRegistry", + () => { + const registry = new OrganizationDataStoresRegistry($replica); + + registry.loadFromDatabase().catch((err) => { + console.error("[OrganizationDataStoresRegistry] Failed to initialize", err); + }); + + const interval = setInterval( + () => { + registry.reload().catch((err) => { + console.error("[OrganizationDataStoresRegistry] Failed to reload", err); + }); + }, + env.ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS + ); + + signalsEmitter.on("SIGTERM", () => clearInterval(interval)); + signalsEmitter.on("SIGINT", () => clearInterval(interval)); + + return registry; + } +); + +/** + * Wait for the registry to finish its initial load, with a timeout. + * After the first call resolves (or times out), subsequent calls are no-ops. + */ +export async function waitForOrganizationDataStoresReady( + timeoutMs = 5000 +): Promise { + if (organizationDataStoresRegistry.isLoaded) return; + if (timeoutMs <= 0) return; + + await Promise.race([ + organizationDataStoresRegistry.isReady, + new Promise((resolve) => setTimeout(resolve, timeoutMs)), + ]); +} diff --git a/internal-packages/database/prisma/migrations/20260331212308_add_organization_data_stores/migration.sql b/internal-packages/database/prisma/migrations/20260331212308_add_organization_data_stores/migration.sql new file mode 100644 index 00000000000..52b8385539a --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260331212308_add_organization_data_stores/migration.sql @@ -0,0 +1,21 @@ +-- CreateEnum +CREATE TYPE "public"."DataStoreKind" AS ENUM ('CLICKHOUSE'); + +-- CreateTable +CREATE TABLE "public"."OrganizationDataStore" ( + "id" TEXT NOT NULL, + "key" TEXT NOT NULL, + "organizationIds" TEXT[], + "kind" "public"."DataStoreKind" NOT NULL, + "config" JSONB NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "OrganizationDataStore_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "OrganizationDataStore_key_key" ON "public"."OrganizationDataStore"("key"); + +-- CreateIndex +CREATE INDEX "OrganizationDataStore_kind_idx" ON "public"."OrganizationDataStore"("kind"); diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 7138aeaab0d..0f71994de37 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -2917,3 +2917,24 @@ model ErrorGroupState { @@unique([environmentId, taskIdentifier, errorFingerprint]) @@index([environmentId, status]) } + +enum DataStoreKind { + CLICKHOUSE +} + +/// Defines org-scoped data store overrides (e.g. dedicated ClickHouse for HIPAA orgs). +/// Multiple organizations can share a single data store row via organizationIds. +model OrganizationDataStore { + id String @id @default(cuid()) + /// Human-readable unique key (e.g. "hipaa-clickhouse-us-east") + key String @unique + /// Organization IDs that use this data store + organizationIds String[] + kind DataStoreKind + /// Versioned config JSON. Structure is discriminated by the top-level `version` field. + config Json + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([kind]) +} From 2f92790b70ae171f894fc49dd9967712d9e027ef Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 1 Apr 2026 12:10:15 +0100 Subject: [PATCH 06/16] Admin page for adding data stores --- apps/webapp/app/routes/admin.data-stores.tsx | 368 +++++++++++++++++++ apps/webapp/app/routes/admin.tsx | 4 + 2 files changed, 372 insertions(+) create mode 100644 apps/webapp/app/routes/admin.data-stores.tsx diff --git a/apps/webapp/app/routes/admin.data-stores.tsx b/apps/webapp/app/routes/admin.data-stores.tsx new file mode 100644 index 00000000000..4397c6d33d1 --- /dev/null +++ b/apps/webapp/app/routes/admin.data-stores.tsx @@ -0,0 +1,368 @@ +import { useState } from "react"; +import { useFetcher } from "@remix-run/react"; +import type { ActionFunctionArgs, LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { redirect } from "@remix-run/server-runtime"; +import { typedjson, useTypedLoaderData } from "remix-typedjson"; +import { z } from "zod"; +import { Button } from "~/components/primitives/Buttons"; +import { + Dialog, + DialogContent, + DialogFooter, + DialogHeader, + DialogTitle, +} from "~/components/primitives/Dialog"; +import { Input } from "~/components/primitives/Input"; +import { Paragraph } from "~/components/primitives/Paragraph"; +import { Popover, PopoverContent, PopoverTrigger } from "~/components/primitives/Popover"; +import { + Table, + TableBlankRow, + TableBody, + TableCell, + TableHeader, + TableHeaderCell, + TableRow, +} from "~/components/primitives/Table"; +import { prisma } from "~/db.server"; +import { requireUser } from "~/services/session.server"; +import { getSecretStore } from "~/services/secrets/secretStore.server"; +import { ClickhouseConnectionSchema } from "~/services/clickhouse/clickhouseSecretSchemas.server"; + +// --------------------------------------------------------------------------- +// Loader +// --------------------------------------------------------------------------- + +export const loader = async ({ request }: LoaderFunctionArgs) => { + const user = await requireUser(request); + if (!user.admin) throw redirect("/"); + + const dataStores = await prisma.organizationDataStore.findMany({ + orderBy: { createdAt: "desc" }, + }); + + return typedjson({ dataStores }); +}; + +// --------------------------------------------------------------------------- +// Action +// --------------------------------------------------------------------------- + +const AddSchema = z.object({ + _action: z.literal("add"), + key: z.string().min(1), + organizationIds: z.string().min(1), + connectionUrl: z.string().url(), +}); + +const DeleteSchema = z.object({ + _action: z.literal("delete"), + id: z.string().min(1), +}); + +export async function action({ request }: ActionFunctionArgs) { + const user = await requireUser(request); + if (!user.admin) throw redirect("/"); + + const formData = await request.formData(); + const _action = formData.get("_action"); + + if (_action === "add") { + const result = AddSchema.safeParse(Object.fromEntries(formData)); + if (!result.success) { + return typedjson( + { error: result.error.issues.map((i) => i.message).join(", ") }, + { status: 400 } + ); + } + + const { key, organizationIds: rawOrgIds, connectionUrl } = result.data; + const organizationIds = rawOrgIds + .split(",") + .map((s) => s.trim()) + .filter(Boolean); + + const secretKey = `data-store:${key}:clickhouse`; + + const secretStore = getSecretStore("DATABASE"); + await secretStore.setSecret(secretKey, ClickhouseConnectionSchema.parse({ url: connectionUrl })); + + await prisma.organizationDataStore.create({ + data: { + key, + organizationIds, + kind: "CLICKHOUSE", + config: { version: 1, data: { secretKey } }, + }, + }); + + + return typedjson({ success: true }); + } + + if (_action === "delete") { + const result = DeleteSchema.safeParse(Object.fromEntries(formData)); + if (!result.success) { + return typedjson({ error: "Invalid request" }, { status: 400 }); + } + + const { id } = result.data; + + const dataStore = await prisma.organizationDataStore.findFirst({ where: { id } }); + if (!dataStore) { + return typedjson({ error: "Data store not found" }, { status: 404 }); + } + + // Delete secret if config references one + const config = dataStore.config as any; + if (config?.data?.secretKey) { + const secretStore = getSecretStore("DATABASE"); + await secretStore.deleteSecret(config.data.secretKey).catch(() => { + // Secret may not exist — proceed with deletion + }); + } + + await prisma.organizationDataStore.delete({ where: { id } }); + + return typedjson({ success: true }); + } + + return typedjson({ error: "Unknown action" }, { status: 400 }); +} + +// --------------------------------------------------------------------------- +// Component +// --------------------------------------------------------------------------- + +export default function AdminDataStoresRoute() { + const { dataStores } = useTypedLoaderData(); + const [addOpen, setAddOpen] = useState(false); + + return ( +
+
+
+ + {dataStores.length} data store{dataStores.length !== 1 ? "s" : ""} + + +
+ + + + + Key + Kind + Organizations + Created + Updated + + Actions + + + + + {dataStores.length === 0 ? ( + + No data stores configured + + ) : ( + dataStores.map((ds) => ( + + + {ds.key} + + + + {ds.kind} + + + + + {ds.organizationIds.length} org{ds.organizationIds.length !== 1 ? "s" : ""} + + {ds.organizationIds.length > 0 && ( + + ({ds.organizationIds.slice(0, 2).join(", ")} + {ds.organizationIds.length > 2 + ? ` +${ds.organizationIds.length - 2} more` + : ""} + ) + + )} + + + + {new Date(ds.createdAt).toLocaleString()} + + + + + {new Date(ds.updatedAt).toLocaleString()} + + + + + + + )) + )} + +
+
+ + +
+ ); +} + +// --------------------------------------------------------------------------- +// Delete button with popover confirmation +// --------------------------------------------------------------------------- + +function DeleteButton({ id, name }: { id: string; name: string }) { + const [open, setOpen] = useState(false); + const fetcher = useFetcher<{ success?: boolean; error?: string }>(); + const isDeleting = fetcher.state !== "idle"; + + return ( + + + + + + + Delete {name}? + + + This will remove the data store and its secret. Organizations using it will fall back to + the default ClickHouse instance. + +
+ + setOpen(false)}> + + + + +
+
+
+ ); +} + +// --------------------------------------------------------------------------- +// Add data store dialog +// --------------------------------------------------------------------------- + +function AddDataStoreDialog({ + open, + onOpenChange, +}: { + open: boolean; + onOpenChange: (open: boolean) => void; +}) { + const fetcher = useFetcher<{ success?: boolean; error?: string }>(); + const isSubmitting = fetcher.state !== "idle"; + + // Close dialog on success + if (fetcher.data?.success && open) { + onOpenChange(false); + } + + return ( + + + + Add data store + + + + + +
+ + +

+ Unique identifier for this data store. Used as the secret key prefix. +

+
+ +
+ + +
+ +
+ + +

Comma-separated organization IDs.

+
+ +
+ + +

+ Stored encrypted in SecretStore. Never logged or displayed again. +

+
+ + {fetcher.data?.error && ( +

{fetcher.data.error}

+ )} + + + + + +
+
+
+ ); +} diff --git a/apps/webapp/app/routes/admin.tsx b/apps/webapp/app/routes/admin.tsx index 4cd2deca533..3b0858f899a 100644 --- a/apps/webapp/app/routes/admin.tsx +++ b/apps/webapp/app/routes/admin.tsx @@ -44,6 +44,10 @@ export default function Page() { label: "Notifications", to: "/admin/notifications", }, + { + label: "Data Stores", + to: "/admin/data-stores", + }, ]} layoutId={"admin"} /> From 9bb2380bdfc6bcde96b9bbf13bdd2140b5955a2f Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 2 Apr 2026 11:42:48 +0100 Subject: [PATCH 07/16] New admin page, lots of improvements to make it more robust and testable --- apps/webapp/app/routes/admin.data-stores.tsx | 150 +++++++----- .../clickhouse/clickhouseFactory.server.ts | 214 ++++++++---------- .../clickhouse/clickhouseFactory.test.ts | 155 ------------- ...ganizationDataStoreConfigSchemas.server.ts | 6 +- .../organizationDataStoresRegistry.server.ts | 91 +++++++- ...zationDataStoresRegistryInstance.server.ts | 50 ++-- apps/webapp/test/clickhouseFactory.test.ts | 130 +++++++++++ .../organizationDataStoresRegistry.test.ts | 197 ++++++++++++++++ 8 files changed, 614 insertions(+), 379 deletions(-) delete mode 100644 apps/webapp/app/services/clickhouse/clickhouseFactory.test.ts create mode 100644 apps/webapp/test/clickhouseFactory.test.ts create mode 100644 apps/webapp/test/organizationDataStoresRegistry.test.ts diff --git a/apps/webapp/app/routes/admin.data-stores.tsx b/apps/webapp/app/routes/admin.data-stores.tsx index 4397c6d33d1..033b143151f 100644 --- a/apps/webapp/app/routes/admin.data-stores.tsx +++ b/apps/webapp/app/routes/admin.data-stores.tsx @@ -26,8 +26,9 @@ import { } from "~/components/primitives/Table"; import { prisma } from "~/db.server"; import { requireUser } from "~/services/session.server"; -import { getSecretStore } from "~/services/secrets/secretStore.server"; import { ClickhouseConnectionSchema } from "~/services/clickhouse/clickhouseSecretSchemas.server"; +import { organizationDataStoresRegistry } from "~/services/dataStores/organizationDataStoresRegistryInstance.server"; +import { tryCatch } from "@trigger.dev/core"; // --------------------------------------------------------------------------- // Loader @@ -55,79 +56,106 @@ const AddSchema = z.object({ connectionUrl: z.string().url(), }); +const UpdateSchema = z.object({ + _action: z.literal("update"), + key: z.string().min(1), + organizationIds: z.string().min(1), + connectionUrl: z.string().url().optional(), +}); + const DeleteSchema = z.object({ _action: z.literal("delete"), - id: z.string().min(1), + key: z.string().min(1), }); +const FormSchema = z.discriminatedUnion("_action", [AddSchema, UpdateSchema, DeleteSchema]); + export async function action({ request }: ActionFunctionArgs) { const user = await requireUser(request); if (!user.admin) throw redirect("/"); const formData = await request.formData(); - const _action = formData.get("_action"); - - if (_action === "add") { - const result = AddSchema.safeParse(Object.fromEntries(formData)); - if (!result.success) { - return typedjson( - { error: result.error.issues.map((i) => i.message).join(", ") }, - { status: 400 } - ); - } - const { key, organizationIds: rawOrgIds, connectionUrl } = result.data; - const organizationIds = rawOrgIds - .split(",") - .map((s) => s.trim()) - .filter(Boolean); + const result = FormSchema.safeParse(Object.fromEntries(formData)); - const secretKey = `data-store:${key}:clickhouse`; + if (!result.success) { + return typedjson( + { error: result.error.issues.map((i) => i.message).join(", ") }, + { status: 400 } + ); + } - const secretStore = getSecretStore("DATABASE"); - await secretStore.setSecret(secretKey, ClickhouseConnectionSchema.parse({ url: connectionUrl })); + switch (result.data._action) { + case "add": { + const { key, organizationIds: rawOrgIds, connectionUrl } = result.data; + const organizationIds = rawOrgIds + .split(",") + .map((s) => s.trim()) + .filter(Boolean); + + const config = ClickhouseConnectionSchema.parse({ url: connectionUrl }); + + const [error, _] = await tryCatch( + organizationDataStoresRegistry.addDataStore({ + key, + kind: "CLICKHOUSE", + organizationIds, + config, + }) + ); - await prisma.organizationDataStore.create({ - data: { - key, - organizationIds, - kind: "CLICKHOUSE", - config: { version: 1, data: { secretKey } }, - }, - }); + if (error) { + return typedjson({ error: error.message }, { status: 400 }); + } + return typedjson({ success: true }); + } + case "update": { + const { key, organizationIds: rawOrgIds, connectionUrl } = result.data; + const organizationIds = rawOrgIds + .split(",") + .map((s) => s.trim()) + .filter(Boolean); + + const config = connectionUrl + ? ClickhouseConnectionSchema.parse({ url: connectionUrl }) + : undefined; + + const [error, _] = await tryCatch( + organizationDataStoresRegistry.updateDataStore({ + key, + kind: "CLICKHOUSE", + organizationIds, + config, + }) + ); - return typedjson({ success: true }); - } + if (error) { + return typedjson({ error: error.message }, { status: 400 }); + } - if (_action === "delete") { - const result = DeleteSchema.safeParse(Object.fromEntries(formData)); - if (!result.success) { - return typedjson({ error: "Invalid request" }, { status: 400 }); + return typedjson({ success: true }); } + case "delete": { + const { key } = result.data; + + const [error, _] = await tryCatch( + organizationDataStoresRegistry.deleteDataStore({ + key, + kind: "CLICKHOUSE", + }) + ); - const { id } = result.data; + if (error) { + return typedjson({ error: error.message }, { status: 400 }); + } - const dataStore = await prisma.organizationDataStore.findFirst({ where: { id } }); - if (!dataStore) { - return typedjson({ error: "Data store not found" }, { status: 404 }); + return typedjson({ success: true }); } - - // Delete secret if config references one - const config = dataStore.config as any; - if (config?.data?.secretKey) { - const secretStore = getSecretStore("DATABASE"); - await secretStore.deleteSecret(config.data.secretKey).catch(() => { - // Secret may not exist — proceed with deletion - }); + default: { + return typedjson({ error: "Unknown action" }, { status: 400 }); } - - await prisma.organizationDataStore.delete({ where: { id } }); - - return typedjson({ success: true }); } - - return typedjson({ error: "Unknown action" }, { status: 400 }); } // --------------------------------------------------------------------------- @@ -207,7 +235,7 @@ export default function AdminDataStoresRoute() { - + )) @@ -225,7 +253,7 @@ export default function AdminDataStoresRoute() { // Delete button with popover confirmation // --------------------------------------------------------------------------- -function DeleteButton({ id, name }: { id: string; name: string }) { +function DeleteButton({ name }: { name: string }) { const [open, setOpen] = useState(false); const fetcher = useFetcher<{ success?: boolean; error?: string }>(); const isDeleting = fetcher.state !== "idle"; @@ -251,7 +279,7 @@ function DeleteButton({ id, name }: { id: string; name: string }) { setOpen(false)}> - + @@ -311,7 +339,13 @@ function AddDataStoreDialog({ - +
@@ -344,9 +378,7 @@ function AddDataStoreDialog({

- {fetcher.data?.error && ( -

{fetcher.data.error}

- )} + {fetcher.data?.error &&

{fetcher.data.error}

}