Module cycpd.expectation_maximization_registration

Expand source code
import time

import cython_functions as cy
import numpy as np
from numpy.testing import assert_almost_equal, assert_array_almost_equal


class expectation_maximization_registration(object):
    """
    Expectation Maximization class

    Parameters
    ----------
    X : numpy.ndarray
        The target point cloud.
    
    Y : numpy.ndarray
        The source point cloud.
    
    max_iterations : int
        The maximum number of iterations to perform.
    
    tolerance : float
        The tolerance for the error.
    
    w : float
        Contribution of the uniform distribution to account for outliers.
        Valid values span 0 (inclusive) and 1 (exclusive).
    
    verbose : bool
        If True, print verbose statements to commandline.
    
    print_reg_params : bool
        If True, print out the registration parameters.


    
    """
    def __init__(
        self,
        X,
        Y,
        max_iterations=100,
        tolerance=1e-5,
        w=0.0,
        verbose=True,
        print_reg_params=True,
        *args,
        **kwargs
    ):
        if type(X) is not np.ndarray or X.ndim != 2:
            raise ValueError("The target point cloud (X) must be at a 2D numpy array.")
        if type(Y) is not np.ndarray or Y.ndim != 2:
            raise ValueError("The source point cloud (Y) must be a 2D numpy array.")
        if X.shape[1] != Y.shape[1]:
            raise ValueError("Both point clouds need to have the same number of dimensions.")

        self.expectation_times = []
        self.maximization_times = []
        self.update_transform_times = []
        self.transform_times = []
        self.update_variance_times = []
        self.time_to_initialize_registration = 0
        self.time_to_initiate_registration = 0

        self.X = X
        self.Y = Y
        self.TY = None
        self.N = self.X.shape[0]
        self.D = self.X.shape[1]
        self.M = self.Y.shape[0]
        self.tolerance = tolerance
        self.w = w
        self.max_iterations = max_iterations
        self.iteration = 0
        self.err = self.tolerance + 1
        self.P = np.zeros((self.M, self.N))
        self.Pt1 = np.zeros((self.N,))
        self.P1 = np.zeros((self.M,))
        self.PX = np.zeros((self.M, self.D))
        self.Np = 0.0
        self.E = 0.0
        self.E_old = 0.0
        self.sigma2 = 0.0
        self.sigma2_prev = 0.0
        self.verbose = verbose
        self.print_reg_params = print_reg_params

    def register(self, callback=lambda **kwargs: None):
        """
        Register the point clouds.

        Parameters
        ----------
        callback : function
            A callback function that is called after each iteration.
            This is used to print figures during the registration process.

            The callback function should take the following arguments:
                - iteration : int
                    The current iteration.
                - E : float
                    The registration error.
                - X : numpy.ndarray
                    The original point cloud.
                - Y : numpy.ndarray
                    The transformed point cloud.
        """
        tic = time.time()
        self.transform_point_cloud()
        self.sigma2 = cy.initialize_sigma2(self.X, self.Y)
        toc = time.time()
        self.time_to_initiate_registration = toc - tic
        while (
            (self.iteration < self.max_iterations)
            and (self.err > self.tolerance)
            and (self.sigma2 > np.finfo(float).eps)
        ):
            self.iterate()
            if callable(callback):
                kwargs = {"iteration": self.iteration, "error": self.err, "X": self.X, "Y": self.TY}
                callback(**kwargs)

        if self.print_reg_params is True:
            print("=" * 72)
            print("Registration Performance Metrics")
            print("=" * 72)
            print("Time to initialize EM: {}".format(self.time_to_initiate_registration))
            print(
                "Time to initialize registration: {}".format(self.time_to_initialize_registration)
            )
            print(
                "Average Expectation Time:                {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.expectation_times), np.std(self.expectation_times)
                )
            )
            print(
                "Average Maximization Time:               {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.maximization_times), np.std(self.maximization_times)
                )
            )
            print("Maximization Times - Per individual step")
            print(
                "Average Update Transform Time:           {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.update_transform_times), np.std(self.update_transform_times)
                )
            )
            print(
                "Average Transform Time:                  {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.transform_times), np.std(self.transform_times)
                )
            )
            print(
                "Average Update Variance Time:            {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.update_variance_times), np.std(self.update_variance_times)
                )
            )
            print("")
            print("Number of iterations performed:          {}".format(self.iteration))
            print("Error at time of finish:                 {}".format(self.err))

        return self.TY, self.get_registration_parameters()

    def get_registration_parameters(self):
        raise NotImplementedError("Registration parameters should be defined in child classes.")

    def iterate(self):
        """
        Perform one iteration of the Expectation Maximization algorithm.
        """
        self.E_old = self.E  # E = negative log-likelihood function
        tic = time.time()
        self.expectation()
        toc = time.time()
        self.expectation_times.append(toc - tic)

        tic = time.time()
        self.maximization()
        toc = time.time()
        self.maximization_times.append(toc - tic)

        self.iteration += 1

        if self.verbose is True:
            while type(self.sigma2) is np.ndarray:
                self.sigma2 = self.sigma2[0]

            print("Iteration:{}".format(self.iteration))
            print(
                "ML:{:10.3f}; \t"
                "ML change (error):{:10.3f}; \t"
                "Sigma^2:{:10.3f}; \t"
                "Sigma^2 change:{:10.3f}".format(
                    self.E, self.err, self.sigma2, self.sigma2_prev - self.sigma2
                )
            )
            percent_done = int(72 * self.iteration / self.max_iterations)
            print("[" + "=" * percent_done + " " * (72 - percent_done) + "]")

    def expectation(self):
        """
        Perform the Expectation step of the Expectation Maximization algorithm.
        """
        self.P1, self.Pt1, self.PX, self.Np, self.E = cy.expectation_2(
            self.X, self.TY, self.sigma2, self.M, self.N, self.D, self.w
        )

    def maximization(self):
        """
        Perform the Maximization step of the Expectation Maximization algorithm.
        """
        tic = time.time()
        self.update_transform()
        toc = time.time()
        self.update_transform_times.append(toc - tic)

        tic = time.time()
        self.transform_point_cloud()
        toc = time.time()
        self.transform_times.append(toc - tic)

        tic = time.time()
        self.update_variance()
        toc = time.time()
        self.update_variance_times.append(toc - tic)

