Source code for pandas_ts.ts_ext_array

# typing.Self and "|" union syntax don't exist in Python 3.9
from __future__ import annotations

from collections.abc import Collection, Iterable, Iterator, Sequence
from typing import Any, cast

import numpy as np
import pandas as pd
import pyarrow as pa
from numpy.typing import ArrayLike

# Needed by ArrowExtensionArray.to_numpy(na_value=no_default)
from pandas._libs.lib import no_default

# It is considered to be an experimental, so we need to be careful with it.
from pandas.core.arrays import ArrowExtensionArray

from pandas_ts.ts_dtype import TsDtype
from pandas_ts.utils import enumerate_chunks, is_pa_type_a_list

__all__ = ["TsExtensionArray"]


[docs] class TsExtensionArray(ArrowExtensionArray): """Pandas extension array for TS structure array Parameters ---------- values : pyarrow.Array or pyarrow.ChunkedArray The array to be wrapped, must be a struct array with all fields being list arrays of the same lengths. validate : bool, default True Whether to validate the input array. Raises ------ ValueError If the input array is not a struct array or if any of the fields is not a list array or if the list arrays have different lengths. """
[docs] _dtype: TsDtype
def __init__(self, values: pa.Array | pa.ChunkedArray, *, validate: bool = True) -> None: super().__init__(values=values) # Fix the dtype to be TsDtype self._dtype = TsDtype.from_pandas_arrow_dtype(self._dtype) if validate: self._validate(self._pa_array) @staticmethod
[docs] def _convert_df_to_pa_scalar(df: pd.DataFrame, *, type: pa.DataType | None) -> pa.Scalar: d = {column: series.values for column, series in df.to_dict("series").items()} return pa.scalar(d, type=type)
@staticmethod
[docs] def _convert_df_value_to_pa(value: object, *, type: pa.DataType | None) -> object: # Convert "scalar" pd.DataFrame to a dict if isinstance(value, pd.DataFrame): return TsExtensionArray._convert_df_to_pa_scalar(value, type=type) # Convert pd.DataFrame collection to a list of dicts if hasattr(value, "__getitem__") and isinstance(value, Iterable): if hasattr(value, "iloc"): first = value.iloc[0] else: try: first = value[0] # type: ignore[index] except IndexError: return value if isinstance(first, pd.DataFrame): return [TsExtensionArray._convert_df_to_pa_scalar(v, type=type) for v in value] return value
@classmethod
[docs] def _from_sequence(cls, scalars, *, dtype=None, copy: bool = False) -> Self: # type: ignore[name-defined] # noqa: F821 scalars = cls._convert_df_value_to_pa(scalars, type=None) # The previous line may return an iterator, but parent's _from_sequence needs Sequence if not isinstance(scalars, Sequence) and isinstance(scalars, Collection): scalars = list(scalars) return super()._from_sequence(scalars, dtype=dtype, copy=copy)
@staticmethod
[docs] def _validate(array: pa.ChunkedArray) -> None: for chunk in array.iterchunks(): if not pa.types.is_struct(chunk.type): raise ValueError(f"Expected a StructArray, got {chunk.type}") struct_array = cast(pa.StructArray, chunk) first_list_array: pa.ListArray | None = None for field in struct_array.type: inner_array = struct_array.field(field.name) if not is_pa_type_a_list(inner_array.type): raise ValueError(f"Expected a ListArray, got {inner_array.type}") list_array = cast(pa.ListArray, inner_array) if first_list_array is None: first_list_array = list_array continue # compare offsets from the first list array with the current one if not first_list_array.offsets.equals(list_array.offsets): raise ValueError("Offsets of all ListArrays must be the same")
[docs] def _replace_pa_array(self, pa_array: pa.ChunkedArray, *, validate: bool) -> None: if validate: self._validate(pa_array) self._pa_array = pa_array self._dtype = TsDtype(pa_array.chunk(0).type)
[docs] def __getitem__(self, item): value = super().__getitem__(item) # Convert "scalar" value to pd.DataFrame if not isinstance(value, dict): return value return pd.DataFrame(value, copy=True)
[docs] def __iter__(self) -> Iterator[Any]: for value in super().__iter__(): # Convert "scalar" value to pd.DataFrame if not isinstance(value, dict): yield value else: yield pd.DataFrame(value, copy=True)
[docs] def to_numpy(self, dtype: None = None, copy: bool = False, na_value: Any = no_default) -> np.ndarray: """Convert the extension array to a numpy array. Parameters ---------- dtype : None This parameter is left for compatibility with the base class method, but it is not used. dtype of the returned array is always object. copy : bool, default False Whether to copy the data. It is not garanteed that the data will not be copied if copy is False. na_value : Any, default no_default TODO: support NA values Returns ------- np.ndarray The numpy array of pd.DataFrame objects. Each element is a single time-series. """ array = super().to_numpy(dtype=dtype, copy=copy, na_value=na_value) # Hack with np.empty is the only way to force numpy to create 1-d array of objects result = np.empty(shape=array.shape, dtype=object) # We do copy=False here because user's 'copy' is already handled by ArrowExtensionArray.to_numpy result[:] = [pd.DataFrame(value, copy=False) for value in array] return result
[docs] def __setitem__(self, key, value) -> None: value = self._convert_df_value_to_pa(value, type=self._dtype.pyarrow_dtype) super().__setitem__(key, value)
@property
[docs] def list_offsets(self) -> pa.ChunkedArray: """The list offsets of the field arrays. It is a chunk array of list offsets of the first field array. (Since all fields are validated to have the same offsets.) Returns ------- pa.ChunkedArray The list offsets of the field arrays. """ return pa.chunked_array([chunk.field(0).offsets for chunk in self._pa_array.iterchunks()])
@property
[docs] def field_names(self) -> list[str]: """Names of the nested columns""" return [field.name for field in self._pa_array.chunk(0).type]
@property
[docs] def flat_length(self) -> int: """Length of the flat arrays""" return sum(chunk.field(0).value_lengths().sum().as_py() for chunk in self._pa_array.iterchunks())
[docs] def view_fields(self, fields: str | list[str]) -> Self: # type: ignore[name-defined] # noqa: F821 """Get a view of the series with only the specified fields Parameters ---------- fields : str or list of str The name of the field or a list of names of the fields to include. Returns ------- TsExtensionArray The view of the series with only the specified fields. """ if isinstance(fields, str): fields = [fields] if len(set(fields)) != len(fields): raise ValueError("Duplicate field names are not allowed") if not set(fields).issubset(self.field_names): raise ValueError(f"Some fields are not found, given: {fields}, available: {self.field_names}") chunks = [] for chunk in self._pa_array.iterchunks(): chunk = cast(pa.StructArray, chunk) struct_dict = {} for field in fields: struct_dict[field] = chunk.field(field) struct_array = pa.StructArray.from_arrays(struct_dict.values(), struct_dict.keys()) chunks.append(struct_array) pa_array = pa.chunked_array(chunks) return self.__class__(pa_array, validate=False)
[docs] def set_flat_field(self, field: str, value: ArrayLike) -> None: """Set the field from flat-array of values Parameters ---------- field : str The name of the field. value : ArrayLike The 'flat' array of values to be set. """ # TODO: optimize for the case when the input is a pa.ChunkedArray if np.ndim(value) == 0: value = np.repeat(value, self.flat_length) pa_array = pa.array(value) if len(pa_array) != self.flat_length: raise ValueError("The input must be a scalar or have the same length as the flat arrays") offsets = self.list_offsets.combine_chunks() list_array = pa.ListArray.from_arrays(values=pa_array, offsets=offsets) return self.set_list_field(field, list_array)
[docs] def set_list_field(self, field: str, value: ArrayLike) -> None: """Set the field from list-array Parameters ---------- field : str The name of the field. value : ArrayLike The list-array of values to be set. """ # TODO: optimize for the case when the input is a pa.ChunkedArray pa_array = pa.array(value) if not is_pa_type_a_list(pa_array.type): raise ValueError(f"Expected a list array, got {pa_array.type}") if len(pa_array) != len(self): raise ValueError("The length of the list-array must be equal to the length of the series") chunks = [] for sl, chunk in enumerate_chunks(self._pa_array): chunk = cast(pa.StructArray, chunk) # Build a new struct array. We collect all existing fields and add the new one. struct_dict = {} for pa_field in chunk.type: struct_dict[pa_field.name] = chunk.field(pa_field.name) struct_dict[field] = pa.array(pa_array[sl]) struct_array = pa.StructArray.from_arrays(struct_dict.values(), struct_dict.keys()) chunks.append(struct_array) pa_array = pa.chunked_array(chunks) self._replace_pa_array(pa_array, validate=True)
[docs] def pop_field(self, field: str): """Delete a field from the struct array Parameters ---------- field : str The name of the field to be deleted. """ if field not in self.field_names: raise ValueError(f"Field '{field}' not found") if len(self.field_names) == 1: raise ValueError("Cannot delete the last field") chunks = [] for chunk in self._pa_array.iterchunks(): chunk = cast(pa.StructArray, chunk) struct_dict = {} for pa_field in chunk.type: if pa_field.name != field: struct_dict[pa_field.name] = chunk.field(pa_field.name) struct_array = pa.StructArray.from_arrays(struct_dict.values(), struct_dict.keys()) chunks.append(struct_array) pa_array = pa.chunked_array(chunks) self._replace_pa_array(pa_array, validate=False)