CVE-2026-42398
Description
Server-Side Request Forgery (CWE-918) in Kibana allows authenticated users with connector management privileges to bypass the operator-configured connection allowlist. By configuring a Webhook connector with a crafted target, an attacker can cause Kibana to issue outbound requests to destinations that the egress restriction controls were intended to block.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
Server-Side Request Forgery in Kibana allows authenticated users with connector management privileges to bypass the connection allowlist via a crafted Webhook connector, potentially disclosing internal network information.
Vulnerability
Server-Side Request Forgery (CWE-918) exists in Kibana versions 9.0.0 through 9.2.7 and 9.3.0 through 9.3.1. The vulnerability allows authenticated users with connector management privileges to bypass the operator-configured connection allowlist (xpack.actions.allowedHosts setting, when not set to wildcard) by crafting a Webhook connector target [1].
Exploitation
An attacker must have valid Kibana credentials with connector management privileges. They then create a Webhook connector with a crafted target that points to a destination not permitted by the allowlist. When the connector is triggered, Kibana makes an outbound request to that destination, effectively bypassing network egress restrictions [1].
Impact
Successful exploitation allows the attacker to force Kibana to send requests to internal or external hosts that should be blocked by the allowlist. This can lead to information disclosure (e.g., internal service discovery, header leakage) and potential further network reconnaissance. The CVSS v3.1 score is 7.7 (High) with confidentiality impact rated as High, integrity and availability as None [1].
Mitigation
The issue is fixed in Kibana versions 9.2.8 and 9.3.2. Users are advised to upgrade to these versions or later. For Elastic Cloud Serverless deployments, the vulnerability was remediated before public disclosure. No workaround is available other than upgrading or restricting connector management privileges to trusted users [1].
AI Insight generated on May 28, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected products
2Patches
22484ea8af037[9.2] [Security Solution] [HDQ]: integration-based targeting and descriptor versioning (#258418) (#260876)
14 files changed · +1978 −465
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_integration_resolver.test.ts+395 −0 added@@ -0,0 +1,395 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IntegrationResolverImpl } from './health_diagnostic_integration_resolver'; +import { QueryType, type IntegrationResolution } from './health_diagnostic_service.types'; +import type { PackageService } from '@kbn/fleet-plugin/server'; +import { + createMockLogger, + createMockQueryV1, + createMockQueryV2, + createMockPackageService, +} from './__mocks__'; + +const INSTALLED_PACKAGES = [ + { + name: 'endpoint', + version: '8.14.2', + status: 'installed', + data_streams: [ + { dataset: 'endpoint.events.process', type: 'logs' }, + { dataset: 'endpoint.events.network', type: 'logs' }, + { dataset: 'endpoint.events.network', type: 'traces' }, + ], + }, + { + name: 'fleet_server', + version: '1.3.1', + status: 'installed', + data_streams: [{ dataset: 'fleet_server.output', type: 'logs' }], + }, + { + name: 'system', + version: '1.0.0', + status: 'not_installed', + data_streams: [{ dataset: 'system.cpu', type: 'metrics' }], + }, +]; + +describe('IntegrationResolverImpl', () => { + let resolver: IntegrationResolverImpl; + let packageService: ReturnType<typeof createMockPackageService>; + + beforeEach(() => { + packageService = createMockPackageService(INSTALLED_PACKAGES); + resolver = new IntegrationResolverImpl( + packageService as unknown as PackageService, + createMockLogger() + ); + }); + + describe('v1 queries', () => { + it('passes v1 queries through as ExecutableQuery without calling Fleet', async () => { + const query = createMockQueryV1(QueryType.DSL); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('executable'); + if (results[0].kind !== 'executable') throw new Error('type guard'); + expect(results[0].query.version).toBe(1); + expect('resolution' in results[0]).toBe(false); + expect(packageService.asInternalUser.getPackages).not.toHaveBeenCalled(); + }); + }); + + describe('v2 queries', () => { + it('produces one ExecutableQuery per matched integration (exact name)', async () => { + const query = createMockQueryV2(QueryType.DSL, { integrations: ['endpoint'] }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('executable'); + if (results[0].kind !== 'executable') throw new Error('type guard'); + expect(results[0].query.version).toBe(2); + + const resolution = (results[0] as { resolution: IntegrationResolution }).resolution; + expect(resolution.name).toBe('endpoint'); + expect(resolution.version).toBe('8.14.2'); + expect(resolution.indices).toContain('logs-endpoint.events.process-*'); + expect(resolution.indices).toContain('logs-endpoint.events.network-*'); + }); + + it('produces one ExecutableQuery per matched integration (regex pattern)', async () => { + const query = createMockQueryV2(QueryType.DSL, { integrations: ['endpoint.*'] }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('executable'); + }); + + it('produces N ExecutableQueries for N matched integrations', async () => { + const query = createMockQueryV2(QueryType.DSL, { + integrations: ['endpoint', 'fleet_server'], + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(2); + expect(results.every((r) => r.kind === 'executable')).toBe(true); + const names = results.map( + (r) => (r as { resolution: IntegrationResolution }).resolution.name + ); + expect(names).toContain('endpoint'); + expect(names).toContain('fleet_server'); + }); + + describe('datastreamTypes filtering', () => { + it('includes only datastreams matching the type patterns', async () => { + const query = createMockQueryV2(QueryType.DSL, { + integrations: ['endpoint'], + datastreamTypes: ['logs'], + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + const resolution = (results[0] as { resolution: IntegrationResolution }).resolution; + expect(resolution.indices).toHaveLength(2); + expect(resolution.indices).toContain('logs-endpoint.events.process-*'); + expect(resolution.indices).toContain('logs-endpoint.events.network-*'); + }); + + it('skips an integration when no datastreams match the type pattern', async () => { + const query = createMockQueryV2(QueryType.DSL, { + integrations: ['endpoint'], + datastreamTypes: ['metrics'], + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('skipped'); + if (results[0].kind !== 'skipped') throw new Error('type guard'); + expect(results[0].reason).toBe('datastreams_not_matched'); + }); + + it('supports regex patterns in datastreamTypes', async () => { + const query = createMockQueryV2(QueryType.DSL, { + integrations: ['endpoint'], + datastreamTypes: ['log.*'], + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('executable'); + }); + + it('includes all datastreams when datastreamTypes is absent', async () => { + const query = createMockQueryV2(QueryType.DSL, { integrations: ['endpoint'] }); + const results = await resolver.resolve([query]); + + const resolution = (results[0] as { resolution: IntegrationResolution }).resolution; + expect(resolution.indices).toHaveLength(3); + }); + + it('selects only traces datastreams when filtering by traces', async () => { + const query = createMockQueryV2(QueryType.DSL, { + integrations: ['endpoint'], + datastreamTypes: ['traces'], + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('executable'); + const resolution = (results[0] as { resolution: IntegrationResolution }).resolution; + expect(resolution.indices).toHaveLength(1); + expect(resolution.indices).toContain('traces-endpoint.events.network-*'); + expect(resolution.indices).not.toContain('logs-endpoint.events.process-*'); + expect(resolution.indices).not.toContain('logs-endpoint.events.network-*'); + }); + + it('produces executable for integration with matching type and skipped for one without', async () => { + const query = createMockQueryV2(QueryType.DSL, { + integrations: ['endpoint', 'fleet_server'], + datastreamTypes: ['traces'], + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(2); + const endpointResult = results.find( + (r) => + 'resolution' in r && + (r as { resolution: IntegrationResolution }).resolution.name === 'endpoint' + ); + const fleetResult = results.find( + (r) => + !('resolution' in r) || + (r as { resolution: IntegrationResolution }).resolution.name === 'fleet_server' + ); + expect(endpointResult?.kind).toBe('executable'); + expect(fleetResult?.kind).toBe('skipped'); + if (fleetResult?.kind !== 'skipped') throw new Error('type guard'); + expect(fleetResult.reason).toBe('datastreams_not_matched'); + }); + + it('matches multiple types with a regex alternation pattern', async () => { + const query = createMockQueryV2(QueryType.DSL, { + integrations: ['endpoint'], + datastreamTypes: ['logs|traces'], + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('executable'); + const resolution = (results[0] as { resolution: IntegrationResolution }).resolution; + expect(resolution.indices).toHaveLength(3); + }); + }); + + test.each([ + ['pattern matches nothing', ['nonexistent.*']], + ['package exists but is not_installed', ['system']], + ])('returns a single SkippedQuery — %s', async (_label, integrations) => { + const query = createMockQueryV2(QueryType.DSL, { integrations }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('skipped'); + if (results[0].kind !== 'skipped') throw new Error('type guard'); + expect(results[0].reason).toBe('integration_not_installed'); + expect('resolution' in results[0]).toBe(false); + }); + + it('skips v2 ESQL query with FROM clause', async () => { + const query = createMockQueryV2(QueryType.ESQL, { + integrations: ['endpoint'], + query: 'FROM logs-* | stats count() by user.name', + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('skipped'); + if (results[0].kind !== 'skipped') throw new Error('type guard'); + expect(results[0].reason).toBe('unsupported_query'); + }); + + it('calls Fleet only once even for multiple v2 queries', async () => { + const q1 = createMockQueryV2(QueryType.DSL, { id: 'q1', integrations: ['endpoint'] }); + const q2 = createMockQueryV2(QueryType.DSL, { id: 'q2', integrations: ['fleet_server'] }); + await resolver.resolve([q1, q2]); + + expect(packageService.asInternalUser.getPackages).toHaveBeenCalledTimes(1); + }); + }); + + describe('Fleet unavailability', () => { + beforeEach(() => { + packageService.asInternalUser.getPackages.mockRejectedValue(new Error('Fleet is down')); + }); + + it('returns fleet_unavailable SkippedQuery for each v2 query when Fleet call fails', async () => { + const q1 = createMockQueryV2(QueryType.DSL, { id: 'q1', integrations: ['endpoint'] }); + const q2 = createMockQueryV2(QueryType.DSL, { id: 'q2', integrations: ['fleet_server'] }); + const results = await resolver.resolve([q1, q2]); + + expect(results).toHaveLength(2); + expect(results.every((r) => r.kind === 'skipped')).toBe(true); + results.forEach((r) => { + if (r.kind !== 'skipped') throw new Error('type guard'); + expect(r.reason).toBe('fleet_unavailable'); + }); + }); + + it('still runs v1 queries when Fleet call fails', async () => { + const v1 = createMockQueryV1(QueryType.DSL); + const v2 = createMockQueryV2(QueryType.DSL, { integrations: ['endpoint'] }); + const results = await resolver.resolve([v1, v2]); + + expect(results).toHaveLength(2); + expect(results[0].kind).toBe('executable'); + if (results[0].kind !== 'executable') throw new Error('type guard'); + expect(results[0].query.version).toBe(1); + + expect(results[1].kind).toBe('skipped'); + if (results[1].kind !== 'skipped') throw new Error('type guard'); + expect(results[1].reason).toBe('fleet_unavailable'); + }); + + it('does not call Fleet when there are only v1 queries', async () => { + packageService.asInternalUser.getPackages.mockClear(); + const v1 = createMockQueryV1(QueryType.DSL); + await resolver.resolve([v1]); + + expect(packageService.asInternalUser.getPackages).not.toHaveBeenCalled(); + }); + }); + + describe('v2 with direct index', () => { + it('returns ExecutableQuery without resolution when index is set', async () => { + const query = createMockQueryV2(QueryType.DSL, { + integrations: undefined, + index: 'logs-test-*', + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('executable'); + if (results[0].kind !== 'executable') throw new Error('type guard'); + expect(results[0].query.version).toBe(2); + expect('resolution' in results[0]).toBe(false); + }); + + it('does not call Fleet when index is set', async () => { + const query = createMockQueryV2(QueryType.DSL, { + integrations: undefined, + index: 'logs-test-*', + }); + await resolver.resolve([query]); + + expect(packageService.asInternalUser.getPackages).not.toHaveBeenCalled(); + }); + + it('executes index-based v2 query even when Fleet is unavailable', async () => { + packageService.asInternalUser.getPackages.mockRejectedValue(new Error('Fleet is down')); + + const query = createMockQueryV2(QueryType.DSL, { + integrations: undefined, + index: 'logs-test-*', + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('executable'); + }); + + it('skips index-based v2 ESQL query with FROM clause', async () => { + const query = createMockQueryV2(QueryType.ESQL, { + integrations: undefined, + index: 'logs-test-*', + query: 'FROM logs-* | stats count() by user.name', + }); + const results = await resolver.resolve([query]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('skipped'); + if (results[0].kind !== 'skipped') throw new Error('type guard'); + expect(results[0].reason).toBe('unsupported_query'); + expect(packageService.asInternalUser.getPackages).not.toHaveBeenCalled(); + }); + + it('resolves index-based v2 alongside integrations-based v2 in one call', async () => { + const indexQuery = createMockQueryV2(QueryType.DSL, { + id: 'q-index', + integrations: undefined, + index: 'logs-test-*', + }); + const integrationsQuery = createMockQueryV2(QueryType.DSL, { + id: 'q-integrations', + integrations: ['endpoint'], + }); + const results = await resolver.resolve([indexQuery, integrationsQuery]); + + expect(results).toHaveLength(2); + expect(results[0].kind).toBe('executable'); + expect(results[1].kind).toBe('executable'); + expect(packageService.asInternalUser.getPackages).toHaveBeenCalledTimes(1); + }); + }); + + describe('unknown version queries', () => { + it('returns SkippedQuery for ParseFailureQuery', async () => { + const unknown = { version: 99, id: 'future', name: 'future', _raw: {} }; + const results = await resolver.resolve([unknown]); + + expect(results).toHaveLength(1); + expect(results[0].kind).toBe('skipped'); + if (results[0].kind !== 'skipped') throw new Error('type guard'); + expect(results[0].reason).toBe('parse_failure'); + }); + }); + + describe('mixed queries', () => { + it('handles a mix of v1, v2, and unknown queries in one call', async () => { + const v1 = createMockQueryV1(QueryType.DSL); + const v2 = createMockQueryV2(QueryType.DSL, { integrations: ['endpoint'] }); + const unknown = { version: 99, id: 'x', name: 'x', _raw: {} }; + + const results = await resolver.resolve([v1, v2, unknown]); + + expect(results).toHaveLength(3); // 1 v1 + 1 v2 (1 integration) + 1 unknown + expect(results[0].kind).toBe('executable'); + expect(results[1].kind).toBe('executable'); + expect(results[2].kind).toBe('skipped'); + expect(packageService.asInternalUser.getPackages).toHaveBeenCalledTimes(1); + }); + + it('expands v2 with two matched integrations to two ExecutableQueries', async () => { + const v2 = createMockQueryV2(QueryType.DSL, { integrations: ['endpoint', 'fleet_server'] }); + const results = await resolver.resolve([v2]); + + expect(results).toHaveLength(2); + expect(results.every((r) => r.kind === 'executable')).toBe(true); + }); + }); +});
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_integration_resolver.ts+151 −0 added@@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger, LogMeta } from '@kbn/core/server'; +import type { PackageService } from '@kbn/fleet-plugin/server'; +import { + QueryType, + type HealthDiagnosticQuery, + type HealthDiagnosticQueryV1, + type HealthDiagnosticQueryV2, + type ExecutableQuery, + type SkippedQuery, + type ResolvedQuery, + type IntegrationResolution, +} from './health_diagnostic_service.types'; + +export interface IntegrationResolver { + resolve(queries: HealthDiagnosticQuery[]): Promise<ResolvedQuery[]>; +} + +export class IntegrationResolverImpl implements IntegrationResolver { + constructor(private readonly packageService: PackageService, private readonly logger: Logger) {} + + async resolve(queries: HealthDiagnosticQuery[]): Promise<ResolvedQuery[]> { + const hasV2WithIntegrations = queries.some( + (q) => 'version' in q && q.version === 2 && !(q as HealthDiagnosticQueryV2).index + ); + let installedPackages: InstalledPackage[] = []; + let fleetUnavailable = false; + + if (hasV2WithIntegrations) { + try { + installedPackages = await this.fetchInstalledPackages(); + } catch (err) { + // just log as debug since it's not necessary to pollute logs with errors if fleet is unavailable - we'll just + // skip v2 queries and inform it accordingly in the stats + this.logger.debug( + 'Failed to fetch installed packages from Fleet; v2 queries will be skipped', + { + error: err.message, + } as LogMeta + ); + fleetUnavailable = true; + } + } + + return queries.flatMap((query) => { + if ('version' in query && query.version === 1) { + return [this.resolveV1(query)]; + } else if ('version' in query && query.version === 2) { + // skip ESQL queries with FROM clause since either `integrations` or `index` specify on + // which indices or datastreams run the query. + if (query.type === QueryType.ESQL && /^[\s\r\n]*FROM/i.test(query.query)) { + return [{ kind: 'skipped', query, reason: 'unsupported_query' } as SkippedQuery]; + } + if ((query as HealthDiagnosticQueryV2).index) { + // index-based v2: resolve directly, no Fleet needed + return [{ kind: 'executable', query } as ExecutableQuery]; + } + if (fleetUnavailable) { + return [{ kind: 'skipped', query, reason: 'fleet_unavailable' } as SkippedQuery]; + } + return this.resolveV2(query as HealthDiagnosticQueryV2, installedPackages); + } else { + return [this.resolveUnknown(query)]; + } + }); + } + + private resolveV1(query: HealthDiagnosticQueryV1): ExecutableQuery { + return { kind: 'executable', query }; + } + + private resolveV2( + query: HealthDiagnosticQueryV2, + installedPackages: InstalledPackage[] + ): ResolvedQuery[] { + const { integrations: patterns, datastreamTypes: typePatterns } = query; + if (!patterns) { + return []; + } + const matched = installedPackages.filter((pkg) => + patterns.some((pattern) => { + try { + return new RegExp(`^${pattern}$`).test(pkg.name); + } catch { + this.logger.warn(`Invalid regex pattern in integrations field: ${pattern}`); + return false; + } + }) + ); + + if (matched.length === 0) { + this.logger.debug('No matching integrations found, skipping query', { + queryName: query.name, + } as LogMeta); + return [{ kind: 'skipped', query, reason: 'integration_not_installed' }]; + } + + return matched.map((pkg) => { + const dataStreams = (pkg.data_streams ?? []).filter((ds) => { + if (!typePatterns || typePatterns.length === 0) return true; + return typePatterns.some((pattern) => { + try { + return new RegExp(`^${pattern}$`).test(ds.type); + } catch { + this.logger.warn(`Invalid regex pattern in datastreamTypes field: ${pattern}`); + return false; + } + }); + }); + + if (dataStreams.length === 0) { + this.logger.debug('Integration matched but no datastreams passed type filter, skipping', { + queryName: query.name, + integration: pkg.name, + typePatterns, + } as LogMeta); + return { kind: 'skipped', query, reason: 'datastreams_not_matched' } as SkippedQuery; + } + + const indices = dataStreams.map((ds) => `${ds.type}-${ds.dataset}-*`); + const resolution: IntegrationResolution = { name: pkg.name, version: pkg.version, indices }; + return { kind: 'executable', query, resolution } as ExecutableQuery; + }); + } + + private resolveUnknown(query: HealthDiagnosticQuery): SkippedQuery { + this.logger.warn('Skipping query that failed to parse', { + queryId: (query as any).id, // eslint-disable-line @typescript-eslint/no-explicit-any + name: query.name, + } as LogMeta); + return { kind: 'skipped', query, reason: 'parse_failure' }; + } + + private async fetchInstalledPackages(): Promise<InstalledPackage[]> { + const all = await this.packageService.asInternalUser.getPackages(); + return all.filter((pkg) => pkg.status === 'installed') as InstalledPackage[]; + } +} + +interface InstalledPackage { + name: string; + version: string; + status: string; + data_streams?: Array<{ dataset: string; type: string }>; +}
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_query_parser.test.ts+405 −0 added@@ -0,0 +1,405 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { parseHealthDiagnosticQueries } from './health_diagnostic_query_parser'; +import { + QueryType, + Action, + type ParseFailureQuery, + type HealthDiagnosticQueryV1, + type HealthDiagnosticQueryV2, +} from './health_diagnostic_service.types'; + +const V1_NO_VERSION_YAML = `--- +id: q1 +name: my-v1-query +index: logs-endpoint.* +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + +const V1_EXPLICIT_YAML = `--- +version: 1 +id: q1 +name: my-v1-query +index: logs-endpoint.* +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + +const V2_YAML = `--- +version: 2 +id: q2 +name: my-v2-query +integrations: 'endpoint.*,fleet_server' +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + +const UNKNOWN_VERSION_YAML = `--- +version: 99 +id: q-future +name: future-query +someNewField: something`; + +describe('parseHealthDiagnosticQueries', () => { + describe('v1 parsing', () => { + test.each([ + ['no version field (legacy)', V1_NO_VERSION_YAML], + ['explicit version: 1', V1_EXPLICIT_YAML], + ])('parses as v1 — %s', (_label, yaml) => { + const queries = parseHealthDiagnosticQueries(yaml); + const q = queries[0] as unknown as HealthDiagnosticQueryV1; + expect(q.version).toBe(1); + if (q.version !== 1) throw new Error('type guard'); + expect(q.index).toBe('logs-endpoint.*'); + expect(q.id).toBe('q1'); + expect(q.type).toBe(QueryType.DSL); + expect(q.filterlist).toEqual({ 'user.name': Action.KEEP }); + }); + + it('returns ParseFailureQuery when v1 descriptor is missing required index field', () => { + const yaml = `--- +id: bad +name: bad +type: DSL +query: 'x' +scheduleCron: 5m +filterlist: + user.name: keep`; + const [q] = parseHealthDiagnosticQueries(yaml); + expect((q as ParseFailureQuery)._raw).toBeDefined(); + expect((q as ParseFailureQuery).id).toBe('bad'); + }); + + it('returns ParseFailureQuery when v1 descriptor is missing the enabled field', () => { + const yaml = `--- +id: no-enabled +name: no-enabled +index: logs-endpoint.* +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep`; + const [q] = parseHealthDiagnosticQueries(yaml); + expect((q as ParseFailureQuery)._raw).toBeDefined(); + expect((q as ParseFailureQuery).id).toBe('no-enabled'); + }); + }); + + describe('v2 parsing', () => { + it('parses a descriptor with version: 2 as v2', () => { + const queries = parseHealthDiagnosticQueries(V2_YAML); + const q = queries[0] as unknown as HealthDiagnosticQueryV2; + expect(q.version).toBe(2); + if (q.version !== 2) throw new Error('type guard'); + expect(q.integrations).toEqual(['endpoint.*', 'fleet_server']); + expect(q.id).toBe('q2'); + }); + + it('returns ParseFailureQuery when neither integrations nor index is present', () => { + const yaml = `--- +version: 2 +id: bad-v2 +name: bad-v2 +type: DSL +query: 'x' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + const [q] = parseHealthDiagnosticQueries(yaml); + expect((q as ParseFailureQuery)._raw).toBeDefined(); + }); + }); + + describe('v2 with index', () => { + it('parses successfully when index is present and integrations is absent', () => { + const yaml = `--- +version: 2 +id: q2-index +name: my-v2-index-query +index: logs-test-* +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + const [q] = parseHealthDiagnosticQueries(yaml); + const v2 = q as unknown as HealthDiagnosticQueryV2; + expect(v2.version).toBe(2); + expect(v2.index).toBe('logs-test-*'); + expect(v2.integrations).toBeUndefined(); + expect(v2.datastreamTypes).toBeUndefined(); + }); + + it('returns ParseFailureQuery when both integrations and index are present', () => { + const yaml = `--- +version: 2 +id: bad-both +name: bad-both +integrations: 'endpoint.*' +index: logs-test-* +type: DSL +query: 'x' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + const [q] = parseHealthDiagnosticQueries(yaml); + expect((q as ParseFailureQuery)._raw).toBeDefined(); + expect((q as ParseFailureQuery).id).toBe('bad-both'); + }); + + it('returns ParseFailureQuery when neither integrations nor index is present (explicit)', () => { + const yaml = `--- +version: 2 +id: bad-neither +name: bad-neither +type: DSL +query: 'x' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + const [q] = parseHealthDiagnosticQueries(yaml); + expect((q as ParseFailureQuery)._raw).toBeDefined(); + expect((q as ParseFailureQuery).id).toBe('bad-neither'); + }); + }); + + describe('types field', () => { + it('parses a comma-separated datastreamTypes string into an array', () => { + const yaml = `--- +version: 2 +id: q-types +name: q-types +integrations: 'endpoint.*' +datastreamTypes: 'logs,metrics.*' +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + const [q] = parseHealthDiagnosticQueries(yaml); + const v2 = q as unknown as HealthDiagnosticQueryV2; + expect(v2.version).toBe(2); + expect(v2.datastreamTypes).toEqual(['logs', 'metrics.*']); + }); + + it('leaves types undefined when the field is absent', () => { + const [q] = parseHealthDiagnosticQueries(V2_YAML); + const v2 = q as unknown as HealthDiagnosticQueryV2; + expect(v2.datastreamTypes).toBeUndefined(); + }); + + it('returns ParseFailureQuery when datastreamTypes is a number', () => { + const yaml = ` +id: q1 +name: q1 +version: 2 +type: DSL +query: '{"query":{"match_all":{}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true +integrations: endpoint +datastreamTypes: 42`; + const [q] = parseHealthDiagnosticQueries(yaml); + expect((q as ParseFailureQuery)._raw).toBeDefined(); + }); + + it('returns ParseFailureQuery when datastreamTypes is a YAML list', () => { + const yaml = ` +id: q1 +name: q1 +version: 2 +type: DSL +query: '{"query":{"match_all":{}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true +integrations: endpoint +datastreamTypes: + - logs + - metrics`; + const [q] = parseHealthDiagnosticQueries(yaml); + expect((q as ParseFailureQuery)._raw).toBeDefined(); + }); + + it('returns ParseFailureQuery when datastreamTypes is an empty string', () => { + const yaml = ` +id: q1 +name: q1 +version: 2 +type: DSL +query: '{"query":{"match_all":{}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true +integrations: endpoint +datastreamTypes: ''`; + const [q] = parseHealthDiagnosticQueries(yaml); + expect((q as ParseFailureQuery)._raw).toBeDefined(); + }); + + it('does not leak datastreamTypes on index-based path', () => { + const yaml = ` +id: q1 +name: q1 +version: 2 +type: DSL +query: '{"query":{"match_all":{}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true +index: logs-test-* +datastreamTypes: logs`; + const [q] = parseHealthDiagnosticQueries(yaml) as HealthDiagnosticQueryV2[]; + expect('datastreamTypes' in q).toBe(false); + }); + }); + + describe('invalid type enum', () => { + it('returns ParseFailureQuery for v1 descriptor with invalid type', () => { + const yaml = ` +id: q1 +name: q1 +index: logs-endpoint.* +type: foo +query: '{"query":{"match_all":{}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + expect((parseHealthDiagnosticQueries(yaml)[0] as ParseFailureQuery)._raw).toBeDefined(); + }); + + it('returns ParseFailureQuery for v2 descriptor with invalid type', () => { + const yaml = ` +id: q1 +name: q1 +version: 2 +integrations: endpoint +type: foo +query: '{"query":{"match_all":{}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + expect((parseHealthDiagnosticQueries(yaml)[0] as ParseFailureQuery)._raw).toBeDefined(); + }); + }); + + describe('invalid filterlist', () => { + it('returns ParseFailureQuery when filterlist is an array', () => { + const yaml = ` +id: q1 +name: q1 +index: logs-endpoint.* +type: DSL +query: '{"query":{"match_all":{}}}' +scheduleCron: 5m +filterlist: + - keep +enabled: true`; + expect((parseHealthDiagnosticQueries(yaml)[0] as ParseFailureQuery)._raw).toBeDefined(); + }); + + it('returns ParseFailureQuery when filterlist contains an invalid action value', () => { + const yaml = ` +id: q1 +name: q1 +index: logs-endpoint.* +type: DSL +query: '{"query":{"match_all":{}}}' +scheduleCron: 5m +filterlist: + user.name: badvalue +enabled: true`; + expect((parseHealthDiagnosticQueries(yaml)[0] as ParseFailureQuery)._raw).toBeDefined(); + }); + }); + + describe('invalid integrations', () => { + it('returns ParseFailureQuery when integrations normalises to empty array', () => { + const yaml = ` +id: q1 +name: q1 +version: 2 +integrations: ' , ' +type: DSL +query: '{"query":{"match_all":{}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + expect((parseHealthDiagnosticQueries(yaml)[0] as ParseFailureQuery)._raw).toBeDefined(); + }); + }); + + describe('unknown version', () => { + it('returns ParseFailureQuery for an unrecognised version', () => { + const [q] = parseHealthDiagnosticQueries(UNKNOWN_VERSION_YAML); + expect((q as ParseFailureQuery)._raw).toBeDefined(); + expect((q as ParseFailureQuery).id).toBe('q-future'); + }); + }); + + describe('multi-document artifact', () => { + it('parses multiple YAML documents, each independently', () => { + const multiDoc = [V1_NO_VERSION_YAML, V2_YAML, UNKNOWN_VERSION_YAML].join('\n'); + const queries = parseHealthDiagnosticQueries(multiDoc); + expect(queries).toHaveLength(3); + expect((queries[0] as HealthDiagnosticQueryV1).version).toBe(1); + expect((queries[1] as HealthDiagnosticQueryV2).version).toBe(2); + expect((queries[2] as ParseFailureQuery)._raw).toBeDefined(); + }); + + it('a malformed document becomes ParseFailureQuery without dropping others', () => { + const goodDoc = `--- +id: good +name: good +index: logs-* +type: DSL +query: 'x' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`; + const badDoc = `--- +version: 2 +id: bad +name: bad +type: DSL +query: 'x' +scheduleCron: 5m +filterlist: + user.name: keep`; + const queries = parseHealthDiagnosticQueries(`${goodDoc}\n${badDoc}`); + expect(queries).toHaveLength(2); + expect((queries[0] as HealthDiagnosticQueryV1).version).toBe(1); + expect((queries[1] as ParseFailureQuery)._raw).toBeDefined(); + }); + }); +});
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_query_parser.ts+150 −0 added@@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import * as YAML from 'yaml'; +import { + QueryType, + Action, + type HealthDiagnosticQuery, + type HealthDiagnosticQueryV1, + type HealthDiagnosticQueryV2, + type ParseFailureQuery, +} from './health_diagnostic_service.types'; + +export const parseHealthDiagnosticQueries = (input: unknown): HealthDiagnosticQuery[] => + YAML.parseAllDocuments(input as string).map(parseOne); + +const parseOne = (doc: YAML.Document): HealthDiagnosticQuery => { + const raw = doc.toJSON() as Record<string, unknown> | null; + const version = raw?.version; + + try { + if (version === undefined || version === 1) { + return parseV1(raw); + } else if (version === 2) { + return parseV2(raw); + } else { + return parseUnknown(raw); + } + } catch { + return parseUnknown(raw); + } +}; + +const parseV1 = (raw: Record<string, unknown> | null): HealthDiagnosticQueryV1 => { + assertRequiredString(raw, 'id'); + assertRequiredString(raw, 'name'); + assertRequiredString(raw, 'index'); + assertRequiredEnum(raw, 'type', Object.values(QueryType)); + assertRequiredString(raw, 'query'); + assertRequiredString(raw, 'scheduleCron'); + assertRequiredObject(raw, 'filterlist'); + assertFilterlistActions(raw); + assertRequiredBoolean(raw, 'enabled'); + + return { ...(raw as Record<string, unknown>), version: 1 } as HealthDiagnosticQueryV1; +}; + +const parseV2 = (raw: Record<string, unknown> | null): HealthDiagnosticQueryV2 => { + assertRequiredString(raw, 'id'); + assertRequiredString(raw, 'name'); + assertRequiredEnum(raw, 'type', Object.values(QueryType)); + assertRequiredString(raw, 'query'); + assertRequiredString(raw, 'scheduleCron'); + assertRequiredObject(raw, 'filterlist'); + assertFilterlistActions(raw); + assertRequiredBoolean(raw, 'enabled'); + + const hasIntegrations = raw && typeof raw.integrations === 'string' && raw.integrations !== ''; + const hasIndex = raw && typeof raw.index === 'string' && raw.index !== ''; + if (!hasIntegrations && !hasIndex) { + throw new Error('v2 descriptor must have either integrations or index'); + } + if (hasIntegrations && hasIndex) { + throw new Error('v2 descriptor must not have both integrations and index'); + } + + const integrations = hasIntegrations + ? (raw.integrations as string) + .split(',') + .map((p) => p.trim()) + .filter((p) => p.length > 0) + : undefined; + + if (integrations !== undefined && integrations.length === 0) { + throw new Error('integrations must contain at least one non-empty pattern'); + } + + const typesRaw = (raw as Record<string, unknown>).datastreamTypes; + if (typesRaw !== undefined && typesRaw !== null) { + if (typeof typesRaw !== 'string' || typesRaw.trim() === '') { + throw new Error('datastreamTypes must be a non-empty comma-separated string when present'); + } + } + const types = + hasIntegrations && typeof typesRaw === 'string' + ? typesRaw + .split(',') + .map((p) => p.trim()) + .filter((p) => p.length > 0) + : undefined; + + const { datastreamTypes: _drop, ...rest } = raw as Record<string, unknown>; + return { + ...rest, + version: 2, + ...(integrations !== undefined ? { integrations } : {}), + ...(types !== undefined ? { datastreamTypes: types } : {}), + } as HealthDiagnosticQueryV2; +}; + +const parseUnknown = (raw: unknown): ParseFailureQuery => { + const obj = raw as Record<string, unknown> | null; + return { + id: obj?.id as string | undefined, + name: obj?.name as string | undefined, + _raw: raw, + }; +}; + +const assertRequiredString = (raw: Record<string, unknown> | null, field: string): void => { + if (!raw || typeof raw[field] !== 'string' || raw[field] === '') { + throw new Error(`Missing or invalid required field: ${field}`); + } +}; + +const assertRequiredObject = (raw: Record<string, unknown> | null, field: string): void => { + if (!raw || typeof raw[field] !== 'object' || raw[field] === null || Array.isArray(raw[field])) { + throw new Error(`Missing or invalid required field: ${field}`); + } +}; + +const assertRequiredEnum = ( + raw: Record<string, unknown> | null, + field: string, + values: readonly string[] +): void => { + if (!raw || !values.includes(raw[field] as string)) { + throw new Error(`Missing or invalid required field: ${field}`); + } +}; + +const assertFilterlistActions = (raw: Record<string, unknown> | null): void => { + const fl = raw?.filterlist as Record<string, unknown>; + const validActions = Object.values(Action) as string[]; + for (const value of Object.values(fl)) { + if (!validActions.includes(value as string)) { + throw new Error(`Invalid filterlist action value: ${value}`); + } + } +}; + +const assertRequiredBoolean = (raw: Record<string, unknown> | null, field: string): void => { + if (!raw || typeof raw[field] !== 'boolean') { + throw new Error(`Missing or invalid required field: ${field}`); + } +};
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_receiver.test.ts+151 −151 modified@@ -12,18 +12,40 @@ import { createMockLogger, createMockEsClient, createMockCircuitBreaker, - createMockQuery, + createMockQueryV1, + createMockQueryV2, createMockSearchResponse, createMockEqlResponse, setupPointInTime, executeObservableTest, + type HealthDiagnosticQueryV1, + type HealthDiagnosticQueryV2, } from './__mocks__'; describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryExecutor', () => { let queryExecutor: CircuitBreakingQueryExecutorImpl; let mockEsClient: ReturnType<typeof createMockEsClient>; let mockLogger: ReturnType<typeof createMockLogger>; + const mkExecV1 = (type: QueryType, overrides: Partial<HealthDiagnosticQueryV1> = {}) => ({ + kind: 'executable' as const, + query: createMockQueryV1(type, overrides), + }); + + const mkExecV2 = ( + type: QueryType, + overrides: Partial<HealthDiagnosticQueryV2> = {}, + indices = ['logs-endpoint.events.process-default'] + ) => ({ + kind: 'executable' as const, + query: createMockQueryV2(type, overrides), + resolution: { + name: 'endpoint', + version: '8.14.2', + indices, + }, + }); + beforeEach(() => { mockEsClient = createMockEsClient(); mockLogger = createMockLogger(); @@ -38,15 +60,15 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should run DSL query successfully', (done) => { - const query = createMockQuery(QueryType.DSL); + const execQuery = mkExecV1(QueryType.DSL); const circuitBreakers = [createMockCircuitBreaker(true)]; mockEsClient.search .mockResolvedValueOnce(createMockSearchResponse([mockDocument])) .mockResolvedValueOnce(createMockSearchResponse([])); executeObservableTest( - queryExecutor.search({ query, circuitBreakers }), + queryExecutor.search({ query: execQuery, circuitBreakers }), (results, completed) => { expect(results).toHaveLength(1); expect(results[0]).toEqual(mockDocument); @@ -66,14 +88,14 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should handle DSL query with aggregations', (done) => { - const query = createMockQuery(QueryType.DSL); + const execQuery = mkExecV1(QueryType.DSL); const circuitBreaker = createMockCircuitBreaker(true); const aggregations = { bucket_count: { value: 42 } }; mockEsClient.search.mockResolvedValueOnce(createMockSearchResponse([], aggregations)); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), (results) => { expect(results).toHaveLength(1); expect(results[0]).toEqual(aggregations); @@ -84,7 +106,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should handle multiple pages of DSL results', (done) => { - const query = createMockQuery(QueryType.DSL); + const execQuery = mkExecV1(QueryType.DSL); const circuitBreaker = createMockCircuitBreaker(true); const doc1 = { ...mockDocument, id: 1 }; const doc2 = { ...mockDocument, id: 2 }; @@ -95,7 +117,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx .mockResolvedValueOnce(createMockSearchResponse([], undefined, 'test-pit-id-3')); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), (results) => { expect(results).toHaveLength(2); expect(results[0]).toEqual(doc1); @@ -123,7 +145,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should handle queries with tiers filtering', (done) => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot', 'warm'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot', 'warm'] }); const circuitBreaker = createMockCircuitBreaker(true); mockEsClient.ilm.explainLifecycle.mockResolvedValue({ @@ -137,7 +159,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx mockEsClient.search.mockResolvedValue(createMockSearchResponse([])); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), () => { expect(mockEsClient.ilm.explainLifecycle).toHaveBeenCalledWith({ index: 'test-index', @@ -149,6 +171,23 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx done ); }); + + it('streamDSL uses resolved indices from v2 ExecutableQuery', async () => { + const execQuery = mkExecV2(QueryType.DSL, {}, ['logs-endpoint.events.process-default']); + setupPointInTime(mockEsClient, 'pit-id'); + mockEsClient.search.mockResolvedValueOnce(createMockSearchResponse([], undefined, 'pit-id')); + + await new Promise<void>((resolve) => { + queryExecutor.streamDSL(execQuery, new AbortController().signal).subscribe({ + complete: resolve, + error: resolve, + }); + }); + + expect(mockEsClient.openPointInTime).toHaveBeenCalledWith( + expect.objectContaining({ index: ['logs-endpoint.events.process-default'] }) + ); + }); }); describe('EQL queries', () => { @@ -158,12 +197,11 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx ]; beforeEach(() => { - mockEsClient.indices.exists.mockResolvedValue(true); mockEsClient.security.hasPrivileges.mockResolvedValue({ has_all_requested: true }); }); test('should run EQL query with events successfully', (done) => { - const query = createMockQuery(QueryType.EQL, { + const execQuery = mkExecV1(QueryType.EQL, { query: 'process where process.name == "cmd.exe"', }); const circuitBreaker = createMockCircuitBreaker(true); @@ -172,7 +210,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx mockEsClient.eql.search.mockResolvedValue(createMockEqlResponse(eventSources)); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), (results) => { expect(results).toHaveLength(2); expect(results[0]).toEqual(eventSources[0]); @@ -192,7 +230,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should run EQL query with sequences successfully', (done) => { - const query = createMockQuery(QueryType.EQL, { + const execQuery = mkExecV1(QueryType.EQL, { query: 'sequence [process where true] [network where true]', }); const circuitBreaker = createMockCircuitBreaker(true); @@ -209,7 +247,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx mockEsClient.eql.search.mockResolvedValue(createMockEqlResponse(undefined, mockSequences)); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), (results) => { expect(results).toHaveLength(1); expect(results[0]).toEqual(mockSequences[0].events.map((e) => e._source)); @@ -220,13 +258,13 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should handle EQL query with no results', (done) => { - const query = createMockQuery(QueryType.EQL); + const execQuery = mkExecV1(QueryType.EQL); const circuitBreaker = createMockCircuitBreaker(true); mockEsClient.eql.search.mockResolvedValue({ hits: {} }); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), (results) => { expect(results).toHaveLength(0); done(); @@ -238,12 +276,11 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx describe('ES|QL queries', () => { beforeEach(() => { - mockEsClient.indices.exists.mockResolvedValue(true); mockEsClient.security.hasPrivileges.mockResolvedValue({ has_all_requested: true }); }); test('should run ES|QL query successfully', (done) => { - const query = createMockQuery(QueryType.ESQL, { query: 'stats count() by user.name' }); + const execQuery = mkExecV1(QueryType.ESQL, { query: 'stats count() by user.name' }); const circuitBreaker = createMockCircuitBreaker(true); const mockRecords = [ @@ -255,7 +292,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx mockEsClient.helpers.esql.mockReturnValue({ toRecords: mockToRecords }); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), (results) => { expect(results).toHaveLength(2); expect(results[0]).toEqual(mockRecords[0]); @@ -270,8 +307,52 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx ); }); + test('v2 ESQL query with FROM clause errors without calling esql', (done) => { + const execQuery = mkExecV2(QueryType.ESQL, { + query: 'FROM logs-* | stats count() by user.name', + }); + const circuitBreaker = createMockCircuitBreaker(true); + + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }).subscribe({ + next: () => done(new Error('Should not emit')), + error: (error) => { + try { + expect(error.message).toContain('FROM clause'); + expect(mockEsClient.helpers.esql).not.toHaveBeenCalled(); + done(); + } catch (e) { + done(e); + } + }, + complete: () => done(new Error('Should not complete successfully')), + }); + }); + + test('should not inject FROM prefix when v1 query starts with lowercase from', (done) => { + const execQuery = mkExecV1(QueryType.ESQL, { + query: 'from logs-* | stats count() by user.name', + }); + const circuitBreaker = createMockCircuitBreaker(true); + + const mockRecords = [{ 'user.name': 'test', 'count()': 1 }]; + const mockToRecords = jest.fn().mockResolvedValue({ records: mockRecords }); + mockEsClient.helpers.esql.mockReturnValue({ toRecords: mockToRecords }); + + executeObservableTest( + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), + () => { + expect(mockEsClient.helpers.esql).toHaveBeenCalledWith( + { query: 'from logs-* | stats count() by user.name' }, + { signal: expect.any(AbortSignal) } + ); + done(); + }, + done + ); + }); + test('should handle ES|QL query with FROM clause already present', (done) => { - const query = createMockQuery(QueryType.ESQL, { + const execQuery = mkExecV1(QueryType.ESQL, { query: 'FROM logs-* | stats count() by user.name', }); const circuitBreaker = createMockCircuitBreaker(true); @@ -281,7 +362,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx mockEsClient.helpers.esql.mockReturnValue({ toRecords: mockToRecords }); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), () => { expect(mockEsClient.helpers.esql).toHaveBeenCalledWith( { query: 'FROM logs-* | stats count() by user.name' }, @@ -296,7 +377,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx describe('Circuit breaker functionality', () => { test('should trigger circuit breaker and abort query', (done) => { - const query = createMockQuery(QueryType.DSL); + const execQuery = mkExecV1(QueryType.DSL); const circuitBreaker = createMockCircuitBreaker(false, 10); setupPointInTime(mockEsClient); @@ -320,7 +401,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx ) ); - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }).subscribe({ next: () => {}, error: (error) => { try { @@ -339,7 +420,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx describe('Error handling', () => { test('should handle Elasticsearch search errors', (done) => { - const query = createMockQuery(QueryType.DSL); + const execQuery = mkExecV1(QueryType.DSL); const circuitBreaker = createMockCircuitBreaker(true); setupPointInTime(mockEsClient); @@ -348,7 +429,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); mockEsClient.search.mockRejectedValue(new Error('Elasticsearch error')); - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }).subscribe({ next: () => {}, error: (error) => { expect(error.message).toBe('Elasticsearch error'); @@ -359,24 +440,24 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should handle unsupported query type', () => { - const query = createMockQuery('INVALID' as QueryType); + const execQuery = mkExecV1('INVALID' as QueryType); const circuitBreaker = createMockCircuitBreaker(true); expect(() => { - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }); + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }); }).toThrow('Unhandled QueryType: INVALID'); }); test('should handle ILM explain lifecycle errors gracefully', (done) => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot'] }); const circuitBreaker = createMockCircuitBreaker(true); mockEsClient.ilm.explainLifecycle.mockResolvedValue({ indices: undefined }); setupPointInTime(mockEsClient); mockEsClient.search.mockResolvedValue(createMockSearchResponse([])); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), () => { expect(mockEsClient.openPointInTime).toHaveBeenCalledWith({ index: ['test-index'], @@ -389,7 +470,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should handle ILM API errors and assume serverless', (done) => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot', 'warm'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot', 'warm'] }); const circuitBreaker = createMockCircuitBreaker(true); const ilmError = new Error( @@ -400,7 +481,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx mockEsClient.search.mockResolvedValue(createMockSearchResponse([])); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), () => { expect(mockEsClient.ilm.explainLifecycle).toHaveBeenCalledWith({ index: 'test-index', @@ -418,7 +499,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should handle network errors during ILM checks', (done) => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot'] }); const circuitBreaker = createMockCircuitBreaker(true); const networkError = new Error('ECONNREFUSED'); @@ -427,7 +508,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx mockEsClient.search.mockResolvedValue(createMockSearchResponse([])); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), () => { expect(mockEsClient.openPointInTime).toHaveBeenCalledWith({ index: ['test-index'], @@ -440,15 +521,15 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should handle malformed ILM responses', (done) => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot'] }); const circuitBreaker = createMockCircuitBreaker(true); mockEsClient.ilm.explainLifecycle.mockResolvedValue({}); setupPointInTime(mockEsClient); mockEsClient.search.mockResolvedValue(createMockSearchResponse([])); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), () => { expect(mockEsClient.openPointInTime).toHaveBeenCalledWith({ index: ['test-index'], @@ -462,22 +543,18 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); describe('Permission checking', () => { - test('should proceed when index exists and has read privileges', (done) => { - const query = createMockQuery(QueryType.DSL); + test('should proceed when has read privileges', (done) => { + const execQuery = mkExecV1(QueryType.DSL); const circuitBreaker = createMockCircuitBreaker(true); setupPointInTime(mockEsClient); mockEsClient.search.mockResolvedValue(createMockSearchResponse([])); executeObservableTest( - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }), + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }), () => { - expect(mockEsClient.indices.exists).toHaveBeenCalledWith({ - index: 'test-index', - allow_no_indices: false, - }); expect(mockEsClient.security.hasPrivileges).toHaveBeenCalledWith({ - index: [{ names: 'test-index', privileges: ['read'] }], + index: [{ names: ['test-index'], privileges: ['read'] }], }); expect(mockEsClient.openPointInTime).toHaveBeenCalled(); done(); @@ -486,51 +563,13 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx ); }); - test('should throw PermissionError when indices.exists returns false', (done) => { - const query = createMockQuery(QueryType.DSL); - const circuitBreaker = createMockCircuitBreaker(true); - - mockEsClient.indices.exists.mockResolvedValue(false); - - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ - next: () => {}, - error: (error) => { - expect(error).toBeInstanceOf(PermissionError); - expect(error.message).toContain('Index does not exist'); - expect(mockEsClient.openPointInTime).not.toHaveBeenCalled(); - done(); - }, - complete: () => done(new Error('Should not complete successfully')), - }); - }); - - test('should throw PermissionError when indices.exists throws', (done) => { - const query = createMockQuery(QueryType.DSL); - const circuitBreaker = createMockCircuitBreaker(true); - - mockEsClient.indices.exists.mockRejectedValue(new Error('index_not_found_exception')); - - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ - next: () => {}, - error: (error) => { - expect(error).toBeInstanceOf(PermissionError); - expect(error.message).toContain('Error accessing index'); - expect(error.message).toContain('index_not_found_exception'); - expect(mockEsClient.openPointInTime).not.toHaveBeenCalled(); - done(); - }, - complete: () => done(new Error('Should not complete successfully')), - }); - }); - test('should throw PermissionError when missing read privileges', (done) => { - const query = createMockQuery(QueryType.DSL); + const execQuery = mkExecV1(QueryType.DSL); const circuitBreaker = createMockCircuitBreaker(true); - mockEsClient.indices.exists.mockResolvedValue(true); mockEsClient.security.hasPrivileges.mockResolvedValue({ has_all_requested: false }); - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }).subscribe({ next: () => {}, error: (error) => { expect(error).toBeInstanceOf(PermissionError); @@ -543,13 +582,12 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should throw PermissionError when security.hasPrivileges throws', (done) => { - const query = createMockQuery(QueryType.DSL); + const execQuery = mkExecV1(QueryType.DSL); const circuitBreaker = createMockCircuitBreaker(true); - mockEsClient.indices.exists.mockResolvedValue(true); mockEsClient.security.hasPrivileges.mockRejectedValue(new Error('security_exception')); - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }).subscribe({ next: () => {}, error: (error) => { expect(error).toBeInstanceOf(PermissionError); @@ -563,12 +601,12 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should not call closePointInTime when permission check fails', (done) => { - const query = createMockQuery(QueryType.DSL); + const execQuery = mkExecV1(QueryType.DSL); const circuitBreaker = createMockCircuitBreaker(true); - mockEsClient.indices.exists.mockRejectedValue(new Error('no access')); + mockEsClient.security.hasPrivileges.mockRejectedValue(new Error('no access')); - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }).subscribe({ next: () => {}, error: () => { setTimeout(() => { @@ -580,32 +618,13 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); }); - test('should throw PermissionError for EQL when index does not exist', (done) => { - const query = createMockQuery(QueryType.EQL); - const circuitBreaker = createMockCircuitBreaker(true); - - mockEsClient.indices.exists.mockResolvedValue(false); - - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ - next: () => {}, - error: (error) => { - expect(error).toBeInstanceOf(PermissionError); - expect(error.message).toContain('Index does not exist'); - expect(mockEsClient.eql.search).not.toHaveBeenCalled(); - done(); - }, - complete: () => done(new Error('Should not complete successfully')), - }); - }); - test('should throw PermissionError for EQL when missing read privileges', (done) => { - const query = createMockQuery(QueryType.EQL); + const execQuery = mkExecV1(QueryType.EQL); const circuitBreaker = createMockCircuitBreaker(true); - mockEsClient.indices.exists.mockResolvedValue(true); mockEsClient.security.hasPrivileges.mockResolvedValue({ has_all_requested: false }); - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }).subscribe({ next: () => {}, error: (error) => { expect(error).toBeInstanceOf(PermissionError); @@ -617,32 +636,13 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); }); - test('should throw PermissionError for ESQL when index does not exist', (done) => { - const query = createMockQuery(QueryType.ESQL, { query: 'stats count() by user.name' }); - const circuitBreaker = createMockCircuitBreaker(true); - - mockEsClient.indices.exists.mockResolvedValue(false); - - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ - next: () => {}, - error: (error) => { - expect(error).toBeInstanceOf(PermissionError); - expect(error.message).toContain('Index does not exist'); - expect(mockEsClient.helpers.esql).not.toHaveBeenCalled(); - done(); - }, - complete: () => done(new Error('Should not complete successfully')), - }); - }); - test('should throw PermissionError for ESQL when missing read privileges', (done) => { - const query = createMockQuery(QueryType.ESQL, { query: 'stats count() by user.name' }); + const execQuery = mkExecV1(QueryType.ESQL, { query: 'stats count() by user.name' }); const circuitBreaker = createMockCircuitBreaker(true); - mockEsClient.indices.exists.mockResolvedValue(true); mockEsClient.security.hasPrivileges.mockResolvedValue({ has_all_requested: false }); - queryExecutor.search({ query, circuitBreakers: [circuitBreaker] }).subscribe({ + queryExecutor.search({ query: execQuery, circuitBreakers: [circuitBreaker] }).subscribe({ next: () => {}, error: (error) => { expect(error).toBeInstanceOf(PermissionError); @@ -657,8 +657,8 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx describe('indicesFor method', () => { test('should return original index when no tiers are specified', async () => { - const query = createMockQuery(QueryType.DSL); - const result = await queryExecutor.indicesFor(query); + const execQuery = mkExecV1(QueryType.DSL); + const result = await queryExecutor.indicesFor(execQuery); expect(result).toEqual(['test-index']); expect(mockEsClient.ilm.explainLifecycle).not.toHaveBeenCalled(); }); @@ -669,7 +669,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx true, mockLogger ); - const query = createMockQuery(QueryType.DSL); + const query = mkExecV1(QueryType.DSL); const result = await serverlessExecutor.indicesFor(query); expect(result).toEqual(['test-index']); expect(mockEsClient.ilm.explainLifecycle).not.toHaveBeenCalled(); @@ -681,14 +681,14 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx true, mockLogger ); - const query = createMockQuery(QueryType.DSL, { tiers: ['hot', 'warm'] }); + const query = mkExecV1(QueryType.DSL, { tiers: ['hot', 'warm'] }); const result = await serverlessExecutor.indicesFor(query); expect(result).toEqual(['test-index']); expect(mockEsClient.ilm.explainLifecycle).not.toHaveBeenCalled(); }); test('should filter indices by tiers when ILM is available', async () => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot', 'warm'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot', 'warm'] }); mockEsClient.ilm.explainLifecycle.mockResolvedValue({ indices: { @@ -699,7 +699,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }, }); - const result = await queryExecutor.indicesFor(query); + const result = await queryExecutor.indicesFor(execQuery); expect(result).toEqual(['test-index-000001', 'test-index-000002', 'test-index-000004']); expect(mockEsClient.ilm.explainLifecycle).toHaveBeenCalledWith({ index: 'test-index', @@ -709,25 +709,25 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }); test('should handle serverless environment (undefined indices)', async () => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot'] }); mockEsClient.ilm.explainLifecycle.mockResolvedValue({ indices: undefined }); - const result = await queryExecutor.indicesFor(query); + const result = await queryExecutor.indicesFor(execQuery); expect(result).toEqual(['test-index']); }); test('should handle empty ILM response', async () => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot'] }); mockEsClient.ilm.explainLifecycle.mockResolvedValue({}); - const result = await queryExecutor.indicesFor(query); + const result = await queryExecutor.indicesFor(execQuery); expect(result).toEqual(['test-index']); }); test('should handle indices without phase information', async () => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot'] }); mockEsClient.ilm.explainLifecycle.mockResolvedValue({ indices: { @@ -737,12 +737,12 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }, }); - const result = await queryExecutor.indicesFor(query); + const result = await queryExecutor.indicesFor(execQuery); expect(result).toEqual(['test-index-000001']); }); test('should filter out indices not in specified tiers', async () => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot'] }); mockEsClient.ilm.explainLifecycle.mockResolvedValue({ indices: { @@ -752,34 +752,34 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }, }); - const result = await queryExecutor.indicesFor(query); + const result = await queryExecutor.indicesFor(execQuery); expect(result).toEqual(['test-index-000001']); }); test('should handle ILM API errors by falling back to original index', async () => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot'] }); const serverlessError = new Error( 'no handler found for uri [/.alerts-security.alerts*/_ilm/explain?only_managed=false&filter_path=indices.*.phase] and method [GET]' ); mockEsClient.ilm.explainLifecycle.mockRejectedValue(serverlessError); - const result = await queryExecutor.indicesFor(query); + const result = await queryExecutor.indicesFor(execQuery); expect(result).toEqual(['test-index']); }); test('should handle authorization errors gracefully', async () => { - const query = createMockQuery(QueryType.DSL, { tiers: ['hot'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['hot'] }); const authError = new Error('security_exception'); mockEsClient.ilm.explainLifecycle.mockRejectedValue(authError); - const result = await queryExecutor.indicesFor(query); + const result = await queryExecutor.indicesFor(execQuery); expect(result).toEqual(['test-index']); }); test('should return empty array when no indices match tiers', async () => { - const query = createMockQuery(QueryType.DSL, { tiers: ['frozen'] }); + const execQuery = mkExecV1(QueryType.DSL, { tiers: ['frozen'] }); mockEsClient.ilm.explainLifecycle.mockResolvedValue({ indices: { @@ -789,7 +789,7 @@ describe('Security Solution - Health Diagnostic Queries - CircuitBreakingQueryEx }, }); - const result = await queryExecutor.indicesFor(query); + const result = await queryExecutor.indicesFor(execQuery); expect(result).toEqual([]); }); });
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_receiver.ts+155 −122 modified@@ -13,6 +13,7 @@ import { EMPTY, from, merge, + throwError, type Observable, takeUntil, map, @@ -33,9 +34,10 @@ import { type CircuitBreakerResult, } from './health_diagnostic_circuit_breakers.types'; import { - type HealthDiagnosticQuery, QueryType, PermissionError, + type IntegrationResolution, + type ExecutableQuery, } from './health_diagnostic_service.types'; import type { TelemetryLogger } from '../telemetry_logger'; import { newTelemetryLogger, withErrorMessage } from '../helpers'; @@ -55,88 +57,86 @@ export class CircuitBreakingQueryExecutorImpl implements CircuitBreakingQueryExe const abortSignal = controller.signal; const circuitBreakers$ = this.configureCircuitBreakers(circuitBreakers, controller); - switch (query.type) { + switch (query.query.type) { case QueryType.DSL: return this.streamDSL<T>(query, abortSignal).pipe(takeUntil(circuitBreakers$)); case QueryType.EQL: return this.streamEql<T>(query, abortSignal).pipe(takeUntil(circuitBreakers$)); case QueryType.ESQL: return this.streamEsql<T>(query, abortSignal).pipe(takeUntil(circuitBreakers$)); default: { - const exhaustiveCheck: never = query.type; + const exhaustiveCheck: never = query.query.type; throw new Error(`Unhandled QueryType: ${exhaustiveCheck}`); } } } - streamEsql<T>(diagnosticQuery: HealthDiagnosticQuery, abortSignal: AbortSignal): Observable<T> { - const regex = /^[\s\r\n]*FROM/; + streamEsql<T>(executableQuery: ExecutableQuery, abortSignal: AbortSignal): Observable<T> { + const { query } = executableQuery; - return from(this.checkPermissions(diagnosticQuery.index)).pipe( - mergeMap(() => - from(this.indicesFor(diagnosticQuery)).pipe( - mergeMap((index) => { - const query = regex.test(diagnosticQuery.query) - ? diagnosticQuery.query - : `FROM ${index} | ${diagnosticQuery.query}`; + if (query.version === 2 && /^[\s\r\n]*FROM/i.test(query.query)) { + // never should fail here since we already manage this scenario in the resolver, but just in case, we put this guard to + // avoid running potentially unsafe queries + return throwError( + () => + new Error( + 'v2 ESQL descriptors must not contain a FROM clause; use the integrations field to target indices' + ) + ); + } + + const regex = /^[\s\r\n]*FROM/i; + const originalIndices = this.originalIndicesFor(executableQuery); + return from(this.checkPermissions(originalIndices)).pipe( + mergeMap(() => from(this.indicesFor(executableQuery))), + mergeMap((indices) => + from( + indices.map((index) => { + const esqlQuery = regex.test(query.query) + ? query.query + : `FROM ${index} | ${query.query}`; return from( - this.client.helpers.esql({ query }, { signal: abortSignal }).toRecords() - ).pipe( - mergeMap((resp) => { - return resp.records.map((r) => r as T); - }) - ); + this.client.helpers.esql({ query: esqlQuery }, { signal: abortSignal }).toRecords() + ).pipe(mergeMap((resp) => resp.records.map((r) => r as T))); }) - ) + ).pipe(mergeMap((obs) => obs)) ) ); } - streamEql<T>(diagnosticQuery: HealthDiagnosticQuery, abortSignal: AbortSignal): Observable<T> { - return from(this.checkPermissions(diagnosticQuery.index)).pipe( - mergeMap(() => - from(this.indicesFor(diagnosticQuery)).pipe( - mergeMap((index) => { - const request: EqlSearchRequest = { - index, - query: diagnosticQuery.query, - size: diagnosticQuery.size, - }; + streamEql<T>(executableQuery: ExecutableQuery, abortSignal: AbortSignal): Observable<T> { + const { query } = executableQuery; + const originalIndices = this.originalIndicesFor(executableQuery); - return from(this.client.eql.search(request, { signal: abortSignal })).pipe( - mergeMap((resp) => { - if (resp.hits.events) { - return resp.hits.events.map((h) => h._source as T); - } else if (resp.hits.sequences) { - return resp.hits.sequences.map((seq) => seq.events.map((h) => h._source) as T); - } else { - this.logger.warn( - '>> Neither hits.events nor hits.sequences found in the response for query', - { queryName: diagnosticQuery.name } as LogMeta - ); - return []; - } - }) - ); + return from(this.checkPermissions(originalIndices)).pipe( + mergeMap(() => from(this.indicesFor(executableQuery))), + mergeMap((indices) => { + const request: EqlSearchRequest = { + index: indices, + query: query.query, + size: query.size, + }; + + return from(this.client.eql.search(request, { signal: abortSignal })).pipe( + mergeMap((resp) => { + if (resp.hits.events) { + return resp.hits.events.map((h) => h._source as T); + } else if (resp.hits.sequences) { + return resp.hits.sequences.map((seq) => seq.events.map((h) => h._source) as T); + } else { + this.logger.warn('>> Neither hits.events nor hits.sequences found', { + queryName: query.name, + } as LogMeta); + return []; + } }) - ) - ) + ); + }) ); } private async checkPermissions(index: Indices) { - let exists = false; - try { - exists = await this.client.indices.exists({ index, allow_no_indices: false }); - } catch (e) { - throw new PermissionError(`Error accessing index: ${e}`); - } - - if (!exists) { - throw new PermissionError('Index does not exist'); - } - try { const res = await this.client.security.hasPrivileges({ index: [ @@ -155,73 +155,72 @@ export class CircuitBreakingQueryExecutorImpl implements CircuitBreakingQueryExe } streamDSL<T>( - diagnosticQuery: HealthDiagnosticQuery, + executableQuery: ExecutableQuery, abortSignal: AbortSignal, pitKeepAlive: string = '1m' ): Observable<T> { + const { query } = executableQuery; + const pageSize = query.size ?? 10000; + const parsedQuery: SearchRequest = JSON.parse(query.query) as SearchRequest; + const originalIndices = this.originalIndicesFor(executableQuery); + let pitId: string; let searchAfter: SortResults | undefined; - const pageSize = diagnosticQuery.size ?? 10000; - - const query: SearchRequest = JSON.parse(diagnosticQuery.query) as SearchRequest; const fetchPage = () => { const paginatedRequest: SearchRequest = { size: pageSize, sort: [{ _shard_doc: 'asc' }], search_after: searchAfter, pit: { id: pitId, keep_alive: pitKeepAlive }, - ...query, + ...parsedQuery, }; return this.client.search<T>(paginatedRequest, { signal: abortSignal }); }; - return from(this.checkPermissions(diagnosticQuery.index)).pipe( - mergeMap(() => - from(this.indicesFor(diagnosticQuery)).pipe( - mergeMap((index) => - from(this.client.openPointInTime({ index, keep_alive: pitKeepAlive })) - ), - map((res) => res.id), + return from(this.checkPermissions(originalIndices)).pipe( + mergeMap(() => from(this.indicesFor(executableQuery))), + mergeMap((indices) => + from(this.client.openPointInTime({ index: indices, keep_alive: pitKeepAlive })) + ), + map((res) => res.id), - mergeMap((id) => { - pitId = id; - return from(fetchPage()); - }), - expand((searchResponse) => { - const returnedPitId = (searchResponse as { pit_id?: string }).pit_id; - if (returnedPitId) { - pitId = returnedPitId; - } + mergeMap((id) => { + pitId = id; + return from(fetchPage()); + }), + expand((searchResponse) => { + const returnedPitId = (searchResponse as { pit_id?: string }).pit_id; + if (returnedPitId) { + pitId = returnedPitId; + } - const hits = searchResponse.hits.hits; - const aggrs = searchResponse.aggregations; + const hits = searchResponse.hits.hits; + const aggrs = searchResponse.aggregations; - if (aggrs || hits.length === 0) { - return EMPTY; - } + if (aggrs || hits.length === 0) { + return EMPTY; + } - searchAfter = hits[hits.length - 1].sort; - return from(fetchPage()); - }), + searchAfter = hits[hits.length - 1].sort; + return from(fetchPage()); + }), - mergeMap((searchResponse) => { - if (searchResponse.aggregations) { - return [searchResponse.aggregations as T]; - } else { - return searchResponse.hits.hits.map((h) => h._source as T); - } - }), + mergeMap((searchResponse) => { + if (searchResponse.aggregations) { + return [searchResponse.aggregations as T]; + } else { + return searchResponse.hits.hits.map((h) => h._source as T); + } + }), - finalize(() => { - if (pitId !== undefined) { - this.client.closePointInTime({ id: pitId }).catch((error) => { - this.logger.warn('>> closePointInTime error', withErrorMessage(error)); - }); - } - }) - ) - ) + finalize(() => { + if (pitId !== undefined) { + this.client.closePointInTime({ id: pitId }).catch((error) => { + this.logger.warn('>> closePointInTime error', withErrorMessage(error)); + }); + } + }) ); } @@ -247,43 +246,64 @@ export class CircuitBreakingQueryExecutorImpl implements CircuitBreakingQueryExe /** * Returns the list of indices to query based on the provided tiers. - * When running in serverless or `query.index` is not managed by an ILM, returns - * the same `query.index`. - * - * @param query The health diagnostic query object. - * @returns A Promise resolving to an array of indices. + * Dispatches on query version: v1 uses `query.index`, v2 uses resolved indices from Fleet. + * When running in serverless or the index is not managed by ILM, returns the base indices as-is. */ - async indicesFor(query: HealthDiagnosticQuery): Promise<string[]> { - if (query.tiers === undefined) { - this.logger.debug('No tiers defined in the query, returning index as is', { + async indicesFor(executableQuery: ExecutableQuery): Promise<string[]> { + const { query } = executableQuery; + const tiers = query.tiers; + + let baseIndices: string[]; + if (query.version === 1) { + baseIndices = [query.index]; + this.logger.debug('Using index from v1 query', { queryName: query.name } as LogMeta); + } else if ('index' in query && query.index) { + baseIndices = [query.index as string]; + this.logger.trace('Using index from v2 query', { queryName: query.name } as LogMeta); + } else { + const v2Query = executableQuery as Extract< + ExecutableQuery, + { resolution: IntegrationResolution } + >; + baseIndices = v2Query.resolution.indices; + this.logger.debug('Using resolved indices from v2 query', { queryName: query.name, + count: baseIndices.length, } as LogMeta); - return [query.index]; } + if (this.isServerless) { this.logger.debug('Running in serverless, returning index as is', { queryName: query.name, } as LogMeta); - return [query.index]; + return baseIndices; } - const tiers = query.tiers; + if (tiers === undefined || baseIndices.length === 0) { + this.logger.debug('No tiers defined or no base indices, returning as-is', { + queryName: query.name, + } as LogMeta); + return baseIndices; + } + const tiered = await Promise.all(baseIndices.map((index) => this.filterByTier(index, tiers))); + return tiered.flat().filter((index) => index !== ''); + } + + private async filterByTier(index: string, tiers: string[]): Promise<string[]> { return this.client.ilm .explainLifecycle({ - index: query.index, + index, only_managed: false, filter_path: ['indices.*.phase'], }) .then((response) => { if (response.indices === undefined) { this.logger.debug( 'Got an empty response while explaining lifecycle. Asumming serverless.', - { - index: query.index, - } as LogMeta + { index } as LogMeta ); - return [query.index]; + return [index]; } else { const indices = Object.entries(response.indices).map(([indexName, stats]) => { if ('phase' in stats && stats.phase) { @@ -298,7 +318,6 @@ export class CircuitBreakingQueryExecutorImpl implements CircuitBreakingQueryExe return ''; } } else { - // should not happen, but just in case this.logger.debug('Index is not managed by an ILM', { index: indexName, tiers, @@ -307,8 +326,8 @@ export class CircuitBreakingQueryExecutorImpl implements CircuitBreakingQueryExe } }); this.logger.debug('Indices managed by ILM', { - queryName: query.name, - tiers: query.tiers, + index, + tiers, indices, } as LogMeta); return indices; @@ -322,7 +341,21 @@ export class CircuitBreakingQueryExecutorImpl implements CircuitBreakingQueryExe 'Error while checking ILM status, assuming serverless', withErrorMessage(error) ); - return [query.index]; + return [index]; }); } + + // Returns the "pre-ILM" index/datastream patterns used for permission checking only. + // v1: the literal `index` field; v2: the Fleet-resolved datastream names. + // Permissions are granted against these names, NOT the backing .ds-* indices. + private originalIndicesFor(executableQuery: ExecutableQuery): string[] { + if (executableQuery.query.version === 1) { + return [executableQuery.query.index]; + } + if ('index' in executableQuery.query && executableQuery.query.index) { + return [executableQuery.query.index as string]; + } + const v2 = executableQuery as Extract<ExecutableQuery, { resolution: IntegrationResolution }>; + return v2.resolution.indices; + } }
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_receiver.types.ts+2 −12 modified@@ -7,20 +7,10 @@ import type { Observable } from 'rxjs'; import type { CircuitBreaker } from './health_diagnostic_circuit_breakers.types'; -import { type HealthDiagnosticQuery } from './health_diagnostic_service.types'; +import type { ExecutableQuery } from './health_diagnostic_service.types'; -/** - * Configuration for executing a search query, including any associated circuit breakers. - */ export interface QueryConfig { - /** - * The Elasticsearch query to execute. - */ - query: HealthDiagnosticQuery; - - /** - * A list of circuit breakers that must pass validation while the query is executed. - */ + query: ExecutableQuery; circuitBreakers: CircuitBreaker[]; }
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_service.test.ts+203 −5 modified@@ -7,6 +7,7 @@ import { of, throwError, from } from 'rxjs'; import type { ElasticsearchClient, AnalyticsServiceStart, Logger } from '@kbn/core/server'; +import type { PackageService } from '@kbn/fleet-plugin/server'; import { HealthDiagnosticServiceImpl } from './health_diagnostic_service'; import { CircuitBreakingQueryExecutorImpl } from './health_diagnostic_receiver'; import { ValidationError } from './health_diagnostic_circuit_breakers.types'; @@ -22,7 +23,9 @@ import { createMockQueryExecutor, createMockDocument, createMockArtifactData, + createMockPackageService, } from './__mocks__'; +import { TELEMETRY_HEALTH_DIAGNOSTIC_QUERY_STATS_EVENT } from '../event_based/events'; jest.mock('./health_diagnostic_receiver'); jest.mock('../artifact'); @@ -39,6 +42,7 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic let mockAnalytics: jest.Mocked<AnalyticsServiceStart>; let mockTelemetryConfigProvider: jest.Mocked<TelemetryConfigProvider>; let mockQueryExecutor: jest.Mocked<CircuitBreakingQueryExecutorImpl>; + let mockPackageService: ReturnType<typeof createMockPackageService>; const mockDocument = createMockDocument(); @@ -49,6 +53,7 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic mockAnalytics = createMockAnalytics(); mockTelemetryConfigProvider = createMockTelemetryConfigProvider(); mockQueryExecutor = createMockQueryExecutor(); + mockPackageService = createMockPackageService([]); MockedCircuitBreakingQueryExecutorImpl.mockImplementation(() => mockQueryExecutor); service = new HealthDiagnosticServiceImpl(mockLogger); @@ -63,9 +68,10 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic const startService = async () => { await service.start({ taskManager: mockTaskManager, - esClient: mockEsClient, + esClient: mockEsClient as unknown as ElasticsearchClient, analytics: mockAnalytics, telemetryConfigProvider: mockTelemetryConfigProvider, + packageService: mockPackageService as unknown as PackageService, }); }; @@ -91,6 +97,8 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic expect(result[0]).toMatchObject({ name: 'test-query', passed: true, + status: 'success', + descriptorVersion: 1, numDocs: 1, fieldNames: expect.arrayContaining(['@timestamp', 'user.name', 'event.action']), }); @@ -133,6 +141,102 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic expect(mockQueryExecutor.search).not.toHaveBeenCalled(); }); + describe('query attribute filtering', () => { + test('should emit a skipped stat for queries with unrecognised versions', async () => { + (artifactService.getArtifact as jest.Mock).mockResolvedValue({ + data: `--- +id: unknown-version-query +name: unknown-version-query +version: 99 +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`, + }); + + const result = await service.runHealthDiagnosticQueries({}); + + expect(result).toHaveLength(1); + expect(result[0]).toMatchObject({ + name: 'unknown-version-query', + status: 'skipped', + skipReason: 'parse_failure', + passed: false, + }); + expect(mockQueryExecutor.search).not.toHaveBeenCalled(); + expect(mockAnalytics.reportEvent).toHaveBeenCalledWith( + TELEMETRY_HEALTH_DIAGNOSTIC_QUERY_STATS_EVENT.eventType, + expect.objectContaining({ status: 'skipped', skipReason: 'parse_failure' }) + ); + }); + + test('should emit a skipped stat for queries missing the enabled attribute', async () => { + (artifactService.getArtifact as jest.Mock).mockResolvedValue({ + data: `--- +id: no-enabled-query +name: no-enabled-query +index: test-index +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep`, + }); + + const result = await service.runHealthDiagnosticQueries({}); + + expect(result).toHaveLength(1); + expect(result[0]).toMatchObject({ + status: 'skipped', + skipReason: 'parse_failure', + passed: false, + }); + expect(mockQueryExecutor.search).not.toHaveBeenCalled(); + }); + + test('should execute valid queries and emit skipped stats for unknown-version queries', async () => { + (artifactService.getArtifact as jest.Mock).mockResolvedValue({ + data: `--- +id: valid-query-1 +name: valid-query-1 +index: test-index +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true +--- +id: unknown-version-query +name: unknown-version-query +version: 99 +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`, + }); + + mockQueryExecutor.search.mockReturnValue(of(mockDocument)); + + const result = await service.runHealthDiagnosticQueries({}); + + expect(result).toHaveLength(2); + const validResult = result.find((r) => r.name === 'valid-query-1'); + const unknownResult = result.find((r) => r.name === 'unknown-version-query'); + expect(validResult).toMatchObject({ status: 'success', passed: true }); + expect(unknownResult).toMatchObject({ + status: 'skipped', + skipReason: 'parse_failure', + passed: false, + }); + expect(mockQueryExecutor.search).toHaveBeenCalledTimes(1); + }); + }); + test('should include circuit breaker stats in successful execution', async () => { const lastExecutionByQuery = { 'test-query': 1640995200000 }; mockQueryExecutor.search.mockReturnValue(of(mockDocument)); @@ -157,6 +261,27 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic ); }); + test('should return failed stat when search() throws synchronously instead of rejecting the batch', async () => { + await startService(); + + const lastExecutionByQuery = { 'test-query': 1640995200000 }; + const error = new Error('Unhandled QueryType: UNKNOWN'); + mockQueryExecutor.search.mockImplementation(() => { + throw error; + }); + + const result = await service.runHealthDiagnosticQueries(lastExecutionByQuery); + + expect(result).toHaveLength(1); + expect(result[0]).toMatchObject({ + name: 'test-query', + passed: false, + status: 'failed', + failure: { message: 'Unhandled QueryType: UNKNOWN' }, + }); + expect(mockLogger.warn).toHaveBeenCalledWith('Error running query', expect.any(Object)); + }); + test('should handle query execution errors', async () => { await startService(); @@ -170,12 +295,14 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic expect(result[0]).toMatchObject({ name: 'test-query', passed: false, + status: 'failed', + descriptorVersion: 1, failure: { message: 'Query execution failed', reason: undefined, }, }); - expect(mockLogger.error).toHaveBeenCalledWith( + expect(mockLogger.warn).toHaveBeenCalledWith( 'Error running query', expect.objectContaining({ error }) ); @@ -207,7 +334,7 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic }); }); - test('should log info (not error) for PermissionError', async () => { + test('should log debug (not warn) for PermissionError', async () => { await startService(); const lastExecutionByQuery = { 'test-query': 1640995200000 }; @@ -225,11 +352,11 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic reason: undefined, }, }); - expect(mockLogger.info).toHaveBeenCalledWith( + expect(mockLogger.debug).toHaveBeenCalledWith( 'Permission error running query.', expect.objectContaining({ error: permissionError }) ); - expect(mockLogger.error).not.toHaveBeenCalled(); + expect(mockLogger.warn).not.toHaveBeenCalled(); }); test('should handle artifact service errors gracefully', async () => { @@ -334,6 +461,77 @@ describe('Security Solution - Health Diagnostic Queries - HealthDiagnosticServic }) ); }); + + it('reports per-integration stats for a successful v2 query', async () => { + mockPackageService.asInternalUser.getPackages.mockResolvedValue([ + { + name: 'endpoint', + version: '8.14.2', + status: 'installed', + data_streams: [ + { dataset: 'endpoint.events.process', type: 'logs' }, + { dataset: 'endpoint.events.network', type: 'traces' }, + ], + }, + ]); + + (artifactService.getArtifact as jest.Mock).mockResolvedValue({ + data: `--- +id: test-query-v2 +name: test-query-v2 +version: 2 +integrations: endpoint +datastreamTypes: logs +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`, + }); + + mockQueryExecutor.search.mockReturnValue(of(mockDocument)); + + const result = await service.runHealthDiagnosticQueries({}); + + expect(result).toHaveLength(1); + expect(result[0].integration).toMatchObject({ + name: 'endpoint', + version: '8.14.2', + indices: ['logs-endpoint.events.process-*'], + }); + }); + + it('emits skipped stats EBT when integration is not installed', async () => { + mockPackageService.asInternalUser.getPackages.mockResolvedValue([]); + + // Set up artifact with a v2 query descriptor (integrations-based) + (artifactService.getArtifact as jest.Mock).mockResolvedValue({ + data: `--- +id: test-query-v2 +name: test-query-v2 +version: 2 +integrations: endpoint.* +type: DSL +query: '{"query": {"match_all": {}}}' +scheduleCron: 5m +filterlist: + user.name: keep +enabled: true`, + }); + + await service.runHealthDiagnosticQueries({}); + + expect(mockAnalytics.reportEvent).toHaveBeenCalledWith( + TELEMETRY_HEALTH_DIAGNOSTIC_QUERY_STATS_EVENT.eventType, + expect.objectContaining({ + status: 'skipped', + skipReason: 'integration_not_installed', + passed: false, + }) + ); + expect(mockQueryExecutor.search).not.toHaveBeenCalled(); + }); }); describe('telemetry opt-out behavior', () => {
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_service.ts+165 −107 modified@@ -5,8 +5,9 @@ * 2.0. */ +import { randomUUID } from 'crypto'; import { schema } from '@kbn/config-schema'; -import { bufferCount, defaultIfEmpty, from, mergeMap, take, tap } from 'rxjs'; +import { bufferCount, defaultIfEmpty, defer, from, mergeMap, take, tap } from 'rxjs'; import { cloneDeep } from 'lodash'; import type { TaskManagerSetupContract, @@ -21,19 +22,22 @@ import type { } from '@kbn/core/server'; import { PermissionError, + type ExecutableQuery, + type SkippedQuery, type HealthDiagnosticQuery, type HealthDiagnosticQueryStats, type HealthDiagnosticService, type HealthDiagnosticServiceSetup, type HealthDiagnosticServiceStart, + type ParseFailureQuery, } from './health_diagnostic_service.types'; import { emptyStat as queryStat, fieldNames, shouldExecute as isDueForExecution, - parseDiagnosticQueries, applyFilterlist, } from './health_diagnostic_utils'; +import { parseHealthDiagnosticQueries } from './health_diagnostic_query_parser'; import { type CircuitBreaker, ValidationError } from './health_diagnostic_circuit_breakers.types'; import type { CircuitBreakingQueryExecutor } from './health_diagnostic_receiver.types'; import { CircuitBreakingQueryExecutorImpl } from './health_diagnostic_receiver'; @@ -50,12 +54,16 @@ import { EventLoopUtilizationCircuitBreaker } from './circuit_breakers/event_loo import { EventLoopDelayCircuitBreaker } from './circuit_breakers/event_loop_delay_circuit_breaker'; import { ElasticsearchCircuitBreaker } from './circuit_breakers/elastic_search_circuit_breaker'; import type { TelemetryConfigProvider } from '../../../../common/telemetry_config/telemetry_config_provider'; +import { + IntegrationResolverImpl, + type IntegrationResolver, +} from './health_diagnostic_integration_resolver'; const TASK_TYPE = 'security:health-diagnostic'; const TASK_ID = `${TASK_TYPE}:1.0.0`; const INTERVAL = '1h'; const TIMEOUT = '10m'; -const QUERY_ARTIFACT_ID = 'health-diagnostic-queries-v1'; +const QUERY_ARTIFACT_ID = 'health-diagnostic-queries-v2'; export class HealthDiagnosticServiceImpl implements HealthDiagnosticService { private readonly salt = 'c2a5d101-d0ef-49cc-871e-6ee55f9546f8'; @@ -65,6 +73,7 @@ export class HealthDiagnosticServiceImpl implements HealthDiagnosticService { private analytics?: AnalyticsServiceStart; private _esClient?: ElasticsearchClient; private telemetryConfigProvider?: TelemetryConfigProvider; + private integrationResolver?: IntegrationResolver; private isServerless = false; constructor(logger: Logger) { @@ -90,6 +99,7 @@ export class HealthDiagnosticServiceImpl implements HealthDiagnosticService { this.analytics = start.analytics; this._esClient = start.esClient; this.telemetryConfigProvider = start.telemetryConfigProvider; + this.integrationResolver = new IntegrationResolverImpl(start.packageService, this.logger); await this.scheduleTask(start.taskManager); } @@ -101,123 +111,159 @@ export class HealthDiagnosticServiceImpl implements HealthDiagnosticService { this.logger.debug('Skipping health diagnostic task because telemetry is disabled'); return []; } - this.logger.debug('Running health diagnostic task'); - - const queriesToRun = await this.getRunnableHealthQueries(lastExecutionByQuery, new Date()); - const statistics: HealthDiagnosticQueryStats[] = []; - if (this.queryExecutor === undefined) { - this.logger.warn('CircuitBreakingQueryExecutor service is not started'); - return statistics; + if (!this.queryExecutor || !this.integrationResolver) { + this.logger.warn('Service is not fully started'); + return []; } - this.logger.debug('About to run health diagnostic queries', { - queriesToRun: queriesToRun.length, - } as LogMeta); - - for (const query of queriesToRun) { - const now = new Date(); - const circuitBreakers = this.buildCircuitBreakers(); - const options = { query, circuitBreakers }; - - const query$ = this.queryExecutor.search(options); - - const stats = await new Promise<HealthDiagnosticQueryStats>((resolve) => { - const queryStats: HealthDiagnosticQueryStats = queryStat(query.name, now); - let currentPage = 0; - - query$ - .pipe( - // cap the result set to the max number of documents - take(telemetryConfiguration.health_diagnostic_config.query.maxDocuments), + const queries = await this.getRunnableHealthQueries(lastExecutionByQuery, new Date()); + const resolved = await this.integrationResolver.resolve(queries); + this.logger.trace('About to run queries', { numQueries: queries.length } as LogMeta); + const statistics: HealthDiagnosticQueryStats[] = []; - // get the fields names, only once (assume all docs have the same structure) - tap((doc) => { - if (queryStats.fieldNames.length === 0) { - queryStats.fieldNames = fieldNames(doc); - } - }), + for (const resolvedQuery of resolved) { + this.logger.trace('About to execute health diagnostic query', { + name: resolvedQuery.query.name, + } as LogMeta); + let stats: HealthDiagnosticQueryStats; - // publish N documents in the same EBT - bufferCount(telemetryConfiguration.health_diagnostic_config.query.bufferSize), - - // emit empty array if no items were buffered (ensures EBT is always sent) - defaultIfEmpty([]), - - // apply filterlist - mergeMap((result) => - from( - applyFilterlist( - result, - query.filterlist, - this.salt, - query, - telemetryConfiguration.encryption_public_keys - ) - ) - ) - ) - .subscribe({ - next: (data) => { - this.logger.debug('Sending query result EBT', { - queryName: query.name, - traceId: queryStats.traceId, - } as LogMeta); - - this.reportEBT(TELEMETRY_HEALTH_DIAGNOSTIC_QUERY_RESULT_EVENT, { - name: query.name, - queryId: query.id, - traceId: queryStats.traceId, - page: currentPage++, - data, - }); - - queryStats.numDocs += data.length; - }, - error: (error) => { - const failure = { - message: error.message, - reason: error instanceof ValidationError ? error.result : undefined, - }; - if (error instanceof PermissionError) { - this.logger.info('Permission error running query.', withErrorMessage(error)); - } else { - this.logger.error('Error running query', withErrorMessage(error)); - } - resolve({ - ...queryStats, - failure, - finished: new Date().toISOString(), - circuitBreakers: this.circuitBreakersStats(circuitBreakers), - passed: false, - }); - }, - complete: () => { - resolve({ - ...queryStats, - finished: new Date().toISOString(), - circuitBreakers: this.circuitBreakersStats(circuitBreakers), - passed: true, - }); - }, - }); - }); + if (resolvedQuery.kind === 'skipped') { + stats = this.buildSkippedStats(resolvedQuery); + } else { + stats = await this.executeQuery(resolvedQuery); + } this.logger.debug('Query executed. Sending query stats EBT', { - queryName: query.name, + queryName: resolvedQuery.query.name, traceId: stats.traceId, } as LogMeta); this.reportEBT(TELEMETRY_HEALTH_DIAGNOSTIC_QUERY_STATS_EVENT, stats); - statistics.push(stats); } - this.logger.debug('Finished running health diagnostic task'); - return statistics; } + private buildSkippedStats(skipped: SkippedQuery): HealthDiagnosticQueryStats { + const { query } = skipped; + const name = query.name ? query.name : query.id ?? 'unknown'; + const now = new Date(); + return { + name, + started: now.toISOString(), + finished: now.toISOString(), + traceId: randomUUID(), + numDocs: 0, + passed: false, + fieldNames: [], + descriptorVersion: 'version' in query ? query.version : 0, + status: 'skipped', + skipReason: skipped.reason, + }; + } + + private async executeQuery( + executableQuery: ExecutableQuery + ): Promise<HealthDiagnosticQueryStats> { + const { query } = executableQuery; + const now = new Date(); + const circuitBreakers = this.buildCircuitBreakers(); + const options = { query: executableQuery, circuitBreakers }; + + if (!this.queryExecutor) { + throw new Error('queryExecutor is unavailable'); + } + const executor = this.queryExecutor; + const query$ = defer(() => executor.search(options)); + + return new Promise<HealthDiagnosticQueryStats>((resolve) => { + const queryStats: HealthDiagnosticQueryStats = queryStat(query.name, now, query.version); + let currentPage = 0; + + query$ + .pipe( + // cap the result set to the max number of documents + take(telemetryConfiguration.health_diagnostic_config.query.maxDocuments), + + // get the fields names, only once (assume all docs have the same structure) + tap((doc) => { + if (queryStats.fieldNames.length === 0) { + queryStats.fieldNames = fieldNames(doc); + } + }), + + // publish N documents in the same EBT + bufferCount(telemetryConfiguration.health_diagnostic_config.query.bufferSize), + + // emit empty array if no items were buffered (ensures EBT is always sent) + defaultIfEmpty([]), + + // apply filterlist + mergeMap((result) => + from( + applyFilterlist( + result, + executableQuery.query.filterlist, + this.salt, + executableQuery.query, + telemetryConfiguration.encryption_public_keys + ) + ) + ) + ) + .subscribe({ + next: (data) => { + this.logger.debug('Sending query result EBT', { + queryName: query.name, + traceId: queryStats.traceId, + } as LogMeta); + + this.reportEBT(TELEMETRY_HEALTH_DIAGNOSTIC_QUERY_RESULT_EVENT, { + name: query.name, + queryId: query.id, + traceId: queryStats.traceId, + page: currentPage++, + data, + }); + + queryStats.numDocs += data.length; + }, + error: (error) => { + const failure = { + message: error.message, + reason: error instanceof ValidationError ? error.result : undefined, + }; + if (error instanceof PermissionError) { + this.logger.debug('Permission error running query.', withErrorMessage(error)); + } else { + this.logger.warn('Error running query', withErrorMessage(error)); + } + resolve({ + ...queryStats, + failure, + finished: new Date().toISOString(), + circuitBreakers: this.circuitBreakersStats(circuitBreakers), + passed: false, + status: 'failed', + integration: 'resolution' in executableQuery ? executableQuery.resolution : undefined, + }); + }, + complete: () => { + resolve({ + ...queryStats, + finished: new Date().toISOString(), + circuitBreakers: this.circuitBreakersStats(circuitBreakers), + passed: true, + status: 'success', + integration: 'resolution' in executableQuery ? executableQuery.resolution : undefined, + }); + }, + }); + }); + } + private circuitBreakersStats(circuitBreakers: CircuitBreaker[]): Record<string, unknown> { return circuitBreakers.reduce((acc, cb) => { acc[cb.constructor.name] = cb.stats(); @@ -317,22 +363,34 @@ export class HealthDiagnosticServiceImpl implements HealthDiagnosticService { } } + private isParseFailureQuery(query: HealthDiagnosticQuery): query is ParseFailureQuery { + return '_raw' in query; + } + private async getRunnableHealthQueries( lastExecutionByQuery: Record<string, number>, now: Date ): Promise<HealthDiagnosticQuery[]> { const healthQueries = await this.healthQueries(); return healthQueries.filter((query) => { + this.logger.trace('Evaluating health diagnostic query for execution', { + query: query.name, + } as LogMeta); try { - const { name, scheduleCron, enabled = false } = query; + if (this.isParseFailureQuery(query)) { + // let it pass the filter to send the stats, i.e. this kind of query will be always + // skipped in the execution phase, but we want to report it in the stats with the + // parse failure reason. + return true; + } + const { name, scheduleCron, enabled } = query; const lastExecutedAt = new Date(lastExecutionByQuery[name] ?? 0); - return enabled && isDueForExecution(lastExecutedAt, now, scheduleCron); } catch (error) { this.logger.warn( 'Error processing health query', withErrorMessage(error, { - name: query.name, + name: (query as { name?: string }).name, } as LogMeta) ); return false; @@ -343,7 +401,7 @@ export class HealthDiagnosticServiceImpl implements HealthDiagnosticService { private async healthQueries(): Promise<HealthDiagnosticQuery[]> { try { const artifact = await artifactService.getArtifact(QUERY_ARTIFACT_ID); - return parseDiagnosticQueries(artifact.data); + return parseHealthDiagnosticQueries(artifact.data); } catch (error) { this.logger.warn('Error getting health diagnostic queries', withErrorMessage(error)); return [];
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_service.types.ts+84 −43 modified@@ -10,6 +10,7 @@ import type { TaskManagerSetupContract, TaskManagerStartContract, } from '@kbn/task-manager-plugin/server'; +import type { PackageService } from '@kbn/fleet-plugin/server'; import type { CircuitBreakerResult } from './health_diagnostic_circuit_breakers.types'; import type { TelemetryConfigProvider } from '../../../../common/telemetry_config/telemetry_config_provider'; @@ -62,6 +63,7 @@ export interface HealthDiagnosticServiceStart { esClient: ElasticsearchClient; analytics: AnalyticsServiceStart; telemetryConfigProvider: TelemetryConfigProvider; + packageService: PackageService; } export interface HealthDiagnosticService { @@ -83,60 +85,92 @@ export interface HealthDiagnosticQueryConfig { } /** - * Defines a health diagnostic query configuration with scheduling and filtering options. + * Fields shared across all known query descriptor versions. */ -export interface HealthDiagnosticQuery { - /** - * A unique identifier for this query. - */ +export interface HealthDiagnosticQueryBase { id: string; - /** - * A descriptive name for this query. - */ name: string; - /** - * The index pattern on which this query will be executed. - */ - index: string; - /** - * Only include indices in the specified tiers. Note that if the `index` - * hasn't a life cycle management or we are on serverless, this will be - * ignored. - */ - tiers?: string[]; - /** - * Specifies the query type, as defined by the QueryType enum. - */ type: QueryType; - /** - * The query string to be executed against the data store. - */ query: string; - /** - * A cron expression that schedules when the query should be run. - */ scheduleCron: string; - /** - * Optional mapping of dot-separated paths to associated actions for filtering results. - */ filterlist: Record<string, Action>; - /** - * Optional flag indicating whether this query is active and should be executed. - */ - enabled?: boolean; - /** - * Query size - */ + enabled: boolean; size?: number; - /** - * Optional RSA public key identifier used for encrypting fields marked with `encrypt` action - * in the filterlist. Required when the filterlist contains any `encrypt` actions. - * This ID corresponds to keys configured in the plugin-level `encryption_public_keys` map. - * Example: "rsa-keypair-v1-2025-q4" - */ encryptionKeyId?: string; + tiers?: string[]; } +/** + * v1 query descriptor — targets a fixed index pattern. + * Produced when the descriptor has `version: 1` or no version field. + */ +export interface HealthDiagnosticQueryV1 extends HealthDiagnosticQueryBase { + version: 1; + index: string; +} + +/** + * v2 query descriptor: targets integrations matched by regex patterns, + * or a direct index pattern (mutually exclusive with integrations). + * Invariant enforced by parser: exactly one of integrations or index is present. + */ +export interface HealthDiagnosticQueryV2 extends HealthDiagnosticQueryBase { + version: 2; + integrations?: string[]; // regex patterns resolved via Fleet + datastreamTypes?: string[]; // only relevant when integrations is set + index?: string; // alternative to integrations: direct index pattern +} + +/** + * Produced when the parser fails to produce a valid V1 or V2 descriptor — + * either an unrecognised version number or missing required fields. + * Carries the raw data for logging and reporting the stats; always results in + * a skipped execution. + */ +export interface ParseFailureQuery { + id?: string; + name?: string; + _raw: unknown; +} + +export type HealthDiagnosticQuery = + | HealthDiagnosticQueryV1 + | HealthDiagnosticQueryV2 + | ParseFailureQuery; + +/** + * Result of resolving a v2 query's integration patterns against Fleet. + */ +export interface IntegrationResolution { + name: string; + version: string; + indices: string[]; +} + +/** + * A query that has been resolved and is ready for ES execution. + * Version-specific shape is preserved for stats reporting. + */ +export type ExecutableQuery = + | { kind: 'executable'; query: HealthDiagnosticQueryV1 } + | { kind: 'executable'; query: HealthDiagnosticQueryV2; resolution: IntegrationResolution } + | { kind: 'executable'; query: HealthDiagnosticQueryV2 & { index: string } }; + +export type SkipReason = + | 'datastreams_not_matched' + | 'integration_not_installed' + | 'parse_failure' + | 'fleet_unavailable' + | 'unsupported_query'; + +export interface SkippedQuery { + kind: 'skipped'; + query: HealthDiagnosticQuery; + reason: SkipReason; +} + +export type ResolvedQuery = ExecutableQuery | SkippedQuery; + export interface HealthDiagnosticQueryResult { name: string; queryId: string; @@ -146,15 +180,22 @@ export interface HealthDiagnosticQueryResult { } export interface HealthDiagnosticQueryStats { + // existing — unchanged name: string; started: string; finished: string; traceId: string; numDocs: number; + /** Kept for downstream backward compatibility. Derived from `status`. */ passed: boolean; failure?: HealthDiagnosticQueryFailure; fieldNames: string[]; circuitBreakers?: Record<string, unknown>; + // new fields + descriptorVersion: number; + status: 'success' | 'failed' | 'skipped'; + skipReason?: SkipReason; + integration?: IntegrationResolution; } export interface HealthDiagnosticQueryFailure {
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/health_diagnostic_utils.ts+17 −20 modified@@ -6,11 +6,10 @@ */ import { randomUUID } from 'crypto'; -import * as YAML from 'yaml'; import { type Interval, intervalFromDate } from '@kbn/task-manager-plugin/server/lib/intervals'; import { Action, - type HealthDiagnosticQuery, + type HealthDiagnosticQueryBase, type HealthDiagnosticQueryStats, } from './health_diagnostic_service.types'; import { unflatten } from '../helpers'; @@ -22,12 +21,6 @@ export function shouldExecute(startDate: Date, endDate: Date, interval: Interval return nextDate !== undefined && nextDate < endDate; } -export function parseDiagnosticQueries(input: unknown): HealthDiagnosticQuery[] { - return YAML.parseAllDocuments(input as string).map((doc) => { - return doc.toJSON() as HealthDiagnosticQuery; - }); -} - export function fieldNames<T>(documents: T): string[] { const result: Set<string> = new Set(); @@ -52,23 +45,27 @@ export function fieldNames<T>(documents: T): string[] { return Array.from(result); } -export function emptyStat(name: string, now: Date): HealthDiagnosticQueryStats { - return { - name, - started: now.toISOString(), - traceId: randomUUID(), - finished: new Date().toISOString(), - numDocs: 0, - passed: false, - fieldNames: [], - }; -} +export const emptyStat = ( + name: string, + now: Date, + descriptorVersion: number +): HealthDiagnosticQueryStats => ({ + name, + started: now.toISOString(), + traceId: randomUUID(), + finished: new Date().toISOString(), + numDocs: 0, + passed: false, + fieldNames: [], + descriptorVersion, + status: 'failed', +}); export async function applyFilterlist( data: unknown[], rules: Record<string, Action>, salt: string, - query?: Pick<HealthDiagnosticQuery, 'encryptionKeyId'>, + query?: Pick<HealthDiagnosticQueryBase, 'encryptionKeyId'>, encryptionPublicKeys?: Record<string, string> ): Promise<unknown[]> { const filteredResult: unknown[] = [];
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/diagnostic/__mocks__/index.ts+56 −5 modified@@ -8,11 +8,19 @@ import type { AnalyticsServiceStart, Logger } from '@kbn/core/server'; import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; import type { CircuitBreakingQueryExecutorImpl } from '../health_diagnostic_receiver'; -import { QueryType, Action } from '../health_diagnostic_service.types'; +import { + QueryType, + Action, + type HealthDiagnosticQueryV1, + type HealthDiagnosticQueryV2, +} from '../health_diagnostic_service.types'; + +export type { HealthDiagnosticQueryV1, HealthDiagnosticQueryV2 }; import type { TelemetryConfigProvider } from '../../../../../common/telemetry_config/telemetry_config_provider'; export const createMockLogger = (): jest.Mocked<Logger> => ({ + trace: jest.fn(), debug: jest.fn(), info: jest.fn(), warn: jest.fn(), @@ -69,9 +77,6 @@ export const createMockEsClient = () => { ilm: { explainLifecycle: jest.fn(), }, - indices: { - exists: jest.fn(), - }, security: { hasPrivileges: jest.fn(), }, @@ -104,6 +109,53 @@ export const createMockQuery = (type: QueryType, overrides = {}) => ({ ...overrides, }); +export const createMockQueryV1 = ( + type: QueryType, + overrides: Partial<HealthDiagnosticQueryV1> = {} +): HealthDiagnosticQueryV1 => ({ + version: 1, + id: 'test-query-v1', + name: 'test-query-v1', + index: 'test-index', + type, + query: type === QueryType.DSL ? '{"query": {"match_all": {}}}' : 'test query', + scheduleCron: '5m', + filterlist: { 'user.name': Action.KEEP }, + enabled: true, + size: 100, + ...overrides, +}); + +export const createMockQueryV2 = ( + type: QueryType, + overrides: Partial<HealthDiagnosticQueryV2> = {} +): HealthDiagnosticQueryV2 => ({ + version: 2, + id: 'test-query-v2', + name: 'test-query-v2', + integrations: ['endpoint.*'], // already an array — parser split happens at parse time + type, + query: type === QueryType.DSL ? '{"query": {"match_all": {}}}' : 'test query', + scheduleCron: '5m', + filterlist: { 'user.name': Action.KEEP }, + enabled: true, + size: 100, + ...overrides, +}); + +export const createMockPackageService = ( + packages: Array<{ + name: string; + version: string; + status: string; + data_streams?: Array<{ dataset: string; type: string }>; + }> = [] +) => ({ + asInternalUser: { + getPackages: jest.fn().mockResolvedValue(packages), + }, +}); + export const createMockArtifactData = ( overrides: Partial<{ id: string; @@ -167,7 +219,6 @@ export const setupPointInTime = ( ) => { mockEsClient.openPointInTime.mockResolvedValue({ id: pitId }); mockEsClient.closePointInTime.mockResolvedValue({}); - mockEsClient.indices.exists.mockResolvedValue(true); mockEsClient.security.hasPrivileges.mockResolvedValue({ has_all_requested: true }); };
x-pack/solutions/security/plugins/security_solution/server/lib/telemetry/event_based/events.ts+43 −0 modified@@ -1200,6 +1200,49 @@ export const TELEMETRY_HEALTH_DIAGNOSTIC_QUERY_STATS_EVENT: EventTypeOpts<Health description: 'Circuit breaker metrics such as execution time and memory usage.', }, }, + descriptorVersion: { + type: 'integer', + _meta: { + description: 'Version of the query descriptor that produced this event.', + }, + }, + status: { + type: 'keyword', + _meta: { + description: "Execution status: 'success', 'failed', or 'skipped'.", + }, + }, + skipReason: { + type: 'keyword', + _meta: { + optional: true, + description: + "Reason for skipping: 'datastreams_not_matched', 'integration_not_installed' or 'unknown_version'.", + }, + }, + integration: { + properties: { + name: { + type: 'keyword', + _meta: { description: 'Name of the matched integration.' }, + }, + version: { + type: 'keyword', + _meta: { description: 'Version of the matched integration.' }, + }, + indices: { + type: 'array', + items: { + type: 'keyword', + _meta: { description: 'Index patterns for this integration after type filtering.' }, + }, + }, + }, + _meta: { + optional: true, + description: 'Integration resolution metadata. Present only for v2 query descriptors.', + }, + }, }, };
x-pack/solutions/security/plugins/security_solution/server/plugin.ts+1 −0 modified@@ -873,6 +873,7 @@ export class Plugin implements ISecuritySolutionPlugin { analytics: core.analytics, receiver: this.telemetryReceiver, telemetryConfigProvider: this.telemetryConfigProvider, + packageService, }; this.healthDiagnosticService.start(serviceStart).catch((e) => {
80a75d1ae44e[9.3] Fix SSE streaming for Kibana (#257866) (#257918)
3 files changed · +9 −1
package.json+1 −0 modified@@ -78,6 +78,7 @@ }, "resolutions": { "**/@babel/parser": "7.24.7", + "**/@hapi/mimos/mime-db": "1.53.0", "**/@hello-pangea/dnd": "18.0.1", "**/@langchain/core": "0.3.80", "**/@langchain/google-common": "0.2.18",
x-pack/platform/plugins/shared/onechat/server/routes/chat.ts+2 −0 modified@@ -449,6 +449,8 @@ export function registerChatRoutes({ 'Content-Type': cloud?.isCloudEnabled ? 'application/octet-stream' : 'text/event-stream', + // another attempt at disabling compression + 'Content-Encoding': 'identity', 'Cache-Control': 'no-cache', Connection: 'keep-alive', 'Transfer-Encoding': 'chunked',
yarn.lock+6 −1 modified@@ -26054,7 +26054,12 @@ mime-db@1.52.0: resolved "https://registry.yarnpkg.com/mime-db/-/mime-db-1.52.0.tgz#bbabcdc02859f4987301c856e3387ce5ec43bf70" integrity sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg== -"mime-db@>= 1.40.0 < 2", mime-db@^1.52.0, mime-db@^1.54.0: +mime-db@1.53.0, mime-db@^1.52.0: + version "1.53.0" + resolved "https://registry.yarnpkg.com/mime-db/-/mime-db-1.53.0.tgz#3cb63cd820fc29896d9d4e8c32ab4fcd74ccb447" + integrity sha512-oHlN/w+3MQ3rba9rqFr6V/ypF10LSkdwUysQL7GkXoTgIWeV+tcXGA852TBxH+gsh8UWoyhR1hKcoMJTuWflpg== + +"mime-db@>= 1.40.0 < 2", mime-db@^1.54.0: version "1.54.0" resolved "https://registry.yarnpkg.com/mime-db/-/mime-db-1.54.0.tgz#cddb3ee4f9c64530dff640236661d42cb6a314f5" integrity sha512-aU5EJuIN2WDemCcAp2vFBfp/m4EAhWJnUNSSw0ixs7/kXbd6Pg64EmwJkNdFhB8aWt1sH2CTXrLxo/iAGV3oPQ==
Vulnerability mechanics
No source-code context for this CVE — mechanics is only generated when we can read the actual fix diff. Without that, the four sections (root cause, attack vector, affected code, fix) would be speculation rather than analysis.
References
1News mentions
0No linked articles in our index yet.