Classes

class expectation_maximization_registration (X, Y, max_iterations=100, tolerance=1e-05, w=0.0, verbose=True, print_reg_params=True, *args, **kwargs)

Expectation Maximization class

Parameters

X : numpy.ndarray
The target point cloud.
Y : numpy.ndarray
The source point cloud.
max_iterations : int
The maximum number of iterations to perform.
tolerance : float
The tolerance for the error.
w : float
Contribution of the uniform distribution to account for outliers. Valid values span 0 (inclusive) and 1 (exclusive).
verbose : bool
If True, print verbose statements to commandline.
print_reg_params : bool
If True, print out the registration parameters.
Expand source code
class expectation_maximization_registration(object):
    """
    Expectation Maximization class

    Parameters
    ----------
    X : numpy.ndarray
        The target point cloud.
    
    Y : numpy.ndarray
        The source point cloud.
    
    max_iterations : int
        The maximum number of iterations to perform.
    
    tolerance : float
        The tolerance for the error.
    
    w : float
        Contribution of the uniform distribution to account for outliers.
        Valid values span 0 (inclusive) and 1 (exclusive).
    
    verbose : bool
        If True, print verbose statements to commandline.
    
    print_reg_params : bool
        If True, print out the registration parameters.


    
    """
    def __init__(
        self,
        X,
        Y,
        max_iterations=100,
        tolerance=1e-5,
        w=0.0,
        verbose=True,
        print_reg_params=True,
        *args,
        **kwargs
    ):
        if type(X) is not np.ndarray or X.ndim != 2:
            raise ValueError("The target point cloud (X) must be at a 2D numpy array.")
        if type(Y) is not np.ndarray or Y.ndim != 2:
            raise ValueError("The source point cloud (Y) must be a 2D numpy array.")
        if X.shape[1] != Y.shape[1]:
            raise ValueError("Both point clouds need to have the same number of dimensions.")

        self.expectation_times = []
        self.maximization_times = []
        self.update_transform_times = []
        self.transform_times = []
        self.update_variance_times = []
        self.time_to_initialize_registration = 0
        self.time_to_initiate_registration = 0

        self.X = X
        self.Y = Y
        self.TY = None
        self.N = self.X.shape[0]
        self.D = self.X.shape[1]
        self.M = self.Y.shape[0]
        self.tolerance = tolerance
        self.w = w
        self.max_iterations = max_iterations
        self.iteration = 0
        self.err = self.tolerance + 1
        self.P = np.zeros((self.M, self.N))
        self.Pt1 = np.zeros((self.N,))
        self.P1 = np.zeros((self.M,))
        self.PX = np.zeros((self.M, self.D))
        self.Np = 0.0
        self.E = 0.0
        self.E_old = 0.0
        self.sigma2 = 0.0
        self.sigma2_prev = 0.0
        self.verbose = verbose
        self.print_reg_params = print_reg_params

    def register(self, callback=lambda **kwargs: None):
        """
        Register the point clouds.

        Parameters
        ----------
        callback : function
            A callback function that is called after each iteration.
            This is used to print figures during the registration process.

            The callback function should take the following arguments:
                - iteration : int
                    The current iteration.
                - E : float
                    The registration error.
                - X : numpy.ndarray
                    The original point cloud.
                - Y : numpy.ndarray
                    The transformed point cloud.
        """
        tic = time.time()
        self.transform_point_cloud()
        self.sigma2 = cy.initialize_sigma2(self.X, self.Y)
        toc = time.time()
        self.time_to_initiate_registration = toc - tic
        while (
            (self.iteration < self.max_iterations)
            and (self.err > self.tolerance)
            and (self.sigma2 > np.finfo(float).eps)
        ):
            self.iterate()
            if callable(callback):
                kwargs = {"iteration": self.iteration, "error": self.err, "X": self.X, "Y": self.TY}
                callback(**kwargs)

        if self.print_reg_params is True:
            print("=" * 72)
            print("Registration Performance Metrics")
            print("=" * 72)
            print("Time to initialize EM: {}".format(self.time_to_initiate_registration))
            print(
                "Time to initialize registration: {}".format(self.time_to_initialize_registration)
            )
            print(
                "Average Expectation Time:                {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.expectation_times), np.std(self.expectation_times)
                )
            )
            print(
                "Average Maximization Time:               {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.maximization_times), np.std(self.maximization_times)
                )
            )
            print("Maximization Times - Per individual step")
            print(
                "Average Update Transform Time:           {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.update_transform_times), np.std(self.update_transform_times)
                )
            )
            print(
                "Average Transform Time:                  {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.transform_times), np.std(self.transform_times)
                )
            )
            print(
                "Average Update Variance Time:            {:10.4f} +/- {:5.4f}".format(
                    np.mean(self.update_variance_times), np.std(self.update_variance_times)
                )
            )
            print("")
            print("Number of iterations performed:          {}".format(self.iteration))
            print("Error at time of finish:                 {}".format(self.err))

        return self.TY, self.get_registration_parameters()

    def get_registration_parameters(self):
        raise NotImplementedError("Registration parameters should be defined in child classes.")

    def iterate(self):
        """
        Perform one iteration of the Expectation Maximization algorithm.
        """
        self.E_old = self.E  # E = negative log-likelihood function
        tic = time.time()
        self.expectation()
        toc = time.time()
        self.expectation_times.append(toc - tic)

        tic = time.time()
        self.maximization()
        toc = time.time()
        self.maximization_times.append(toc - tic)

        self.iteration += 1

        if self.verbose is True:
            while type(self.sigma2) is np.ndarray:
                self.sigma2 = self.sigma2[0]

            print("Iteration:{}".format(self.iteration))
            print(
                "ML:{:10.3f}; \t"
                "ML change (error):{:10.3f}; \t"
                "Sigma^2:{:10.3f}; \t"
                "Sigma^2 change:{:10.3f}".format(
                    self.E, self.err, self.sigma2, self.sigma2_prev - self.sigma2
                )
            )
            percent_done = int(72 * self.iteration / self.max_iterations)
            print("[" + "=" * percent_done + " " * (72 - percent_done) + "]")

    def expectation(self):
        """
        Perform the Expectation step of the Expectation Maximization algorithm.
        """
        self.P1, self.Pt1, self.PX, self.Np, self.E = cy.expectation_2(
            self.X, self.TY, self.sigma2, self.M, self.N, self.D, self.w
        )

    def maximization(self):
        """
        Perform the Maximization step of the Expectation Maximization algorithm.
        """
        tic = time.time()
        self.update_transform()
        toc = time.time()
        self.update_transform_times.append(toc - tic)

        tic = time.time()
        self.transform_point_cloud()
        toc = time.time()
        self.transform_times.append(toc - tic)

        tic = time.time()
        self.update_variance()
        toc = time.time()
        self.update_variance_times.append(toc - tic)

