Source code for odl.discr.discr_ops

# Copyright 2014-2020 The ODL contributors
#
# This file is part of ODL.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

"""Operators defined on `DiscretizedSpace`."""

from __future__ import absolute_import, division, print_function

import numpy as np

from odl.discr.discr_space import DiscretizedSpace
from odl.discr.discr_utils import (
    _normalize_interp, per_axis_interpolator, point_collocation)
from odl.discr.partition import uniform_partition
from odl.operator import Operator
from odl.space import tensor_space
from odl.util import (
    normalized_scalar_param_list, resize_array, safe_int_conv, writable_array)
from odl.util.numerics import _SUPPORTED_RESIZE_PAD_MODES
from odl.util.utility import nullcontext

__all__ = ('Resampling', 'ResizingOperator')


[docs]class Resampling(Operator): """An operator that resamples on a different grid in the same set."""
[docs] def __init__(self, domain, range, interp): """Initialize a new instance. Parameters ---------- domain : `DiscretizedSpace` Set of elements that are to be resampled. range : `DiscretizedSpace` Set in which the resampled elements lie. Must have the same `DiscretizedSpace.domain` as ``domain``. interp : str or sequence of str Interpolation type that should be used to resample. A single value applies to all axes, and a sequence gives the interpolation scheme per axis. Supported values: ``'nearest'``, ``'linear'`` Examples -------- Create two spaces with different number of points and a resampling operator using nearest-neighbor interpolation: >>> coarse_discr = odl.uniform_discr(0, 1, 3) >>> fine_discr = odl.uniform_discr(0, 1, 6) >>> resampling = odl.Resampling(coarse_discr, fine_discr, 'nearest') >>> resampling.domain uniform_discr(0.0, 1.0, 3) >>> resampling.range uniform_discr(0.0, 1.0, 6) >>> resampling.interp 'nearest' Apply the corresponding resampling operator to an element: >>> print(resampling([0, 1, 0])) [ 0., 0., 1., 1., 0., 0.] With linear interpolation: >>> resampling = odl.Resampling(coarse_discr, fine_discr, 'linear') >>> print(resampling([0, 1, 0])) [ 0. , 0.25, 0.75, 0.75, 0.25, 0. ] """ if domain.domain != range.domain: raise ValueError( '`domain.domain` ({}) does not match `range.domain` ({})' ''.format(domain.domain, range.domain) ) super(Resampling, self).__init__( domain=domain, range=range, linear=True) self.__interp_byaxis = _normalize_interp(interp, domain.ndim)
@property def interp_byaxis(self): """Tuple of per-axis interpolation schemes.""" return self.__interp_byaxis @property def interp(self): """Interpolation scheme or tuple of per-axis interpolation schemes.""" if ( len(self.interp_byaxis) != 0 and all(s == self.interp_byaxis[0] for s in self.interp_byaxis[1:]) ): return self.interp_byaxis[0] else: return self.interp_byaxis
[docs] def _call(self, x, out=None): """Apply resampling operator. The element ``x`` is resampled using the sampling and interpolation operators of the underlying spaces. """ interpolator = per_axis_interpolator( x, self.domain.grid.coord_vectors, self.interp ) out_ctx = nullcontext() if out is None else writable_array(out) with out_ctx as out_arr: return point_collocation( interpolator, self.range.meshgrid, out=out_arr )
@property def inverse(self): """An (approximate) inverse of this resampling operator. The returned operator is resampling defined in the opposite direction. See Also -------- adjoint : resampling is unitary, so the adjoint is the inverse. """ return Resampling(self.range, self.domain, self.interp) @property def adjoint(self): """Return an (approximate) adjoint. The result is only exact if the interpolation and sampling operators of the underlying spaces match exactly. Returns ------- adjoint : Resampling Resampling operator defined in the opposite direction. Examples -------- Create resampling operator and inverse: >>> coarse_discr = odl.uniform_discr(0, 1, 3) >>> fine_discr = odl.uniform_discr(0, 1, 6) >>> resampling = odl.Resampling(coarse_discr, fine_discr, 'nearest') >>> resampling_inv = resampling.inverse The inverse is a proper left inverse if the resampling goes from a coarser to a finer sampling: >>> x = [0, 1, 0] >>> print(resampling_inv(resampling(x))) [ 0., 1., 0.] However, it can fail in the other direction: >>> y = [0, 0, 0, 1, 0, 0] >>> print(resampling(resampling_inv(y))) [ 0., 0., 1., 1., 0., 0.] """ return self.inverse
[docs]class ResizingOperator(Operator): """Operator mapping a discretized function to a new domain. This operator is a mapping between uniformly discretized `DiscretizedSpace` spaces with the same `DiscretizedSpace.cell_sides`, but different `DiscretizedSpace.shape`. The underlying operation is array resizing, i.e. no resampling is performed. In axes where the domain is enlarged, the new entries are filled ("padded") according to a provided parameter ``pad_mode``. All resizing operator variants are linear, except constant padding with constant != 0. See `the online documentation <https://odlgroup.github.io/odl/math/resizing_ops.html>`_ on resizing operators for mathematical details. """
[docs] def __init__(self, domain, range=None, ran_shp=None, **kwargs): """Initialize a new instance. Parameters ---------- domain : uniform `DiscretizedSpace` Uniformly discretized space, the operator can be applied to its elements. range : uniform `DiscretizedSpace`, optional Uniformly discretized space in which the result of the application of this operator lies. For the default ``None``, a space with the same attributes as ``domain`` is used, except for its shape, which is set to ``ran_shp``. ran_shp : sequence of ints, optional Shape of the range of this operator. This can be provided instead of ``range`` and is mandatory if ``range`` is ``None``. offset : int or sequence of ints, optional Number of cells to add to/remove from the left of ``domain.partition``. By default, the difference is distributed evenly, with preference for left in case of ambiguity. This option is can only be used together with ``ran_shp``. pad_mode : string, optional Method to be used to fill in missing values in an enlarged array. ``'constant'``: Fill with ``pad_const``. ``'symmetric'``: Reflect at the boundaries, not doubling the outmost values. This requires left and right padding sizes to be strictly smaller than the original array shape. ``'periodic'``: Fill in values from the other side, keeping the order. This requires left and right padding sizes to be at most as large as the original array shape. ``'order0'``: Extend constantly with the outmost values (ensures continuity). ``'order1'``: Extend with constant slope (ensures continuity of the first derivative). This requires at least 2 values along each axis where padding is applied. pad_const : scalar, optional Value to be used in the ``'constant'`` padding mode. discr_kwargs: dict, optional Keyword arguments passed to the `uniform_discr` constructor. Examples -------- The simplest way of initializing a resizing operator is by providing ``ran_shp`` and, optionally, parameters for the padding variant to be used. The range is inferred from ``domain`` and the supplied parameters. If no ``offset`` is given, the difference in size is evenly distributed to both sides: >>> space = odl.uniform_discr([0, 0], [1, 1], (2, 4)) >>> resize_op = odl.ResizingOperator(space, ran_shp=(4, 4)) >>> resize_op.range uniform_discr([-0.5, 0. ], [ 1.5, 1. ], (4, 4)) Testing different padding methods in the first axis (zero padding is the default): >>> x = [[1, 2, 3, 4], ... [5, 6, 7, 8]] >>> resize_op = odl.ResizingOperator(space, ran_shp=(4, 4)) >>> print(resize_op(x)) [[ 0., 0., 0., 0.], [ 1., 2., 3., 4.], [ 5., 6., 7., 8.], [ 0., 0., 0., 0.]] >>> >>> resize_op = odl.ResizingOperator(space, ran_shp=(4, 4), ... offset=(0, 0), ... pad_mode='periodic') >>> print(resize_op(x)) [[ 1., 2., 3., 4.], [ 5., 6., 7., 8.], [ 1., 2., 3., 4.], [ 5., 6., 7., 8.]] >>> >>> resize_op = odl.ResizingOperator(space, ran_shp=(4, 4), ... offset=(0, 0), ... pad_mode='order0') >>> print(resize_op(x)) [[ 1., 2., 3., 4.], [ 5., 6., 7., 8.], [ 5., 6., 7., 8.], [ 5., 6., 7., 8.]] Alternatively, the range of the operator can be provided directly. This requires that the partitions match, i.e. that the cell sizes are the same and there is no shift: >>> # Same space as in the first example, see above >>> large_spc = odl.uniform_discr([-0.5, 0], [1.5, 1], (4, 4)) >>> resize_op = odl.ResizingOperator(space, large_spc, ... pad_mode='periodic') >>> print(resize_op(x)) [[ 5., 6., 7., 8.], [ 1., 2., 3., 4.], [ 5., 6., 7., 8.], [ 1., 2., 3., 4.]] """ # Swap names to be able to use the range iterator without worries import builtins ran, range = range, builtins.range if not isinstance(domain, DiscretizedSpace): raise TypeError('`domain` must be a `DiscretizedSpace` instance, ' 'got {!r}'.format(domain)) offset = kwargs.pop('offset', None) discr_kwargs = kwargs.pop('discr_kwargs', {}) if ran is None: if ran_shp is None: raise ValueError('either `ran` or `ran_shp` must be ' 'given') offset = normalized_scalar_param_list( offset, domain.ndim, param_conv=safe_int_conv, keep_none=True) ran = _resize_discr(domain, ran_shp, offset, discr_kwargs) self.__offset = tuple(_offset_from_spaces(domain, ran)) elif ran_shp is None: if offset is not None: raise ValueError('`offset` can only be combined with ' '`ran_shp`') for i in range(domain.ndim): if (ran.is_uniform_byaxis[i] and domain.is_uniform_byaxis[i] and not np.isclose(ran.cell_sides[i], domain.cell_sides[i])): raise ValueError( 'in axis {}: cell sides of domain and range differ ' 'significantly: (difference {})' ''.format(i, ran.cell_sides[i] - domain.cell_sides[i])) self.__offset = _offset_from_spaces(domain, ran) else: raise ValueError('cannot combine `range` with `ran_shape`') pad_mode = kwargs.pop('pad_mode', 'constant') pad_mode, pad_mode_in = str(pad_mode).lower(), pad_mode if pad_mode not in _SUPPORTED_RESIZE_PAD_MODES: raise ValueError("`pad_mode` '{}' not understood" "".format(pad_mode_in)) self.__pad_mode = pad_mode # Store constant in a way that ensures safe casting (one-element array) self.__pad_const = np.array(kwargs.pop('pad_const', 0), dtype=ran.dtype) # padding mode 'constant' with `pad_const != 0` is not linear linear = (self.pad_mode != 'constant' or self.pad_const == 0.0) super(ResizingOperator, self).__init__(domain, ran, linear=linear)
@property def offset(self): """Number of cells added to/removed from the left.""" return self.__offset @property def pad_mode(self): """Padding mode used by this operator.""" return self.__pad_mode @property def pad_const(self): """Constant used by this operator in case of constant padding.""" return self.__pad_const @property def axes(self): """Dimensions in which an actual resizing is performed.""" return tuple(i for i in range(self.domain.ndim) if self.domain.shape[i] != self.range.shape[i])
[docs] def _call(self, x, out): """Implement ``self(x, out)``.""" with writable_array(out) as out_arr: resize_array(x.asarray(), self.range.shape, offset=self.offset, pad_mode=self.pad_mode, pad_const=self.pad_const, direction='forward', out=out_arr)
[docs] def derivative(self, point): """Derivative of this operator at ``point``. For the particular case of constant padding with non-zero constant, the derivative is the corresponding zero-padding variant. In all other cases, this operator is linear, i.e. the derivative is equal to ``self``. """ if self.pad_mode == 'constant' and self.pad_const != 0: return ResizingOperator( domain=self.domain, range=self.range, pad_mode='constant', pad_const=0.0) else: # operator is linear return self
@property def adjoint(self): """Adjoint of this operator.""" if not self.is_linear: raise NotImplementedError('this operator is not linear and ' 'thus has no adjoint') op = self class ResizingOperatorAdjoint(Operator): """Adjoint of `ResizingOperator`. See `the online documentation <https://odlgroup.github.io/odl/math/resizing_ops.html>`_ on resizing operators for mathematical details. """ def _call(self, x, out): """Implement ``self(x, out)``.""" with writable_array(out) as out_arr: resize_array(x.asarray(), op.domain.shape, offset=op.offset, pad_mode=op.pad_mode, pad_const=0, direction='adjoint', out=out_arr) @property def adjoint(self): """Adjoint of the adjoint, i.e. the original operator.""" return op @property def inverse(self): """(Pseudo-)Inverse of this operator. Note that in axes where ``self`` extends, the returned operator acts as a proper inverse, while in restriction axes, the operation is not invertible. """ return ResizingOperatorAdjoint( domain=self.range, range=self.domain, pad_mode=op.pad_mode ) return ResizingOperatorAdjoint(op.range, op.domain, linear=True) @property def inverse(self): """(Pseudo-)Inverse of this operator. Note that in axes where ``self`` extends, the returned operator acts as left inverse, while in restriction axes, it is a right inverse. """ return ResizingOperator(self.range, self.domain, pad_mode=self.pad_mode, pad_const=self.pad_const)
def _offset_from_spaces(dom, ran): """Return index offset corresponding to given spaces.""" affected = np.not_equal(dom.shape, ran.shape) diff_l = np.abs(ran.grid.min() - dom.grid.min()) offset_float = diff_l / dom.cell_sides offset = np.around(offset_float).astype(int) for i in range(dom.ndim): if affected[i] and not np.isclose(offset[i], offset_float[i]): raise ValueError('in axis {}: range is shifted relative to domain ' 'by a non-multiple {} of cell_sides' ''.format(i, offset_float[i] - offset[i])) offset[~affected] = 0 return tuple(offset) def _resize_discr(discr, newshp, offset, discr_kwargs): """Return a space based on ``discr`` and ``newshp``. Use the domain of ``discr`` and its partition to create a new uniformly discretized space with ``newshp`` as shape. In axes where ``offset`` is given, it determines the number of added/removed cells to the left. Where ``offset`` is ``None``, the points are distributed evenly to left and right. The ``discr_kwargs`` parameter is passed to `uniform_discr` for further specification of discretization parameters. """ nodes_on_bdry = discr_kwargs.get('nodes_on_bdry', False) if np.shape(nodes_on_bdry) == (): nodes_on_bdry = ([(bool(nodes_on_bdry), bool(nodes_on_bdry))] * discr.ndim) elif discr.ndim == 1 and len(nodes_on_bdry) == 2: nodes_on_bdry = [nodes_on_bdry] elif len(nodes_on_bdry) != discr.ndim: raise ValueError('`nodes_on_bdry` has length {}, expected {}' ''.format(len(nodes_on_bdry), discr.ndim)) dtype = discr_kwargs.pop('dtype', discr.dtype) impl = discr_kwargs.pop('impl', discr.impl) exponent = discr_kwargs.pop('exponent', discr.exponent) weighting = discr_kwargs.pop('weighting', discr.weighting) affected = np.not_equal(newshp, discr.shape) ndim = discr.ndim for i in range(ndim): if affected[i] and not discr.is_uniform_byaxis[i]: raise ValueError('cannot resize in non-uniformly discretized ' 'axis {}'.format(i)) grid_min, grid_max = discr.grid.min(), discr.grid.max() cell_size = discr.cell_sides new_minpt, new_maxpt = [], [] for axis, (n_orig, n_new, off, on_bdry) in enumerate(zip( discr.shape, newshp, offset, nodes_on_bdry)): if affected[axis]: n_diff = n_new - n_orig if off is None: num_r = n_diff // 2 num_l = n_diff - num_r else: num_r = n_diff - off num_l = off else: num_l, num_r = 0, 0 try: on_bdry_l, on_bdry_r = on_bdry except TypeError: on_bdry_l = on_bdry on_bdry_r = on_bdry if on_bdry_l: new_minpt.append(grid_min[axis] - num_l * cell_size[axis]) else: new_minpt.append(grid_min[axis] - (num_l + 0.5) * cell_size[axis]) if on_bdry_r: new_maxpt.append(grid_max[axis] + num_r * cell_size[axis]) else: new_maxpt.append(grid_max[axis] + (num_r + 0.5) * cell_size[axis]) tspace = tensor_space(newshp, dtype=dtype, impl=impl, exponent=exponent, weighting=weighting) # Stack together the (unchanged) nonuniform axes and the (new) uniform # axes in the right order part = uniform_partition([], [], ()) for i in range(ndim): if discr.is_uniform_byaxis[i]: part = part.append( uniform_partition(new_minpt[i], new_maxpt[i], newshp[i], nodes_on_bdry=nodes_on_bdry[i])) else: part = part.append(discr.partition.byaxis[i]) return DiscretizedSpace(part, tspace) if __name__ == '__main__': from odl.util.testutils import run_doctests run_doctests()