10
10
# scapy.contrib.status = loads
11
11
12
12
import time
13
+
14
+ from scapy .compat import Optional , cast , Callable
15
+
13
16
from scapy .contrib .automotive .gm .gmlan import GMLAN , GMLAN_SA , GMLAN_RD , \
14
17
GMLAN_TD , GMLAN_PM , GMLAN_RMBA
15
18
from scapy .config import conf
19
+ from scapy .packet import Packet
16
20
from scapy .contrib .isotp import ISOTPSocket
17
21
from scapy .error import warning , log_loading
18
22
from scapy .utils import PeriodicSenderThread
19
23
20
-
21
24
__all__ = ["GMLAN_TesterPresentSender" , "GMLAN_InitDiagnostics" ,
22
25
"GMLAN_GetSecurityAccess" , "GMLAN_RequestDownload" ,
23
26
"GMLAN_TransferData" , "GMLAN_TransferPayload" ,
24
27
"GMLAN_ReadMemoryByAddress" , "GMLAN_BroadcastSocket" ]
25
28
29
+
26
30
log_loading .info ("\" conf.contribs['GMLAN']"
27
31
"['treat-response-pending-as-answer']\" set to True). This "
28
32
"is required by the GMLAN-Utils module to operate "
33
37
conf .contribs ['GMLAN' ] = {'treat-response-pending-as-answer' : False }
34
38
35
39
36
- class GMLAN_TesterPresentSender (PeriodicSenderThread ):
37
- def __init__ (self , sock , pkt = GMLAN (service = "TesterPresent" ), interval = 2 ):
38
- """ Thread to send TesterPresent messages packets periodically
39
-
40
- Args:
41
- sock: socket where packet is sent periodically
42
- pkt: packet to send
43
- interval: interval between two packets
44
- """
45
- PeriodicSenderThread .__init__ (self , sock , pkt , interval )
46
-
47
-
40
+ # Helper function
48
41
def _check_response (resp , verbose ):
42
+ # type: (Packet, Optional[bool]) -> bool
49
43
if resp is None :
50
44
if verbose :
51
45
print ("Timeout." )
52
46
return False
53
47
if verbose :
54
48
resp .show ()
55
- return resp .sprintf ( "%GMLAN. service%" ) != " NegativeResponse"
49
+ return resp .service != 0x7f # NegativeResponse
56
50
57
51
58
- def _send_and_check_response (sock , req , timeout , verbose ):
59
- if verbose :
60
- print ("Sending %s" % repr (req ))
61
- resp = sock .sr1 (req , timeout = timeout , verbose = 0 )
62
- return _check_response (resp , verbose )
52
+ class GMLAN_TesterPresentSender (PeriodicSenderThread ):
63
53
54
+ def __init__ (self , sock , pkt = GMLAN (service = "TesterPresent" ), interval = 2 ):
55
+ # type: (ISOTPSocket, Packet, int) -> None
56
+ """ Thread to send GMLAN TesterPresent packets periodically
64
57
65
- def GMLAN_InitDiagnostics (sock , broadcastsocket = None , timeout = None ,
66
- verbose = None , retry = 0 ):
67
- """Send messages to put an ECU into an diagnostic/programming state.
68
-
69
- Args:
70
- sock: socket to send the message on.
71
- broadcast: socket for broadcasting. If provided some message will be
72
- sent as broadcast. Recommended when used on a network with
73
- several ECUs.
74
- timeout: timeout for sending, receiving or sniffing packages.
75
- verbose: set verbosity level
76
- retry: number of retries in case of failure.
77
-
78
- Returns true on success.
58
+ :param sock: socket where packet is sent periodically
59
+ :param pkt: packet to send
60
+ :param interval: interval between two packets
61
+ """
62
+ PeriodicSenderThread .__init__ (self , sock , pkt , interval )
63
+
64
+ def run (self ):
65
+ # type: () -> None
66
+ while not self ._stopped .is_set ():
67
+ self ._socket .sr1 (self ._pkt , verbose = False , timeout = 0.1 )
68
+ time .sleep (self ._interval )
69
+
70
+
71
+ def GMLAN_InitDiagnostics (sock , broadcast_socket = None , timeout = None , verbose = None , retry = 0 ): # noqa: E501
72
+ # type: (ISOTPSocket, Optional[ISOTPSocket], Optional[int], Optional[bool], int) -> bool # noqa: E501
73
+ """ Send messages to put an ECU into diagnostic/programming state.
74
+
75
+ :param sock: socket for communication.
76
+ :param broadcast_socket: socket for broadcasting. If provided some message
77
+ will be sent as broadcast. Recommended when used
78
+ on a network with several ECUs.
79
+ :param timeout: timeout for sending, receiving or sniffing packages.
80
+ :param verbose: set verbosity level
81
+ :param retry: number of retries in case of failure.
82
+ :return: True on success else False
79
83
"""
84
+ # Helper function
85
+ def _send_and_check_response (sock , req , timeout , verbose ):
86
+ # type: (ISOTPSocket, Packet, Optional[int], Optional[bool]) -> bool
87
+ if verbose :
88
+ print ("Sending %s" % repr (req ))
89
+ resp = sock .sr1 (req , timeout = timeout , verbose = False )
90
+ return _check_response (resp , verbose )
91
+
80
92
if verbose is None :
81
- verbose = conf .verb
93
+ verbose = conf .verb > 0
82
94
retry = abs (retry )
83
95
84
96
while retry >= 0 :
85
97
retry -= 1
86
98
87
99
# DisableNormalCommunication
88
100
p = GMLAN (service = "DisableNormalCommunication" )
89
- if broadcastsocket is None :
101
+ if broadcast_socket is None :
90
102
if not _send_and_check_response (sock , p , timeout , verbose ):
91
103
continue
92
104
else :
93
105
if verbose :
94
106
print ("Sending %s as broadcast" % repr (p ))
95
- broadcastsocket .send (p )
107
+ broadcast_socket .send (p )
96
108
time .sleep (0.05 )
97
109
98
110
# ReportProgrammedState
@@ -116,24 +128,25 @@ def GMLAN_InitDiagnostics(sock, broadcastsocket=None, timeout=None,
116
128
return False
117
129
118
130
119
- def GMLAN_GetSecurityAccess (sock , keyFunction , level = 1 , timeout = None ,
120
- verbose = None , retry = 0 ):
121
- """Authenticate on ECU. Implements Seey-Key procedure.
122
-
123
- Args:
124
- sock: socket to send the message on.
125
- keyFunction: function implementing the key algorithm.
126
- level: level of access
127
- timeout: timeout for sending, receiving or sniffing packages.
128
- verbose: set verbosity level
129
- retry: number of retries in case of failure.
131
+ def GMLAN_GetSecurityAccess (sock , key_function , level = 1 , timeout = None , verbose = None , retry = 0 ): # noqa: E501
132
+ # type: (ISOTPSocket, Callable[[int], int], int, Optional[int], Optional[bool], int) -> bool # noqa: E501
133
+ """ Authenticate on ECU. Implements Seey-Key procedure.
130
134
131
- Returns true on success.
135
+ :param sock: socket to send the message on.
136
+ :param key_function: function implementing the key algorithm.
137
+ :param level: level of access
138
+ :param timeout: timeout for sending, receiving or sniffing packages.
139
+ :param verbose: set verbosity level
140
+ :param retry: number of retries in case of failure.
141
+ :return: True on success.
132
142
"""
133
143
if verbose is None :
134
- verbose = conf .verb
144
+ verbose = conf .verb > 0
135
145
retry = abs (retry )
136
146
147
+ if key_function is None :
148
+ return False
149
+
137
150
if level % 2 == 0 :
138
151
warning ("Parameter Error: Level must be an odd number." )
139
152
return False
@@ -157,7 +170,7 @@ def GMLAN_GetSecurityAccess(sock, keyFunction, level=1, timeout=None,
157
170
return True
158
171
159
172
keypkt = GMLAN () / GMLAN_SA (subfunction = level + 1 ,
160
- securityKey = keyFunction (seed ))
173
+ securityKey = key_function (seed ))
161
174
if verbose :
162
175
print ("Responding with key.." )
163
176
resp = sock .sr1 (keypkt , timeout = timeout , verbose = 0 )
@@ -182,21 +195,20 @@ def GMLAN_GetSecurityAccess(sock, keyFunction, level=1, timeout=None,
182
195
183
196
184
197
def GMLAN_RequestDownload (sock , length , timeout = None , verbose = None , retry = 0 ):
185
- """Send RequestDownload message.
198
+ # type: (ISOTPSocket, int, Optional[int], Optional[bool], int) -> bool
199
+ """ Send RequestDownload message.
186
200
187
- Usually used before calling TransferData.
201
+ Usually used before calling TransferData.
188
202
189
- Args:
190
- sock: socket to send the message on.
191
- length: value for the message's parameter 'unCompressedMemorySize'.
192
- timeout: timeout for sending, receiving or sniffing packages.
193
- verbose: set verbosity level.
194
- retry: number of retries in case of failure.
195
-
196
- Returns true on success.
203
+ :param sock: socket to send the message on.
204
+ :param length: value for the message's parameter 'unCompressedMemorySize'.
205
+ :param timeout: timeout for sending, receiving or sniffing packages.
206
+ :param verbose: set verbosity level.
207
+ :param retry: number of retries in case of failure.
208
+ :return: True on success
197
209
"""
198
210
if verbose is None :
199
- verbose = conf .verb
211
+ verbose = conf .verb > 0
200
212
retry = abs (retry )
201
213
202
214
while retry >= 0 :
@@ -211,26 +223,25 @@ def GMLAN_RequestDownload(sock, length, timeout=None, verbose=None, retry=0):
211
223
return False
212
224
213
225
214
- def GMLAN_TransferData (sock , addr , payload , maxmsglen = None , timeout = None ,
215
- verbose = None , retry = 0 ):
216
- """Send TransferData message.
226
+ def GMLAN_TransferData (sock , addr , payload , maxmsglen = None , timeout = None , verbose = None , retry = 0 ): # noqa: E501
227
+ # type: (ISOTPSocket, int, bytes, Optional[int], Optional[int], Optional[bool], int) -> bool # noqa: E501
228
+ """ Send TransferData message.
217
229
218
230
Usually used after calling RequestDownload.
219
231
220
- Args:
221
- sock: socket to send the message on.
222
- addr: destination memory address on the ECU.
223
- payload: data to be sent.
224
- maxmsglen: maximum length of a single iso-tp message. (default:
225
- maximum length)
226
- timeout: timeout for sending, receiving or sniffing packages.
227
- verbose: set verbosity level.
228
- retry: number of retries in case of failure.
229
-
230
- Returns true on success.
232
+ :param sock: socket to send the message on.
233
+ :param addr: destination memory address on the ECU.
234
+ :param payload: data to be sent.
235
+ :param maxmsglen: maximum length of a single iso-tp message.
236
+ default: maximum length
237
+ :param timeout: timeout for sending, receiving or sniffing packages.
238
+ :param verbose: set verbosity level.
239
+ :param retry: number of retries in case of failure.
240
+ :return: True on success.
231
241
"""
232
242
if verbose is None :
233
- verbose = conf .verb
243
+ verbose = conf .verb > 0
244
+
234
245
retry = abs (retry )
235
246
startretry = retry
236
247
@@ -244,6 +255,8 @@ def GMLAN_TransferData(sock, addr, payload, maxmsglen=None, timeout=None,
244
255
if maxmsglen is None or maxmsglen <= 0 or maxmsglen > (4093 - scheme ):
245
256
maxmsglen = (4093 - scheme )
246
257
258
+ maxmsglen = cast (int , maxmsglen )
259
+
247
260
for i in range (0 , len (payload ), maxmsglen ):
248
261
retry = startretry
249
262
while True :
@@ -268,19 +281,18 @@ def GMLAN_TransferData(sock, addr, payload, maxmsglen=None, timeout=None,
268
281
269
282
def GMLAN_TransferPayload (sock , addr , payload , maxmsglen = None , timeout = None ,
270
283
verbose = None , retry = 0 ):
271
- """Send data by using GMLAN services.
272
-
273
- Args:
274
- sock: socket to send the data on.
275
- addr: destination memory address on the ECU.
276
- payload: data to be sent.
277
- maxmsglen: maximum length of a single iso-tp message. (default:
278
- maximum length)
279
- timeout: timeout for sending, receiving or sniffing packages.
280
- verbose: set verbosity level.
281
- retry: number of retries in case of failure.
282
-
283
- Returns true on success.
284
+ # type: (ISOTPSocket, int, bytes, Optional[int], Optional[int], Optional[bool], int) -> bool # noqa: E501
285
+ """ Send data by using GMLAN services.
286
+
287
+ :param sock: socket to send the data on.
288
+ :param addr: destination memory address on the ECU.
289
+ :param payload: data to be sent.
290
+ :param maxmsglen: maximum length of a single iso-tp message.
291
+ default: maximum length
292
+ :param timeout: timeout for sending, receiving or sniffing packages.
293
+ :param verbose: set verbosity level.
294
+ :param retry: number of retries in case of failure.
295
+ :return: True on success.
284
296
"""
285
297
if not GMLAN_RequestDownload (sock , len (payload ), timeout = timeout ,
286
298
verbose = verbose , retry = retry ):
@@ -293,20 +305,19 @@ def GMLAN_TransferPayload(sock, addr, payload, maxmsglen=None, timeout=None,
293
305
294
306
def GMLAN_ReadMemoryByAddress (sock , addr , length , timeout = None ,
295
307
verbose = None , retry = 0 ):
296
- """Read data from ECU memory.
297
-
298
- Args:
299
- sock: socket to send the data on.
300
- addr: source memory address on the ECU.
301
- length: bytes to read
302
- timeout: timeout for sending, receiving or sniffing packages.
303
- verbose: set verbosity level.
304
- retry: number of retries in case of failure.
305
-
306
- Returns the bytes read.
308
+ # type: (ISOTPSocket, int, int, Optional[int], Optional[bool], int) -> Optional[bytes] # noqa: E501
309
+ """ Read data from ECU memory.
310
+
311
+ :param sock: socket to send the data on.
312
+ :param addr: source memory address on the ECU.
313
+ :param length: bytes to read.
314
+ :param timeout: timeout for sending, receiving or sniffing packages.
315
+ :param verbose: set verbosity level.
316
+ :param retry: number of retries in case of failure.
317
+ :return: bytes red or None
307
318
"""
308
319
if verbose is None :
309
- verbose = conf .verb
320
+ verbose = conf .verb > 0
310
321
retry = abs (retry )
311
322
312
323
scheme = conf .contribs ['GMLAN' ]['GMLAN_ECU_AddressingScheme' ]
@@ -335,6 +346,11 @@ def GMLAN_ReadMemoryByAddress(sock, addr, length, timeout=None,
335
346
336
347
337
348
def GMLAN_BroadcastSocket (interface ):
338
- """Returns a GMLAN broadcast socket using interface."""
349
+ # type: (str) -> ISOTPSocket
350
+ """ Returns a GMLAN broadcast socket using interface.
351
+
352
+ :param interface: interface name
353
+ :return: ISOTPSocket configured as GMLAN Broadcast Socket
354
+ """
339
355
return ISOTPSocket (interface , sid = 0x101 , did = 0x0 , basecls = GMLAN ,
340
- extended_addr = 0xfe )
356
+ extended_addr = 0xfe , padding = True )
0 commit comments