Skip to content

Commit 18704d4

Browse files
committed
Create a VLenUTF8Codec
1 parent c7ad572 commit 18704d4

File tree

3 files changed

+87
-8
lines changed

3 files changed

+87
-8
lines changed

bio2zarr/vcf2zarr/vcz.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from zarr.codecs import BloscCodec, BytesCodec
1616

1717
from .. import constants, core, provenance
18+
from ..zarr_v3_utils import VLenUTF8Codec
1819
from . import icf
1920

2021
logger = logging.getLogger(__name__)
@@ -34,6 +35,8 @@ def inspect(path):
3435

3536
DEFAULT_ZARR_COMPRESSOR = numcodecs.Blosc(cname="zstd", clevel=7)
3637
DEFAULT_ZARR_CODECS = [BytesCodec(), BloscCodec(cname="lz4", clevel=7)]
38+
STRING_ZARR_CODECS = [VLenUTF8Codec(), BloscCodec(cname="lz4", clevel=7)]
39+
3740

3841
_fixed_field_descriptions = {
3942
"variant_contig": "An identifier from the reference genome or an angle-bracketed ID"
@@ -574,27 +577,28 @@ def init(
574577
def encode_samples(self, root):
575578
if self.schema.samples != self.icf.metadata.samples:
576579
raise ValueError("Subsetting or reordering samples not supported currently")
577-
data = np.array([sample.id for sample in self.schema.samples], dtype=str)
580+
data = np.array([sample.id for sample in self.schema.samples], dtype=object)
578581
array = root.create_array(
579582
"sample_id",
580583
shape=data.shape,
581584
dtype=data.dtype,
582-
codecs=DEFAULT_ZARR_CODECS,
585+
codecs=STRING_ZARR_CODECS,
583586
chunks=(self.schema.samples_chunk_size,),
584587
)
585588
array[...] = data
586589
array.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
587590
logger.debug("Samples done")
588591

589592
def encode_contig_id(self, root):
590-
data = np.array([contig.id for contig in self.schema.contigs], dtype=str)
593+
data = np.array([contig.id for contig in self.schema.contigs], dtype=object)
591594
array = root.create_array(
592595
"contig_id",
593596
shape=data.shape,
594597
dtype=data.dtype,
595-
codecs=DEFAULT_ZARR_CODECS,
598+
codecs=STRING_ZARR_CODECS,
596599
chunks=data.shape, # no chunking
597600
)
601+
array[...] = data
598602
array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
599603
if all(contig.length is not None for contig in self.schema.contigs):
600604
data = np.array(
@@ -604,9 +608,10 @@ def encode_contig_id(self, root):
604608
"contig_length",
605609
shape=data.shape,
606610
dtype=data.dtype,
607-
compressor=DEFAULT_ZARR_CODECS,
611+
codecs=DEFAULT_ZARR_CODECS,
608612
chunks=data.shape, # no chunking
609613
)
614+
array[...] = data
610615
array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
611616

612617
def encode_filter_id(self, root):
@@ -617,23 +622,29 @@ def encode_filter_id(self, root):
617622
"filter_id",
618623
shape=data.shape,
619624
dtype=data.dtype,
620-
codecs=DEFAULT_ZARR_CODECS,
625+
codecs=STRING_ZARR_CODECS,
621626
chunks=data.shape, # no chunking
622627
)
628+
array[...] = data
623629
array.attrs["_ARRAY_DIMENSIONS"] = ["filters"]
624630

625631
def init_array(self, root, array_spec, variants_dim_size):
626632
object_codec = None
627633
if array_spec.dtype == "O":
628634
object_codec = numcodecs.VLenUTF8()
635+
codecs = STRING_ZARR_CODECS
636+
else:
637+
codecs = DEFAULT_ZARR_CODECS
629638
shape = list(array_spec.shape)
630639
# Truncate the variants dimension is max_variant_chunks was specified
631640
shape[0] = variants_dim_size
641+
compressor = numcodecs.get_codec(array_spec.compressor)
632642
a = root.create_array( # empty raises NotImplemented
633643
array_spec.name,
634644
shape=shape,
635645
chunks=array_spec.chunks,
636646
dtype=array_spec.dtype,
647+
codecs=codecs,
637648
# TODO
638649
# compressor=numcodecs.get_codec(array_spec.compressor),
639650
# filters=[numcodecs.get_codec(filt) for filt in array_spec.filters],
@@ -915,7 +926,7 @@ def finalise(self, show_progress=False):
915926
logger.debug(f"Removing {self.wip_path}")
916927
shutil.rmtree(self.wip_path)
917928
logger.info("Consolidating Zarr metadata")
918-
zarr.consolidate_metadata(self.path)
929+
# zarr.consolidate_metadata(self.path)
919930

920931
######################
921932
# encode_all_partitions

bio2zarr/vcf2zarr/verification.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ def verify(vcf_path, zarr_path, show_progress=False):
167167

168168
format_fields = {}
169169
info_fields = {}
170-
for colname in root.keys():
170+
for colname in root.group_keys():
171171
if colname.startswith("call") and not colname.startswith("call_genotype"):
172172
vcf_name = colname.split("_", 1)[1]
173173
vcf_type = format_headers[vcf_name]["Type"]

bio2zarr/zarr_v3_utils.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from dataclasses import dataclass
2+
3+
import numcodecs
4+
from numcodecs.compat import ensure_bytes, ensure_ndarray
5+
from zarr.abc.codec import ArrayBytesCodec
6+
from zarr.array_spec import ArraySpec
7+
from zarr.buffer import Buffer, NDBuffer
8+
from zarr.codecs.registry import register_codec
9+
from zarr.common import JSON, to_thread
10+
11+
12+
@dataclass(frozen=True)
13+
class VLenUTF8Codec(ArrayBytesCodec):
14+
is_fixed_size = False
15+
16+
def __init__(self, *args, **kwargs) -> None:
17+
pass
18+
19+
def to_dict(self) -> dict[str, JSON]:
20+
return {"name": "vlen-utf8", "compressor": {"id": "vlen-utf8"}}
21+
22+
async def _decode_single(
23+
self,
24+
chunk_bytes: Buffer,
25+
chunk_spec: ArraySpec,
26+
) -> NDBuffer:
27+
compressor = numcodecs.get_codec(dict(id="vlen-utf8"))
28+
chunk_numpy_array = ensure_ndarray(
29+
await to_thread(compressor.decode, chunk_bytes.as_array_like())
30+
)
31+
32+
# ensure correct dtype
33+
if str(chunk_numpy_array.dtype) != chunk_spec.dtype:
34+
chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype)
35+
36+
# ensure correct chunk shape
37+
if chunk_numpy_array.shape != chunk_spec.shape:
38+
chunk_numpy_array = chunk_numpy_array.reshape(
39+
chunk_spec.shape,
40+
)
41+
42+
return NDBuffer.from_numpy_array(chunk_numpy_array)
43+
44+
async def _encode_single(
45+
self,
46+
chunk_array: NDBuffer,
47+
_chunk_spec: ArraySpec,
48+
) -> Buffer | None:
49+
chunk_numpy_array = chunk_array.as_numpy_array()
50+
compressor = numcodecs.get_codec(dict(id="vlen-utf8"))
51+
if (
52+
not chunk_numpy_array.flags.c_contiguous
53+
and not chunk_numpy_array.flags.f_contiguous
54+
):
55+
chunk_numpy_array = chunk_numpy_array.copy(order="A")
56+
encoded_chunk_bytes = ensure_bytes(
57+
await to_thread(compressor.encode, chunk_numpy_array)
58+
)
59+
60+
return Buffer.from_bytes(encoded_chunk_bytes)
61+
62+
def compute_encoded_size(
63+
self, _input_byte_length: int, _chunk_spec: ArraySpec
64+
) -> int:
65+
raise NotImplementedError
66+
67+
68+
register_codec("vlen-utf8", VLenUTF8Codec)

0 commit comments

Comments
 (0)