Skip to content

Commit 3aaa49f

Browse files
committed
ENH: Add writer for Siemens CSA header
Allows us to take a parsed CSA header and convert it back into a string. Useful for things like DICOM anonymization, or perhaps round tripping DICOM -> Nifti -> DICOM.
1 parent 5a42cb3 commit 3aaa49f

File tree

2 files changed

+122
-0
lines changed

2 files changed

+122
-0
lines changed

nibabel/nicom/csareader.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
33
'''
44
import numpy as np
5+
import struct
56

67
from .structreader import Unpacker
78
from .utils import find_private_section
9+
from ..externals.six import string_types, u
810

911
# DICOM VR code to Python type
1012
_CONVERTERS = {
@@ -29,6 +31,10 @@ class CSAReadError(CSAError):
2931
pass
3032

3133

34+
class CSAWriteError(CSAError):
35+
pass
36+
37+
3238
def get_csa_header(dcm_data, csa_type='image'):
3339
''' Get CSA header information from DICOM header
3440
@@ -158,6 +164,96 @@ def read(csa_str):
158164
return csa_dict
159165

160166

167+
def write(csa_header):
168+
''' Write string from CSA header `csa_header`
169+
170+
Parameters
171+
----------
172+
csa_header : dict
173+
header information as dict, where `header` has fields (at least)
174+
``type, n_tags, tags``. ``header['tags']`` is also a dictionary
175+
with one key, value pair for each tag in the header.
176+
177+
Returns
178+
-------
179+
csa_str : str
180+
byte string containing CSA header information
181+
'''
182+
result = []
183+
if csa_header['type'] == 2:
184+
result.append(b'SV10')
185+
result.append(csa_header['unused0'])
186+
if not 0 < csa_header['n_tags'] <= 128:
187+
raise CSAWriteError('Number of tags `t` should be '
188+
'0 < t <= 128')
189+
result.append(struct.pack('2I',
190+
csa_header['n_tags'],
191+
csa_header['check'])
192+
)
193+
194+
# Build list of tags in correct order
195+
tags = list(csa_header['tags'].items())
196+
tags.sort(key=lambda x: x[1]['tag_no'])
197+
tag0_n_items = tags[0][1]['n_items']
198+
199+
# Add the information for each tag
200+
for tag_name, tag_dict in tags:
201+
vm = tag_dict['vm']
202+
vr = tag_dict['vr']
203+
n_items = tag_dict['n_items']
204+
assert n_items < 100
205+
result.append(struct.pack('64si4s3i',
206+
make_nt_str(tag_name),
207+
vm,
208+
make_nt_str(vr),
209+
tag_dict['syngodt'],
210+
n_items,
211+
tag_dict['last3'])
212+
)
213+
214+
# Figure out the number of values for this tag
215+
if vm == 0:
216+
n_values = n_items
217+
else:
218+
n_values = vm
219+
220+
# Add each item for this tag
221+
for item_no in range(n_items):
222+
# Figure out the item length
223+
if item_no >= n_values or tag_dict['items'][item_no] == '':
224+
item_len = 0
225+
else:
226+
item = tag_dict['items'][item_no]
227+
if not isinstance(item, string_types):
228+
item = u(str(item))
229+
item_nt_str = make_nt_str(item)
230+
item_len = len(item_nt_str)
231+
232+
# These values aren't actually preserved in the dict
233+
# representation of the header. Best we can do is set the ones
234+
# that determine the item length appropriately.
235+
x0, x1, x2, x3 = 0, 0, 0, 0
236+
if csa_header['type'] == 1: # CSA1 - odd length calculation
237+
x0 = tag0_n_items + item_len
238+
if item_len < 0 or (ptr + item_len) > csa_len:
239+
if item_no < vm:
240+
items.append('')
241+
break
242+
else: # CSA2
243+
x1 = item_len
244+
result.append(struct.pack('4i', x0, x1, x2, x3))
245+
246+
if item_len == 0:
247+
continue
248+
249+
result.append(item_nt_str)
250+
# go to 4 byte boundary
251+
plus4 = item_len % 4
252+
if plus4 != 0:
253+
result.append(b'\x00' * (4 - plus4))
254+
return b''.join(result)
255+
256+
161257
def get_scalar(csa_dict, tag_name):
162258
try:
163259
items = csa_dict['tags'][tag_name]['items']
@@ -255,3 +351,18 @@ def nt_str(s):
255351
if zero_pos == -1:
256352
return s
257353
return s[:zero_pos].decode('latin-1')
354+
355+
356+
def make_nt_str(s):
357+
''' Create a null terminated byte string from a unicode object.
358+
359+
Parameters
360+
----------
361+
s : unicode
362+
363+
Returns
364+
-------
365+
result : bytes
366+
s encoded as latin-1 with a null char appended
367+
'''
368+
return s.encode('latin-1') + b'\x00'

nibabel/nicom/tests/test_csareader.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,14 @@ def test_ice_dims():
128128
assert_equal(csa.get_ice_dims(csa_info),
129129
ex_dims)
130130
assert_equal(csa.get_ice_dims({}), None)
131+
132+
133+
def test_read_write_rt():
134+
# Try doing a read-write-read round trip and make sure the dictionary
135+
# representation of the header is the same. We can't exactly reproduce the
136+
# original string representation currently.
137+
for csa_str in (CSA2_B0, CSA2_B1000):
138+
csa_info = csa.read(csa_str)
139+
new_csa_str = csa.write(csa_info)
140+
new_csa_info = csa.read(new_csa_str)
141+
assert_equal(csa_info, new_csa_info)

0 commit comments

Comments
 (0)