Source code for odl.solvers.iterative.iterative

# Copyright 2014-2019 The ODL contributors
#
# This file is part of ODL.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

"""Simple iterative type optimization schemes."""

from __future__ import print_function, division, absolute_import
from builtins import next
import numpy as np

from odl.operator import IdentityOperator, OperatorComp, OperatorSum
from odl.util import normalized_scalar_param_list


__all__ = ('landweber', 'conjugate_gradient', 'conjugate_gradient_normal',
           'gauss_newton', 'kaczmarz')


# TODO: update all docs


[docs] def landweber(op, x, rhs, niter, omega=None, projection=None, callback=None): r"""Optimized implementation of Landweber's method. Solves the inverse problem:: A(x) = rhs Parameters ---------- op : `Operator` Operator in the inverse problem. ``op.derivative(x).adjoint`` must be well-defined for ``x`` in the operator domain. x : ``op.domain`` element Element to which the result is written. Its initial value is used as starting point of the iteration, and its values are updated in each iteration step. rhs : ``op.range`` element Right-hand side of the equation defining the inverse problem. niter : int Number of iterations. omega : positive float, optional Relaxation parameter in the iteration. Default: ``1 / op.norm(estimate=True) ** 2`` projection : callable, optional Function that can be used to modify the iterates in each iteration, for example enforcing positivity. The function should take one argument and modify it in-place. callback : callable, optional Object executing code per iteration, e.g. plotting each iterate. Notes ----- This method calculates an approximate least-squares solution of the inverse problem of the first kind .. math:: \mathcal{A} (x) = y, for a given :math:`y\in \mathcal{Y}`, i.e. an approximate solution :math:`x^*` to .. math:: \min_{x\in \mathcal{X}} \| \mathcal{A}(x) - y \|_{\mathcal{Y}}^2 for a (Frechet-) differentiable operator :math:`\mathcal{A}: \mathcal{X} \to \mathcal{Y}` between Hilbert spaces :math:`\mathcal{X}` and :math:`\mathcal{Y}`. The method starts from an initial guess :math:`x_0` and uses the iteration .. math:: x_{k+1} = x_k - \omega \ \partial \mathcal{A}(x)^* (\mathcal{A}(x_k) - y), where :math:`\partial \mathcal{A}(x)` is the Frechet derivative of :math:`\mathcal{A}` at :math:`x` and :math:`\omega` is a relaxation parameter. For linear problems, a choice :math:`0 < \omega < 2/\lVert \mathcal{A}^2\rVert` guarantees convergence, where :math:`\lVert\mathcal{A}\rVert` stands for the operator norm of :math:`\mathcal{A}`. Users may also optionally provide a projection to project each iterate onto some subset. For example enforcing positivity. This implementation uses a minimum amount of memory copies by applying re-usable temporaries and in-place evaluation. The method is also described in a `Wikipedia article <https://en.wikipedia.org/wiki/Landweber_iteration>`_. """ # TODO: add a book reference if x not in op.domain: raise TypeError('`x` {!r} is not in the domain of `op` {!r}' ''.format(x, op.domain)) if omega is None: omega = 1 / op.norm(estimate=True) ** 2 # Reusable temporaries tmp_ran = op.range.element() tmp_dom = op.domain.element() for _ in range(niter): op(x, out=tmp_ran) tmp_ran -= rhs op.derivative(x).adjoint(tmp_ran, out=tmp_dom) x.lincomb(1, x, -omega, tmp_dom) if projection is not None: projection(x) if callback is not None: callback(x)
[docs] def conjugate_gradient(op, x, rhs, niter, callback=None): """Optimized implementation of CG for self-adjoint operators. This method solves the inverse problem (of the first kind):: A(x) = y for a linear and self-adjoint `Operator` ``A``. It uses a minimum amount of memory copies by applying re-usable temporaries and in-place evaluation. The method is described (for linear systems) in a `Wikipedia article <https://en.wikipedia.org/wiki/Conjugate_gradient_method>`_. Parameters ---------- op : linear `Operator` Operator in the inverse problem. It must be linear and self-adjoint. This implies in particular that its domain and range are equal. x : ``op.domain`` element Element to which the result is written. Its initial value is used as starting point of the iteration, and its values are updated in each iteration step. rhs : ``op.range`` element Right-hand side of the equation defining the inverse problem. niter : int Number of iterations. callback : callable, optional Object executing code per iteration, e.g. plotting each iterate. See Also -------- conjugate_gradient_normal : Solver for nonsymmetric matrices """ # TODO: add a book reference # TODO: update doc if op.domain != op.range: raise ValueError('operator needs to be self-adjoint') if x not in op.domain: raise TypeError('`x` {!r} is not in the domain of `op` {!r}' ''.format(x, op.domain)) r = op(x) r.lincomb(1, rhs, -1, r) # r = rhs - A x p = r.copy() d = op.domain.element() # Extra storage for storing A x sqnorm_r_old = r.norm() ** 2 # Only recalculate norm after update if sqnorm_r_old == 0: # Return if no step forward return for _ in range(niter): op(p, out=d) # d = A p inner_p_d = p.inner(d) if inner_p_d == 0.0: # Return if step is 0 return alpha = sqnorm_r_old / inner_p_d x.lincomb(1, x, alpha, p) # x = x + alpha*p r.lincomb(1, r, -alpha, d) # r = r - alpha*d sqnorm_r_new = r.norm() ** 2 beta = sqnorm_r_new / sqnorm_r_old sqnorm_r_old = sqnorm_r_new p.lincomb(1, r, beta, p) # p = s + b * p if callback is not None: callback(x)
[docs] def conjugate_gradient_normal(op, x, rhs, niter=1, callback=None): """Optimized implementation of CG for the normal equation. This method solves the inverse problem (of the first kind) :: A(x) == rhs with a linear `Operator` ``A`` by looking at the normal equation :: A.adjoint(A(x)) == A.adjoint(rhs) It uses a minimum amount of memory copies by applying re-usable temporaries and in-place evaluation. The method is described (for linear systems) in a `Wikipedia article <https://en.wikipedia.org/wiki/Conjugate_gradient_method#\ Conjugate_gradient_on_the_normal_equations>`_. Parameters ---------- op : `Operator` Operator in the inverse problem. If not linear, it must have an implementation of `Operator.derivative`, which in turn must implement `Operator.adjoint`, i.e. the call ``op.derivative(x).adjoint`` must be valid. x : ``op.domain`` element Element to which the result is written. Its initial value is used as starting point of the iteration, and its values are updated in each iteration step. rhs : ``op.range`` element Right-hand side of the equation defining the inverse problem niter : int Number of iterations. callback : callable, optional Object executing code per iteration, e.g. plotting each iterate. See Also -------- conjugate_gradient : Optimized solver for symmetric matrices odl.solvers.smooth.nonlinear_cg.conjugate_gradient_nonlinear : Equivalent solver for the nonlinear case """ # TODO: add a book reference # TODO: update doc if x not in op.domain: raise TypeError('`x` {!r} is not in the domain of `op` {!r}' ''.format(x, op.domain)) d = op(x) d.lincomb(1, rhs, -1, d) # d = rhs - A x p = op.derivative(x).adjoint(d) s = p.copy() q = op.range.element() sqnorm_s_old = s.norm() ** 2 # Only recalculate norm after update for _ in range(niter): op(p, out=q) # q = A p sqnorm_q = q.norm() ** 2 if sqnorm_q == 0.0: # Return if residual is 0 return a = sqnorm_s_old / sqnorm_q x.lincomb(1, x, a, p) # x = x + a*p d.lincomb(1, d, -a, q) # d = d - a*Ap op.derivative(p).adjoint(d, out=s) # s = A^T d sqnorm_s_new = s.norm() ** 2 b = sqnorm_s_new / sqnorm_s_old sqnorm_s_old = sqnorm_s_new p.lincomb(1, s, b, p) # p = s + b * p if callback is not None: callback(x)
[docs] def exp_zero_seq(base): """Default exponential zero sequence. It is defined by t_0 = 1.0 t_m = t_(m-1) / base or, in closed form t_m = base^(-m-1) Parameters ---------- base : float Base of the sequence. Its absolute value must be larger than 1. Yields ------ val : float The next value in the exponential sequence. """ value = 1.0 while True: value /= base yield value
[docs] def gauss_newton(op, x, rhs, niter, zero_seq=exp_zero_seq(2.0), callback=None): """Optimized implementation of a Gauss-Newton method. This method solves the inverse problem (of the first kind):: A(x) = y for a (Frechet-) differentiable `Operator` ``A`` using a Gauss-Newton iteration. It uses a minimum amount of memory copies by applying re-usable temporaries and in-place evaluation. A variant of the method applied to a specific problem is described in a `Wikipedia article <https://en.wikipedia.org/wiki/Gauss%E2%80%93Newton_algorithm>`_. Parameters ---------- op : `Operator` Operator in the inverse problem. If not linear, it must have an implementation of `Operator.derivative`, which in turn must implement `Operator.adjoint`, i.e. the call ``op.derivative(x).adjoint`` must be valid. x : ``op.domain`` element Element to which the result is written. Its initial value is used as starting point of the iteration, and its values are updated in each iteration step. rhs : ``op.range`` element Right-hand side of the equation defining the inverse problem niter : int Maximum number of iterations. zero_seq : iterable, optional Zero sequence whose values are used for the regularization of the linearized problem in each Newton step. callback : callable, optional Object executing code per iteration, e.g. plotting each iterate. """ if x not in op.domain: raise TypeError('`x` {!r} is not in the domain of `op` {!r}' ''.format(x, op.domain)) x0 = x.copy() id_op = IdentityOperator(op.domain) dx = op.domain.zero() tmp_dom = op.domain.element() u = op.domain.element() tmp_ran = op.range.element() v = op.range.element() for _ in range(niter): tm = next(zero_seq) deriv = op.derivative(x) deriv_adjoint = deriv.adjoint # v = rhs - op(x) - deriv(x0-x) # u = deriv.T(v) op(x, out=tmp_ran) # eval op(x) v.lincomb(1, rhs, -1, tmp_ran) # assign v = rhs - op(x) tmp_dom.lincomb(1, x0, -1, x) # assign temp tmp_dom = x0 - x deriv(tmp_dom, out=tmp_ran) # eval deriv(x0-x) v -= tmp_ran # assign v = rhs-op(x)-deriv(x0-x) deriv_adjoint(v, out=u) # eval/assign u = deriv.T(v) # Solve equation Tikhonov regularized system # (deriv.T o deriv + tm * id_op)^-1 u = dx tikh_op = OperatorSum(OperatorComp(deriv.adjoint, deriv), tm * id_op, tmp_dom) # TODO: allow user to select other method conjugate_gradient(tikh_op, dx, u, 3) # Update x x.lincomb(1, x0, 1, dx) # x = x0 + dx if callback is not None: callback(x)
[docs] def kaczmarz(ops, x, rhs, niter, omega=1, projection=None, random=False, callback=None, callback_loop='outer'): r"""Optimized implementation of Kaczmarz's method. Solves the inverse problem given by the set of equations:: A_n(x) = rhs_n This is also known as the Landweber-Kaczmarz's method, since the method coincides with the Landweber method for a single operator. Parameters ---------- ops : sequence of `Operator`'s Operators in the inverse problem. ``op[i].derivative(x).adjoint`` must be well-defined for ``x`` in the operator domain and for all ``i``. x : ``op.domain`` element Element to which the result is written. Its initial value is used as starting point of the iteration, and its values are updated in each iteration step. rhs : sequence of ``ops[i].range`` elements Right-hand side of the equation defining the inverse problem. niter : int Number of iterations. omega : positive float or sequence of positive floats, optional Relaxation parameter in the iteration. If a single float is given the same step is used for all operators, otherwise separate steps are used. projection : callable, optional Function that can be used to modify the iterates in each iteration, for example enforcing positivity. The function should take one argument and modify it in-place. random : bool, optional If `True`, the order of the operators is randomized in each iteration. callback : callable, optional Object executing code per iteration, e.g. plotting each iterate. callback_loop : {'inner', 'outer'} Whether the callback should be called in the inner or outer loop. Notes ----- This method calculates an approximate least-squares solution of the inverse problem of the first kind .. math:: \mathcal{A}_i (x) = y_i \quad 1 \leq i \leq n, for a given :math:`y_n \in \mathcal{Y}_n`, i.e. an approximate solution :math:`x^*` to .. math:: \min_{x\in \mathcal{X}} \sum_{i=1}^n \| \mathcal{A}_i(x) - y_i \|_{\mathcal{Y}_i}^2 for a (Frechet-) differentiable operator :math:`\mathcal{A}: \mathcal{X} \to \mathcal{Y}` between Hilbert spaces :math:`\mathcal{X}` and :math:`\mathcal{Y}`. The method starts from an initial guess :math:`x_0` and uses the iteration .. math:: x_{k+1} = x_k - \omega_{[k]} \ \partial \mathcal{A}_{[k]}(x_k)^* (\mathcal{A}_{[k]}(x_k) - y_{[k]}), where :math:`\partial \mathcal{A}_{[k]}(x_k)` is the Frechet derivative of :math:`\mathcal{A}_{[k]}` at :math:`x_k`, :math:`\omega_{[k]}` is a relaxation parameter and :math:`[k] := k \text{ mod } n`. For linear problems, a choice :math:`0 < \omega_i < 2/\lVert \mathcal{A}_{i}^2\rVert` guarantees convergence, where :math:`\|\mathcal{A}_{i}\|` stands for the operator norm of :math:`\mathcal{A}_{i}`. This implementation uses a minimum amount of memory copies by applying re-usable temporaries and in-place evaluation. The method is also described in a `Wikipedia article <https://en.wikipedia.org/wiki/Kaczmarz_method>`_. and in Natterer, F. Mathematical Methods in Image Reconstruction, section 5.3.2. See Also -------- landweber """ domain = ops[0].domain if any(domain != opi.domain for opi in ops): raise ValueError('domains of `ops` are not all equal') if x not in domain: raise TypeError('`x` {!r} is not in the domain of `ops` {!r}' ''.format(x, domain)) if len(ops) != len(rhs): raise ValueError('`number of `ops` {} does not match number of ' '`rhs` {}'.format(len(ops), len(rhs))) omega = normalized_scalar_param_list(omega, len(ops), param_conv=float) # Reusable elements in the range, one per type of space ranges = [opi.range for opi in ops] unique_ranges = set(ranges) tmp_rans = {ran: ran.element() for ran in unique_ranges} # Single reusable element in the domain tmp_dom = domain.element() # Iteratively find solution for _ in range(niter): if random: rng = np.random.permutation(range(len(ops))) else: rng = range(len(ops)) for i in rng: # Find residual tmp_ran = tmp_rans[ops[i].range] ops[i](x, out=tmp_ran) tmp_ran -= rhs[i] # Update x ops[i].derivative(x).adjoint(tmp_ran, out=tmp_dom) x.lincomb(1, x, -omega[i], tmp_dom) if projection is not None: projection(x) if callback is not None and callback_loop == 'inner': callback(x) if callback is not None and callback_loop == 'outer': callback(x)
if __name__ == '__main__': from odl.util.testutils import run_doctests run_doctests()