Skip to content

Adding option to encode and send binary messages in message translator tool #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

Written by: Andreas Schneider
7th May 2021
20th August 2022

License: MIT

This code translates binary SBD messages by the Artemis Global Tracker.
Messages can be read from local files or from email attachments on an IMAP server.
Binary messages can be read from local files or from email attachments on an IMAP server.
Optionally, the coordinates of all messages can be written into a GPX file.
Alternatively, binary messages can be encoded and written to file or sent to a device.
"""

import numpy as np
Expand All @@ -22,6 +24,7 @@
import configparser
import imaplib
import email
import requests
import os.path
import argparse

Expand Down Expand Up @@ -232,7 +235,7 @@ def checksum(data):
np.seterr(**old_settings)
return cs_a, cs_b

def translate_sbd(message):
def decode_message(message):
"""
Parse binary SBD message from Sparkfun Artemis Global Tracker.

Expand All @@ -259,6 +262,8 @@ def translate_sbd(message):
data[field.name] = datetime.datetime(*values)
elif field_len > 0:
data[field.name] = message[ind:ind+field_len]
else:
data[field.name] = None
elif isinstance(FIELD_TYPE[field], np.dtype): # dtype of scalar
field_len = FIELD_TYPE[field].itemsize
data[field.name] = np.frombuffer(message[ind:ind+field_len], dtype=FIELD_TYPE[field])[0]
Expand All @@ -276,6 +281,66 @@ def translate_sbd(message):
assert (message[ind+1] == cs_b), 'Checksum mismatch.'
return data

def encode_message(data):
"""
Create a binary SBD message in Sparkfun Artemis Global Tracker format.

Args:
data, dictionary with data to send, with keys named according to TrackerMessageFields names

Returns:
msg, encoded binary SBD message
"""
msg = b''
msg += np.uint8(TrackerMessageFields.STX.value)
for field in TrackerMessageFields:
if field.name in data:
msg += np.uint8(field.value)
if field == TrackerMessageFields.DATETIME:
msg += struct.pack(
'HBBBBB',
data[field.name].year, data[field.name].month,
data[field.name].day, data[field.name].hour,
data[field.name].minute, data[field.name].second)
elif isinstance(FIELD_TYPE[field], np.dtype): # dtype of scalar
rawvalue = np.array([ data[field.name] ])
if field in CONVERSION_FACTOR:
rawvalue /= CONVERSION_FACTOR[field]
msg += rawvalue.astype(FIELD_TYPE[field]).tobytes()
del rawvalue
elif isinstance(FIELD_TYPE[field], tuple): # dtype and length of array
assert(len(data[field.name]) == FIELD_TYPE[field][1])
rawdata = np.array(data[field.name])
if field in CONVERSION_FACTOR:
rawdata /= CONVERSION_FACTOR[field]
msg += rawdata.astype(FIELD_TYPE[field]).tobytes()
del rawdata
elif isinstance(FIELD_TYPE[field], int): # number of bytes
msg += np.zeros(FIELD_TYPE[field], dtype=np.uint8).tobytes()
msg += np.uint8(TrackerMessageFields.ETX.value)
cs_a, cs_b = checksum(msg)
msg += cs_a
msg += cs_b
return msg

def asc2bin(msg_asc):
"""
Convert ASCII representation of binary message to real binary message.
"""
msg = b''
for ind in np.arange(0,len(msg_asc),2):
msg += np.uint8(int(msg_asc[ind:ind+2], 16))
return msg

def bin2asc(msg_bin):
"""
Encode binary message to ASCII representation.
"""
msg_asc = ''
for ind in range(len(msg_bin)):
msg_asc += '{:02x}'.format(msg_bin[ind])
return msg_asc

def message2trackpoint(msg):
"""
Creates a GPX trackpoint from a translated IRIDIUM message.
Expand Down Expand Up @@ -343,12 +408,52 @@ def get_messages(imap, from_address='@rockblock.rock7.com', all_messges=False):
sbd_list = query_mail(imap, from_address=from_address, unseen_only=not all_messges)
for sbd in sbd_list:
try:
messages.append(translate_sbd(sbd))
messages.append(decode_message(sbd))
except (ValueError, AssertionError) as err:
print('Error translating message: ',err)
pass
return messages

def send_message(imei, data, user, password):
"""
Send a mobile terminated (MT) message to a RockBLOCK device.

