diff --git a/pandas/io/sas/sas7bdat.py b/pandas/io/sas/sas7bdat.py index 392dfa22ee67b..05cc742b45d83 100644 --- a/pandas/io/sas/sas7bdat.py +++ b/pandas/io/sas/sas7bdat.py @@ -13,6 +13,8 @@ Reference for binary data compression: http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/CUJ/1992/9210/ross/ross.htm """ +from __future__ import annotations + from collections import abc from datetime import ( datetime, @@ -34,7 +36,10 @@ ) import pandas as pd -from pandas import isna +from pandas import ( + DataFrame, + isna, +) from pandas.io.common import get_handle from pandas.io.sas._sas import Parser @@ -150,6 +155,9 @@ class SAS7BDATReader(ReaderBase, abc.Iterator): bytes. """ + _int_length: int + _cached_page: bytes | None + def __init__( self, path_or_buf, @@ -198,29 +206,29 @@ def __init__( self.close() raise - def column_data_lengths(self): + def column_data_lengths(self) -> np.ndarray: """Return a numpy int64 array of the column data lengths""" return np.asarray(self._column_data_lengths, dtype=np.int64) - def column_data_offsets(self): + def column_data_offsets(self) -> np.ndarray: """Return a numpy int64 array of the column offsets""" return np.asarray(self._column_data_offsets, dtype=np.int64) - def column_types(self): + def column_types(self) -> np.ndarray: """ Returns a numpy character array of the column types: s (string) or d (double) """ return np.asarray(self._column_types, dtype=np.dtype("S1")) - def close(self): + def close(self) -> None: self.handles.close() - def _get_properties(self): + def _get_properties(self) -> None: # Check magic number self._path_or_buf.seek(0) - self._cached_page = self._path_or_buf.read(288) + self._cached_page = cast(bytes, self._path_or_buf.read(288)) if self._cached_page[0 : len(const.magic)] != const.magic: raise ValueError("magic number mismatch (not a SAS file?)") @@ -294,9 +302,11 @@ def _get_properties(self): ) # Read the rest of the header into cached_page. - buf = self._path_or_buf.read(self.header_length - 288) + buf = cast(bytes, self._path_or_buf.read(self.header_length - 288)) self._cached_page += buf - if len(self._cached_page) != self.header_length: + # error: Argument 1 to "len" has incompatible type "Optional[bytes]"; + # expected "Sized" + if len(self._cached_page) != self.header_length: # type: ignore[arg-type] raise ValueError("The SAS7BDAT file appears to be truncated.") self._page_length = self._read_int( @@ -355,7 +365,7 @@ def __next__(self): return da # Read a single float of the given width (4 or 8). - def _read_float(self, offset, width): + def _read_float(self, offset: int, width: int): if width not in (4, 8): self.close() raise ValueError("invalid float width") @@ -388,24 +398,24 @@ def _read_bytes(self, offset: int, length: int): raise ValueError("The cached page is too small.") return self._cached_page[offset : offset + length] - def _parse_metadata(self): + def _parse_metadata(self) -> None: done = False while not done: - self._cached_page = self._path_or_buf.read(self._page_length) + self._cached_page = cast(bytes, self._path_or_buf.read(self._page_length)) if len(self._cached_page) <= 0: break if len(self._cached_page) != self._page_length: raise ValueError("Failed to read a meta data page from the SAS file.") done = self._process_page_meta() - def _process_page_meta(self): + def _process_page_meta(self) -> bool: self._read_page_header() pt = [const.page_meta_type, const.page_amd_type] + const.page_mix_types if self._current_page_type in pt: self._process_page_metadata() is_data_page = self._current_page_type & const.page_data_type is_mix_page = self._current_page_type in const.page_mix_types - return ( + return bool( is_data_page or is_mix_page or self._current_page_data_subheader_pointers != [] @@ -422,7 +432,7 @@ def _read_page_header(self): tx, const.subheader_count_length ) - def _process_page_metadata(self): + def _process_page_metadata(self) -> None: bit_offset = self._page_bit_offset for i in range(self._current_page_subheaders_count): @@ -439,7 +449,8 @@ def _process_page_metadata(self): ) self._process_subheader(subheader_index, pointer) - def _get_subheader_index(self, signature, compression, ptype): + def _get_subheader_index(self, signature: bytes, compression, ptype) -> int: + # TODO: return here could be made an enum index = const.subheader_signature_to_index.get(signature) if index is None: f1 = (compression == const.compressed_subheader_id) or (compression == 0) @@ -451,7 +462,9 @@ def _get_subheader_index(self, signature, compression, ptype): raise ValueError("Unknown subheader signature") return index - def _process_subheader_pointers(self, offset: int, subheader_pointer_index: int): + def _process_subheader_pointers( + self, offset: int, subheader_pointer_index: int + ) -> _SubheaderPointer: subheader_pointer_length = self._subheader_pointer_length total_offset = offset + subheader_pointer_length * subheader_pointer_index @@ -473,11 +486,13 @@ def _process_subheader_pointers(self, offset: int, subheader_pointer_index: int) return x - def _read_subheader_signature(self, offset): + def _read_subheader_signature(self, offset: int) -> bytes: subheader_signature = self._read_bytes(offset, self._int_length) return subheader_signature - def _process_subheader(self, subheader_index, pointer): + def _process_subheader( + self, subheader_index: int, pointer: _SubheaderPointer + ) -> None: offset = pointer.offset length = pointer.length @@ -505,7 +520,7 @@ def _process_subheader(self, subheader_index, pointer): processor(offset, length) - def _process_rowsize_subheader(self, offset, length): + def _process_rowsize_subheader(self, offset: int, length: int) -> None: int_len = self._int_length lcs_offset = offset @@ -534,7 +549,7 @@ def _process_rowsize_subheader(self, offset, length): self._lcs = self._read_int(lcs_offset, 2) self._lcp = self._read_int(lcp_offset, 2) - def _process_columnsize_subheader(self, offset, length): + def _process_columnsize_subheader(self, offset: int, length: int) -> None: int_len = self._int_length offset += int_len self.column_count = self._read_int(offset, int_len) @@ -545,10 +560,10 @@ def _process_columnsize_subheader(self, offset, length): ) # Unknown purpose - def _process_subheader_counts(self, offset, length): + def _process_subheader_counts(self, offset: int, length: int) -> None: pass - def _process_columntext_subheader(self, offset, length): + def _process_columntext_subheader(self, offset: int, length: int) -> None: offset += self._int_length text_block_size = self._read_int(offset, const.text_block_size_length) @@ -600,7 +615,7 @@ def _process_columntext_subheader(self, offset, length): self.encoding or self.default_encoding ) - def _process_columnname_subheader(self, offset, length): + def _process_columnname_subheader(self, offset: int, length: int) -> None: int_len = self._int_length offset += int_len column_name_pointers_count = (length - 2 * int_len - 12) // 8 @@ -632,7 +647,7 @@ def _process_columnname_subheader(self, offset, length): name_str = self.column_names_strings[idx] self.column_names.append(name_str[col_offset : col_offset + col_len]) - def _process_columnattributes_subheader(self, offset, length): + def _process_columnattributes_subheader(self, offset: int, length: int) -> None: int_len = self._int_length column_attributes_vectors_count = (length - 2 * int_len - 12) // (int_len + 8) for i in range(column_attributes_vectors_count): @@ -658,11 +673,11 @@ def _process_columnattributes_subheader(self, offset, length): x = self._read_int(col_types, const.column_type_length) self._column_types.append(b"d" if x == 1 else b"s") - def _process_columnlist_subheader(self, offset, length): + def _process_columnlist_subheader(self, offset: int, length: int) -> None: # unknown purpose pass - def _process_format_subheader(self, offset, length): + def _process_format_subheader(self, offset: int, length: int) -> None: int_len = self._int_length text_subheader_format = ( offset + const.column_format_text_subheader_index_offset + 3 * int_len @@ -711,7 +726,7 @@ def _process_format_subheader(self, offset, length): self.column_formats.append(column_format) self.columns.append(col) - def read(self, nrows=None): + def read(self, nrows: int | None = None) -> DataFrame | None: if (nrows is None) and (self.chunksize is not None): nrows = self.chunksize @@ -747,7 +762,7 @@ def read(self, nrows=None): def _read_next_page(self): self._current_page_data_subheader_pointers = [] - self._cached_page = self._path_or_buf.read(self._page_length) + self._cached_page = cast(bytes, self._path_or_buf.read(self._page_length)) if len(self._cached_page) <= 0: return True elif len(self._cached_page) != self._page_length: @@ -770,12 +785,12 @@ def _read_next_page(self): return False - def _chunk_to_dataframe(self): + def _chunk_to_dataframe(self) -> DataFrame: n = self._current_row_in_chunk_index m = self._current_row_in_file_index ix = range(m - n, m) - rslt = pd.DataFrame(index=ix) + rslt = DataFrame(index=ix) js, jb = 0, 0 for j in range(self.column_count):