diff --git a/integration_tests/snapshots/logs/sync-metrics_node16.log b/integration_tests/snapshots/logs/sync-metrics_node16.log index f3b12198..0ec11c1a 100644 --- a/integration_tests/snapshots/logs/sync-metrics_node16.log +++ b/integration_tests/snapshots/logs/sync-metrics_node16.log @@ -19,6 +19,7 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGat HTTP POST https://api.datadoghq.com/api/v1/distribution_points?api_key=XXXX TraceHeaders: ["x-datadog-parent-id:XXXX","x-datadog-sampling-priority:-1","x-datadog-trace-id:XXXX"] END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB START +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request { "e": XXXX, "m": "aws.lambda.enhanced.invocations", @@ -34,10 +35,10 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request HTTP POST https://api.datadoghq.com/api/v1/distribution_points?api_key=XXXX TraceHeaders: ["x-datadog-parent-id:XXXX","x-datadog-sampling-priority:-1","x-datadog-trace-id:XXXX"] END Duration: XXXX ms Memory Used: XXXX MB START +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request { "e": XXXX, "m": "aws.lambda.enhanced.invocations", @@ -53,6 +54,5 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request HTTP POST https://api.datadoghq.com/api/v1/distribution_points?api_key=XXXX TraceHeaders: ["x-datadog-parent-id:XXXX","x-datadog-sampling-priority:-1","x-datadog-trace-id:XXXX"] END Duration: XXXX ms Memory Used: XXXX MB diff --git a/integration_tests/snapshots/logs/sync-metrics_node18.log b/integration_tests/snapshots/logs/sync-metrics_node18.log index 2c776379..68b6549d 100644 --- a/integration_tests/snapshots/logs/sync-metrics_node18.log +++ b/integration_tests/snapshots/logs/sync-metrics_node18.log @@ -19,6 +19,7 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGat HTTP POST https://api.datadoghq.com/api/v1/distribution_points?api_key=XXXX TraceHeaders: ["x-datadog-parent-id:XXXX","x-datadog-sampling-priority:-1","x-datadog-trace-id:XXXX"] END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB START +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request { "e": XXXX, "m": "aws.lambda.enhanced.invocations", @@ -34,7 +35,6 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request HTTP POST https://api.datadoghq.com/api/v1/distribution_points?api_key=XXXX TraceHeaders: ["x-datadog-parent-id:XXXX","x-datadog-sampling-priority:-1","x-datadog-trace-id:XXXX"] END Duration: XXXX ms Memory Used: XXXX MB START diff --git a/integration_tests/snapshots/logs/sync-metrics_node20.log b/integration_tests/snapshots/logs/sync-metrics_node20.log index 67d98486..e1c67ee4 100644 --- a/integration_tests/snapshots/logs/sync-metrics_node20.log +++ b/integration_tests/snapshots/logs/sync-metrics_node20.log @@ -19,7 +19,6 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGat HTTP POST https://api.datadoghq.com/api/v1/distribution_points?api_key=XXXX TraceHeaders: ["x-datadog-parent-id:XXXX","x-datadog-sampling-priority:-1","x-datadog-trace-id:XXXX"] END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB START -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request { "e": XXXX, "m": "aws.lambda.enhanced.invocations", @@ -35,9 +34,11 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS re ], "v": 1 } +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request HTTP POST https://api.datadoghq.com/api/v1/distribution_points?api_key=XXXX TraceHeaders: ["x-datadog-parent-id:XXXX","x-datadog-sampling-priority:-1","x-datadog-trace-id:XXXX"] END Duration: XXXX ms Memory Used: XXXX MB START +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request { "e": XXXX, "m": "aws.lambda.enhanced.invocations", @@ -53,6 +54,5 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request HTTP POST https://api.datadoghq.com/api/v1/distribution_points?api_key=XXXX TraceHeaders: ["x-datadog-parent-id:XXXX","x-datadog-sampling-priority:-1","x-datadog-trace-id:XXXX"] END Duration: XXXX ms Memory Used: XXXX MB diff --git a/src/metrics/api.ts b/src/metrics/api.ts index f820dfc5..6594ef9f 100644 --- a/src/metrics/api.ts +++ b/src/metrics/api.ts @@ -1,7 +1,8 @@ import querystring from "querystring"; import { URL } from "url"; -import { logDebug, post } from "../utils"; +import { logDebug } from "../utils"; +import { post } from "../utils/request"; import { APIMetric } from "./model"; const API_KEY_QUERY_PARAM = "api_key"; diff --git a/src/metrics/extension.spec.ts b/src/metrics/extension.spec.ts index b3e5801a..251c594b 100644 --- a/src/metrics/extension.spec.ts +++ b/src/metrics/extension.spec.ts @@ -1,6 +1,5 @@ import nock from "nock"; - -import { isExtensionRunning, flushExtension, EXTENSION_URL } from "./extension"; +import { isExtensionRunning, EXTENSION_URL, flushExtension } from "./extension"; import mock from "mock-fs"; describe("isExtensionRunning", () => { @@ -22,15 +21,16 @@ describe("isExtensionRunning", () => { expect(ran).toBeFalsy(); }); }); + describe("flushExtension", () => { it("calls flush on the extension", async () => { const scope = nock(EXTENSION_URL).post("/lambda/flush", JSON.stringify({})).reply(200); - await flushExtension(); + await flushExtension(true); expect(scope.isDone()).toBeTruthy(); }); it("catches error when flush doesn't respond", async () => { const scope = nock(EXTENSION_URL).post("/lambda/flush", JSON.stringify({})).replyWithError("Unavailable"); - await flushExtension(); + await flushExtension(true); expect(scope.isDone()).toBeTruthy(); }); }); diff --git a/src/metrics/extension.ts b/src/metrics/extension.ts index 4266d28d..14a12f3d 100644 --- a/src/metrics/extension.ts +++ b/src/metrics/extension.ts @@ -1,26 +1,16 @@ import { URL } from "url"; -import { post, logDebug, logError } from "../utils"; +import { logDebug, logError } from "../utils"; import fs from "fs"; export const EXTENSION_URL = "http://127.0.0.1:8124"; const EXTENSION_PATH = "/opt/extensions/datadog-agent"; -const LOCAL_FLUSH_PATH = "/lambda/flush"; const LOCAL_FLUSH_TIMEOUT_MS = 100; +const LOCAL_FLUSH_PATH = "/lambda/flush"; export async function isExtensionRunning() { const extensionExists = await fileExists(EXTENSION_PATH); if (!extensionExists) { - logDebug(`Extension Layer is not present`); - return false; - } - return true; -} - -export async function flushExtension(): Promise { - const url = new URL(LOCAL_FLUSH_PATH, EXTENSION_URL); - const result = await post(url, {}, { timeout: LOCAL_FLUSH_TIMEOUT_MS }); - if (!result.success) { - logError(`Failed to flush extension. ${result.errorMessage}`); + logDebug(`Extension Layer is not present.`); return false; } return true; @@ -32,3 +22,24 @@ function fileExists(filename: string): Promise { .then(() => true) .catch(() => false); } + +export async function flushExtension(localTesting = false) { + if (localTesting) { + try { + const { post } = require("../utils/request"); + const url = new URL(LOCAL_FLUSH_PATH, EXTENSION_URL); + const result = await post(url, {}, { timeout: LOCAL_FLUSH_TIMEOUT_MS }); + if (!result.success) { + logError(`Failed to flush extension. ${result.errorMessage}`); + } + + return true; + } catch (error) { + if (error instanceof Error) { + logError("Failed to flush extension", error); + } + } + } + + return false; +} diff --git a/src/metrics/listener.spec.ts b/src/metrics/listener.spec.ts index 233ad044..f1c4bd06 100644 --- a/src/metrics/listener.spec.ts +++ b/src/metrics/listener.spec.ts @@ -78,7 +78,7 @@ describe("MetricsListener", () => { siteURL, }); - listener.onStartInvocation({}); + await listener.onStartInvocation({}); listener.sendDistributionMetric("my-metric", 10, false, "tag:a", "tag:b"); await expect(listener.onCompleteInvocation()).resolves.toEqual(undefined); }); @@ -153,7 +153,7 @@ describe("MetricsListener", () => { }); // jest.useFakeTimers(); - listener.onStartInvocation({}); + await listener.onStartInvocation({}); listener.sendDistributionMetricWithDate("my-metric", 10, new Date(1584983836 * 1000), false, "tag:a", "tag:b"); await listener.onCompleteInvocation(); diff --git a/src/metrics/listener.ts b/src/metrics/listener.ts index d75b1023..9175503a 100644 --- a/src/metrics/listener.ts +++ b/src/metrics/listener.ts @@ -1,14 +1,12 @@ import { StatsD } from "hot-shots"; import { promisify } from "util"; import { logDebug, logError } from "../utils"; -import { APIClient } from "./api"; import { flushExtension, isExtensionRunning } from "./extension"; import { KMSService } from "./kms-service"; import { writeMetricToStdout } from "./metric-log"; import { Distribution } from "./model"; -import { Processor } from "./processor"; -const metricsBatchSendIntervalMS = 10000; // 10 seconds +const METRICS_BATCH_SEND_INTERVAL = 10000; // 10 seconds export interface MetricsConfig { /** @@ -56,7 +54,7 @@ export interface MetricsConfig { } export class MetricsListener { - private currentProcessor?: Promise; + private currentProcessor?: Promise; private apiKey: Promise; private statsDClient?: StatsD; private isExtensionRunning?: boolean = undefined; @@ -83,6 +81,7 @@ export class MetricsListener { return; } + this.currentProcessor = this.createProcessor(this.config, this.apiKey); } @@ -120,15 +119,11 @@ export class MetricsListener { logError("failed to flush metrics", error as Error); } } - try { - if (this.isExtensionRunning && this.config.localTesting) { - logDebug(`Flushing Extension for local test`); - await flushExtension(); - } - } catch (error) { - if (error instanceof Error) { - logError("failed to flush extension", error as Error); - } + + // Flush only when testing extension locally. + // Passing config flag so we can lazy load the request module. + if (this.isExtensionRunning) { + await flushExtension(this.config.localTesting); } this.currentProcessor = undefined; } @@ -171,12 +166,17 @@ export class MetricsListener { } private async createProcessor(config: MetricsConfig, apiKey: Promise) { - const key = await apiKey; - const url = `https://api.${config.siteURL}`; - const apiClient = new APIClient(key, url); - const processor = new Processor(apiClient, metricsBatchSendIntervalMS, config.shouldRetryMetrics); - processor.startProcessing(); - return processor; + if (!this.isExtensionRunning && !this.config.logForwarding) { + const { APIClient } = require("./api"); + const { Processor } = require("./processor"); + + const key = await apiKey; + const url = `https://api.${config.siteURL}`; + const apiClient = new APIClient(key, url); + const processor = new Processor(apiClient, METRICS_BATCH_SEND_INTERVAL, config.shouldRetryMetrics); + processor.startProcessing(); + return processor; + } } private async getAPIKey(config: MetricsConfig) { diff --git a/src/trace/patch-http.ts b/src/trace/patch-http.ts index 1ea361a5..ed31211b 100644 --- a/src/trace/patch-http.ts +++ b/src/trace/patch-http.ts @@ -33,6 +33,8 @@ export function unpatchHttp() { } function patchMethod(mod: typeof http | typeof https, method: "get" | "request", contextService: TraceContextService) { + if (mod[method].__wrapped !== undefined) return; // Only patch once + shimmer.wrap(mod, method, (original) => { const fn = (arg1: any, arg2: any, arg3: any) => { [arg1, arg2, arg3] = addTraceContextToArgs(contextService, arg1, arg2, arg3); diff --git a/src/utils/index.ts b/src/utils/index.ts index b66bc6b8..9bb311e7 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -2,5 +2,4 @@ export { didFunctionColdStart, getSandboxInitTags, setSandboxInit, isProactiveIn export { wrap, promisifiedHandler } from "./handler"; export { Timer } from "./timer"; export { logError, logDebug, Logger, setLogLevel, setLogger, LogLevel } from "./log"; -export { get, post } from "./request"; export { tagObject } from "./tag-object";