From 77d976fbbe83cc35e054e77f596a9bb8d9397cc1 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Tue, 24 Jan 2023 17:29:11 -0800 Subject: [PATCH 1/8] Enforce function timeout at worker level. --- src/emulator/functionsEmulator.ts | 8 +- src/emulator/functionsEmulatorRuntime.ts | 36 +---- src/emulator/functionsRuntimeWorker.ts | 136 +++++++++++------- .../emulators/functionsRuntimeWorker.spec.ts | 69 +++++---- 4 files changed, 128 insertions(+), 121 deletions(-) diff --git a/src/emulator/functionsEmulator.ts b/src/emulator/functionsEmulator.ts index c4c7f3edd1f..6965429405d 100644 --- a/src/emulator/functionsEmulator.ts +++ b/src/emulator/functionsEmulator.ts @@ -1108,12 +1108,6 @@ export class FunctionsEmulator implements EmulatorInstance { envs.K_REVISION = "1"; envs.PORT = "80"; - // TODO(danielylee): Later, we want timeout to be enforce by the data plane. For now, we rely on the runtime to - // enforce timeout. - if (trigger?.timeoutSeconds) { - envs.FUNCTIONS_EMULATOR_TIMEOUT_SECONDS = trigger.timeoutSeconds.toString(); - } - if (trigger) { const target = trigger.entryPoint; envs.FUNCTION_TARGET = target; @@ -1357,7 +1351,7 @@ export class FunctionsEmulator implements EmulatorInstance { }; const pool = this.workerPools[backend.codebase]; - const worker = pool.addWorker(trigger?.id, runtime, extensionLogInfo); + const worker = pool.addWorker(trigger, runtime, extensionLogInfo); await worker.waitForSocketReady(); return worker; } diff --git a/src/emulator/functionsEmulatorRuntime.ts b/src/emulator/functionsEmulatorRuntime.ts index d514c43181e..5e3b20686a3 100644 --- a/src/emulator/functionsEmulatorRuntime.ts +++ b/src/emulator/functionsEmulatorRuntime.ts @@ -1018,12 +1018,6 @@ async function main(): Promise { }); app.all(`/*`, async (req: express.Request, res: express.Response) => { try { - new EmulatorLog( - "INFO", - "runtime-status", - `Beginning execution of "${FUNCTION_TARGET_NAME}"` - ).log(); - const trigger = FUNCTION_TARGET_NAME.split(".").reduce((mod, functionTargetPart) => { return mod?.[functionTargetPart]; }, functionModule) as CloudFunction; @@ -1031,18 +1025,6 @@ async function main(): Promise { throw new Error(`Failed to find function ${FUNCTION_TARGET_NAME} in the loaded module`); } - const startHrTime = process.hrtime(); - res.on("finish", () => { - const elapsedHrTime = process.hrtime(startHrTime); - new EmulatorLog( - "INFO", - "runtime-status", - `Finished "${FUNCTION_TARGET_NAME}" in ${ - elapsedHrTime[0] * 1000 + elapsedHrTime[1] / 1000000 - }ms` - ).log(); - }); - switch (FUNCTION_SIGNATURE) { case "event": case "cloudevent": @@ -1063,25 +1045,9 @@ async function main(): Promise { res.status(500).send(err.message); } }); - const server = app.listen(process.env.PORT, () => { + app.listen(process.env.PORT, () => { logDebug(`Listening to port: ${process.env.PORT}`); }); - if (!FUNCTION_DEBUG_MODE) { - let timeout = process.env.FUNCTIONS_EMULATOR_TIMEOUT_SECONDS || "60"; - if (timeout.endsWith("s")) { - timeout = timeout.slice(0, -1); - } - const timeoutMs = parseInt(timeout, 10) * 1000; - server.setTimeout(timeoutMs, () => { - new EmulatorLog( - "FATAL", - "runtime-error", - `Your function timed out after ~${timeout}s. To configure this timeout, see - https://firebase.google.com/docs/functions/manage-functions#set_timeout_and_memory_allocation.` - ).log(); - return flushAndExit(1); - }); - } // Event emitters do not work well with async functions, so we // construct our own promise chain to make sure each message is diff --git a/src/emulator/functionsRuntimeWorker.ts b/src/emulator/functionsRuntimeWorker.ts index 5365b7989d2..7c2188e883c 100644 --- a/src/emulator/functionsRuntimeWorker.ts +++ b/src/emulator/functionsRuntimeWorker.ts @@ -3,12 +3,11 @@ import * as uuid from "uuid"; import { FunctionsRuntimeInstance } from "./functionsEmulator"; import { EmulatorLog, Emulators, FunctionsExecutionMode } from "./types"; -import { FunctionsRuntimeBundle } from "./functionsEmulatorShared"; +import { EmulatedTriggerDefinition, FunctionsRuntimeBundle } from "./functionsEmulatorShared"; import { EventEmitter } from "events"; import { EmulatorLogger, ExtensionLogInfo } from "./emulatorLogger"; import { FirebaseError } from "../error"; import { Serializable } from "child_process"; -import { IncomingMessage } from "http"; type LogListener = (el: EmulatorLog) => any; @@ -32,17 +31,22 @@ export enum RuntimeWorkerState { export class RuntimeWorker { readonly id: string; - readonly key: string; - readonly runtime: FunctionsRuntimeInstance; + readonly triggerKey: string; stateEvents: EventEmitter = new EventEmitter(); private logListeners: Array = []; + private logger: EmulatorLogger; private _state: RuntimeWorkerState = RuntimeWorkerState.CREATED; - constructor(key: string, runtime: FunctionsRuntimeInstance) { + constructor( + triggerId: string | undefined, + readonly runtime: FunctionsRuntimeInstance, + readonly extensionLogInfo: ExtensionLogInfo, + readonly timeoutSeconds?: number + ) { this.id = uuid.v4(); - this.key = key; + this.triggerKey = triggerId || "~free~"; this.runtime = runtime; const childProc = this.runtime.process; @@ -64,8 +68,15 @@ export class RuntimeWorker { }); } + this.logger = triggerId + ? EmulatorLogger.forFunction(triggerId, extensionLogInfo) + : EmulatorLogger.forEmulator(Emulators.FUNCTIONS); + this.onLogs((log: EmulatorLog) => { + this.logger.handleRuntimeLog(log); + }, true /* listen forever */); + childProc.on("exit", () => { - this.log("exited"); + this.logDebug("exited"); this.state = RuntimeWorkerState.FINISHED; }); } @@ -107,33 +118,57 @@ export class RuntimeWorker { } request(req: http.RequestOptions, resp: http.ServerResponse, body?: unknown): Promise { + if (this.triggerKey !== "~free~") { + this.logInfo(`Beginning execution of "${this.triggerKey}"`); + } + const startHrTime = process.hrtime(); + this.state = RuntimeWorkerState.BUSY; const onFinish = (): void => { + if (this.triggerKey !== "~free~") { + const elapsedHrTime = process.hrtime(startHrTime); + this.logInfo( + `Finished "${this.triggerKey}" in ${ + elapsedHrTime[0] * 1000 + elapsedHrTime[1] / 1000000 + }ms` + ); + } + if (this.state === RuntimeWorkerState.BUSY) { this.state = RuntimeWorkerState.IDLE; } else if (this.state === RuntimeWorkerState.FINISHING) { - this.log(`IDLE --> FINISHING`); + this.logDebug(`IDLE --> FINISHING`); this.runtime.process.kill(); } }; return new Promise((resolve) => { - const proxy = http.request( - { - ...this.runtime.conn.httpReqOpts(), - method: req.method, - path: req.path, - headers: req.headers, - }, - (_resp: IncomingMessage) => { - resp.writeHead(_resp.statusCode || 200, _resp.headers); - const piped = _resp.pipe(resp); - piped.on("finish", () => { - onFinish(); - resolve(); - }); - } - ); + const reqOpts = { + ...this.runtime.conn.httpReqOpts(), + method: req.method, + path: req.path, + headers: req.headers, + }; + if (this.timeoutSeconds) { + reqOpts.timeout = this.timeoutSeconds * 1000; + } + const proxy = http.request(reqOpts, (_resp: http.IncomingMessage) => { + resp.writeHead(_resp.statusCode || 200, _resp.headers); + const piped = _resp.pipe(resp); + piped.on("finish", () => { + onFinish(); + resolve(); + }); + }); + proxy.on("timeout", () => { + this.logger.log( + "ERROR", + `Your function timed out after ~${this.timeoutSeconds}s. To configure this timeout, see + https://firebase.google.com/docs/functions/manage-functions#set_timeout_and_memory_allocation.` + ); + proxy.destroy(); + }); proxy.on("error", (err) => { + this.logger.log("ERROR", `Request to function failed: ${err}`); resp.writeHead(500); resp.write(JSON.stringify(err)); resp.end(); @@ -164,7 +199,7 @@ export class RuntimeWorker { this.runtime.events.removeAllListeners(); } - this.log(state); + this.logDebug(state); this._state = state; this.stateEvents.emit(this._state); } @@ -220,11 +255,12 @@ export class RuntimeWorker { } } - private log(msg: string): void { - EmulatorLogger.forEmulator(Emulators.FUNCTIONS).log( - "DEBUG", - `[worker-${this.key}-${this.id}]: ${msg}` - ); + private logDebug(msg: string): void { + this.logger.log("DEBUG", `[worker-${this.triggerKey}-${this.id}]: ${msg}`); + } + + private logInfo(msg: string): void { + this.logger.logLabeled("BULLET", "functions", msg); } } @@ -233,7 +269,7 @@ export class RuntimeWorkerPool { constructor(private mode: FunctionsExecutionMode = FunctionsExecutionMode.AUTO) {} - getKey(triggerId: string | undefined) { + getKey(triggerId: string | undefined): string { if (this.mode === FunctionsExecutionMode.SEQUENTIAL) { return "~shared~"; } else { @@ -247,15 +283,15 @@ export class RuntimeWorkerPool { * each BUSY worker we move it to the FINISHING state so that it will * kill itself after it's done with its current task. */ - refresh() { + refresh(): void { for (const arr of this.workers.values()) { arr.forEach((w) => { if (w.state === RuntimeWorkerState.IDLE) { - this.log(`Shutting down IDLE worker (${w.key})`); + this.log(`Shutting down IDLE worker (${w.triggerKey})`); w.state = RuntimeWorkerState.FINISHING; w.runtime.process.kill(); } else if (w.state === RuntimeWorkerState.BUSY) { - this.log(`Marking BUSY worker to finish (${w.key})`); + this.log(`Marking BUSY worker to finish (${w.triggerKey})`); w.state = RuntimeWorkerState.FINISHING; } }); @@ -265,7 +301,7 @@ export class RuntimeWorkerPool { /** * Immediately kill all workers. */ - exit() { + exit(): void { for (const arr of this.workers.values()) { arr.forEach((w) => { if (w.state === RuntimeWorkerState.IDLE) { @@ -340,25 +376,27 @@ export class RuntimeWorkerPool { * `worker.readyForWork()` or `worker.waitForSocketReady()`. */ addWorker( - triggerId: string | undefined, + trigger: EmulatedTriggerDefinition | undefined, runtime: FunctionsRuntimeInstance, - extensionLogInfo?: ExtensionLogInfo + extensionLogInfo: ExtensionLogInfo ): RuntimeWorker { - const worker = new RuntimeWorker(this.getKey(triggerId), runtime); - this.log(`addWorker(${worker.key})`); + this.log(`addWorker(${this.getKey(trigger?.id)})`); + // Disable worker timeout if: + // (1) This is a diagnostic call without trigger id OR + // (2) If in SEQUENTIAL execution mode + const disableTimeout = !trigger?.id || this.mode === FunctionsExecutionMode.SEQUENTIAL; + const worker = new RuntimeWorker( + trigger?.id, + runtime, + extensionLogInfo, + disableTimeout ? undefined : trigger?.timeoutSeconds + ); - const keyWorkers = this.getTriggerWorkers(triggerId); + const keyWorkers = this.getTriggerWorkers(trigger?.id); keyWorkers.push(worker); - this.setTriggerWorkers(triggerId, keyWorkers); - - const logger = triggerId - ? EmulatorLogger.forFunction(triggerId, extensionLogInfo) - : EmulatorLogger.forEmulator(Emulators.FUNCTIONS); - worker.onLogs((log: EmulatorLog) => { - logger.handleRuntimeLog(log); - }, true /* listen forever */); + this.setTriggerWorkers(trigger?.id, keyWorkers); - this.log(`Adding worker with key ${worker.key}, total=${keyWorkers.length}`); + this.log(`Adding worker with key ${worker.triggerKey}, total=${keyWorkers.length}`); return worker; } diff --git a/src/test/emulators/functionsRuntimeWorker.spec.ts b/src/test/emulators/functionsRuntimeWorker.spec.ts index 00bc983930c..28a73687eaf 100644 --- a/src/test/emulators/functionsRuntimeWorker.spec.ts +++ b/src/test/emulators/functionsRuntimeWorker.spec.ts @@ -8,6 +8,7 @@ import { RuntimeWorkerPool, RuntimeWorkerState, } from "../../emulator/functionsRuntimeWorker"; +import { EmulatedTriggerDefinition } from "../../emulator/functionsEmulatorShared"; import { EmulatorLog, FunctionsExecutionMode } from "../../emulator/types"; import { ChildProcess } from "child_process"; @@ -82,14 +83,22 @@ class WorkerStateCounter { } } -describe("FunctionsRuntimeWorker", () => { - const workerPool = new RuntimeWorkerPool(); +function mockTrigger(id: string): EmulatedTriggerDefinition { + return { + id, + name: id, + entryPoint: id, + region: "us-central1", + platform: "gcfv2", + }; +} +describe("FunctionsRuntimeWorker", () => { describe("RuntimeWorker", () => { it("goes from created --> idle --> busy --> idle in normal operation", async () => { const scope = nock("http://localhost").get("/").reply(200); - const worker = new RuntimeWorker(workerPool.getKey("trigger"), new MockRuntimeInstance()); + const worker = new RuntimeWorker("trigger", new MockRuntimeInstance(), {}); const counter = new WorkerStateCounter(worker); worker.readyForWork(); @@ -108,7 +117,7 @@ describe("FunctionsRuntimeWorker", () => { it("goes from created --> idle --> busy --> finished when there's an error", async () => { const scope = nock("http://localhost").get("/").replyWithError("boom"); - const worker = new RuntimeWorker(workerPool.getKey("trigger"), new MockRuntimeInstance()); + const worker = new RuntimeWorker("trigger", new MockRuntimeInstance(), {}); const counter = new WorkerStateCounter(worker); worker.readyForWork(); @@ -128,7 +137,7 @@ describe("FunctionsRuntimeWorker", () => { it("goes from created --> busy --> finishing --> finished when marked", async () => { const scope = nock("http://localhost").get("/").replyWithError("boom"); - const worker = new RuntimeWorker(workerPool.getKey("trigger"), new MockRuntimeInstance()); + const worker = new RuntimeWorker("trigger", new MockRuntimeInstance(), {}); const counter = new WorkerStateCounter(worker); worker.readyForWork(); @@ -153,42 +162,42 @@ describe("FunctionsRuntimeWorker", () => { const scope = nock("http://localhost").get("/").reply(200); const pool = new RuntimeWorkerPool(); - const trigger = "region-trigger1"; + const triggerId = "region-trigger1"; // No idle workers to begin - expect(pool.getIdleWorker(trigger)).to.be.undefined; + expect(pool.getIdleWorker(triggerId)).to.be.undefined; // Add a worker and make sure it's there - const worker = pool.addWorker(trigger, new MockRuntimeInstance()); + const worker = pool.addWorker(mockTrigger(triggerId), new MockRuntimeInstance(), {}); worker.readyForWork(); - const triggerWorkers = pool.getTriggerWorkers(trigger); + const triggerWorkers = pool.getTriggerWorkers(triggerId); expect(triggerWorkers.length).length.to.eq(1); - expect(pool.getIdleWorker(trigger)).to.eql(worker); + expect(pool.getIdleWorker(triggerId)).to.eql(worker); const resp = httpMocks.createResponse({ eventEmitter: EventEmitter }); resp.on("end", () => { // Finished sending response. About to go back to IDLE state. - expect(pool.getIdleWorker(trigger)).to.be.undefined; + expect(pool.getIdleWorker(triggerId)).to.be.undefined; }); await worker.request({ method: "GET", path: "/" }, resp); scope.done(); // Completed handling request. Worker should be IDLE again. - expect(pool.getIdleWorker(trigger)).to.eql(worker); + expect(pool.getIdleWorker(triggerId)).to.eql(worker); }); it("does not consider failed workers idle", async () => { const pool = new RuntimeWorkerPool(); - const trigger = "trigger1"; + const triggerId = "trigger1"; // No idle workers to begin - expect(pool.getIdleWorker(trigger)).to.be.undefined; + expect(pool.getIdleWorker(triggerId)).to.be.undefined; // Add a worker to the pool that's destined to fail. const scope = nock("http://localhost").get("/").replyWithError("boom"); - const worker = pool.addWorker(trigger, new MockRuntimeInstance()); + const worker = pool.addWorker(mockTrigger(triggerId), new MockRuntimeInstance(), {}); worker.readyForWork(); - expect(pool.getIdleWorker(trigger)).to.eql(worker); + expect(pool.getIdleWorker(triggerId)).to.eql(worker); // Send request to the worker. Request should fail, killing the worker. await worker.request( @@ -198,18 +207,18 @@ describe("FunctionsRuntimeWorker", () => { scope.done(); // Confirm there are no idle workers. - expect(pool.getIdleWorker(trigger)).to.be.undefined; + expect(pool.getIdleWorker(triggerId)).to.be.undefined; }); it("exit() kills idle and busy workers", async () => { const pool = new RuntimeWorkerPool(); - const trigger = "trigger1"; + const triggerId = "trigger1"; - const busyWorker = pool.addWorker(trigger, new MockRuntimeInstance()); + const busyWorker = pool.addWorker(mockTrigger(triggerId), new MockRuntimeInstance(), {}); busyWorker.readyForWork(); const busyWorkerCounter = new WorkerStateCounter(busyWorker); - const idleWorker = pool.addWorker(trigger, new MockRuntimeInstance()); + const idleWorker = pool.addWorker(mockTrigger(triggerId), new MockRuntimeInstance(), {}); idleWorker.readyForWork(); const idleWorkerCounter = new WorkerStateCounter(idleWorker); @@ -234,13 +243,13 @@ describe("FunctionsRuntimeWorker", () => { it("refresh() kills idle workers and marks busy ones as finishing", async () => { const pool = new RuntimeWorkerPool(); - const trigger = "trigger1"; + const triggerId = "trigger1"; - const busyWorker = pool.addWorker(trigger, new MockRuntimeInstance()); + const busyWorker = pool.addWorker(mockTrigger(triggerId), new MockRuntimeInstance(), {}); busyWorker.readyForWork(); const busyWorkerCounter = new WorkerStateCounter(busyWorker); - const idleWorker = pool.addWorker(trigger, new MockRuntimeInstance()); + const idleWorker = pool.addWorker(mockTrigger(triggerId), new MockRuntimeInstance(), {}); idleWorker.readyForWork(); const idleWorkerCounter = new WorkerStateCounter(idleWorker); @@ -265,23 +274,23 @@ describe("FunctionsRuntimeWorker", () => { it("gives assigns all triggers to the same worker in sequential mode", async () => { const scope = nock("http://localhost").get("/").reply(200); - const trigger1 = "region-abc"; - const trigger2 = "region-def"; + const triggerId1 = "region-abc"; + const triggerId2 = "region-def"; const pool = new RuntimeWorkerPool(FunctionsExecutionMode.SEQUENTIAL); - const worker = pool.addWorker(trigger1, new MockRuntimeInstance()); + const worker = pool.addWorker(mockTrigger(triggerId1), new MockRuntimeInstance(), {}); worker.readyForWork(); const resp = httpMocks.createResponse({ eventEmitter: EventEmitter }); resp.on("end", () => { - expect(pool.readyForWork(trigger1)).to.be.false; - expect(pool.readyForWork(trigger2)).to.be.false; + expect(pool.readyForWork(triggerId1)).to.be.false; + expect(pool.readyForWork(triggerId2)).to.be.false; }); await worker.request({ method: "GET", path: "/" }, resp); scope.done(); - expect(pool.readyForWork(trigger1)).to.be.true; - expect(pool.readyForWork(trigger2)).to.be.true; + expect(pool.readyForWork(triggerId1)).to.be.true; + expect(pool.readyForWork(triggerId2)).to.be.true; }); }); }); From 9f3759042ef1c7ccc0a2759744468a729261aa63 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Wed, 25 Jan 2023 10:58:56 -0800 Subject: [PATCH 2/8] Remove test case - no longer appropriate. --- .../functionsEmulatorRuntime.spec.ts | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/scripts/emulator-tests/functionsEmulatorRuntime.spec.ts b/scripts/emulator-tests/functionsEmulatorRuntime.spec.ts index 4bbd63ce45f..9d7c270b32e 100644 --- a/scripts/emulator-tests/functionsEmulatorRuntime.spec.ts +++ b/scripts/emulator-tests/functionsEmulatorRuntime.spec.ts @@ -730,40 +730,6 @@ describe("FunctionsEmulator-Runtime", function () { expect(runtime.sysMsg["runtime-error"]?.length).to.eq(1); }); }); - - describe("Timeout", () => { - it("enforces configured timeout", async () => { - const timeoutEnvs = { - FUNCTIONS_EMULATOR_TIMEOUT_SECONDS: "1", - FUNCTIONS_EMULATOR_DISABLE_TIMEOUT: "false", - }; - runtime = await startRuntime( - "functionId", - "http", - () => { - return { - functionId: require("firebase-functions").https.onRequest( - (req: any, resp: any): Promise => { - return new Promise((resolve) => { - setTimeout(() => { - resp.sendStatus(200); - resolve(); - }, 5_000); - }); - } - ), - }; - }, - timeoutEnvs - ); - try { - await sendReq(runtime); - } catch (e: any) { - // Carry on - } - expect(runtime.sysMsg["runtime-error"]?.length).to.eq(1); - }); - }); }); describe("Debug", () => { From cec9d2c1fdb92cdc7f1c907d9c8b1132e3b294ed Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Wed, 25 Jan 2023 11:30:38 -0800 Subject: [PATCH 3/8] Add new test case. --- .../emulator-tests/functionsEmulator.spec.ts | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/scripts/emulator-tests/functionsEmulator.spec.ts b/scripts/emulator-tests/functionsEmulator.spec.ts index ef6d7c128d8..fc70f1e90ba 100644 --- a/scripts/emulator-tests/functionsEmulator.spec.ts +++ b/scripts/emulator-tests/functionsEmulator.spec.ts @@ -1037,4 +1037,33 @@ describe("FunctionsEmulator", function () { expect(triggerDefinitions[0].timeoutSeconds).to.equal(26); }); }); + + it("should enforce timeout", async () => { + await useFunction( + emu, + "timeoutFn", + () => { + return { + timeoutFn: require("firebase-functions") + .runWith({ timeoutSeconds: 1 }) + .https.onRequest((req: express.Request, res: express.Response): Promise => { + return new Promise((resolve) => { + setTimeout(() => { + res.sendStatus(200); + resolve(); + }, 5_000); + }); + }), + }; + }, + ["us-central1"], + { + timeoutSeconds: 1, + } + ); + + await supertest(emu.createHubServer()) + .get("/fake-project-id/us-central1/timeoutFn") + .expect(500); + }); }); From ffd76444e8a1da8a3b527ef5ad57b4ed281b71a7 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Tue, 31 Jan 2023 10:05:37 -0800 Subject: [PATCH 4/8] Refactor floating string. --- CHANGELOG.md | 1 + src/emulator/functionsRuntimeWorker.ts | 14 +++++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e69de29bb2d..349b92a3ed8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -0,0 +1 @@ +- Refactor the way timeouts are enforced by the Functions Emulator (#5464) diff --git a/src/emulator/functionsRuntimeWorker.ts b/src/emulator/functionsRuntimeWorker.ts index 7c2188e883c..241aece94b8 100644 --- a/src/emulator/functionsRuntimeWorker.ts +++ b/src/emulator/functionsRuntimeWorker.ts @@ -29,6 +29,14 @@ export enum RuntimeWorkerState { FINISHED = "FINISHED", } +/** + * Given no trigger key, worker is given this special key. + * + * This is useful when running the Functions Emulator in debug mode + * where single process shared amongst all triggers. + */ +const FREE_WORKER_KEY = "~free~"; + export class RuntimeWorker { readonly id: string; readonly triggerKey: string; @@ -46,7 +54,7 @@ export class RuntimeWorker { readonly timeoutSeconds?: number ) { this.id = uuid.v4(); - this.triggerKey = triggerId || "~free~"; + this.triggerKey = triggerId || FREE_WORKER_KEY; this.runtime = runtime; const childProc = this.runtime.process; @@ -118,14 +126,14 @@ export class RuntimeWorker { } request(req: http.RequestOptions, resp: http.ServerResponse, body?: unknown): Promise { - if (this.triggerKey !== "~free~") { + if (this.triggerKey !== FREE_WORKER_KEY) { this.logInfo(`Beginning execution of "${this.triggerKey}"`); } const startHrTime = process.hrtime(); this.state = RuntimeWorkerState.BUSY; const onFinish = (): void => { - if (this.triggerKey !== "~free~") { + if (this.triggerKey !== FREE_WORKER_KEY) { const elapsedHrTime = process.hrtime(startHrTime); this.logInfo( `Finished "${this.triggerKey}" in ${ From e1fc3bfc2e0eb92b71a996dacddf5fde42e8baca Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Wed, 25 Jan 2023 13:29:29 -0800 Subject: [PATCH 5/8] Add option to initialize python functions. --- src/deploy/functions/runtimes/python/index.ts | 31 +++++--- src/functions/python.ts | 2 + src/init/features/functions/index.ts | 7 ++ src/init/features/functions/python.ts | 71 +++++++++++++++++++ templates/init/functions/python/_gitignore | 0 templates/init/functions/python/main.py | 13 ++++ .../init/functions/python/requirements.txt | 1 + 7 files changed, 114 insertions(+), 11 deletions(-) create mode 100644 src/init/features/functions/python.ts create mode 100644 templates/init/functions/python/_gitignore create mode 100644 templates/init/functions/python/main.py create mode 100644 templates/init/functions/python/requirements.txt diff --git a/src/deploy/functions/runtimes/python/index.ts b/src/deploy/functions/runtimes/python/index.ts index 4e4989f9df5..8825e383f5a 100644 --- a/src/deploy/functions/runtimes/python/index.ts +++ b/src/deploy/functions/runtimes/python/index.ts @@ -13,7 +13,7 @@ import { runWithVirtualEnv } from "../../../../functions/python"; import { FirebaseError } from "../../../../error"; import { Build } from "../../build"; -const LATEST_VERSION: runtimes.Runtime = "python310"; +export const LATEST_VERSION: runtimes.Runtime = "python310"; /** * Create a runtime delegate for the Python runtime, if applicable. @@ -37,6 +37,24 @@ export async function tryCreateDelegate( return Promise.resolve(new Delegate(context.projectId, context.sourceDir, runtime)); } +/** + * Get corresponding python binary name for a given runtime. + * + * By default, returns "python" + */ +export function getPythonBinary(runtime: runtimes.Runtime): string { + if (process.platform === "win32") { + // There is no easy way to get specific version of python executable in Windows. + return "python.exe"; + } + if (runtime === "python310") { + return "python3.10"; + } else if (runtime === "python311") { + return "python3.11"; + } + return "python"; +} + export class Delegate implements runtimes.RuntimeDelegate { public readonly name = "python"; constructor( @@ -82,16 +100,7 @@ export class Delegate implements runtimes.RuntimeDelegate { } getPythonBinary(): string { - if (process.platform === "win32") { - // There is no easy way to get specific version of python executable in Windows. - return "python.exe"; - } - if (this.runtime === "python310") { - return "python3.10"; - } else if (this.runtime === "python311") { - return "python3.11"; - } - return "python"; + return getPythonBinary(this.runtime); } validate(): Promise { diff --git a/src/functions/python.ts b/src/functions/python.ts index f37b821e1d8..63c1cb9870c 100644 --- a/src/functions/python.ts +++ b/src/functions/python.ts @@ -12,6 +12,7 @@ export function runWithVirtualEnv( commandAndArgs: string[], cwd: string, envs: Record, + spawnOpts: cp.SpawnOptions = {}, venvDir = DEFAULT_VENV_DIR ): cp.ChildProcess { const activateScriptPath = @@ -25,6 +26,7 @@ export function runWithVirtualEnv( shell: true, cwd, stdio: [/* stdin= */ "pipe", /* stdout= */ "pipe", /* stderr= */ "pipe", "pipe"], + ...spawnOpts, // Linting disabled since internal types expect NODE_ENV which does not apply to Python runtimes. // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any env: envs as any, diff --git a/src/init/features/functions/index.ts b/src/init/features/functions/index.ts index f80875a5e67..1740104e467 100644 --- a/src/init/features/functions/index.ts +++ b/src/init/features/functions/index.ts @@ -13,6 +13,7 @@ import { assertUnique, } from "../../../functions/projectConfig"; import { FirebaseError } from "../../../error"; +import { isEnabled } from "../../../experiments"; const MAX_ATTEMPTS = 5; @@ -167,6 +168,12 @@ async function languageSetup(setup: any, config: Config): Promise { value: "typescript", }, ]; + if (isEnabled("pythonfunctions")) { + choices.push({ + name: "Python", + value: "python", + }); + } const language = await promptOnce({ type: "list", message: "What language would you like to use to write Cloud Functions?", diff --git a/src/init/features/functions/python.ts b/src/init/features/functions/python.ts new file mode 100644 index 00000000000..729b921fe26 --- /dev/null +++ b/src/init/features/functions/python.ts @@ -0,0 +1,71 @@ +import * as fs from "fs"; +import * as spawn from "cross-spawn"; +import * as path from "path"; + +import { Config } from "../../../config"; +import { getPythonBinary, LATEST_VERSION } from "../../../deploy/functions/runtimes/python"; +import { runWithVirtualEnv } from "../../../functions/python"; +import { promptOnce } from "../../../prompt"; + +const TEMPLATE_ROOT = path.resolve(__dirname, "../../../../templates/init/functions/python"); +const MAIN_TEMPLATE = fs.readFileSync(path.join(TEMPLATE_ROOT, "main.py"), "utf8"); +const REQUIREMENTS_TEMPLATE = fs.readFileSync(path.join(TEMPLATE_ROOT, "requirements.txt"), "utf8"); +const GITIGNORE_TEMPLATE = fs.readFileSync(path.join(TEMPLATE_ROOT, "_gitignore"), "utf8"); + +/** + * Create a Python Firebase Functions project. + */ +export async function setup(setup: any, config: Config): Promise { + await config.askWriteProjectFile( + `${setup.functions.source}/requirements.txt`, + REQUIREMENTS_TEMPLATE + ); + await config.askWriteProjectFile(`${setup.functions.source}/.gitignore`, GITIGNORE_TEMPLATE); + await config.askWriteProjectFile(`${setup.functions.source}/main.py`, MAIN_TEMPLATE); + + // Write the latest supported runtime version to the config. + config.set("functions.runtime", LATEST_VERSION); + // Add python specific ignores to config. + config.set("functions.ignore", ["venv", "__pycache__"]); + + // Setup VENV. + const venvProcess = spawn(getPythonBinary(LATEST_VERSION), ["-m", "venv", "venv"], { + shell: true, + cwd: config.path(setup.functions.source), + stdio: [/* stdin= */ "pipe", /* stdout= */ "pipe", /* stderr= */ "pipe", "pipe"], + }); + await new Promise((resolve, reject) => { + venvProcess.on("exit", resolve); + venvProcess.on("error", reject); + }); + + const install = await promptOnce({ + name: "install", + type: "confirm", + message: "Do you want to install dependencies now?", + default: true, + }); + if (install) { + // Update pip to support dependencies like pyyaml. + const upgradeProcess = runWithVirtualEnv( + ["pip3", "install", "--upgrade", "pip"], + config.path(setup.functions.source), + {}, + { stdio: ["inherit", "inherit", "inherit"] } + ); + await new Promise((resolve, reject) => { + upgradeProcess.on("exit", resolve); + upgradeProcess.on("error", reject); + }); + const installProcess = runWithVirtualEnv( + [getPythonBinary(LATEST_VERSION), "-m", "pip", "install", "-r", "requirements.txt"], + config.path(setup.functions.source), + {}, + { stdio: ["inherit", "inherit", "inherit"] } + ); + await new Promise((resolve, reject) => { + installProcess.on("exit", resolve); + installProcess.on("error", reject); + }); + } +} diff --git a/templates/init/functions/python/_gitignore b/templates/init/functions/python/_gitignore new file mode 100644 index 00000000000..e69de29bb2d diff --git a/templates/init/functions/python/main.py b/templates/init/functions/python/main.py new file mode 100644 index 00000000000..1d82099501f --- /dev/null +++ b/templates/init/functions/python/main.py @@ -0,0 +1,13 @@ +# Welcome to Cloud Functions for Firebase for Python! +# To get started, simply uncomment the below code or create your own. +# Deploy with `firebase deploy` + +from firebase_functions import https +from firebase_admin import initialize_app + +# initialize_app() +# +# +# @https.on_request() +# def on_request_example(req: https.Request) -> https.Response: +# return https.Response("Hello world!") \ No newline at end of file diff --git a/templates/init/functions/python/requirements.txt b/templates/init/functions/python/requirements.txt new file mode 100644 index 00000000000..b762b1f2877 --- /dev/null +++ b/templates/init/functions/python/requirements.txt @@ -0,0 +1 @@ +git+https://github.com/firebase/firebase-functions-python.git@main#egg=firebase-functions \ No newline at end of file From b30fcdf296b6d424c46d9ffd4f9e25c220d4d464 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 26 Jan 2023 10:05:55 -0800 Subject: [PATCH 6/8] Fix emulator support for cloudevent to better conform to production specification. --- src/emulator/pubsubEmulator.ts | 17 +++++++++++++---- src/emulator/storage/cloudFunctions.ts | 6 +++--- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/emulator/pubsubEmulator.ts b/src/emulator/pubsubEmulator.ts index a208407a930..1f8be4716b1 100644 --- a/src/emulator/pubsubEmulator.ts +++ b/src/emulator/pubsubEmulator.ts @@ -189,20 +189,29 @@ export class PubsubEmulator implements EmulatorInstance { topic: string, message: Message ): CloudEvent { + // Pubsub events from Pubsub Emulator include a date with nanoseconds. + // Prod Pubsub doesn't publish timestamp at that level of precision. Timestamp with nanosecond precision also + // are difficult to parse in languages other than Node.js (e.g. python). + const truncatedPublishTime = new Date(message.publishTime.getMilliseconds()).toISOString(); const data: MessagePublishedData = { message: { messageId: message.id, - publishTime: message.publishTime, + publishTime: truncatedPublishTime, attributes: message.attributes, orderingKey: message.orderingKey, data: message.data.toString("base64"), - }, + + // NOTE: We include camel_cased attributes since they also available and depended on by other runtimes + // like python. + message_id: message.id, + publish_time: truncatedPublishTime, + } as MessagePublishedData["message"], subscription: this.subscriptionForTopic.get(topic)!.name, }; return { - specversion: "1", + specversion: "1.0", id: uuid.v4(), - time: message.publishTime.toISOString(), + time: truncatedPublishTime, type: "google.cloud.pubsub.topic.v1.messagePublished", source: `//pubsub.googleapis.com/projects/${this.args.projectId}/topics/${topic}`, data, diff --git a/src/emulator/storage/cloudFunctions.ts b/src/emulator/storage/cloudFunctions.ts index 7308c67018e..bdd8aca82d3 100644 --- a/src/emulator/storage/cloudFunctions.ts +++ b/src/emulator/storage/cloudFunctions.ts @@ -106,7 +106,7 @@ export class StorageCloudFunctions { time = typeof data.updated === "string" ? data.updated : data.updated.toISOString(); } return { - specversion: "1", + specversion: "1.0", id: uuid.v4(), type: `google.cloud.storage.object.v1.${ceAction}`, source: `//storage.googleapis.com/projects/_/buckets/${objectMetadataPayload.bucket}/objects/${objectMetadataPayload.name}`, @@ -255,9 +255,9 @@ export interface ObjectMetadataPayload { * Customer-supplied encryption key. * * This object contains the following properties: - * * `encryptionAlgorithm` (`string|undefined`): The encryption algorithm that + * `encryptionAlgorithm` (`string|undefined`): The encryption algorithm that * was used. Always contains the value `AES256`. - * * `keySha256` (`string|undefined`): An RFC 4648 base64-encoded string of the + * `keySha256` (`string|undefined`): An RFC 4648 base64-encoded string of the * SHA256 hash of your encryption key. You can use this SHA256 hash to * uniquely identify the AES-256 encryption key required to decrypt the * object, which you must store securely. From ce4f5ccc97da694c89aa0dc1ba062c2f2dd57129 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 26 Jan 2023 13:00:06 -0800 Subject: [PATCH 7/8] Add more logging for python discovery server. --- src/deploy/functions/runtimes/python/index.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/deploy/functions/runtimes/python/index.ts b/src/deploy/functions/runtimes/python/index.ts index 8825e383f5a..37a6c546fcd 100644 --- a/src/deploy/functions/runtimes/python/index.ts +++ b/src/deploy/functions/runtimes/python/index.ts @@ -129,6 +129,14 @@ export class Delegate implements runtimes.RuntimeDelegate { )} in ${this.sourceDir}` ); const childProcess = runWithVirtualEnv(args, this.sourceDir, envWithAdminPort); + childProcess.stdout?.on("data", (chunk: Buffer) => { + const chunkString = chunk.toString(); + logger.debug(`stdout: ${chunkString}`); + }); + childProcess.stderr?.on("data", (chunk: Buffer) => { + const chunkString = chunk.toString(); + logger.debug(`stderr: ${chunkString}`); + }); return Promise.resolve(async () => { await fetch(`http://127.0.0.1:${port}/__/quitquitquit`); const quitTimeout = setTimeout(() => { From 9e7920996a764929d180bdaa597c79962735c5ce Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Tue, 31 Jan 2023 15:11:34 -0800 Subject: [PATCH 8/8] Add changelog. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 349b92a3ed8..75601213bf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1 +1,2 @@ - Refactor the way timeouts are enforced by the Functions Emulator (#5464) +- Fix bug where cloudevent emitted by various emulators didn't conform to spec (#5466)