Skip to content

TYP: io.sas #40524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 23, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 46 additions & 31 deletions pandas/io/sas/sas7bdat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
Reference for binary data compression:
http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/CUJ/1992/9210/ross/ross.htm
"""
from __future__ import annotations

from collections import abc
from datetime import (
datetime,
Expand All @@ -34,7 +36,10 @@
)

import pandas as pd
from pandas import isna
from pandas import (
DataFrame,
isna,
)

from pandas.io.common import get_handle
from pandas.io.sas._sas import Parser
Expand Down Expand Up @@ -150,6 +155,9 @@ class SAS7BDATReader(ReaderBase, abc.Iterator):
bytes.
"""

_int_length: int
_cached_page: bytes | None

def __init__(
self,
path_or_buf,
Expand Down Expand Up @@ -198,29 +206,29 @@ def __init__(
self.close()
raise

def column_data_lengths(self):
def column_data_lengths(self) -> np.ndarray:
"""Return a numpy int64 array of the column data lengths"""
return np.asarray(self._column_data_lengths, dtype=np.int64)

def column_data_offsets(self):
def column_data_offsets(self) -> np.ndarray:
"""Return a numpy int64 array of the column offsets"""
return np.asarray(self._column_data_offsets, dtype=np.int64)

def column_types(self):
def column_types(self) -> np.ndarray:
"""
Returns a numpy character array of the column types:
s (string) or d (double)
"""
return np.asarray(self._column_types, dtype=np.dtype("S1"))

def close(self):
def close(self) -> None:
self.handles.close()

def _get_properties(self):
def _get_properties(self) -> None:

# Check magic number
self._path_or_buf.seek(0)
self._cached_page = self._path_or_buf.read(288)
self._cached_page = cast(bytes, self._path_or_buf.read(288))
if self._cached_page[0 : len(const.magic)] != const.magic:
raise ValueError("magic number mismatch (not a SAS file?)")

Expand Down Expand Up @@ -294,9 +302,11 @@ def _get_properties(self):
)

# Read the rest of the header into cached_page.
buf = self._path_or_buf.read(self.header_length - 288)
buf = cast(bytes, self._path_or_buf.read(self.header_length - 288))
self._cached_page += buf
if len(self._cached_page) != self.header_length:
# error: Argument 1 to "len" has incompatible type "Optional[bytes]";
# expected "Sized"
if len(self._cached_page) != self.header_length: # type: ignore[arg-type]
raise ValueError("The SAS7BDAT file appears to be truncated.")

self._page_length = self._read_int(
Expand Down Expand Up @@ -355,7 +365,7 @@ def __next__(self):
return da

# Read a single float of the given width (4 or 8).
def _read_float(self, offset, width):
def _read_float(self, offset: int, width: int):
if width not in (4, 8):
self.close()
raise ValueError("invalid float width")
Expand Down Expand Up @@ -388,24 +398,24 @@ def _read_bytes(self, offset: int, length: int):
raise ValueError("The cached page is too small.")
return self._cached_page[offset : offset + length]

def _parse_metadata(self):
def _parse_metadata(self) -> None:
done = False
while not done:
self._cached_page = self._path_or_buf.read(self._page_length)
self._cached_page = cast(bytes, self._path_or_buf.read(self._page_length))
if len(self._cached_page) <= 0:
break
if len(self._cached_page) != self._page_length:
raise ValueError("Failed to read a meta data page from the SAS file.")
done = self._process_page_meta()

def _process_page_meta(self):
def _process_page_meta(self) -> bool:
self._read_page_header()
pt = [const.page_meta_type, const.page_amd_type] + const.page_mix_types
if self._current_page_type in pt:
self._process_page_metadata()
is_data_page = self._current_page_type & const.page_data_type
is_mix_page = self._current_page_type in const.page_mix_types
return (
return bool(
is_data_page
or is_mix_page
or self._current_page_data_subheader_pointers != []
Expand All @@ -422,7 +432,7 @@ def _read_page_header(self):
tx, const.subheader_count_length
)

def _process_page_metadata(self):
def _process_page_metadata(self) -> None:
bit_offset = self._page_bit_offset

for i in range(self._current_page_subheaders_count):
Expand All @@ -439,7 +449,8 @@ def _process_page_metadata(self):
)
self._process_subheader(subheader_index, pointer)

def _get_subheader_index(self, signature, compression, ptype):
def _get_subheader_index(self, signature: bytes, compression, ptype) -> int:
# TODO: return here could be made an enum
index = const.subheader_signature_to_index.get(signature)
if index is None:
f1 = (compression == const.compressed_subheader_id) or (compression == 0)
Expand All @@ -451,7 +462,9 @@ def _get_subheader_index(self, signature, compression, ptype):
raise ValueError("Unknown subheader signature")
return index

def _process_subheader_pointers(self, offset: int, subheader_pointer_index: int):
def _process_subheader_pointers(
self, offset: int, subheader_pointer_index: int
) -> _SubheaderPointer:

subheader_pointer_length = self._subheader_pointer_length
total_offset = offset + subheader_pointer_length * subheader_pointer_index
Expand All @@ -473,11 +486,13 @@ def _process_subheader_pointers(self, offset: int, subheader_pointer_index: int)

return x

def _read_subheader_signature(self, offset):
def _read_subheader_signature(self, offset: int) -> bytes:
subheader_signature = self._read_bytes(offset, self._int_length)
return subheader_signature

def _process_subheader(self, subheader_index, pointer):
def _process_subheader(
self, subheader_index: int, pointer: _SubheaderPointer
) -> None:
offset = pointer.offset
length = pointer.length

Expand Down Expand Up @@ -505,7 +520,7 @@ def _process_subheader(self, subheader_index, pointer):

processor(offset, length)

def _process_rowsize_subheader(self, offset, length):
def _process_rowsize_subheader(self, offset: int, length: int) -> None:

int_len = self._int_length
lcs_offset = offset
Expand Down Expand Up @@ -534,7 +549,7 @@ def _process_rowsize_subheader(self, offset, length):
self._lcs = self._read_int(lcs_offset, 2)
self._lcp = self._read_int(lcp_offset, 2)

def _process_columnsize_subheader(self, offset, length):
def _process_columnsize_subheader(self, offset: int, length: int) -> None:
int_len = self._int_length
offset += int_len
self.column_count = self._read_int(offset, int_len)
Expand All @@ -545,10 +560,10 @@ def _process_columnsize_subheader(self, offset, length):
)

# Unknown purpose
def _process_subheader_counts(self, offset, length):
def _process_subheader_counts(self, offset: int, length: int) -> None:
pass

def _process_columntext_subheader(self, offset, length):
def _process_columntext_subheader(self, offset: int, length: int) -> None:

offset += self._int_length
text_block_size = self._read_int(offset, const.text_block_size_length)
Expand Down Expand Up @@ -600,7 +615,7 @@ def _process_columntext_subheader(self, offset, length):
self.encoding or self.default_encoding
)

