11% Regression tests for obd_scan
2- ~ python3_only
2+ ~ vcan_socket needs_root linux
33
44# More information at http://www.secdev.org/projects/UTscapy/
55
6+ + Configuration
7+ ~ conf
68
7- ############
8- ############
9- + Load general modules
10- = Load contribution layer
9+ = Imports
10+ load_layer("can")
11+ import six, subprocess
12+ from subprocess import call
13+
14+
15+ = Definition of constants, utility functions and mock classes
16+ iface0 = "vcan0"
17+ iface1 = "vcan1"
18+
19+ # function to exit when the can-isotp kernel module is not available
20+ ISOTP_KERNEL_MODULE_AVAILABLE = False
21+ def exit_if_no_isotp_module():
22+ if not ISOTP_KERNEL_MODULE_AVAILABLE:
23+ err = "TEST SKIPPED: can-isotp not available"
24+ subprocess.call("printf \"%s\r\n\" > /dev/stderr" % err, shell=True)
25+ warning("Can't test ISOTP native socket because kernel module is not loaded")
26+ exit(0)
27+
28+
29+ = Initialize a virtual CAN interface
30+ if 0 != call("cansend %s 000#" % iface0, shell=True):
31+ # vcan0 is not enabled
32+ if 0 != call("sudo modprobe vcan", shell=True):
33+ raise Exception("modprobe vcan failed")
34+ if 0 != call("sudo ip link add name %s type vcan" % iface0, shell=True):
35+ print("add %s failed: Maybe it was already up?" % iface0)
36+ if 0 != call("sudo ip link set dev %s up" % iface0, shell=True):
37+ raise Exception("could not bring up %s" % iface0)
38+
39+ if 0 != call("cansend %s 000#" % iface0, shell=True):
40+ raise Exception("cansend doesn't work")
41+
42+ if 0 != call("cansend %s 000#" % iface1, shell=True):
43+ # vcan1 is not enabled
44+ if 0 != call("sudo modprobe vcan", shell=True):
45+ raise Exception("modprobe vcan failed")
46+ if 0 != call("sudo ip link add name %s type vcan" % iface1, shell=True):
47+ print("add %s failed: Maybe it was already up?" % iface1)
48+ if 0 != call("sudo ip link set dev %s up" % iface1, shell=True):
49+ raise Exception("could not bring up %s" % iface1)
50+
51+ if 0 != call("cansend %s 000#" % iface1, shell=True):
52+ raise Exception("cansend doesn't work")
53+
54+ print("CAN should work now")
1155
12- load_contrib('automotive.obd.obd')
1356
14- + Configuration of CAN virtual sockets
15- ~ conf command linux
57+ if six.PY3:
58+ from scapy.contrib.cansocket_native import *
59+ else:
60+ from scapy.contrib.cansocket_python_can import *
1661
17- = Load module
1862
19- conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': True}
20- load_contrib('isotp')
63+ if "python_can" in CANSocket.__module__:
64+ import can as python_can
65+ new_can_socket = lambda iface: CANSocket(iface=python_can.interface.Bus(bustype='socketcan', channel=iface, bitrate=250000))
66+ new_can_socket0 = lambda: CANSocket(iface=python_can.interface.Bus(bustype='socketcan', channel=iface0, bitrate=250000), timeout=0.01)
67+ new_can_socket1 = lambda: CANSocket(iface=python_can.interface.Bus(bustype='socketcan', channel=iface1, bitrate=250000), timeout=0.01)
68+ can_socket_string = "-i socketcan -c %s -b 250000" % iface0
69+ else:
70+ new_can_socket = lambda iface: CANSocket(iface)
71+ new_can_socket0 = lambda: CANSocket(iface0)
72+ new_can_socket1 = lambda: CANSocket(iface1)
73+ can_socket_string = "-c %s" % iface0
2174
22- = Load os
75+ # utility function for draining a can interface, asserting that no packets are there
76+ def drain_bus(iface=iface0, assert_empty=True):
77+ s = new_can_socket(iface)
78+ pkts = s.sniff(timeout=0.1)
79+ if assert_empty:
80+ assert len(pkts) == 0
81+ s.close()
2382
24- import os
83+ print("CAN sockets should work now")
2584
26- = Setup vcan0
27- ~ needs_root
2885
29- bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"
30- 0 == os.system(bashCommand)
86+ = Verify that a CAN socket can be created and closed
87+ s = new_can_socket(iface0)
88+ s.close()
89+
90+
91+ = Check if can-isotp and can-utils are installed on this system
92+ p = subprocess.Popen('lsmod | grep "^can_isotp"', stdout=subprocess.PIPE, shell=True)
93+ if p.wait() == 0:
94+ if b"can_isotp" in p.stdout.read():
95+ p = subprocess.Popen("isotpsend -s1 -d0 %s" % iface0, stdin=subprocess.PIPE, shell=True)
96+ p.stdin.write(b"01")
97+ p.stdin.close()
98+ r = p.wait()
99+ if r == 0:
100+ ISOTP_KERNEL_MODULE_AVAILABLE = True
101+
102+
103+ + Syntax check
104+
105+ = Import isotp
106+ conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': False}
107+ load_contrib("isotp")
108+
109+
110+ ############
111+ ############
112+ + Load general modules
113+ = Load contribution layer
114+
115+ load_contrib('automotive.obd.obd')
31116
32117+ Load OBD_scan
33118= imports
@@ -61,10 +146,6 @@ s3_queue.put(s3)
61146+ Simulate scanner
62147~ linux needs_root
63148
64- = Create socket
65-
66- socket = ISOTPSocket("vcan0", 0x7e0, 0x7e8, basecls=OBD, padding=True)
67-
68149= Create responder
69150# Ensures the responder is running before sending the first request
70151ready = Event()
@@ -82,34 +163,32 @@ class MockResponder(Thread):
82163 resp = s3_queue.get()
83164 sock.send(resp)
84165 def run(self):
85- with ISOTPSocket("vcan0" , 0x7e8, 0x7e0, basecls=OBD, padding=True) as sock:
166+ with ISOTPSocket(new_can_socket(iface0) , 0x7e8, 0x7e0, basecls=OBD, padding=True) as sock:
86167 ready.set()
87168 while not self._stopped.is_set() or not (s1_queue.empty() and s3_queue.empty()):
88- sniff(opened_socket=sock,
89- timeout=0.2,
90- store=False,
91- prn=lambda p: self.process_request(p, sock))
169+ sniff(opened_socket=sock, timeout=0.2, store=False, prn=lambda p: self.process_request(p, sock))
92170 def stop(self):
93171 self._stopped.set()
94172
95173responder = MockResponder()
96174responder.start()
97175
98176= Get ids
99- all_ids_set = set(range(1, 256))
100- supported_ids = _supported_id_numbers(socket, 0.1, OBD_S01, 'pid', False)
101- unsupported_ids = all_ids_set - supported_ids
177+ with ISOTPSocket(new_can_socket(iface0), 0x7e0, 0x7e8, basecls=OBD, padding=True) as socket:
178+ all_ids_set = set(range(1, 256))
179+ supported_ids = _supported_id_numbers(socket, 0.1, OBD_S01, 'pid', False)
180+ unsupported_ids = all_ids_set - supported_ids
102181
103182= Run scanner
104183# timeout to avoid a deadlock if the test which sets this event fails
105- ready.wait(timeout=5)
106- data = obd_scan(socket, 0.1, True, True, False)
107- dtc = data[0]
108- supported = data[1]
109- unsupported = data[2]
184+ with ISOTPSocket(new_can_socket(iface0), 0x7e0, 0x7e8, basecls=OBD, padding=True) as socket:
185+ ready.wait(timeout=5)
186+ data = obd_scan(socket, 0.1, True, True, False)
187+ dtc = data[0]
188+ supported = data[1]
189+ unsupported = data[2]
110190
111191= Cleanup
112- socket.close()
113192responder.stop()
114193if 0 != call("sudo ip link delete vcan0", shell=True):
115194 raise Exception("vcan0 could not be deleted")
0 commit comments