Skip to content

Commit 0a2fe9c

Browse files
yurinnickJenySadadia
authored andcommitted
src/patchset.py: Implement Patchset service
Patchset service process patchset nodes: - Wait for parent checkout node to be available - Download checkout node tarball - Apply patches and calculate patchset hash - Upload new tarball Signed-off-by: Nikolay Yurin <[email protected]>
1 parent 7e179c2 commit 0a2fe9c

File tree

6 files changed

+413
-40
lines changed

6 files changed

+413
-40
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@
22
.docker-env
33
data
44
*.pyc
5+
*.venv
6+

config/kernelci.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ kdir = "/home/kernelci/data/src/linux"
1313
output = "/home/kernelci/data/output"
1414
storage_config = "docker-host"
1515

16+
[patchset]
17+
kdir = "/home/kernelci/data/src/linux-patchset"
18+
output = "/home/kernelci/data/output"
19+
storage_config = "docker-host"
20+
patchset_tmp_file_prefix = "kernel-patch"
21+
patchset_short_hash_len = 13
22+
allowed_domains = ["patchwork.kernel.org"]
23+
polling_delay_secs = 30
24+
1625
[scheduler]
1726
output = "/home/kernelci/data/output"
1827

docker-compose.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,18 @@ services:
193193
- '--mode=holdoff'
194194
extra_hosts:
195195
- "host.docker.internal:host-gateway"
196+
197+
patchset:
198+
<<: *base-service
199+
container_name: 'kernelci-pipeline-patchset'
200+
command:
201+
- './pipeline/patchset.py'
202+
- '--settings=${KCI_SETTINGS:-/home/kernelci/config/kernelci.toml}'
203+
- 'run'
204+
volumes:
205+
- './src:/home/kernelci/pipeline'
206+
- './config:/home/kernelci/config'
207+
- './data/ssh:/home/kernelci/data/ssh'
208+
- './data/src:/home/kernelci/data/src'
209+
- './data/output:/home/kernelci/data/output'
210+