Args:
imei, the destination IMEI number
data, the data to send
user, the RockBLOCK username
password, the RockBLOCK password

Returns:
success, True on success, else False
message, an error or success message
"""
resp = requests.post(
'https://core.rock7.com/rockblock/MT',
data={'imei': imei, 'data': bin2asc(data),
'username': user, 'password': password})
if not resp.ok:
print('Error sending message: POST command failed: {}'.format(resp.text))
parts = resp.text.split(',')
if parts[0] == 'OK':
try:
print('Message {} sent.'.format(parts[1]))
except IndexError:
print('Unexpected server response format.')
return True, 'OK'
elif parts[0] == 'FAILED':
error_message = ''
try:
print('Sending message failed with error code {}: {}'.format(parts[1], parts[2]))
error_message = parts[2]
except IndexError:
print('Unexpected server response format.')
return False, error_message
else:
error_message = 'Unexpected server response: {}'.format(resp.text)
print(error_message)
return False, error_message

def write_gpx(gpx_track, output_file):
"""
Write a track to a GPX file.
Expand All @@ -365,9 +470,9 @@ def write_gpx(gpx_track, output_file):
fd.write(gpx.to_xml())
return

def main(filelist, use_imap=False, all_messages=False, output_file=None):
def main_decode(filelist, use_imap=False, all_messages=False, output_file=None):
"""
Main function.
Decode binary SBD messages and write out data.
"""
if output_file:
gpx_segment = gpxpy.gpx.GPXTrackSegment()
Expand All @@ -385,31 +490,101 @@ def main(filelist, use_imap=False, all_messages=False, output_file=None):
if output_file:
gpx_segment.points.append(message2trackpoint(msg))
else:
for filename in filelist:
with open(filename,'rb') as fd:
msg_bin = fd.read()
try:
msg_trans = translate_sbd(msg_bin)
except (ValueError, AssertionError, IndexError) as err:
print('Error translating message {}: {}'.format(filename, err))
continue
print(filename, msg_trans)
if output_file:
gpx_segment.points.append(message2trackpoint(msg_trans))
if len(filelist) == 1 and not os.path.isfile(filelist[0]):
# Argument is supposed to be an ASCII representation of a binary message.
msg_bin = asc2bin(filelist[0])
msg_trans = decode_message(msg_bin)
print(msg_trans)
else:
for filename in filelist:
with open(filename,'rb') as fd:
msg_bin = fd.read()
try:
msg_trans = decode_message(msg_bin)
except (ValueError, AssertionError, IndexError) as err:
print('Error translating message {}: {}'.format(filename, err))
continue
print(filename, msg_trans)
if output_file:
gpx_segment.points.append(message2trackpoint(msg_trans))
if output_file:
gpx_track = gpxpy.gpx.GPXTrack()
gpx_track.segments.append(gpx_segment)
print('Writing {}'.format(output_file))
write_gpx(gpx_track, output_file)
return

