Source code for pybamm.solvers.processed_variable

#
# Processed Variable class
#
from typing import Optional
import casadi
import numpy as np
import pybamm
from scipy.integrate import cumulative_trapezoid
import xarray as xr
import bisect


[docs] class ProcessedVariable: """ An object that can be evaluated at arbitrary (scalars or vectors) t and x, and returns the (interpolated) value of the base variable at that t and x. Parameters ---------- base_variables : list of :class:`pybamm.Symbol` A list of base variables with a method `evaluate(t,y)`, each entry of which returns the value of that variable for that particular sub-solution. A Solution can be comprised of sub-solutions which are the solutions of different models. Note that this can be any kind of node in the expression tree, not just a :class:`pybamm.Variable`. When evaluated, returns an array of size (m,n) base_variables_casadi : list of :class:`casadi.Function` A list of casadi functions. When evaluated, returns the same thing as `base_Variable.evaluate` (but more efficiently). solution : :class:`pybamm.Solution` The solution object to be used to create the processed variables time_integral : :class:`pybamm.ProcessedVariableTimeIntegral`, optional Not none if the variable is to be time-integrated (default is None) """ def __init__( self, base_variables, base_variables_casadi, solution, time_integral: Optional[pybamm.ProcessedVariableTimeIntegral] = None, ): self.base_variables = base_variables self.base_variables_casadi = base_variables_casadi self.all_ts = solution.all_ts self.all_ys = solution.all_ys self.all_yps = solution.all_yps self.all_inputs = solution.all_inputs self.all_inputs_casadi = solution.all_inputs_casadi self.mesh = base_variables[0].mesh self.domain = base_variables[0].domain self.domains = base_variables[0].domains self.time_integral = time_integral # Process spatial variables geometry = solution.all_models[0].geometry self.spatial_variables = {} for domain_level, domain_names in self.domains.items(): variables = [] for domain in domain_names: variables += list(geometry[domain].keys()) self.spatial_variables[domain_level] = variables # Sensitivity starts off uninitialized, only set when called self._sensitivities = None self.all_solution_sensitivities = solution._all_sensitivities # Store time self.t_pts = solution.t # Evaluate base variable at initial time self.base_eval_shape = self.base_variables[0].shape self.base_eval_size = self.base_variables[0].size self._xr_array_raw = None self._entries_raw = None self._entries_for_interp_raw = None self._coords_raw = None def initialise(self): if self.entries_raw_initialized: return entries = self.observe_raw() t = self.t_pts entries_for_interp, coords = self._interp_setup(entries, t) self._entries_raw = entries self._entries_for_interp_raw = entries_for_interp self._coords_raw = coords
[docs] def observe_and_interp(self, t, fill_value): """ Interpolate the variable at the given time points and y values. t must be a sorted array of time points. """ entries = self._observe_hermite_cpp(t) processed_entries = self._observe_postfix(entries, t) tf = self.t_pts[-1] if t[-1] > tf and fill_value != "extrapolate": # fill the rest idx = np.searchsorted(t, tf, side="right") processed_entries[..., idx:] = fill_value return processed_entries
[docs] def observe_raw(self): """ Evaluate the base variable at the given time points and y values. """ t = self.t_pts # For small number of points, use Python if pybamm.has_idaklu(): entries = self._observe_raw_cpp() else: # Fallback method for when IDAKLU is not available. To be removed # when the C++ code is migrated to a new repo entries = self._observe_raw_python() # pragma: no cover return self._observe_postfix(entries, t)
def _setup_cpp_inputs(self, t, full_range): pybamm.logger.debug("Setting up C++ interpolation inputs") ts = self.all_ts ys = self.all_ys yps = self.all_yps inputs = self.all_inputs_casadi # Remove all empty ts idxs = np.where([ti.size > 0 for ti in ts])[0] # Find the indices of the time points to observe if not full_range: ts_nonempty = [ts[idx] for idx in idxs] idxs_subset = _find_ts_indices(ts_nonempty, t) idxs = idxs[idxs_subset] # Extract the time points and inputs ts = [ts[idx] for idx in idxs] ys = [ys[idx] for idx in idxs] if self.hermite_interpolation: yps = [yps[idx] for idx in idxs] inputs = [self.all_inputs_casadi[idx] for idx in idxs] is_f_contiguous = _is_f_contiguous(ys) ts = pybamm.solvers.idaklu_solver.idaklu.VectorRealtypeNdArray(ts) ys = pybamm.solvers.idaklu_solver.idaklu.VectorRealtypeNdArray(ys) if self.hermite_interpolation: yps = pybamm.solvers.idaklu_solver.idaklu.VectorRealtypeNdArray(yps) else: yps = None inputs = pybamm.solvers.idaklu_solver.idaklu.VectorRealtypeNdArray(inputs) # Generate the serialized C++ functions only once funcs_unique = {} funcs = [None] * len(idxs) for i in range(len(idxs)): vars = self.base_variables_casadi[idxs[i]] if vars not in funcs_unique: funcs_unique[vars] = vars.serialize() funcs[i] = funcs_unique[vars] return ts, ys, yps, funcs, inputs, is_f_contiguous def _observe_hermite_cpp(self, t): pybamm.logger.debug("Observing and Hermite interpolating the variable in C++") ts, ys, yps, funcs, inputs, _ = self._setup_cpp_inputs(t, full_range=False) shapes = self._shape(t) return pybamm.solvers.idaklu_solver.idaklu.observe_hermite_interp( t, ts, ys, yps, inputs, funcs, shapes ) def _observe_raw_cpp(self): pybamm.logger.debug("Observing the variable raw data in C++") t = self.t_pts ts, ys, _, funcs, inputs, is_f_contiguous = self._setup_cpp_inputs( t, full_range=True ) shapes = self._shape(self.t_pts) return pybamm.solvers.idaklu_solver.idaklu.observe( ts, ys, inputs, funcs, is_f_contiguous, shapes ) def _observe_raw_python(self): raise NotImplementedError # pragma: no cover def _observe_postfix(self, entries, t): return entries def _interp_setup(self, entries, t): raise NotImplementedError # pragma: no cover def _shape(self, t): raise NotImplementedError # pragma: no cover def _process_spatial_variable_names(self, spatial_variable): if len(spatial_variable) == 0: return None # Extract names raw_names = [] for var in spatial_variable: # Ignore tabs in domain names if var == "tabs": continue if isinstance(var, str): raw_names.append(var) else: raw_names.append(var.name) # Rename battery variables to match PyBaMM convention if all([var.startswith("r") for var in raw_names]): return "r" elif all([var.startswith("x") for var in raw_names]): return "x" elif all([var.startswith("R") for var in raw_names]): return "R" elif len(raw_names) == 1: return raw_names[0] else: raise NotImplementedError( f"Spatial variable name not recognized for {spatial_variable}" ) def __call__( self, t=None, x=None, r=None, y=None, z=None, R=None, fill_value=np.nan, ): # Check to see if we are interpolating exactly onto the raw solution time points t_observe, observe_raw = self._check_observe_raw(t) # Check if the time points are sorted and unique is_sorted = observe_raw or _is_sorted(t_observe) # Sort them if not if not is_sorted: idxs_sort = np.argsort(t_observe) t_observe = t_observe[idxs_sort] hermite_time_interp = ( pybamm.has_idaklu() and self.hermite_interpolation and not observe_raw ) if hermite_time_interp: entries = self.observe_and_interp(t_observe, fill_value) spatial_interp = any(a is not None for a in [x, r, y, z, R]) xr_interp = spatial_interp or not hermite_time_interp if xr_interp: if hermite_time_interp: # Already interpolated in time t = None entries_for_interp, coords = self._interp_setup(entries, t_observe) else: self.initialise() entries_for_interp, coords = ( self._entries_for_interp_raw, self._coords_raw, ) if self.time_integral is None: processed_entries = self._xr_interpolate( entries_for_interp, coords, observe_raw, t, x, r, y, z, R, fill_value, ) else: processed_entries = entries_for_interp else: processed_entries = entries if not is_sorted: idxs_unsort = np.zeros_like(idxs_sort) idxs_unsort[idxs_sort] = np.arange(len(t_observe)) processed_entries = processed_entries[..., idxs_unsort] # Remove a singleton time dimension if we interpolate in time with hermite if hermite_time_interp and t_observe.size == 1: processed_entries = np.squeeze(processed_entries, axis=-1) return processed_entries def _xr_interpolate( self, entries_for_interp, coords, observe_raw, t=None, x=None, r=None, y=None, z=None, R=None, fill_value=None, ): """ Evaluate the variable at arbitrary *dimensional* t (and x, r, y, z and/or R), using interpolation """ if observe_raw: if not self.xr_array_raw_initialized: self._xr_array_raw = xr.DataArray(entries_for_interp, coords=coords) xr_data_array = self._xr_array_raw else: xr_data_array = xr.DataArray(entries_for_interp, coords=coords) kwargs = {"t": t, "x": x, "r": r, "y": y, "z": z, "R": R} # Remove any None arguments kwargs = {key: value for key, value in kwargs.items() if value is not None} # Use xarray interpolation, return numpy array out = xr_data_array.interp(**kwargs, kwargs={"fill_value": fill_value}).values return out def _check_observe_raw(self, t): """ Checks if the raw data should be observed exactly at the solution time points Args: t (np.ndarray, list, None): time points to observe Returns: t_observe (np.ndarray): time points to observe observe_raw (bool): True if observing the raw data """ # if this is a time integral variable, t must be None and we observe either the # data times (for a discrete sum) or the solution times (for a continuous sum) if self.time_integral is not None: if self.time_integral.method == "discrete": # discrete sum should be observed at the discrete times t = self.time_integral.discrete_times else: # assume we can do a sufficiently accurate trapezoidal integration at t_pts t = self.t_pts observe_raw = (t is None) or ( np.asarray(t).size == len(self.t_pts) and np.all(t == self.t_pts) ) if observe_raw: t_observe = self.t_pts elif not isinstance(t, np.ndarray): if not isinstance(t, list): t = [t] t_observe = np.array(t) else: t_observe = t if t_observe[0] < self.t_pts[0]: raise ValueError( "The interpolation points must be greater than or equal to the initial solution time" ) return t_observe, observe_raw @property def entries(self): """ Returns the raw data entries of the processed variable. If the processed variable has not been initialized (i.e. the entries have not been calculated), then the processed variable is initialized first. """ self.initialise() return self._entries_raw @property def data(self): """Same as entries, but different name""" return self.entries @property def entries_raw_initialized(self): return self._entries_raw is not None @property def xr_array_raw_initialized(self): return self._xr_array_raw is not None @property def sensitivities(self): """ Returns a dictionary of sensitivities for each input parameter. The keys are the input parameters, and the value is a matrix of size (n_x * n_t, n_p), where n_x is the number of states, n_t is the number of time points, and n_p is the size of the input parameter """ # No sensitivities if there are no inputs if len(self.all_inputs[0]) == 0: return {} # Otherwise initialise and return sensitivities if self._sensitivities is None: if self.all_solution_sensitivities: self.initialise_sensitivity_explicit_forward() else: raise ValueError( "Cannot compute sensitivities. The 'calculate_sensitivities' " "argument of the solver.solve should be changed from 'None' to " "allow sensitivities calculations. Check solver documentation for " "details." ) return self._sensitivities
[docs] def initialise_sensitivity_explicit_forward(self): "Set up the sensitivity dictionary" all_S_var = [] for ts, ys, inputs_stacked, inputs, base_variable, dy_dp in zip( self.all_ts, self.all_ys, self.all_inputs_casadi, self.all_inputs, self.base_variables, self.all_solution_sensitivities["all"], ): # Set up symbolic variables t_casadi = casadi.MX.sym("t") y_casadi = casadi.MX.sym("y", ys.shape[0]) p_casadi = { name: casadi.MX.sym(name, value.shape[0]) for name, value in inputs.items() } p_casadi_stacked = casadi.vertcat(*[p for p in p_casadi.values()]) # Convert variable to casadi format for differentiating var_casadi = base_variable.to_casadi(t_casadi, y_casadi, inputs=p_casadi) dvar_dy = casadi.jacobian(var_casadi, y_casadi) dvar_dp = casadi.jacobian(var_casadi, p_casadi_stacked) # Convert to functions and evaluate index-by-index dvar_dy_func = casadi.Function( "dvar_dy", [t_casadi, y_casadi, p_casadi_stacked], [dvar_dy] ) dvar_dp_func = casadi.Function( "dvar_dp", [t_casadi, y_casadi, p_casadi_stacked], [dvar_dp] ) for idx, t in enumerate(ts): u = ys[:, idx] next_dvar_dy_eval = dvar_dy_func(t, u, inputs_stacked) next_dvar_dp_eval = dvar_dp_func(t, u, inputs_stacked) if idx == 0: dvar_dy_eval = next_dvar_dy_eval dvar_dp_eval = next_dvar_dp_eval else: dvar_dy_eval = casadi.diagcat(dvar_dy_eval, next_dvar_dy_eval) dvar_dp_eval = casadi.vertcat(dvar_dp_eval, next_dvar_dp_eval) # Compute sensitivity S_var = dvar_dy_eval @ dy_dp + dvar_dp_eval all_S_var.append(S_var) S_var = casadi.vertcat(*all_S_var) sensitivities = {"all": S_var} # Add the individual sensitivity start = 0 for name, inp in self.all_inputs[0].items(): end = start + inp.shape[0] sensitivities[name] = S_var[:, start:end] start = end # Save attribute self._sensitivities = sensitivities
@property def hermite_interpolation(self): return self.all_yps is not None
class ProcessedVariable0D(ProcessedVariable): def __init__( self, base_variables, base_variables_casadi, solution, time_integral: Optional[pybamm.ProcessedVariableTimeIntegral] = None, ): self.dimensions = 0 super().__init__( base_variables, base_variables_casadi, solution, time_integral=time_integral, ) def _observe_raw_python(self): pybamm.logger.debug("Observing the variable raw data in Python") # initialise empty array of the correct size entries = np.empty(self._shape(self.t_pts)) idx = 0 # Evaluate the base_variable index-by-index for ts, ys, inputs, base_var_casadi in zip( self.all_ts, self.all_ys, self.all_inputs_casadi, self.base_variables_casadi ): for inner_idx, t in enumerate(ts): t = ts[inner_idx] y = ys[:, inner_idx] entries[idx] = float(base_var_casadi(t, y, inputs)) idx += 1 return entries def _observe_postfix(self, entries, t): if self.time_integral is None: return entries if self.time_integral.method == "discrete": return np.sum(entries, axis=0, initial=self.time_integral.initial_condition) elif self.time_integral.method == "continuous": return cumulative_trapezoid( entries, self.t_pts, initial=float(self.time_integral.initial_condition) ) else: raise ValueError( "time_integral method must be 'discrete' or 'continuous'" ) # pragma: no cover def _interp_setup(self, entries, t): # save attributes for interpolation entries_for_interp = entries coords_for_interp = {"t": t} return entries_for_interp, coords_for_interp def _shape(self, t): return [len(t)] class ProcessedVariable1D(ProcessedVariable): """ An object that can be evaluated at arbitrary (scalars or vectors) t and x, and returns the (interpolated) value of the base variable at that t and x. Parameters ---------- base_variables : list of :class:`pybamm.Symbol` A list of base variables with a method `evaluate(t,y)`, each entry of which returns the value of that variable for that particular sub-solution. A Solution can be comprised of sub-solutions which are the solutions of different models. Note that this can be any kind of node in the expression tree, not just a :class:`pybamm.Variable`. When evaluated, returns an array of size (m,n) base_variables_casadi : list of :class:`casadi.Function` A list of casadi functions. When evaluated, returns the same thing as `base_Variable.evaluate` (but more efficiently). solution : :class:`pybamm.Solution` The solution object to be used to create the processed variables """ def __init__( self, base_variables, base_variables_casadi, solution, time_integral: Optional[pybamm.ProcessedVariableTimeIntegral] = None, ): self.dimensions = 1 super().__init__( base_variables, base_variables_casadi, solution, time_integral=time_integral, ) def _observe_raw_python(self): pybamm.logger.debug("Observing the variable raw data in Python") entries = np.empty(self._shape(self.t_pts)) # Evaluate the base_variable index-by-index idx = 0 for ts, ys, inputs, base_var_casadi in zip( self.all_ts, self.all_ys, self.all_inputs_casadi, self.base_variables_casadi ): for inner_idx, t in enumerate(ts): t = ts[inner_idx] y = ys[:, inner_idx] entries[:, idx] = base_var_casadi(t, y, inputs).full()[:, 0] idx += 1 return entries def _interp_setup(self, entries, t): # Get node and edge values nodes = self.mesh.nodes edges = self.mesh.edges if entries.shape[0] == len(nodes): space = nodes elif entries.shape[0] == len(edges): space = edges # add points outside domain for extrapolation to boundaries extrap_space_left = np.array([2 * space[0] - space[1]]) extrap_space_right = np.array([2 * space[-1] - space[-2]]) space = np.concatenate([extrap_space_left, space, extrap_space_right]) extrap_entries_left = 2 * entries[0] - entries[1] extrap_entries_right = 2 * entries[-1] - entries[-2] entries_for_interp = np.vstack( [extrap_entries_left, entries, extrap_entries_right] ) # assign attributes for reference (either x_sol or r_sol) self.spatial_variable_names = { k: self._process_spatial_variable_names(v) for k, v in self.spatial_variables.items() } self.first_dimension = self.spatial_variable_names["primary"] # assign attributes for reference pts_for_interp = space self.internal_boundaries = self.mesh.internal_boundaries # Set first_dim_pts to edges for nicer plotting self.first_dim_pts = edges # save attributes for interpolation coords_for_interp = {self.first_dimension: pts_for_interp, "t": t} return entries_for_interp, coords_for_interp def _shape(self, t): t_size = len(t) space_size = self.base_eval_shape[0] return [space_size, t_size] class ProcessedVariable2D(ProcessedVariable): """ An object that can be evaluated at arbitrary (scalars or vectors) t and x, and returns the (interpolated) value of the base variable at that t and x. Parameters ---------- base_variables : list of :class:`pybamm.Symbol` A list of base variables with a method `evaluate(t,y)`, each entry of which returns the value of that variable for that particular sub-solution. A Solution can be comprised of sub-solutions which are the solutions of different models. Note that this can be any kind of node in the expression tree, not just a :class:`pybamm.Variable`. When evaluated, returns an array of size (m,n) base_variables_casadi : list of :class:`casadi.Function` A list of casadi functions. When evaluated, returns the same thing as `base_Variable.evaluate` (but more efficiently). solution : :class:`pybamm.Solution` The solution object to be used to create the processed variables """ def __init__( self, base_variables, base_variables_casadi, solution, time_integral: Optional[pybamm.ProcessedVariableTimeIntegral] = None, ): self.dimensions = 2 super().__init__( base_variables, base_variables_casadi, solution, time_integral=time_integral, ) first_dim_nodes = self.mesh.nodes first_dim_edges = self.mesh.edges second_dim_nodes = self.base_variables[0].secondary_mesh.nodes if self.base_eval_size // len(second_dim_nodes) == len(first_dim_nodes): first_dim_pts = first_dim_nodes elif self.base_eval_size // len(second_dim_nodes) == len(first_dim_edges): first_dim_pts = first_dim_edges second_dim_pts = second_dim_nodes self.first_dim_size = len(first_dim_pts) self.second_dim_size = len(second_dim_pts) def _observe_raw_python(self): """ Initialise a 2D object that depends on x and r, x and z, x and R, or R and r. """ pybamm.logger.debug("Observing the variable raw data in Python") first_dim_size, second_dim_size, t_size = self._shape(self.t_pts) entries = np.empty((first_dim_size, second_dim_size, t_size)) # Evaluate the base_variable index-by-index idx = 0 for ts, ys, inputs, base_var_casadi in zip( self.all_ts, self.all_ys, self.all_inputs_casadi, self.base_variables_casadi ): for inner_idx, t in enumerate(ts): t = ts[inner_idx] y = ys[:, inner_idx] entries[:, :, idx] = np.reshape( base_var_casadi(t, y, inputs).full(), [first_dim_size, second_dim_size], order="F", ) idx += 1 return entries def _interp_setup(self, entries, t): """ Initialise a 2D object that depends on x and r, x and z, x and R, or R and r. """ first_dim_nodes = self.mesh.nodes first_dim_edges = self.mesh.edges second_dim_nodes = self.base_variables[0].secondary_mesh.nodes second_dim_edges = self.base_variables[0].secondary_mesh.edges if self.base_eval_size // len(second_dim_nodes) == len(first_dim_nodes): first_dim_pts = first_dim_nodes elif self.base_eval_size // len(second_dim_nodes) == len(first_dim_edges): first_dim_pts = first_dim_edges second_dim_pts = second_dim_nodes # add points outside first dimension domain for extrapolation to # boundaries extrap_space_first_dim_left = np.array( [2 * first_dim_pts[0] - first_dim_pts[1]] ) extrap_space_first_dim_right = np.array( [2 * first_dim_pts[-1] - first_dim_pts[-2]] ) first_dim_pts = np.concatenate( [extrap_space_first_dim_left, first_dim_pts, extrap_space_first_dim_right] ) extrap_entries_left = np.expand_dims(2 * entries[0] - entries[1], axis=0) extrap_entries_right = np.expand_dims(2 * entries[-1] - entries[-2], axis=0) entries_for_interp = np.concatenate( [extrap_entries_left, entries, extrap_entries_right], axis=0 ) # add points outside second dimension domain for extrapolation to # boundaries extrap_space_second_dim_left = np.array( [2 * second_dim_pts[0] - second_dim_pts[1]] ) extrap_space_second_dim_right = np.array( [2 * second_dim_pts[-1] - second_dim_pts[-2]] ) second_dim_pts = np.concatenate( [ extrap_space_second_dim_left, second_dim_pts, extrap_space_second_dim_right, ] ) extrap_entries_second_dim_left = np.expand_dims( 2 * entries_for_interp[:, 0, :] - entries_for_interp[:, 1, :], axis=1 ) extrap_entries_second_dim_right = np.expand_dims( 2 * entries_for_interp[:, -1, :] - entries_for_interp[:, -2, :], axis=1 ) entries_for_interp = np.concatenate( [ extrap_entries_second_dim_left, entries_for_interp, extrap_entries_second_dim_right, ], axis=1, ) self.spatial_variable_names = { k: self._process_spatial_variable_names(v) for k, v in self.spatial_variables.items() } self.first_dimension = self.spatial_variable_names["primary"] self.second_dimension = self.spatial_variable_names["secondary"] # assign attributes for reference first_dim_pts_for_interp = first_dim_pts second_dim_pts_for_interp = second_dim_pts # Set pts to edges for nicer plotting self.first_dim_pts = first_dim_edges self.second_dim_pts = second_dim_edges # save attributes for interpolation coords_for_interp = { self.first_dimension: first_dim_pts_for_interp, self.second_dimension: second_dim_pts_for_interp, "t": t, } return entries_for_interp, coords_for_interp def _shape(self, t): first_dim_size = self.first_dim_size second_dim_size = self.second_dim_size t_size = len(t) return [first_dim_size, second_dim_size, t_size] class ProcessedVariable2DSciKitFEM(ProcessedVariable2D): """ An object that can be evaluated at arbitrary (scalars or vectors) t and x, and returns the (interpolated) value of the base variable at that t and x. Parameters ---------- base_variables : list of :class:`pybamm.Symbol` A list of base variables with a method `evaluate(t,y)`, each entry of which returns the value of that variable for that particular sub-solution. A Solution can be comprised of sub-solutions which are the solutions of different models. Note that this can be any kind of node in the expression tree, not just a :class:`pybamm.Variable`. When evaluated, returns an array of size (m,n) base_variables_casadi : list of :class:`casadi.Function` A list of casadi functions. When evaluated, returns the same thing as `base_Variable.evaluate` (but more efficiently). solution : :class:`pybamm.Solution` The solution object to be used to create the processed variables """ def __init__( self, base_variables, base_variables_casadi, solution, time_integral: Optional[pybamm.ProcessedVariableTimeIntegral] = None, ): self.dimensions = 2 super(ProcessedVariable2D, self).__init__( base_variables, base_variables_casadi, solution, time_integral=time_integral, ) y_sol = self.mesh.edges["y"] z_sol = self.mesh.edges["z"] self.first_dim_size = len(y_sol) self.second_dim_size = len(z_sol) def _interp_setup(self, entries, t): y_sol = self.mesh.edges["y"] z_sol = self.mesh.edges["z"] # assign attributes for reference self.y_sol = y_sol self.z_sol = z_sol self.first_dimension = "y" self.second_dimension = "z" self.first_dim_pts = y_sol self.second_dim_pts = z_sol # save attributes for interpolation coords_for_interp = {"y": y_sol, "z": z_sol, "t": t} return entries, coords_for_interp def process_variable(base_variables, *args, **kwargs): mesh = base_variables[0].mesh domain = base_variables[0].domain # Evaluate base variable at initial time base_eval_shape = base_variables[0].shape base_eval_size = base_variables[0].size # handle 2D (in space) finite element variables differently if ( mesh and "current collector" in domain and isinstance(mesh, pybamm.ScikitSubMesh2D) ): return ProcessedVariable2DSciKitFEM(base_variables, *args, **kwargs) # check variable shape if len(base_eval_shape) == 0 or base_eval_shape[0] == 1: return ProcessedVariable0D(base_variables, *args, **kwargs) n = mesh.npts base_shape = base_eval_shape[0] # Try some shapes that could make the variable a 1D variable if base_shape in [n, n + 1]: return ProcessedVariable1D(base_variables, *args, **kwargs) # Try some shapes that could make the variable a 2D variable first_dim_nodes = mesh.nodes first_dim_edges = mesh.edges second_dim_pts = base_variables[0].secondary_mesh.nodes if base_eval_size // len(second_dim_pts) in [ len(first_dim_nodes), len(first_dim_edges), ]: return ProcessedVariable2D(base_variables, *args, **kwargs) # Raise error for 3D variable raise NotImplementedError( f"Shape not recognized for {base_variables[0]}" + "(note processing of 3D variables is not yet implemented)" ) def _is_f_contiguous(all_ys): """ Check if all the ys are f-contiguous in memory Args: all_ys (list of np.ndarray): list of all ys Returns: bool: True if all ys are f-contiguous """ return all(isinstance(y, np.ndarray) and y.data.f_contiguous for y in all_ys) def _is_sorted(t): """ Check if an array is sorted Args: t (np.ndarray): array to check Returns: bool: True if array is sorted """ return np.all(t[:-1] <= t[1:]) def _find_ts_indices(ts, t): """ Parameters: - ts: A list of numpy arrays (each sorted) whose values are successively increasing. - t: A sorted list or array of values to find within ts. Returns: - indices: A list of indices from `ts` such that at least one value of `t` falls within ts[idx]. """ indices = [] # Get the minimum and maximum values of the target values `t` t_min, t_max = t[0], t[-1] # Step 1: Use binary search to find the range of `ts` arrays where t_min and t_max could lie low_idx = bisect.bisect_left([ts_arr[-1] for ts_arr in ts], t_min) high_idx = bisect.bisect_right([ts_arr[0] for ts_arr in ts], t_max) # Step 2: Iterate over the identified range for idx in range(low_idx, high_idx): ts_min, ts_max = ts[idx][0], ts[idx][-1] # Binary search within `t` to check if any value falls within [ts_min, ts_max] i = bisect.bisect_left(t, ts_min) if i < len(t) and t[i] <= ts_max: # At least one value of t is within ts[idx] indices.append(idx) # extrapolating if (t[-1] > ts[-1][-1]) and (len(indices) == 0 or indices[-1] != len(ts) - 1): indices.append(len(ts) - 1) return indices