src/monitor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,10 @@ def _run(self, sub_id):
6060
event = self._api.receive_event(sub_id)
6161
obj = event.data
6262
dt = datetime.datetime.fromisoformat(event['time'])
63-
commit = (obj['data']['kernel_revision']['commit'][:12]
64-
if 'kernel_revision' in obj['data']
65-
else str(None))
63+
try:
64+
commit = obj['data']['kernel_revision']['commit'][:12]
65+
except (KeyError, TypeError):
66+
commit = str(None)
6667
result = result_map[obj['result']] if obj['result'] else str(None)
6768
print(self.LOG_FMT.format(
6869
time=dt.strftime('%Y-%m-%d %H:%M:%S.%f'),

src/patchset.py

Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
#!/usr/bin/env python3
2+
#
3+
# SPDX-License-Identifier: LGPL-2.1-or-later
4+
#
5+
# Copyright (c) Meta Platforms, Inc. and affiliates.
6+
# Author: Nikolay Yurin <[email protected]>
7+
8+
import os
9+
import sys
10+
import json
11+
import requests
12+
import time
13+
import tempfile
14+
import hashlib
15+
from datetime import datetime, timedelta
16+
from urllib.parse import urlparse
17+
from urllib.request import urlopen
18+
19+
import kernelci
20+
import kernelci.build
21+
import kernelci.config
22+
from kernelci.legacy.cli import Args, Command, parse_opts
23+
import kernelci.storage
24+
25+
from base import Service
26+
from tarball import Tarball
27+
28+
29+
class Patchset(Tarball):
30+
TAR_CREATE_CMD = """\
31+
set -e
32+
cd {target_dir}
33+
tar --create --transform "s/^/{prefix}\\//" * | gzip > {tarball_path}
34+
"""
35+
36+
APPLY_PATCH_SHELL_CMD = """\
37+
set -e
38+
cd {checkout_path}
39+
patch -p1 < {patch_file}
40+
"""
41+
42+
# FIXME: I really don"t have a good idea what I"m doing here
43+
# This code probably needs rework and put into kernelci.patch
44+
def _hash_patch(self, patch_name, patch_file):
45+
allowed_prefixes = {
46+
b"old mode", # Old file permissions
47+
b"new mode", # New file permissions
48+
b"-", # This convers both removed lines and source file
49+
b"+", # This convers both added lines and target file
50+
# "@" I don"t know how we should handle hunks yet
51+
}
52+
hashable_patch_lines = []
53+
for line in patch_file.readlines():
54+
if not line:
55+
continue
56+
57+
for prefix in allowed_prefixes:
58+
if line.startswith(prefix):
59+
hashable_patch_lines.append(line)
60+
break
61+
62+
hashable_content = b"/n".join(hashable_patch_lines)
63+
self.log.debug(
64+
"Hashable content:\n" +
65+
hashable_content.decode("utf-8")
66+
)
67+
patch_hash_digest = hashlib.sha256(hashable_content).hexdigest()
68+
self.log.debug(f"Patch {patch_name} hash: {patch_hash_digest}")
69+
return patch_hash_digest
70+
71+
# FIXME: move into kernelci.patch
72+
def _apply_patch(self, checkout_path, patch_name, patch_url):
73+
self.log.info(
74+
f"Applying patch {patch_name}, url: {patch_url}",
75+
)
76+
try:
77+
encoding = urlopen(patch_url).headers.get_charsets()[0]
78+
except Exception as e:
79+
self.log.warn(
80+
"Failed to fetch encoding from patch "
81+
f"{patch_name} headers: {e}"
82+
)
83+
self.log.warn("Falling back to utf-8 encoding")
84+
encoding = "utf-8"
85+
86+
with tempfile.NamedTemporaryFile(
87+
prefix="{}-{}-".format(
88+
self._service_config.patchset_tmp_file_prefix,
89+
patch_name
90+
),
91+
encoding=encoding
92+
) as tmp_f:
93+
if not kernelci.build._download_file(patch_url, tmp_f.name):
94+
raise FileNotFoundError(
95+
f"Error downloading patch from {patch_url}"
96+
)
97+
98+
kernelci.shell_cmd(self.APPLY_PATCH_SHELL_CMD.format(
99+
checkout_path=checkout_path,
100+
patch_file=tmp_f.name,
101+
))
102+
103+
return self._hash_patch(patch_name, tmp_f)
104+
105+
# FIXME: move into kernelci.patch
106+
def _apply_patches(self, checkout_path, patch_artifacts):
107+
patchset_hash = hashlib.sha256()
108+
for patch_name, patch_url in patch_artifacts.items():
109+
patch_hash = self._apply_patch(checkout_path, patch_name, patch_url)
110+
patchset_hash.update(patch_hash.encode("utf-8"))
111+
112+
patchset_hash_digest = patchset_hash.hexdigest()
113+
self.log.debug(f"Patchset hash: {patchset_hash_digest}")
114+
return patchset_hash_digest
115+
116+
def _download_checkout_archive(self, download_path, tarball_url, retries=3):
117+
self.log.info(f"Downloading checkout tarball, url: {tarball_url}")
118+
tar_filename = os.path.basename(urlparse(tarball_url).path)
119+
kernelci.build.pull_tarball(
120+
kdir=download_path,
121+
url=tarball_url,
122+
dest_filename=tar_filename,
123+
retries=retries,
124+
delete=True
125+
)
126+
127+
def _update_node(
128+
self,
129+
patchset_node,
130+
checkout_node,
131+
tarball_url,
132+
patchset_hash
133+
):
134+
patchset_data = checkout_node.get("data", {}).copy()
135+
patchset_data["kernel_revision"]["patchset"] = patchset_hash
136+
137+
updated_node = patchset_node.copy()
138+
updated_node["artifacts"]["tarball"] = tarball_url
139+
updated_node["state"] = "available"
140+
updated_node["data"] = patchset_data
141+
updated_node["holdoff"] = str(
142+
datetime.utcnow() + timedelta(minutes=10)
143+
)
144+
145+
try:
146+
self._api.node.update(updated_node)
147+
except requests.exceptions.HTTPError as err:
148+
err_msg = json.loads(err.response.content).get("detail", [])
149+
self.log.error(err_msg)
150+
151+
def _setup(self, *args):
152+
return self._api_helper.subscribe_filters({
153+
"op": "created",
154+
"name": "patchset",
155+
"state": "running",
156+
})
157+
158+
def _has_allowed_domain(self, url):
159+
domain = urlparse(url).hostname
160+
if domain not in self._service_config.allowed_domains:
161+
raise RuntimeError(
162+
"Forbidden mbox domain %s, allowed domains: %s",
163+
domain,
164+
self._service_config.allowed_domains,
165+
)
166+
167+
def _get_patch_artifacts(self, patchset_node):
168+
node_artifacts = patchset_node.get("artifacts")
169+
if not node_artifacts:
170+
raise ValueError(
171+
"Patchset node %s has no artifacts",
172+
patchset_node["id"],
173+
)
174+
175+
for patch_mbox_url in node_artifacts.values():
176+
self._has_allowed_domain(patch_mbox_url)
177+
178+
return node_artifacts
179+
180+
def _gen_checkout_name(self, checkout_node):
181+
revision = checkout_node["data"]["kernel_revision"]
182+
return "-".join([
183+
"linux",
184+
revision["tree"],
185+
revision["branch"],
186+
revision["describe"],
187+
])
188+
189+
def _process_patchset(self, checkout_node, patchset_node):
190+
patch_artifacts = self._get_patch_artifacts(patchset_node)
191+
192+
# Tarball download implicitely removes destination dir
193+
# there's no need to cleanup this directory
194+
self._download_checkout_archive(
195+
download_path=self._service_config.kdir,
196+
tarball_url=checkout_node["artifacts"]["tarball"]
197+
)
198+
199+
checkout_name = self._gen_checkout_name(checkout_node)
200+
checkout_path = os.path.join(self._service_config.kdir, checkout_name)
201+
202+
patchset_hash = self._apply_patches(checkout_path, patch_artifacts)
203+
patchset_hash_short = patchset_hash[
204+
:self._service_config.patchset_short_hash_len
205+
]
206+
207+
tarball_path = self._make_tarball(
208+
target_dir=checkout_path,
209+
tarball_name=f"{checkout_name}-{patchset_hash_short}"
210+
)
211+
tarball_url = self._push_tarball(tarball_path)
212+
213+
self._update_node(
214+
patchset_node=patchset_node,
215+
checkout_node=checkout_node,
216+
tarball_url=tarball_url,
217+
patchset_hash=patchset_hash
218+
)
219+
220+
def _mark_failed(self, patchset_node):
221+
node = patchset_node.copy()
222+
node.update({
223+
"state": "done",
224+
"result": "fail",
225+
})
226+
try:
227+
self._api.node.update(node)
228+
except requests.exceptions.HTTPError as err:
229+
err_msg = json.loads(err.response.content).get("detail", [])
230+
self.log.error(err_msg)
231+
232+
def _mark_failed_if_no_parent(self, patchset_node):
233+
if not patchset_node["parent"]:
234+
self.log.error(
235+
f"Patchset node {patchset_node['id']} as has no parent"
236+
"checkout node , marking node as failed",
237+
)
238+
self._mark_failed(patchset_node)
239+
return True
240+
241+
return False
242+
243+
def _mark_failed_if_parent_failed(self, patchset_node, checkout_node):
244+
if (
245+
checkout_node["state"] == "done" and
246+
checkout_node["result"] == "fail"
247+
):
248+
self.log.error(
249+
f"Parent checkout node {checkout_node['id']} failed, "
250+
f"marking patchset node {patchset_node['id']} as failed",
251+
)
252+
self._mark_failed(patchset_node)
253+
return True
254+
255+
return False
256+
257+
def _run(self, _sub_id):
258+
self.log.info("Listening for new trigger events")
259+
self.log.info("Press Ctrl-C to stop.")
260+
261+
while True:
262+
patchset_nodes = self._api.node.find({
263+
"name": "patchset",
264+
"state": "running",
265+
})
266+
267+
if patchset_nodes:
268+
self.log.debug(f"Found patchset nodes: {patchset_nodes}")
269+
270+
for patchset_node in patchset_nodes:
271+
if self._mark_failed_if_no_parent(patchset_node):
272+
continue
273+
274+
checkout_node = self._api.node.get(patchset_node["parent"])
275+
276+
if self._mark_failed_if_parent_failed(
277+
patchset_node,
278+
checkout_node
279+
):
280+
continue
281+
282+
if checkout_node["state"] == "running":
283+
self.log.info(
284+
f"Patchset node {patchset_node['id']} is waiting "
285+
f"for checkout node {checkout_node['id']} to complete",
286+
)
287+
continue
288+
289+
try:
290+
self.log.info(
291+
f"Processing patchset node: {patchset_node['id']}",
292+
)
293+
self._process_patchset(checkout_node, patchset_node)
294+
except Exception as e:
295+
self.log.error(
296+
f"Patchset node {patchset_node['id']} "
297+
f"processing failed: {e}",
298+
)
299+
self.log.traceback()
300+
self._mark_failed(patchset_node)
301+
302+
self.log.info(
303+
"Waiting %d seconds for a new nodes..." %
304+
self._service_config.polling_delay_secs,
305+
)
306+
time.sleep(self._service_config.polling_delay_secs)
307+
308+
309+
class cmd_run(Command):
310+
help = (
311+
"Wait for a checkout node to be available "
312+
"and push a source+patchset tarball"
313+
)
314+
args = [
315+
Args.kdir, Args.output, Args.api_config, Args.storage_config,
316+
]
317+
opt_args = [
318+
Args.verbose, Args.storage_cred,
319+
]
320+
321+
def __call__(self, configs, args):
322+
return Patchset(configs, args).run(args)
323+
324+
325+
if __name__ == "__main__":
326+
opts = parse_opts("patchset", globals())
327+
configs = kernelci.config.load("config")
328+
status = opts.command(configs, opts)
329+
sys.exit(0 if status is True else 1)

0 commit comments

Comments
 (0)