def _process_columnname_subheader(self, offset, length):
def _process_columnname_subheader(self, offset: int, length: int) -> None:
int_len = self._int_length
offset += int_len
column_name_pointers_count = (length - 2 * int_len - 12) // 8
Expand Down Expand Up @@ -632,7 +647,7 @@ def _process_columnname_subheader(self, offset, length):
name_str = self.column_names_strings[idx]
self.column_names.append(name_str[col_offset : col_offset + col_len])

def _process_columnattributes_subheader(self, offset, length):
def _process_columnattributes_subheader(self, offset: int, length: int) -> None:
int_len = self._int_length
column_attributes_vectors_count = (length - 2 * int_len - 12) // (int_len + 8)
for i in range(column_attributes_vectors_count):
Expand All @@ -658,11 +673,11 @@ def _process_columnattributes_subheader(self, offset, length):
x = self._read_int(col_types, const.column_type_length)
self._column_types.append(b"d" if x == 1 else b"s")

def _process_columnlist_subheader(self, offset, length):
def _process_columnlist_subheader(self, offset: int, length: int) -> None:
# unknown purpose
pass

def _process_format_subheader(self, offset, length):
def _process_format_subheader(self, offset: int, length: int) -> None:
int_len = self._int_length
text_subheader_format = (
offset + const.column_format_text_subheader_index_offset + 3 * int_len
Expand Down Expand Up @@ -711,7 +726,7 @@ def _process_format_subheader(self, offset, length):
self.column_formats.append(column_format)
self.columns.append(col)

def read(self, nrows=None):
def read(self, nrows: int | None = None) -> DataFrame | None:

if (nrows is None) and (self.chunksize is not None):
nrows = self.chunksize
Expand Down Expand Up @@ -747,7 +762,7 @@ def read(self, nrows=None):

def _read_next_page(self):
self._current_page_data_subheader_pointers = []
self._cached_page = self._path_or_buf.read(self._page_length)
self._cached_page = cast(bytes, self._path_or_buf.read(self._page_length))
if len(self._cached_page) <= 0:
return True
elif len(self._cached_page) != self._page_length:
Expand All @@ -770,12 +785,12 @@ def _read_next_page(self):

return False

def _chunk_to_dataframe(self):
def _chunk_to_dataframe(self) -> DataFrame:

n = self._current_row_in_chunk_index
m = self._current_row_in_file_index
ix = range(m - n, m)
rslt = pd.DataFrame(index=ix)
rslt = DataFrame(index=ix)

js, jb = 0, 0
for j in range(self.column_count):
Expand Down