def main_encode(position=None, time=None, userfunc=None, output_file=None, send=None):
"""
Encode binary SBD message corresponding to given data
and write it to file or send it to mobile IRIDIUM device.
"""
data = {}
if position is not None:
data.update({
'LON': position[0],
'LAT': position[1],
'ALT': position[2]})
if time:
data.update({'DATETIME': time})
if userfunc:
data.update(userfunc)
message = encode_message(data)
if output_file:
with open(output_file, 'wb') as fd:
fd.write(message)
else:
print(bin2asc(message))
if send:
config = configparser.ConfigParser()
config.read(send)
status, error_message = send_message(
config['device']['imei'], message,
config['rockblock']['user'], config['rockblock']['password'])
if status:
print('Message sent.')
else:
print('Error sending message: {}'.format(error_message))

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('filenames', nargs='+', help='Files to translate')
parser.add_argument('-d', '--decode', nargs='+', help='Decode messages from files or argument')
parser.add_argument('-i', '--imap', required=False, action='store_true', default=False, help='Query imap server instead of reading local files. filenames argument will be interpreted as ini file.')
parser.add_argument('-a', '--all', required=False, action='store_true', default=False, help='Retrieve all messages, not only unread ones. Only relevant in combination with -i.')
parser.add_argument('-o', '--output', required=False, default=None, help='Optional output GPX file')
parser.add_argument('-e', '--encode', required=False, action='store_true', default=False, help='Encode binary message')
parser.add_argument('-p', '--position', required=False, default=None, help='Position lon,lat,alt')
parser.add_argument('-t', '--time', required=False, nargs='?', default=None, const=datetime.datetime.utcnow(), help='UTC time in ISO format YYYY-mm-dd HH:MM:SS, or now if no argument given')
parser.add_argument('-u', '--userfunc', required=False, default=None, help='User function (comma-separated list of functions to trigger)')
parser.add_argument('-s', '--send', required=False, default=None, help='Send message to device as specified in configuration file')
args = parser.parse_args()
if args.imap:
assert (len(args.filenames) == 1), 'In combination with the -i option, exactly one file name must be given, namely the ini file.'
main(args.filenames, use_imap=args.imap, all_messages=args.all, output_file=args.output)
if args.encode:
if args.position is not None:
position = np.array(args.position.split(',')).astype(float)
else:
position = None
if args.time is not None:
if isinstance(args.time, datetime.datetime):
time = args.time
else:
time = datetime.datetime.fromisoformat(args.time)
else:
time = None
if args.userfunc is not None:
userfunc = {}
funcs = args.userfunc.split(',')
for func in funcs:
if ':' in func:
vals = func.split(':')
userfunc.update({'USERFUNC'+vals[0]: vals[1]})
else:
userfunc.update({'USERFUNC'+func: True})
else:
userfunc = None
main_encode(position=position, time=time, userfunc=userfunc, output_file=args.output, send=args.send)
elif args.decode:
if args.imap:
assert (len(args.decode) == 1), 'In combination with the -i option, exactly one file name must be given, namely the ini file.'
main_decode(args.decode, use_imap=args.imap, all_messages=args.all, output_file=args.output)
else:
print('Decode or encode needs to be specified.')
18 changes: 17 additions & 1 deletion Tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ attachments again.

### Artemis_Global_Tracker_Message_Translator.py:

A command-line script to translate binary SBD messages. Give the files to translate as argument, e.g. `*.bin`. To write coordinates into a GPX track, use the `-o` option combined with an output filename.
A command-line script to translate binary SBD messages. To decode messages, use the `-d` option and give the files to translate as argument, e.g. `*.bin`. Alternatively, you can give an ASCII representation of a binary message as argument, which is then translated. To write coordinates into a GPX track, use the `-o` option combined with an output filename.
Example:
```
python3 Artemis_Global_Tracker_Message_Translator.py *.bin -o track.gpx
Expand All @@ -104,6 +104,22 @@ to process all messages from RockBLOCK. By default, the script only retrieves ne
python3 Artemis_Global_Tracker_Message_Translator.py -i imap_settings.ini -a -o track.gpx
```

The tool can also encode binary messages and optionally send them to a device. This is done with the `-e` option in combination with one or more of the following options:

* `-p lon,lat,alt` position with longitude, latitude and altitude
* `-t YYYY-mm-ddTHH:MM:SS` datetime as specified
* `-u n[:val][,m[:val]]` USERFUNC n [and m] with optinal values: provide a comma-separated list of numbers of user functions you want to have, with optional values separated by colon, e.g. `-u 1,5:123,8:1000` to invoke user function 1, user function 5 with value 123 and user function 8 with value 1000

To send a message to a device, use the `-s` option with an ini file specifying the target device and the RockBLOCK credentials as follows:
```
[device]
imei = 300434012345678

[rockblock]
user = username@rockblock
password = mypassword@rockblock
```

### Artemis_Global_Tracker_Mapper.py:

![Mapper](../img/Mapper.JPG)
Expand Down