Skip to content

Commit 544b19f

Browse files
committed
src/patchset.py: Implement Patchset service
Patchset service process patchset nodes: - Wait for parent checkout node to be available - Download checkout node tarball - Apply patches and calculate patchset hash - Upload new tarball Signed-off-by: Nikolay Yurin <[email protected]>
1 parent c1f4cb0 commit 544b19f

File tree

6 files changed

+374
-21
lines changed

6 files changed

+374
-21
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
.env
22
.docker-env
33
data
4+
*.pyc
5+
*.venv

config/kernelci.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ kdir = "/home/kernelci/data/src/linux"
1313
output = "/home/kernelci/data/output"
1414
storage_config = "docker-host"
1515

16+
[patchset]
17+
kdir = "/home/kernelci/data/src/linux-patchset"
18+
output = "/home/kernelci/data/output"
19+
storage_config = "docker-host"
20+
1621
[scheduler]
1722
output = "/home/kernelci/data/output"
1823

docker-compose.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,17 @@ services:
153153
- '--settings=${KCI_SETTINGS:-/home/kernelci/config/kernelci.toml}'
154154
- 'run'
155155
- '--mode=holdoff'
156+
157+
patchset:
158+
<<: *base-service
159+
container_name: 'kernelci-pipeline-patchset'
160+
command:
161+
- './pipeline/patchset.py'
162+
- '--settings=${KCI_SETTINGS:-/home/kernelci/config/kernelci.toml}'
163+
- 'run'
164+
volumes:
165+
- './src:/home/kernelci/pipeline'
166+
- './config:/home/kernelci/config'
167+
- './data/ssh:/home/kernelci/data/ssh'
168+
- './data/src:/home/kernelci/data/src'
169+
- './data/output:/home/kernelci/data/output'

