Source code for odl.tomo.backends.astra_cpu

# Copyright 2014-2020 The ODL contributors
#
# This file is part of ODL.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

"""Backend for ASTRA using CPU."""

from __future__ import absolute_import, division, print_function

import warnings

import numpy as np

from odl.discr import DiscretizedSpace, DiscretizedSpaceElement
from odl.tomo.backends.astra_setup import (
    astra_algorithm, astra_data, astra_projection_geometry, astra_projector,
    astra_volume_geometry)
from odl.tomo.backends.util import _add_default_complex_impl
from odl.tomo.geometry import (
    DivergentBeamGeometry, Geometry, ParallelBeamGeometry)
from odl.util import writable_array

try:
    import astra
except ImportError:
    pass

__all__ = (
    'astra_cpu_forward_projector',
    'astra_cpu_back_projector',
    'default_astra_proj_type',
)


[docs]def default_astra_proj_type(geom): """Return the default ASTRA projector type for a given geometry. Parameters ---------- geom : `Geometry` ODL geometry object for which to get the default projector type. Returns ------- astra_proj_type : str Default projector type for the given geometry. In 2D: - `ParallelBeamGeometry`: ``'linear'`` - `DivergentBeamGeometry`: ``'line_fanflat'`` In 3D: - `ParallelBeamGeometry`: ``'linear3d'`` - `DivergentBeamGeometry`: ``'linearcone'`` """ if isinstance(geom, ParallelBeamGeometry): return 'linear' if geom.ndim == 2 else 'linear3d' elif isinstance(geom, DivergentBeamGeometry): return 'line_fanflat' if geom.ndim == 2 else 'linearcone' else: raise TypeError( 'no default exists for {}, `astra_proj_type` must be given ' 'explicitly'.format(type(geom)) )
[docs]def astra_cpu_forward_projector(vol_data, geometry, proj_space, out=None, astra_proj_type=None): """Run an ASTRA forward projection on the given data using the CPU. Parameters ---------- vol_data : `DiscretizedSpaceElement` Volume data to which the forward projector is applied. geometry : `Geometry` Geometry defining the tomographic setup. proj_space : `DiscretizedSpace` Space to which the calling operator maps. out : ``proj_space`` element, optional Element of the projection space to which the result is written. If ``None``, an element in ``proj_space`` is created. astra_proj_type : str, optional Type of projector that should be used. See `the ASTRA documentation <http://www.astra-toolbox.com/docs/proj2d.html>`_ for details. By default, a suitable projector type for the given geometry is selected, see `default_astra_proj_type`. Returns ------- out : ``proj_space`` element Projection data resulting from the application of the projector. If ``out`` was provided, the returned object is a reference to it. """ if not isinstance(vol_data, DiscretizedSpaceElement): raise TypeError( 'volume data {!r} is not a `DiscretizedSpaceElement` instance' ''.format(vol_data) ) if vol_data.space.impl != 'numpy': raise TypeError( "`vol_data.space.impl` must be 'numpy', got {!r}" "".format(vol_data.space.impl) ) if not isinstance(geometry, Geometry): raise TypeError( 'geometry {!r} is not a Geometry instance'.format(geometry) ) if not isinstance(proj_space, DiscretizedSpace): raise TypeError( '`proj_space` {!r} is not a DiscretizedSpace instance.' ''.format(proj_space) ) if proj_space.impl != 'numpy': raise TypeError( "`proj_space.impl` must be 'numpy', got {!r}" "".format(proj_space.impl) ) if vol_data.ndim != geometry.ndim: raise ValueError( 'dimensions {} of volume data and {} of geometry do not match' ''.format(vol_data.ndim, geometry.ndim) ) if out is None: out = proj_space.element() else: if out not in proj_space: raise TypeError( '`out` {} is neither None nor a `DiscretizedSpaceElement` ' 'instance'.format(out) ) ndim = vol_data.ndim # Create astra geometries vol_geom = astra_volume_geometry(vol_data.space) proj_geom = astra_projection_geometry(geometry) # Create projector if astra_proj_type is None: astra_proj_type = default_astra_proj_type(geometry) proj_id = astra_projector(astra_proj_type, vol_geom, proj_geom, ndim) # Create ASTRA data structures vol_data_arr = np.asarray(vol_data) vol_id = astra_data(vol_geom, datatype='volume', data=vol_data_arr, allow_copy=True) with writable_array(out, dtype='float32', order='C') as out_arr: sino_id = astra_data(proj_geom, datatype='projection', data=out_arr, ndim=proj_space.ndim) # Create algorithm algo_id = astra_algorithm('forward', ndim, vol_id, sino_id, proj_id, impl='cpu') # Run algorithm astra.algorithm.run(algo_id) # Delete ASTRA objects astra.algorithm.delete(algo_id) astra.data2d.delete((vol_id, sino_id)) astra.projector.delete(proj_id) return out
[docs]def astra_cpu_back_projector(proj_data, geometry, vol_space, out=None, astra_proj_type=None): """Run an ASTRA back-projection on the given data using the CPU. Parameters ---------- proj_data : `DiscretizedSpaceElement` Projection data to which the back-projector is applied. geometry : `Geometry` Geometry defining the tomographic setup. vol_space : `DiscretizedSpace` Space to which the calling operator maps. out : ``vol_space`` element, optional Element of the reconstruction space to which the result is written. If ``None``, an element in ``vol_space`` is created. astra_proj_type : str, optional Type of projector that should be used. See `the ASTRA documentation <http://www.astra-toolbox.com/docs/proj2d.html>`_ for details. By default, a suitable projector type for the given geometry is selected, see `default_astra_proj_type`. Returns ------- out : ``vol_space`` element Reconstruction data resulting from the application of the backward projector. If ``out`` was provided, the returned object is a reference to it. """ if not isinstance(proj_data, DiscretizedSpaceElement): raise TypeError( 'projection data {!r} is not a `DiscretizedSpaceElement` ' 'instance'.format(proj_data) ) if proj_data.space.impl != 'numpy': raise TypeError( '`proj_data` must be a `numpy.ndarray` based, container, ' "got `impl` {!r}".format(proj_data.space.impl) ) if not isinstance(geometry, Geometry): raise TypeError( 'geometry {!r} is not a Geometry instance'.format(geometry) ) if not isinstance(vol_space, DiscretizedSpace): raise TypeError( 'volume space {!r} is not a DiscretizedSpace instance' ''.format(vol_space) ) if vol_space.impl != 'numpy': raise TypeError( "`vol_space.impl` must be 'numpy', got {!r}".format(vol_space.impl) ) if vol_space.ndim != geometry.ndim: raise ValueError( 'dimensions {} of reconstruction space and {} of geometry ' 'do not match' ''.format(vol_space.ndim, geometry.ndim) ) if out is None: out = vol_space.element() else: if out not in vol_space: raise TypeError( '`out` {} is neither None nor a `DiscretizedSpaceElement` ' 'instance'.format(out) ) ndim = proj_data.ndim # Create astra geometries vol_geom = astra_volume_geometry(vol_space) proj_geom = astra_projection_geometry(geometry) # Create ASTRA data structure sino_id = astra_data( proj_geom, datatype='projection', data=proj_data, allow_copy=True ) # Create projector if astra_proj_type is None: astra_proj_type = default_astra_proj_type(geometry) proj_id = astra_projector(astra_proj_type, vol_geom, proj_geom, ndim) # Convert out to correct dtype and order if needed. with writable_array(out, dtype='float32', order='C') as out_arr: vol_id = astra_data( vol_geom, datatype='volume', data=out_arr, ndim=vol_space.ndim ) # Create algorithm algo_id = astra_algorithm( 'backward', ndim, vol_id, sino_id, proj_id, impl='cpu' ) # Run algorithm astra.algorithm.run(algo_id) # Weight the adjoint by appropriate weights scaling_factor = float(proj_data.space.weighting.const) scaling_factor /= float(vol_space.weighting.const) out *= scaling_factor # Delete ASTRA objects astra.algorithm.delete(algo_id) astra.data2d.delete((vol_id, sino_id)) astra.projector.delete(proj_id) return out
[docs]class AstraCpuImpl: """Thin wrapper implementing ASTRA CPU for `RayTransform`."""
[docs] def __init__(self, geometry, vol_space, proj_space): """Initialize a new instance. Parameters ---------- geometry : `Geometry` Geometry defining the tomographic setup. vol_space : `DiscretizedSpace` Reconstruction space, the space of the images to be forward projected. proj_space : `DiscretizedSpace` Projection space, the space of the result. """ if not isinstance(geometry, Geometry): raise TypeError( '`geometry` must be a `Geometry` instance, got {!r}' ''.format(geometry) ) if not isinstance(vol_space, DiscretizedSpace): raise TypeError( '`vol_space` must be a `DiscretizedSpace` instance, got {!r}' ''.format(vol_space) ) if not isinstance(proj_space, DiscretizedSpace): raise TypeError( '`proj_space` must be a `DiscretizedSpace` instance, got {!r}' ''.format(proj_space) ) if geometry.ndim > 2: raise ValueError( '`impl` {!r} only works for 2d'.format(self.__class__.__name__) ) if vol_space.size >= 512 ** 2: warnings.warn( "The 'astra_cpu' backend may be too slow for volumes of this " "size. Consider using 'astra_cuda' if your machine has an " "Nvidia GPU.", RuntimeWarning, ) self.geometry = geometry self._vol_space = vol_space self._proj_space = proj_space
@property def vol_space(self): return self._vol_space @property def proj_space(self): return self._proj_space
[docs] @_add_default_complex_impl def call_backward(self, x, out, **kwargs): return astra_cpu_back_projector( x, self.geometry, self.vol_space.real_space, out, **kwargs )
[docs] @_add_default_complex_impl def call_forward(self, x, out, **kwargs): return astra_cpu_forward_projector( x, self.geometry, self.proj_space.real_space, out, **kwargs )
if __name__ == '__main__': from odl.util.testutils import run_doctests run_doctests()