-
Notifications
You must be signed in to change notification settings - Fork 52
chore: add is_container_env to telemetry MCP-2 #298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a480c94
126994c
bf204bb
3d91b40
fe64649
f73c208
671bbeb
c2a1246
678beee
8d53eea
95b0b06
911fdcb
4703628
931816f
0b60fe2
6ba2346
f4bfbf0
3424e0f
83fd1d9
7e3cdb1
c67877e
814ccb9
1dba800
a0c208c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,114 +7,152 @@ import { MACHINE_METADATA } from "./constants.js"; | |
import { EventCache } from "./eventCache.js"; | ||
import nodeMachineId from "node-machine-id"; | ||
import { getDeviceId } from "@mongodb-js/device-id"; | ||
import fs from "fs/promises"; | ||
|
||
async function fileExists(filePath: string): Promise<boolean> { | ||
try { | ||
await fs.access(filePath, fs.constants.F_OK); | ||
return true; // File exists | ||
} catch (e: unknown) { | ||
if ( | ||
e instanceof Error && | ||
( | ||
e as Error & { | ||
code: string; | ||
} | ||
).code === "ENOENT" | ||
) { | ||
return false; // File does not exist | ||
} | ||
throw e; // Re-throw unexpected errors | ||
} | ||
} | ||
|
||
type EventResult = { | ||
success: boolean; | ||
error?: Error; | ||
}; | ||
async function isContainerized(): Promise<boolean> { | ||
if (process.env.container) { | ||
return true; | ||
} | ||
|
||
const exists = await Promise.all(["/.dockerenv", "/run/.containerenv", "/var/run/.containerenv"].map(fileExists)); | ||
|
||
export const DEVICE_ID_TIMEOUT = 3000; | ||
return exists.includes(true); | ||
} | ||
|
||
export class Telemetry { | ||
private isBufferingEvents: boolean = true; | ||
/** Resolves when the device ID is retrieved or timeout occurs */ | ||
public deviceIdPromise: Promise<string> | undefined; | ||
private deviceIdAbortController = new AbortController(); | ||
private eventCache: EventCache; | ||
private getRawMachineId: () => Promise<string>; | ||
private getContainerEnv: () => Promise<boolean>; | ||
private cachedCommonProperties?: CommonProperties; | ||
private flushing: boolean = false; | ||
|
||
private constructor( | ||
private readonly session: Session, | ||
private readonly userConfig: UserConfig, | ||
private readonly commonProperties: CommonProperties, | ||
{ eventCache, getRawMachineId }: { eventCache: EventCache; getRawMachineId: () => Promise<string> } | ||
{ | ||
eventCache, | ||
getRawMachineId, | ||
getContainerEnv, | ||
}: { | ||
eventCache: EventCache; | ||
getRawMachineId: () => Promise<string>; | ||
getContainerEnv: () => Promise<boolean>; | ||
} | ||
) { | ||
this.eventCache = eventCache; | ||
this.getRawMachineId = getRawMachineId; | ||
this.getContainerEnv = getContainerEnv; | ||
} | ||
|
||
static create( | ||
session: Session, | ||
userConfig: UserConfig, | ||
{ | ||
commonProperties = { ...MACHINE_METADATA }, | ||
eventCache = EventCache.getInstance(), | ||
getRawMachineId = () => nodeMachineId.machineId(true), | ||
getContainerEnv = isContainerized, | ||
}: { | ||
eventCache?: EventCache; | ||
getRawMachineId?: () => Promise<string>; | ||
commonProperties?: CommonProperties; | ||
getContainerEnv?: () => Promise<boolean>; | ||
} = {} | ||
): Telemetry { | ||
const instance = new Telemetry(session, userConfig, commonProperties, { eventCache, getRawMachineId }); | ||
|
||
void instance.start(); | ||
return instance; | ||
} | ||
|
||
private async start(): Promise<void> { | ||
if (!this.isTelemetryEnabled()) { | ||
return; | ||
} | ||
this.deviceIdPromise = getDeviceId({ | ||
getMachineId: () => this.getRawMachineId(), | ||
onError: (reason, error) => { | ||
switch (reason) { | ||
case "resolutionError": | ||
logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", String(error)); | ||
break; | ||
case "timeout": | ||
logger.debug(LogId.telemetryDeviceIdTimeout, "telemetry", "Device ID retrieval timed out"); | ||
break; | ||
case "abort": | ||
// No need to log in the case of aborts | ||
break; | ||
} | ||
}, | ||
abortSignal: this.deviceIdAbortController.signal, | ||
const instance = new Telemetry(session, userConfig, { | ||
eventCache, | ||
getRawMachineId, | ||
getContainerEnv, | ||
}); | ||
|
||
this.commonProperties.device_id = await this.deviceIdPromise; | ||
|
||
this.isBufferingEvents = false; | ||
return instance; | ||
} | ||
|
||
public async close(): Promise<void> { | ||
this.deviceIdAbortController.abort(); | ||
this.isBufferingEvents = false; | ||
await this.emitEvents(this.eventCache.getEvents()); | ||
await this.flush(); | ||
} | ||
|
||
/** | ||
* Emits events through the telemetry pipeline | ||
* @param events - The events to emit | ||
*/ | ||
public async emitEvents(events: BaseEvent[]): Promise<void> { | ||
try { | ||
if (!this.isTelemetryEnabled()) { | ||
logger.info(LogId.telemetryEmitFailure, "telemetry", `Telemetry is disabled.`); | ||
return; | ||
} | ||
|
||
await this.emit(events); | ||
} catch { | ||
logger.debug(LogId.telemetryEmitFailure, "telemetry", `Error emitting telemetry events.`); | ||
} | ||
public emitEvents(events: BaseEvent[]): void { | ||
void this.flush(events); | ||
} | ||
|
||
/** | ||
* Gets the common properties for events | ||
* @returns Object containing common properties for all events | ||
*/ | ||
public getCommonProperties(): CommonProperties { | ||
return { | ||
...this.commonProperties, | ||
mcp_client_version: this.session.agentRunner?.version, | ||
mcp_client_name: this.session.agentRunner?.name, | ||
session_id: this.session.sessionId, | ||
config_atlas_auth: this.session.apiClient.hasCredentials() ? "true" : "false", | ||
config_connection_string: this.userConfig.connectionString ? "true" : "false", | ||
}; | ||
private async getCommonProperties(): Promise<CommonProperties> { | ||
if (!this.cachedCommonProperties) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still basically identical to I can see some value in the idea that we'd only start trying to resolve the device ID etc. stuff when there's an emit emission but this will be problematic and harder to reason about: if there's 2 I still think the responsibility of a 1-time resolution of machine should be inside the setup function and not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
but
We could check for then we'd prevent the async clash
yeah, that actually does make it much better in my view. then I do see some of the general value of starting this only when an emission happens, if that is the intended improvement. |
||
let deviceId: string | undefined; | ||
let containerEnv: boolean | undefined; | ||
try { | ||
await Promise.all([ | ||
getDeviceId({ | ||
getMachineId: () => this.getRawMachineId(), | ||
onError: (reason, error) => { | ||
switch (reason) { | ||
case "resolutionError": | ||
logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", String(error)); | ||
break; | ||
case "timeout": | ||
logger.debug( | ||
LogId.telemetryDeviceIdTimeout, | ||
"telemetry", | ||
"Device ID retrieval timed out" | ||
); | ||
break; | ||
case "abort": | ||
// No need to log in the case of aborts | ||
break; | ||
} | ||
}, | ||
abortSignal: this.deviceIdAbortController.signal, | ||
}).then((id) => { | ||
deviceId = id; | ||
}), | ||
this.getContainerEnv().then((env) => { | ||
containerEnv = env; | ||
}), | ||
]); | ||
} catch (error: unknown) { | ||
const err = error instanceof Error ? error : new Error(String(error)); | ||
logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", err.message); | ||
} | ||
this.cachedCommonProperties = { | ||
...MACHINE_METADATA, | ||
mcp_client_version: this.session.agentRunner?.version, | ||
mcp_client_name: this.session.agentRunner?.name, | ||
session_id: this.session.sessionId, | ||
config_atlas_auth: this.session.apiClient.hasCredentials() ? "true" : "false", | ||
config_connection_string: this.userConfig.connectionString ? "true" : "false", | ||
is_container_env: containerEnv ? "true" : "false", | ||
device_id: deviceId, | ||
}; | ||
} | ||
|
||
return this.cachedCommonProperties; | ||
} | ||
|
||
/** | ||
|
@@ -135,60 +173,74 @@ export class Telemetry { | |
} | ||
|
||
/** | ||
* Attempts to emit events through authenticated and unauthenticated clients | ||
* Attempts to flush events through authenticated and unauthenticated clients | ||
* Falls back to caching if both attempts fail | ||
*/ | ||
private async emit(events: BaseEvent[]): Promise<void> { | ||
if (this.isBufferingEvents) { | ||
this.eventCache.appendEvents(events); | ||
public async flush(events?: BaseEvent[]): Promise<void> { | ||
if (!this.isTelemetryEnabled()) { | ||
logger.info(LogId.telemetryEmitFailure, "telemetry", `Telemetry is disabled.`); | ||
return; | ||
} | ||
|
||
const cachedEvents = this.eventCache.getEvents(); | ||
const allEvents = [...cachedEvents, ...events]; | ||
if (this.flushing) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't this identical to buffering? could we structure it as buffering? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I disagree with buffering as a terminology here, if we are emptying the cache after sending events over network it means we are no longer accumulating but rather releasing resources. The only term I've seen used to resonate with that purpose is flush, similar to flush logs after a buffer is too big for instance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Well, we are releasing already buffered resources but we're buffering all future resources. The state of the boolean to me in usage is more relevant generally to the future resources rather than the past resources, i.e. we use it largely to determine whether or not to We do actually have a concept of flush along with buffering in https://github.com/mongodb-js/mongosh/blob/main/packages/logging/src/logging-and-telemetry.ts#L129 |
||
this.eventCache.appendEvents(events ?? []); | ||
process.nextTick(async () => { | ||
// try again if in the middle of a flush | ||
await this.flush(); | ||
}); | ||
return; | ||
} | ||
|
||
logger.debug( | ||
LogId.telemetryEmitStart, | ||
"telemetry", | ||
`Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)` | ||
); | ||
this.flushing = true; | ||
|
||
const result = await this.sendEvents(this.session.apiClient, allEvents); | ||
if (result.success) { | ||
try { | ||
const cachedEvents = this.eventCache.getEvents(); | ||
const allEvents = [...cachedEvents, ...(events ?? [])]; | ||
if (allEvents.length <= 0) { | ||
this.flushing = false; | ||
return; | ||
} | ||
|
||
logger.debug( | ||
LogId.telemetryEmitStart, | ||
"telemetry", | ||
`Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)` | ||
); | ||
|
||
await this.sendEvents(this.session.apiClient, allEvents); | ||
this.eventCache.clearEvents(); | ||
logger.debug( | ||
LogId.telemetryEmitSuccess, | ||
"telemetry", | ||
`Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}` | ||
); | ||
return; | ||
} catch (error: unknown) { | ||
logger.debug( | ||
LogId.telemetryEmitFailure, | ||
"telemetry", | ||
`Error sending event to client: ${error instanceof Error ? error.message : String(error)}` | ||
); | ||
this.eventCache.appendEvents(events ?? []); | ||
process.nextTick(async () => { | ||
// try again | ||
await this.flush(); | ||
}); | ||
} | ||
|
||
logger.debug( | ||
LogId.telemetryEmitFailure, | ||
"telemetry", | ||
`Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}` | ||
); | ||
this.eventCache.appendEvents(events); | ||
this.flushing = false; | ||
} | ||
|
||
/** | ||
* Attempts to send events through the provided API client | ||
*/ | ||
private async sendEvents(client: ApiClient, events: BaseEvent[]): Promise<EventResult> { | ||
try { | ||
await client.sendEvents( | ||
events.map((event) => ({ | ||
...event, | ||
properties: { ...this.getCommonProperties(), ...event.properties }, | ||
})) | ||
); | ||
return { success: true }; | ||
} catch (error) { | ||
return { | ||
success: false, | ||
error: error instanceof Error ? error : new Error(String(error)), | ||
}; | ||
} | ||
private async sendEvents(client: ApiClient, events: BaseEvent[]): Promise<void> { | ||
const commonProperties = await this.getCommonProperties(); | ||
|
||
await client.sendEvents( | ||
events.map((event) => ({ | ||
...event, | ||
properties: { ...commonProperties, ...event.properties }, | ||
})) | ||
); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.