src/monitor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,10 @@ def _run(self, sub_id):
6060
event = self._api.receive_event(sub_id)
6161
obj = event.data
6262
dt = datetime.datetime.fromisoformat(event['time'])
63-
commit = (obj['data']['kernel_revision']['commit'][:12]
64-
if 'kernel_revision' in obj['data']
65-
else str(None))
63+
try:
64+
commit = obj['data']['kernel_revision']['commit'][:12]
65+
except (KeyError, TypeError):
66+
commit = str(None)
6667
result = result_map[obj['result']] if obj['result'] else str(None)
6768
print(self.LOG_FMT.format(
6869
time=dt.strftime('%Y-%m-%d %H:%M:%S.%f'),

src/patchset.py

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
#!/usr/bin/env python3
2+
#
3+
# SPDX-License-Identifier: LGPL-2.1-or-later
4+
#
5+
# Copyright (C) 2022 Collabora Limited
6+
# Author: Nikolay Yurin <[email protected]>
7+
8+
import os
9+
import sys
10+
import json
11+
import requests
12+
import time
13+
import tempfile
14+
import hashlib
15+
from datetime import datetime, timedelta
16+
from urllib.parse import urlparse
17+
from urllib.request import urlopen
18+
19+
import kernelci
20+
import kernelci.build
21+
import kernelci.config
22+
from kernelci.legacy.cli import Args, Command, parse_opts
23+
import kernelci.storage
24+
25+
from tarball import Tarball
26+
27+
# FIXME: make patchset service configuration option
28+
ALLOWED_DOMAINS = {"patchwork.kernel.org"}
29+
PATCHSET_SHORT_HASH_LEN = 13
30+
POLLING_DELAY_SECS = 30
31+
PATCH_TMP_FILE_PREFIX = "kernel-patch"
32+
33+
34+
class Patchset(Tarball):
35+
TAR_CREATE_CMD = """\
36+
set -e
37+
cd {target_dir}/
38+
tar --create --transform "s/^/{prefix}\\//" * | gzip > {tarball_path}
39+
"""
40+
41+
APPLY_PATCH_SHELL_CMD = """\
42+
set -e
43+
cd {kdir}
44+
patch -p1 < {patch_file}
45+
"""
46+
47+
# FIXME: I really don"t have a good idea what I"m doing here
48+
# This code probably needs rework and put into kernelci.patch
49+
def _hash_patch(self, patch_name, patch_file):
50+
allowed_prefixes = {
51+
b"old mode", # Old file permissions
52+
b"new mode", # New file permissions
53+
b"-", # This convers both removed lines and source file
54+
b"+", # This convers both added lines and target file
55+
# "@" I don"t know how we should handle hunks yet
56+
}
57+
hashable_patch_lines = []
58+
for line in patch_file.readlines():
59+
if not line:
60+
continue
61+
62+
for prefix in allowed_prefixes:
63+
if line.startswith(prefix):
64+
hashable_patch_lines.append(line)
65+
break
66+
67+
hashable_content = b"/n".join(hashable_patch_lines)
68+
self.log.debug(
69+
"Hashable content:\n" +
70+
hashable_content.decode("utf-8")
71+
)
72+
patch_hash_digest = hashlib.sha256(hashable_content).hexdigest()
73+
self.log.debug(f"Patch {patch_name} hash: {patch_hash_digest}")
74+
return patch_hash_digest
75+
76+
# FIXME: move into kernelci.patch
77+
def _apply_patch(self, kdir, patch_name, patch_url):
78+
self.log.info(
79+
f"Applying patch {patch_name}, url: {patch_url}",
80+
)
81+
try:
82+
encoding = urlopen(patch_url).headers.get_charsets()[0]
83+
except Exception as e:
84+
self.log.warn(
85+
"Failed to fetch encoding from patch "
86+
f"{patch_name} headers: {e}"
87+
)
88+
self.log.warn("Falling back to utf-8 encoding")
89+
encoding = "utf-8"
90+
91+
with tempfile.NamedTemporaryFile(
92+
prefix="{}-{}-".format(PATCH_TMP_FILE_PREFIX, patch_name),
93+
encoding=encoding
94+
) as tmp_f:
95+
if not kernelci.build._download_file(patch_url, tmp_f.name):
96+
raise FileNotFoundError(
97+
f"Error downloading patch from {patch_url}"
98+
)
99+
100+
kernelci.shell_cmd(self.APPLY_PATCH_SHELL_CMD.format(
101+
kdir=kdir,
102+
patch_file=tmp_f.name,
103+
))
104+
105+
return self._hash_patch(patch_name, tmp_f)
106+
107+
# FIXME: move into kernelci.patch
108+
def _apply_patches(self, kdir, patch_artifacts):
109+
patchset_hash = hashlib.sha256()
110+
for patch_name, patch_url in patch_artifacts.items():
111+
patch_hash = self._apply_patch(kdir, patch_name, patch_url)
112+
patchset_hash.update(patch_hash.encode("utf-8"))
113+
114+
patchset_hash_digest = patchset_hash.hexdigest()
115+
self.log.debug(f"Patchset hash: {patchset_hash_digest}")
116+
return patchset_hash_digest
117+
118+
def _download_checkout_archive(self, tarball_url, retries=3):
119+
self.log.info(f"Downloading checkout tarball, url: {tarball_url}")
120+
tar_filename = os.path.basename(urlparse(tarball_url).path)
121+
kernelci.build.pull_tarball(
122+
kdir=self._kdir,
123+
url=tarball_url,
124+
dest_filename=tar_filename,
125+
retries=retries,
126+
delete=True
127+
)
128+
129+
def _update_node(
130+
self,
131+
patchset_node,
132+
checkout_node,
133+
tarball_url,
134+
patchset_hash
135+
):
136+
patchset_data = checkout_node.get("data", {}).copy()
137+
patchset_data["kernel_revision"]["patchset"] = patchset_hash
138+
139+
updated_node = patchset_node.copy()
140+
updated_node["artifacts"]["tarball"] = tarball_url
141+
updated_node["state"] = "available"
142+
updated_node["data"] = patchset_data
143+
updated_node["holdoff"] = str(
144+
datetime.utcnow() + timedelta(minutes=10)
145+
)
146+
147+
try:
148+
self._api.node.update(updated_node)
149+
except requests.exceptions.HTTPError as err:
150+
err_msg = json.loads(err.response.content).get("detail", [])
151+
self.log.error(err_msg)
152+
153+
def _setup(self, *args):
154+
return self._api_helper.subscribe_filters({
155+
"op": "created",
156+
"name": "patchset",
157+
"state": "running",
158+
})
159+
160+
def _has_allowed_domain(self, url):
161+
domain = urlparse(url).hostname
162+
if domain not in ALLOWED_DOMAINS:
163+
raise RuntimeError(
164+
"Forbidden mbox domain %s, allowed domains: %s",
165+
domain,
166+
ALLOWED_DOMAINS
167+
)
168+
169+
def _get_patch_artifacts(self, patchset_node):
170+
node_artifacts = patchset_node.get("artifacts")
171+
if not node_artifacts:
172+
raise ValueError(
173+
"Patchset node %s has no artifacts",
174+
patchset_node["id"],
175+
)
176+
177+
for patch_mbox_url in node_artifacts.values():
178+
self._has_allowed_domain(patch_mbox_url)
179+
180+
return node_artifacts
181+
182+
def _gen_checkout_name(self, checkout_node):
183+
revision = checkout_node["data"]["kernel_revision"]
184+
return "-".join([
185+
"linux",
186+
revision["tree"],
187+
revision["branch"],
188+
revision["describe"],
189+
])
190+
191+
def _process_patchset(self, checkout_node, patchset_node):
192+
patch_artifacts = self._get_patch_artifacts(patchset_node)
193+
self._download_checkout_archive(checkout_node["artifacts"]["tarball"])
194+
195+
checkout_name = self._gen_checkout_name(checkout_node)
196+
checkout_path = os.path.join(self._kdir, checkout_name)
197+
198+
patchset_hash = self._apply_patches(checkout_path, patch_artifacts)
199+
patchset_hash_short = patchset_hash[:PATCHSET_SHORT_HASH_LEN]
200+
201+
tarball_path = self._make_tarball(
202+
target_dir=checkout_path,
203+
tarball_name=f"{checkout_name}-{patchset_hash_short}"
204+
)
205+
tarball_url = self._push_tarball(tarball_path)
206+
207+
self._update_node(
208+
patchset_node=patchset_node,
209+
checkout_node=checkout_node,
210+
tarball_url=tarball_url,
211+
patchset_hash=patchset_hash
212+
)
213+
214+
def _mark_failed(self, patchset_node):
215+
node = patchset_node.copy()
216+
node.update({
217+
"state": "done",
218+
"result": "fail",
219+
})
220+
try:
221+
self._api.node.update(node)
222+
except requests.exceptions.HTTPError as err:
223+
err_msg = json.loads(err.response.content).get("detail", [])
224+
self.log.error(err_msg)
225+
226+
def _mark_failed_if_no_parent(self, patchset_node):
227+
if not patchset_node["parent"]:
228+
self.log.error(
229+
f"Patchset node {patchset_node['id']} as has no parent"
230+
"checkout node , marking node as failed",
231+
)
232+
self._mark_failed(patchset_node)
233+
return True
234+
235+
return False
236+
237+
def _mark_failed_if_parent_failed(self, patchset_node, checkout_node):
238+
if (
239+
checkout_node["state"] == "done" and
240+
checkout_node["result"] == "fail"
241+
):
242+
self.log.error(
243+
f"Parent checkout node {checkout_node['id']} failed, "
244+
f"marking patchset node {patchset_node['id']} as failed",
245+
)
246+
self._mark_failed(patchset_node)
247+
return True
248+
249+
return False
250+
251+
def _run(self, sub_id):
252+
self.log.info("Listening for new trigger events")
253+
self.log.info("Press Ctrl-C to stop.")
254+
255+
while True:
256+
patchset_nodes = self._api.node.find({
257+
"name": "patchset",
258+
"state": "running",
259+
})
260+
261+
if patchset_nodes:
262+
self.log.debug(f"Found patchset nodes: {patchset_nodes}")
263+
264+
for patchset_node in patchset_nodes:
265+
if self._mark_failed_if_no_parent(patchset_node):
266+
continue
267+
268+
checkout_node = self._api.node.get(patchset_node["parent"])
269+
270+
if self._mark_failed_if_parent_failed(
271+
patchset_node,
272+
checkout_node
273+
):
274+
continue
275+
276+
if checkout_node["state"] == "running":
277+
self.log.info(
278+
f"Patchset node {patchset_node['id']} is waiting "
279+
f"for checkout node {checkout_node['id']} to complete",
280+
)
281+
continue
282+
283+
try:
284+
self.log.info(
285+
f"Processing patchset node: {patchset_node['id']}",
286+
)
287+
self._process_patchset(checkout_node, patchset_node)
288+
except Exception as e:
289+
self.log.error(
290+
f"Patchset node {patchset_node['id']} "
291+
f"processing failed: {e}",
292+
)
293+
self.log.traceback()
294+
self._mark_failed(patchset_node)
295+
296+
self.log.info(
297+
f"Waiting {POLLING_DELAY_SECS} seconds for a new nodes...",
298+
)
299+
time.sleep(POLLING_DELAY_SECS)
300+
301+
302+
class cmd_run(Command):
303+
help = (
304+
"Wait for a checkout node to be available "
305+
"and push a source+patchset tarball"
306+
)
307+
args = [
308+
Args.kdir, Args.output, Args.api_config, Args.storage_config,
309+
]
310+
opt_args = [
311+
Args.verbose, Args.storage_cred,
312+
]
313+
314+
def __call__(self, configs, args):
315+
return Patchset(configs, args).run(args)
316+
317+
318+
if __name__ == "__main__":
319+
opts = parse_opts("patchset", globals())
320+
configs = kernelci.config.load("config/pipeline.yaml")
321+
status = opts.command(configs, opts)
322+
sys.exit(0 if status is True else 1)

0 commit comments

Comments
 (0)