Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/msc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

logger = logging.getLogger('msc')

# See Annex E in EN 300 401
crc16_11021 = crcmod.mkCrcFun(0x11021, 0x0, False, 0xFFFF)
crcfun = crcmod.predefined.mkPredefinedCrcFun('x25')
def calculate_crc(data):
return crcfun(data)
Expand Down
89 changes: 61 additions & 28 deletions src/msc/datagroups/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from msc import bitarray_to_hex, int_to_bitarray, calculate_crc, InvalidCrcError, generate_transport_id
from msc import bitarray_to_hex, int_to_bitarray, crc16_11021, InvalidCrcError, generate_transport_id
from mot import DirectoryEncoder, SortedHeaderInformation
from bitarray import bitarray
import logging
Expand Down Expand Up @@ -341,22 +341,24 @@ def tobytes(self):
# datagroup header
bits += bitarray('0') # (0): ExtensionFlag - 0=no extension
bits += bitarray('1' if self.crc_enabled else '0') # (1): CrcFlag - true if there is a CRC at the end of the datagroup
bits += bitarray('1') # (2): SegmentFlag - 1=segment header included
bits += bitarray('1') # (3): UserAccessFlag - true
bits += bitarray('0' if self.segment_index is None else '1') # (2): SegmentFlag - 1=segment header included
bits += bitarray('0' if self.transport_id is None else '1') # (3): UserAccessFlag - true
bits += int_to_bitarray(self._type, 4) # (4-7): DataGroupType
bits += int_to_bitarray(self.continuity % 16, 4) # (8-11): ContinuityIndex
bits += int_to_bitarray(self.repetition, 4) # (12-15): RepetitionIndex - remaining = 0 (only this once)

# session header
# segment field
bits += bitarray('1' if self.last else '0') # (16): Last - true if the last segment
bits += int_to_bitarray(self.segment_index, 15) # (17-32): SegmentNumber
if self.segment_index is not None:
bits += bitarray('1' if self.last else '0') # (16): Last - true if the last segment
bits += int_to_bitarray(self.segment_index, 15) # (17-32): SegmentNumber

# user access field
bits += bitarray('000') # (33-35): RFA
bits += bitarray('1') # (36): TransportId - true to include Transport ID
bits += int_to_bitarray(2, 4) # (37-40): LengthIndicator - length of transport Id and End user address fields (will be 2 bytes as only transport ID defined)
bits += int_to_bitarray(self._transport_id, 16) # (41-56) transport ID
if self.transport_id is not None:
bits += bitarray('000') # (33-35): RFA
bits += bitarray('1') # (36): TransportId - true to include Transport ID
bits += int_to_bitarray(2, 4) # (37-40): LengthIndicator - length of transport Id and End user address fields (will be 2 bytes as only transport ID defined)
bits += int_to_bitarray(self._transport_id, 16) # (41-56) transport ID

# data field
tmp = bitarray()
Expand All @@ -365,41 +367,72 @@ def tobytes(self):

# CRC
crc = 0;
if self.crc_enabled: crc = calculate_crc(bits.tobytes())
if self.crc_enabled: crc = crc16_11021(bits.tobytes())
bits += int_to_bitarray(crc, 16)

return bits.tobytes()

@staticmethod
def frombits(bits, i=0, check_crc=True):
"""Parse a datagroup from a bitarray, with an optional offset"""


# use only the slice indicated by the offset
bits = bits[i:]
# check we have enough header first
if (bits.length() - i) < ((9 + 2) * 8): raise IncompleteDatagroupError

ext_flag = bits[0]
crc_flag = bits[1]
seg_flag = bits[2]
uaf_flag = bits[3]
header_size = 16
if ext_flag:
header_size += 16
if seg_flag:
header_size += 16
if uaf_flag:
tid_present = bits[header_size+3]
uaf_sz = int(bits[header_size+4:header_size+8].to01(), 2)
header_size += 8+8*uaf_sz
else:
tid_present = False

min_size = header_size
if crc_flag:
min_size += 16
if bits.length() < min_size:
raise IncompleteDatagroupError

# datagroup header
type = int(bits[4:8].to01(), 2)
continuity = int(bits[8:12].to01(), 2)
repetition = int(bits[12:16].to01(), 2)

# session header
# segment field
last = bits[16]
segment_index = int(bits[17:32].to01(), 2)

if seg_flag:
last = bits[16]
segment_index = int(bits[17:32].to01(), 2)
else:
last = False
segment_index = None

# user access field
transport_id = int(bits[40:56].to01(), 2)

# data segment header
size = int(bits[59:72].to01(), 2) # get size to check we have a complete datagroup
if bits.length() < 72 + size * 8 + 16: raise IncompleteDatagroupError
data = bits[72 : 72 + (size*8)]
if check_crc:
crc = int(bits[72 + data.length() : 72 + data.length() + 16].to01(), 2)
calculated = calculate_crc(bits[:72+data.length()].tobytes())
if crc != calculated: raise InvalidCrcError(crc, bits[:72+data.length() + 16].tobytes())
if tid_present:
transport_id = int(bits[40:56].to01(), 2)
else:
transport_id = None

# extract data and compute CRC
if crc_flag:
hdr_plus_data = bits[:-16]
crc = int(bits[bits.length()-16:].to01(), 2)
if check_crc and crc != crc16_11021(hdr_plus_data.tobytes()):
raise InvalidCrcError(crc, crc_slice.tobytes())
else:
hdr_plus_data = bits

datagroup = Datagroup(transport_id, type, data.tobytes(), segment_index, continuity, True, repetition, last)
datagroup = Datagroup(transport_id, type,
hdr_plus_data[header_size:].tobytes(), segment_index, continuity,
True, repetition, last)
logger.debug('parsed datagroup: %s', datagroup)

return datagroup
Expand All @@ -410,7 +443,7 @@ def __str__(self):
elif self._type == 6: type_description = 'MOT Directory (uncompressed)'
elif self._type == 7: type_description = 'MOT Directory (compressed)'
else: type_description = 'unknown'
return '[segment=%d bytes], type=%d [%s], transportid=%d, segmentindex=%d, continuity=%d, last=%s' % (len(self._data), self._type, type_description, self._transport_id, self.segment_index, self.continuity, self.last)
return '[segment=%d bytes], type=%d [%s], transportid=%s, segmentindex=%s, continuity=%d, last=%s' % (len(self._data), self._type, type_description, self._transport_id, self.segment_index, self.continuity, self.last)

def __repr__(self):
return '<DataGroup: %s>' % str(self)
Expand Down