Source code for pyMAISE.preprocessing

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
from sklearn.model_selection import train_test_split as sklearn_train_test_split

import pyMAISE.settings as settings
from pyMAISE.utils import _try_clear


[docs] def read_csv( path, input_slice=None, output_slice=None, header="infer", ): """ Read comma-separated values (csv) file into ``xarray.DataArray``. This is used for an entire data set and/or its inputs and outputs. Parameters ---------- path : str or list of str The path to the single input file containing both inputs and outputs or a list of two paths, the first to the inputs and the second to the outputs. input_slice : slice or None, default=None Index slice of inputs from a data set containing both inputs and outputs for a single string ``path``. output_slice : slice or None, default=None Index slice of outputs from a data set containing both inputs and outputs for a single string ``path``. header : int, 'infer', or None, default=`infer` Row number(s) containing column labels and marking the start of the data. Used in `pandas.read_csv <https://pandas.pydata.org/pandas-docs/stable/reference/\ api/pandas.read_csv.html>`_ :cite:`mckinney2010data`. Returns ------- data : xarray.DataArray The input and output data given by the files. inputs : xarray.DataArray The input data produced from either slicing ``data`` with ``input_slice`` or from reading the first file path given in ``path``. If a single path is given and ``input_slice`` and ``output_slice`` is none then only ``data`` is returned. outputs : xarray.DataArray The output data is produced in the same fashion as ``inputs`` using ``output_slice``. If a list of file paths is given for ``data`` then this corresponds to the second. """ print("Reading data from", path) if isinstance(path, str): # Read in data from path data = ( pd.read_csv(path, header=header) .to_xarray() .to_array() .transpose(..., "variable") ) if input_slice is None and output_slice is None: _try_clear() return data else: assert input_slice is not None and output_slice is not None # Separate data based on slices inputs = data.isel(variable=input_slice) outputs = data.isel(variable=output_slice) data = xr.concat([inputs, outputs], dim=data.dims[-1]) _try_clear() return data, inputs, outputs elif isinstance(path, list): # Make sure only two paths are given assert len(path) == 2 # Assuming the first path is the input and second is # output data inputs = ( pd.read_csv(path[0], header=header) .to_xarray() .to_array() .transpose(..., "variable") ) outputs = ( pd.read_csv(path[1], header=header) .to_xarray() .to_array() .transpose(..., "variable") ) # Join input and output data = inputs.combine_first(outputs) _try_clear() return data, inputs, outputs else: raise TypeError(f"path = {path} is neither a string or list of strings")
[docs] def one_hot_encode(data, **kwargs): """ One hot encode multiclass classification data. This is required for training neural network models. This function utilizes `pandas.get_dummies <https://pandas.pydata.org/pandas-docs/stable/reference/\ api/pandas.get_dummies.html>`_ :cite:`mckinney2010data`. Parameters ---------- data: xarray.DataArray All data to be one hot encoded. dtype: dtype, default=float The data type. Returns ------- dummified_data: xarray.DataArray One hot encoded data. """ assert isinstance(data, xr.DataArray) kwargs["dtype"] = kwargs.get("dtype", float) # Convert xarray.DataArray to pandas.DataFrame df = data.to_dataset(dim=data.dims[-1]).to_dataframe() # One hot encoding df = pd.get_dummies(df, **kwargs) # Return one hot encoded xarray.DataArray return ( df.to_xarray() .to_array() .rename({"variable": data.dims[-1]}) .transpose(data.dims[0], ..., data.dims[-1]) )
[docs] class SplitSequence: """ Split sequence function for rolling windows of time series data. Using a rolling windows, 2D-time series data of dimensions (time steps, features) is split according to the features defined in ``sequence_inputs``, ``sequence_outputs``, and the windows width and positional information. This results in a 3D input data set and a 2D or 3D output data set. If the data set is 3D, then rolling windows are applied to the sequences specified in ``sequence_inputs`` and ``sequence_outputs`` resulting in a 4D array. The features and windows (the last two dimensions) are combined to create a 3D data set. Features without rolling windows are specified in ``feature_inputs`` and ``feature_outputs`` and are concatenated to get 3D input and output ``xarray.DataArray`` objects. Parameters ---------- input_steps: int The window size or number of time steps for each input sample. output_steps: int The window size or number of time steps for each output sample. output_position: int The position to start the output window relative to the position of the final time step in the input window. If the last time step in the input window is at index five and ``output_position=1``, then the output window begins at index six. sequence_inputs: None or array/list of int or str Corresponds to the features (last dimension of ``data``) taken for inputs. If ``None`` then the entire data set is used for inputs. sequence_outputs: None or array/list of int or str Corresponds to the labels (last dimension of ``data``) that are taken for outputs. If ``None`` then the entire data set is used for outputs. const_inputs: None or array/list of int or str The features concatenated to the input windows that are not used in rolling windows. This is only used when ``data`` is 3D. const_outputs: None or array/list of int or str The labels concatenated to the input windows that are not used in rolling windows. This is only used when ``data`` is 3D. Examples -------- Using the 2D LOCA data set, we demonstrate rolling windows on the perturbed data. >>> from pyMAISE import datasets, preprocessing >>> _, perturbed = datasets.load_loca(stack_series=True) >>> perturbed.shape (1600000, 44) >>> sequence_outputs = [ "Pellet Cladding Temperature", "Core Pressure", "Water Level", "Break Flow Rate" ] >>> split_sequences = preprocessing.SplitSequence( 4, 1, 1, sequence_outputs=sequence_outputs ) >>> perturbed_input, perturbed_output = split_sequences.split(perturbed) >>> perturbed_input.shape (1599996, 4, 44) >>> perturbed_output.shape (1599996, 4) Alternatively, we can use the 3D perturbed LOCA data, specify the four sequential features as inputs and outputs and then add the time-independent features. >>> from pyMAISE import datasets, preprocessing >>> _, perturbed = datasets.load_loca(stack_series=False) >>> perturbed.shape (4000, 400, 44) >>> split_sequences = preprocessing.SplitSequence( input_steps=4, output_steps=1, output_position=1, sequence_inputs=range(-4, 0), sequence_outputs=range(-4, 0), const_inputs=range(40), ) >>> perturbed_input, perturbed_output = split_sequences.split(perturbed) >>> perturbed_input.shape (4000, 396, 56) >>> perturbed_output.shape (4000, 396, 4) """
[docs] def __init__(self, input_steps, output_steps, output_position, **kwargs): self._input_steps = input_steps self._output_steps = output_steps self._output_position = output_position # Defaults self._sequence_inputs = kwargs.get("sequence_inputs", None) self._sequence_outputs = kwargs.get("sequence_outputs", None) self._const_inputs = kwargs.get("const_inputs", None) self._const_outputs = kwargs.get("const_outputs", None)
[docs] def split(self, data): """ Run rolling windows. Parameters ---------- data: xarray.DataArray A data set that includes both input and output sequence data. This data can be either 2 or 3-dimensional. Returns ------- split_input : xarray.DataArray The 3D data set of input data with dimensions (samples, time steps, features). split_output : xarray.DataArray The 3D or 2D data set of output data with either dimensions (samples, time steps, labels) or (samples, labels). If ``output_steps=1`` then the time steps dimension is removed. """ # Assert our data is either 2D or 3D assert isinstance(data, xr.DataArray) assert len(data.shape) > 1 and len(data.shape) < 4 # Function for checking index type, turns lists of str to array of ints def type_based_index(index_list): if index_list is None: return data if isinstance(index_list[0], str): return data.sel(**{data.dims[-1]: index_list}) else: return data.isel(**{data.dims[-1]: index_list}) inputs = type_based_index(self._sequence_inputs) outputs = type_based_index(self._sequence_outputs) # Get feature names input_feature_strs = inputs.coords[inputs.dims[-1]].values output_feature_strs = outputs.coords[outputs.dims[-1]].values # Check the number of dimensions in the given data set # If 2D then assume (timesteps, features) # If 3D then assume (samples, timesteps, features) temporal_index = 0 if len(data.shape) == 3: temporal_index = 1 # Trim input array based on output window and position and create # rolling window x = np.lib.stride_tricks.sliding_window_view( np.take( inputs.values, np.arange( inputs.shape[temporal_index] - self._output_position - self._output_steps + 1 ), axis=temporal_index, ), self._input_steps, axis=temporal_index, ) # Trim output array based on input window and output position # and create rolling window y = np.lib.stride_tricks.sliding_window_view( np.take( outputs.values, np.arange( self._input_steps + self._output_position - 1, outputs.shape[temporal_index], ), axis=temporal_index, ), self._output_steps, axis=temporal_index, ) # Swap window and feature axes to make feature last dimension x = np.swapaxes(x, -2, -1) y = np.swapaxes(y, -2, -1) # If our windowed arrays are 4D then compress windows and features if temporal_index == 1: # Expand names windowed_features = [] windowed_labels = [] for name in input_feature_strs: windowed_features = windowed_features + [ f"{name}_{timestep}" for timestep in range(x.shape[2]) ] for name in output_feature_strs: windowed_labels = windowed_labels + [ f"{name}_{timestep}" for timestep in range(y.shape[2]) ] input_feature_strs = windowed_features output_feature_strs = windowed_labels # Stack time steps into features dimension x = x.reshape((x.shape[0], x.shape[1], -1)) y = y.reshape((y.shape[0], y.shape[1], -1)) # Add features that were not used in windows if self._const_inputs is not None: const_inputs = type_based_index(self._const_inputs) input_feature_strs = ( list(const_inputs.coords[const_inputs.dims[-1]].values) + input_feature_strs ) x = np.concatenate( ( np.take( const_inputs.values, np.arange(x.shape[temporal_index]), axis=temporal_index, ), x, ), axis=2, ) if self._const_outputs is not None: const_outputs = type_based_index(self._const_outputs) output_feature_strs = ( list(const_outputs.coords[const_outputs.dims[-1]].values) + output_feature_strs ) y = np.concatenate( ( np.take( const_outputs.values, np.arange(x.shape[temporal_index]), axis=temporal_index, ), y, ), axis=2, ) # Create input xarray.DataArray split_inputs = xr.DataArray( x, coords={ "samples": np.arange(x.shape[0]), inputs.dims[temporal_index]: np.arange(x.shape[1]), inputs.dims[-1]: input_feature_strs, }, ) # Create output xarray.DataArray, if we have only one output_step # then we ommit the time dimension split_outputs = ( xr.DataArray( y.reshape(-1, y.shape[-1]), coords={ "samples": np.arange(y.shape[0]), outputs.dims[-1]: output_feature_strs, }, ) if y.shape[1] == 1 else xr.DataArray( y, coords={ "samples": np.arange(y.shape[0]), inputs.dims[temporal_index]: np.arange(y.shape[1]), outputs.dims[-1]: output_feature_strs, }, ) ) return split_inputs, split_outputs
[docs] def train_test_split( data, test_size=0.3, shuffle=True, ): """ Split data into training and testing data sets. Parameters ---------- data: xarray.DataArray or list of 2 xarray.DataArray[s] Data to be split. Assumes the first dimension is the sample dimension. test_size: float between 0 and 1 Percentage of the data used for testing. shuffle: bool, default=False Shuffle samples during split. Returns ------- split_data: tuple of xarray.DataArray If an ``xarray.DataArray`` was given for ``data`` then split_data consists of ``(data_train, data_test)``. If a list of 2 ``xarray.DataArray`` objects, ``[x, y]``, then split_data consists of ``(x_train, x_test, y_train, y_test)``. """ if isinstance(data, xr.DataArray): data = [data] if not all( (isinstance(x, xr.DataArray) and x.shape[0] == data[0].shape[0] for x in data) ): raise RuntimeError( "data must either be an xarray.DataArray or a list of " + "xarray.DataArray with the same number of samples (dimension 0)" ) samples_idx = np.arange(data[0].shape[0]) train_idx, test_idx = sklearn_train_test_split( samples_idx, test_size=test_size, random_state=settings.values.random_state, shuffle=shuffle, ) # Split into train and test using indices split_data = [] for array in data: split_data.append(array[train_idx,]) split_data.append(array[test_idx,]) return tuple(split_data)
[docs] def scale_data(train_data, test_data, scaler): """ Scale training and testing data using the scaler provided. This method returns the fit scalar which can be used to scale any additional data. Example ------- Given the following 2D ``xarray.DataArrays`` of shape (samples, features/labels), ``xtrain`` and ``xtest``, we can apply the ``pyMAISE.preprocessing.scale_data()`` method, .. code:: python from pyMAISE.preprocessing import scale_data from sklearn.preprocessing import MinMaxScaler xtrain, xtest, xscaler = scale_data(xtrain, xtest, MinMaxScaler) We can then scale an additional dataset, ``xvalid``, that matches the format of ``xtrain`` and ``xtest`` by running .. code:: python xvalid.values = xscaler.transform(xvalid.values) Parameters ---------- train_data: xarray.DataArray Training data. test_data: xarray.DataArray Testing data. scaler: callable An object with ``fit_transform`` and ``transform`` methods such as `min-max scaler from sklearn <https://scikit-learn.org/stable/\ modules/generated/sklearn.preprocessing.MinMaxScaler.html>`_ :cite:`scikit-learn`. Returns ------- train_data: xarray.DataArray Scaled training data. test_data: xarray.DataArray Scaled testing data based on ``scaler`` fit on ``train_data``. scaler: callable The scaler given, fit and used to scale the given data. """ # Check that our train_data and test_data are xarray.DataArray[s] if not ( isinstance(train_data, xr.DataArray) and isinstance(test_data, xr.DataArray) ): raise TypeError("train_data and test_data must be type xarray.DataArray") # Check the dimensions of train_data and test_data for all but samples dimension if not (train_data.shape[1:] == test_data.shape[1:]): raise RuntimeError( "train_data and test_data must have the same number of\n" + "dimensions and the same size in all but in dimension 0" ) # Transform data train_data.values = scaler.fit_transform( train_data.values.reshape(-1, train_data.shape[-1]) ).reshape(train_data.shape) test_data.values = scaler.transform( test_data.values.reshape(-1, test_data.shape[-1]) ).reshape(test_data.shape) # Return data return train_data, test_data, scaler
[docs] def correlation_matrix( data, method="pearson", min_periods=1, fig=None, ax=None, colorbar=True, annotations=False, ): """ Create a correlation matrix for a data set. This function uses `pandas.DataFrame.corr <https://pandas.pydata.org/pandas-docs/stable/\ reference/api/pandas.DataFrame.corr.html>`_ :cite:`mckinney2010data`, for ``method`` and ``min_periods`` please refer to their documentation. Parameters ---------- data: xarray.DataArray Raw data. fig: matplotlib.figure or None, default=None Figure object. If ``None`` then one is created. ax: matplotlib.pyplot.axis or None, default=None Axis object. If ``None`` then one is created. colorbar: bool, default=True Add colorbar to plot. annotations: bool, default=False Add annotations to elements. Returns ------- fig: matplotlib.figure Created or provided figure. ax: matplotlib.pyplot.axis Created or provided axis. """ # Create correlation matrix DataFrame corr = data.to_dataset(dim=data.dims[-1]).to_dataframe().corr(method, min_periods) # Get figure and axis object if not provided if ax is None and fig is None: fig, ax = plt.subplots() # Create heatmap im = ax.imshow(corr) # Set x and y ticks to column headers ax.set_xticks(np.arange(corr.shape[0]), labels=corr.columns, rotation=65) ax.set_yticks(np.arange(corr.shape[1]), labels=corr.columns, rotation=0) # Add colorbar if colorbar == True if colorbar: fig.colorbar(im) # Add annotations of each value in square if annotations == True if annotations: for i in range(len(corr.columns)): for j in range(len(corr.columns)): ax.text( j, i, round(corr.to_numpy()[i, j], 2), ha="center", va="center", color="black", ) return (fig, ax)