Skip to content

Commit 566766c

Browse files
feat: release target releases endpoint and tests (#515)
1 parent 1f3c032 commit 566766c

File tree

16 files changed

+4852
-4060
lines changed

16 files changed

+4852
-4060
lines changed

apps/event-worker/src/utils/dispatch-evaluate-jobs.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import type * as schema from "@ctrlplane/db/schema";
1+
import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine";
22

33
import { Channel, getQueue } from "@ctrlplane/events";
44

5-
export const dispatchEvaluateJobs = async (rts: schema.ReleaseTarget[]) => {
5+
export const dispatchEvaluateJobs = async (rts: ReleaseTargetIdentifier[]) => {
66
const jobs = rts.map((rt) => ({
77
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
88
data: rt,

apps/event-worker/src/workers/compute-deployment-resource-selector.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ export const computeDeploymentResourceSelectorWorkerEvent = createWorker(
2020
await db.transaction(async (tx) => {
2121
await tx.execute(
2222
sql`
23-
SELECT * from ${schema.deployment}
24-
WHERE ${schema.deployment.id} = ${id}
23+
SELECT * from ${schema.computedDeploymentResource}
24+
WHERE ${eq(schema.computedDeploymentResource.deploymentId, deployment.id)}
2525
FOR UPDATE NOWAIT
2626
`,
2727
);
@@ -58,7 +58,7 @@ export const computeDeploymentResourceSelectorWorkerEvent = createWorker(
5858
.onConflictDoNothing();
5959
});
6060

61-
getQueue(Channel.ComputeSystemsReleaseTargets).add(
61+
await getQueue(Channel.ComputeSystemsReleaseTargets).add(
6262
deployment.system.id,
6363
deployment.system,
6464
);

apps/event-worker/src/workers/compute-environment-resource-selector.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ export const computeEnvironmentResourceSelectorWorkerEvent = createWorker(
3838
// acquire a lock on the environment
3939
await tx.execute(
4040
sql`
41-
SELECT * from ${schema.environment}
42-
WHERE ${schema.environment.id} = ${id}
41+
SELECT * from ${schema.computedEnvironmentResource}
42+
WHERE ${eq(schema.computedEnvironmentResource.environmentId, environment.id)}
4343
FOR UPDATE NOWAIT
4444
`,
4545
);
@@ -79,7 +79,7 @@ export const computeEnvironmentResourceSelectorWorkerEvent = createWorker(
7979
.onConflictDoNothing();
8080
});
8181

82-
getQueue(Channel.ComputeSystemsReleaseTargets).add(
82+
await getQueue(Channel.ComputeSystemsReleaseTargets).add(
8383
environment.system.id,
8484
environment.system,
8585
);

apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Tx } from "@ctrlplane/db";
22

3-
import { and, eq, inArray, isNull, selector, sql } from "@ctrlplane/db";
3+
import { and, eq, isNull, selector, sql } from "@ctrlplane/db";
44
import { db } from "@ctrlplane/db/client";
55
import * as schema from "@ctrlplane/db/schema";
66
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
@@ -67,14 +67,12 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker(
6767
const { workspaceId } = policy;
6868

6969
try {
70-
const rts = await db.transaction(async (tx) => {
70+
await db.transaction(async (tx) => {
7171
await tx.execute(
7272
sql`
73-
SELECT * FROM ${schema.system}
74-
INNER JOIN ${schema.environment} ON ${eq(schema.environment.systemId, schema.system.id)}
75-
INNER JOIN ${schema.deployment} ON ${eq(schema.deployment.systemId, schema.system.id)}
76-
INNER JOIN ${schema.releaseTarget} ON ${eq(schema.releaseTarget.environmentId, schema.environment.id)}
77-
WHERE ${eq(schema.system.workspaceId, workspaceId)}
73+
SELECT * from ${schema.computedPolicyTargetReleaseTarget}
74+
INNER JOIN ${schema.releaseTarget} ON ${eq(schema.releaseTarget.id, schema.computedPolicyTargetReleaseTarget.releaseTargetId)}
75+
WHERE ${eq(schema.computedPolicyTargetReleaseTarget.policyTargetId, policyTarget.id)}
7876
FOR UPDATE NOWAIT
7977
`,
8078
);
@@ -93,28 +91,41 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker(
9391
policyTarget,
9492
);
9593

96-
if (releaseTargets.length === 0) return [];
97-
return tx
94+
if (releaseTargets.length === 0) return;
95+
await tx
9896
.insert(schema.computedPolicyTargetReleaseTarget)
9997
.values(releaseTargets)
100-
.onConflictDoNothing()
101-
.returning();
98+
.onConflictDoNothing();
10299
});
103100

104-
if (rts.length === 0) return;
101+
const releaseTargets = await db
102+
.select()
103+
.from(schema.releaseTarget)
104+
.innerJoin(
105+
schema.resource,
106+
eq(schema.releaseTarget.resourceId, schema.resource.id),
107+
)
108+
.where(
109+
and(
110+
isNull(schema.resource.deletedAt),
111+
eq(schema.resource.workspaceId, workspaceId),
112+
),
113+
)
114+
.then((rows) => rows.map((row) => row.release_target));
105115

106-
const releaseTargets = await db.query.releaseTarget.findMany({
107-
where: inArray(
108-
schema.releaseTarget.id,
109-
rts.map((rt) => rt.releaseTargetId),
116+
const queueInsertionPromises = releaseTargets.map((rt) =>
117+
getQueue(Channel.EvaluateReleaseTarget).add(
118+
`${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
119+
rt,
120+
{
121+
deduplication: {
122+
id: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
123+
ttl: 500,
124+
},
125+
},
110126
),
111-
});
112-
113-
const jobs = releaseTargets.map((rt) => ({
114-
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
115-
data: rt,
116-
}));
117-
await getQueue(Channel.EvaluateReleaseTarget).addBulk(jobs);
127+
);
128+
await Promise.all(queueInsertionPromises);
118129
} catch (e: any) {
119130
const isRowLocked = e.code === "55P03";
120131
if (isRowLocked) {

apps/event-worker/src/workers/compute-systems-release-targets.ts

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { db } from "@ctrlplane/db/client";
55
import * as schema from "@ctrlplane/db/schema";
66
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
77

8+
import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js";
9+
810
const findMatchingEnvironmentDeploymentPairs = (
911
tx: Tx,
1012
system: { id: string; workspaceId: string },
@@ -75,50 +77,60 @@ export const computeSystemsReleaseTargetsWorker = createWorker(
7577
const environmentIds = environments.map((e) => e.id);
7678
const { workspaceId } = system;
7779

80+
if (deploymentIds.length === 0 || environmentIds.length === 0) return;
81+
7882
try {
7983
const createdReleaseTargets = await db.transaction(async (tx) => {
8084
await tx.execute(
8185
sql`
82-
SELECT * FROM ${schema.system}
83-
INNER JOIN ${schema.environment} ON ${eq(schema.environment.systemId, schema.system.id)}
84-
INNER JOIN ${schema.deployment} ON ${eq(schema.deployment.systemId, schema.system.id)}
85-
INNER JOIN ${schema.releaseTarget} ON ${eq(schema.releaseTarget.environmentId, schema.environment.id)}
86-
WHERE ${eq(schema.system.id, systemId)}
86+
SELECT ${schema.releaseTarget.id} FROM ${schema.releaseTarget}
87+
WHERE ${or(
88+
inArray(schema.releaseTarget.deploymentId, deploymentIds),
89+
inArray(schema.releaseTarget.environmentId, environmentIds),
90+
)}
8791
FOR UPDATE NOWAIT
8892
`,
8993
);
9094

91-
const previousReleaseTargets = await tx
92-
.delete(schema.releaseTarget)
93-
.where(
94-
or(
95-
inArray(schema.releaseTarget.deploymentId, deploymentIds),
96-
inArray(schema.releaseTarget.environmentId, environmentIds),
97-
),
98-
)
99-
.returning();
95+
await tx.execute(
96+
sql`
97+
SELECT * FROM ${schema.computedEnvironmentResource}
98+
WHERE ${inArray(schema.computedEnvironmentResource.environmentId, environmentIds)}
99+
FOR UPDATE NOWAIT
100+
`,
101+
);
102+
103+
await tx.execute(
104+
sql`
105+
SELECT * FROM ${schema.computedDeploymentResource}
106+
WHERE ${inArray(schema.computedDeploymentResource.deploymentId, deploymentIds)}
107+
FOR UPDATE NOWAIT
108+
`,
109+
);
110+
111+
const previousReleaseTargets = await tx.query.releaseTarget.findMany({
112+
where: or(
113+
inArray(schema.releaseTarget.deploymentId, deploymentIds),
114+
inArray(schema.releaseTarget.environmentId, environmentIds),
115+
),
116+
});
100117

101118
const releaseTargets = await findMatchingEnvironmentDeploymentPairs(
102119
tx,
103120
system,
104121
);
105122

106-
if (releaseTargets.length > 0)
107-
await tx
108-
.insert(schema.releaseTarget)
109-
.values(releaseTargets)
110-
.onConflictDoNothing();
111-
112123
const created = releaseTargets.filter(
113124
(rt) =>
114125
!previousReleaseTargets.some(
115126
(prevRt) =>
116127
prevRt.deploymentId === rt.deploymentId &&
117-
prevRt.resourceId === rt.resourceId,
128+
prevRt.resourceId === rt.resourceId &&
129+
prevRt.environmentId === rt.environmentId,
118130
),
119131
);
120-
121-
return created;
132+
if (created.length === 0) return [];
133+
return tx.insert(schema.releaseTarget).values(created).returning();
122134
});
123135

124136
if (createdReleaseTargets.length === 0) return;
@@ -132,18 +144,24 @@ export const computeSystemsReleaseTargetsWorker = createWorker(
132144
)
133145
.where(eq(schema.policy.workspaceId, workspaceId));
134146

135-
for (const { policy_target: policyTarget } of policyTargets) {
136-
getQueue(Channel.ComputePolicyTargetReleaseTargetSelector).add(
137-
policyTarget.id,
138-
policyTarget,
139-
);
147+
if (policyTargets.length > 0) {
148+
for (const { policy_target: policyTarget } of policyTargets) {
149+
getQueue(Channel.ComputePolicyTargetReleaseTargetSelector).add(
150+
policyTarget.id,
151+
policyTarget,
152+
);
153+
}
154+
return;
140155
}
156+
157+
await dispatchEvaluateJobs(createdReleaseTargets);
141158
} catch (e: any) {
142159
const isRowLocked = e.code === "55P03";
143160
if (isRowLocked) {
144161
await getQueue(Channel.ComputeSystemsReleaseTargets).add(
145162
job.name,
146163
job.data,
164+
{ delay: 500 },
147165
);
148166
return;
149167
}

0 commit comments

Comments
 (0)