Subclasses

Methods

def expectation(self)

Perform the Expectation step of the Expectation Maximization algorithm.

Expand source code
def expectation(self):
    """
    Perform the Expectation step of the Expectation Maximization algorithm.
    """
    self.P1, self.Pt1, self.PX, self.Np, self.E = cy.expectation_2(
        self.X, self.TY, self.sigma2, self.M, self.N, self.D, self.w
    )
def get_registration_parameters(self)
Expand source code
def get_registration_parameters(self):
    raise NotImplementedError("Registration parameters should be defined in child classes.")
def iterate(self)

Perform one iteration of the Expectation Maximization algorithm.

Expand source code
def iterate(self):
    """
    Perform one iteration of the Expectation Maximization algorithm.
    """
    self.E_old = self.E  # E = negative log-likelihood function
    tic = time.time()
    self.expectation()
    toc = time.time()
    self.expectation_times.append(toc - tic)

    tic = time.time()
    self.maximization()
    toc = time.time()
    self.maximization_times.append(toc - tic)

    self.iteration += 1

    if self.verbose is True:
        while type(self.sigma2) is np.ndarray:
            self.sigma2 = self.sigma2[0]

        print("Iteration:{}".format(self.iteration))
        print(
            "ML:{:10.3f}; \t"
            "ML change (error):{:10.3f}; \t"
            "Sigma^2:{:10.3f}; \t"
            "Sigma^2 change:{:10.3f}".format(
                self.E, self.err, self.sigma2, self.sigma2_prev - self.sigma2
            )
        )
        percent_done = int(72 * self.iteration / self.max_iterations)
        print("[" + "=" * percent_done + " " * (72 - percent_done) + "]")
def maximization(self)

Perform the Maximization step of the Expectation Maximization algorithm.

Expand source code
def maximization(self):
    """
    Perform the Maximization step of the Expectation Maximization algorithm.
    """
    tic = time.time()
    self.update_transform()
    toc = time.time()
    self.update_transform_times.append(toc - tic)

    tic = time.time()
    self.transform_point_cloud()
    toc = time.time()
    self.transform_times.append(toc - tic)

    tic = time.time()
    self.update_variance()
    toc = time.time()
    self.update_variance_times.append(toc - tic)
def register(self, callback=<function expectation_maximization_registration.<lambda>>)

Register the point clouds.

Parameters

callback : function

A callback function that is called after each iteration. This is used to print figures during the registration process.

The callback function should take the following arguments: - iteration : int The current iteration. - E : float The registration error. - X : numpy.ndarray The original point cloud. - Y : numpy.ndarray The transformed point cloud.

Expand source code
def register(self, callback=lambda **kwargs: None):
    """
    Register the point clouds.

    Parameters
    ----------
    callback : function
        A callback function that is called after each iteration.
        This is used to print figures during the registration process.

        The callback function should take the following arguments:
            - iteration : int
                The current iteration.
            - E : float
                The registration error.
            - X : numpy.ndarray
                The original point cloud.
            - Y : numpy.ndarray
                The transformed point cloud.
    """
    tic = time.time()
    self.transform_point_cloud()
    self.sigma2 = cy.initialize_sigma2(self.X, self.Y)
    toc = time.time()
    self.time_to_initiate_registration = toc - tic
    while (
        (self.iteration < self.max_iterations)
        and (self.err > self.tolerance)
        and (self.sigma2 > np.finfo(float).eps)
    ):
        self.iterate()
        if callable(callback):
            kwargs = {"iteration": self.iteration, "error": self.err, "X": self.X, "Y": self.TY}
            callback(**kwargs)

    if self.print_reg_params is True:
        print("=" * 72)
        print("Registration Performance Metrics")
        print("=" * 72)
        print("Time to initialize EM: {}".format(self.time_to_initiate_registration))
        print(
            "Time to initialize registration: {}".format(self.time_to_initialize_registration)
        )
        print(
            "Average Expectation Time:                {:10.4f} +/- {:5.4f}".format(
                np.mean(self.expectation_times), np.std(self.expectation_times)
            )
        )
        print(
            "Average Maximization Time:               {:10.4f} +/- {:5.4f}".format(
                np.mean(self.maximization_times), np.std(self.maximization_times)
            )
        )
        print("Maximization Times - Per individual step")
        print(
            "Average Update Transform Time:           {:10.4f} +/- {:5.4f}".format(
                np.mean(self.update_transform_times), np.std(self.update_transform_times)
            )
        )
        print(
            "Average Transform Time:                  {:10.4f} +/- {:5.4f}".format(
                np.mean(self.transform_times), np.std(self.transform_times)
            )
        )
        print(
            "Average Update Variance Time:            {:10.4f} +/- {:5.4f}".format(
                np.mean(self.update_variance_times), np.std(self.update_variance_times)
            )
        )
        print("")
        print("Number of iterations performed:          {}".format(self.iteration))
        print("Error at time of finish:                 {}".format(self.err))

    return self.TY, self.get_registration_parameters()