Module cycpd.deformable_registration

Expand source code
import time
from builtins import super

import numpy as np

from .expectation_maximization_registration import expectation_maximization_registration


def gaussian_kernel(X, beta, Y=None):
    """
    Compute the Gaussian kernel matrix.

    Parameters
    ----------
    X : numpy.ndarray
        The point cloud.

    beta : float
        The bandwidth of the Gaussian kernel.

    Y : numpy.ndarray, optional
        Second point cloud to compute the kernel matrix for. 
        If None, the second kernel matrix is X.
    
    Returns
    -------
    G : numpy.ndarray
        The Gaussian kernel matrix.
    """
    if Y is None:
        Y = X
    diff = X[:, None, :] - Y[None, :, :]
    diff = diff**2
    diff = np.sum(diff, 2)
    return np.exp(-diff / (2 * beta**2))


def lowrankQS(G, num_eig, eig_fgt=False):
    """
    Compute the low-rank approximation of the kernel matrix.

    !!!
    This function is a placeholder for implementing the fast
    gauss transform. It is not yet implemented.
    !!!
    
    Parameters
    ----------
    G : numpy.ndarray
        The gaussian kernel matrix.
    
    num_eig : int
        The number of eigenvectors to use.
    
    eig_fgt : bool
        If True, use the fast gauss transform to speed up registration.

    """
    # if we do not use FGT we construct affinity matrix G and find the
    # first eigenvectors/values directly

    if eig_fgt is False:
        S, Q = np.linalg.eigh(G)
        eig_indices = list(np.argsort(np.abs(S))[::-1][:num_eig])
        Q = Q[:, eig_indices]  # eigenvectors
        S = S[eig_indices]  # eigenvalues.

        return Q, S

    elif eig_fgt is True:
        raise Exception("Fast Gauss Transform Not Yet Implemented!")


class deformable_registration(expectation_maximization_registration):
    def __init__(self, alpha=2, beta=2, low_rank=True, num_eig=100, eig_fgt=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        tic = time.time()
        self.alpha = 2 if alpha is None else alpha
        self.beta = 2 if beta is None else beta
        self.num_eig = num_eig
        self.eig_fgt = eig_fgt
        self.W = np.zeros((self.M, self.D))
        self.G = gaussian_kernel(self.Y, self.beta)
        self.low_rank = low_rank

        if self.low_rank is True:
            self.Q, self.S = lowrankQS(self.G, num_eig=self.num_eig, eig_fgt=self.eig_fgt)
            self.inv_S = np.diag(1.0 / self.S)
            self.S = np.diag(self.S)
        toc = time.time()
        self.time_to_initialize_registration = toc - tic

    def update_transform(self):
        """
        Update the transform parameters.
        """
        if self.low_rank is False:
            tic = time.time()
            A = np.dot(np.diag(self.P1), self.G) + self.alpha * self.sigma2 * np.eye(self.M)
            toc = time.time()
            self.A_times.append(toc - tic)
            tic = time.time()
            B = self.PX - np.dot(np.diag(self.P1), self.Y)
            toc = time.time()
            self.B_times.append(toc - tic)
            tic = time.time()
            self.W = np.linalg.solve(A, B)
            toc = time.time()
            self.solve_times.append(toc - tic)
        else:
            dP = np.diag(self.P1)
            dPQ = np.matmul(dP, self.Q)
            F = self.PX - np.matmul(dP, self.Y)

            self.W = (
                1
                / (self.alpha * self.sigma2)
                * (
                    F
                    - np.matmul(
                        dPQ,
                        (
                            np.linalg.solve(
                                (self.alpha * self.sigma2 * self.inv_S + np.matmul(self.Q.T, dPQ)),
                                (np.matmul(self.Q.T, F)),
                            )
                        ),
                    )
                )
            )
            QtW = np.matmul(self.Q.T, self.W)
            self.E = self.E + self.alpha / 2 * np.trace(np.matmul(QtW.T, np.matmul(self.S, QtW)))
            # self.err = abs((self.E - self.E_old) / self.E)
            self.err = abs(self.E - self.E_old)
            # The absolute difference is more conservative (does more iterations) than the line above it which
            # is calculating the normalized change in the E(L). This calculation was changed to match the matlab
            # code created for low_rank matrices.

    def transform_point_cloud(self, Y=None):
        """
        Transform the point cloud.
        
        Parameters
        ----------
        Y : numpy.ndarray, optional
            The point cloud to transform. If None, the inpputted source 
            point cloud is used.

        Returns
        -------
        Y_transformed : numpy.ndarray
            The transformed point cloud.            
        """
        if self.low_rank is False:
            if Y is None:
                self.TY = self.Y + np.dot(self.G, self.W)
                return
            else:
                G = gaussian_kernel(Y, self.beta, Y=self.Y)
                return Y + np.matmul(G, self.W)
        elif self.low_rank is True:
            if Y is None:
                self.TY = self.Y + np.matmul(self.Q, np.matmul(self.S, np.matmul(self.Q.T, self.W)))
                return
            else:
                G = gaussian_kernel(Y, beta=self.beta, Y=self.Y)
                return Y + np.matmul(G, self.W)

    def update_variance(self):
        """
        Update the variance parameters.
        """
        self.sigma2_prev = self.sigma2
        A = np.sum(np.square(self.X) * self.Pt1[:, None])
        B = np.sum(np.square(self.TY) * self.P1[:, None])
        C = 2 * np.trace(np.matmul(self.PX.T, self.TY))
        self.sigma2 = abs(A + B - C) / (self.Np * self.D)

        if self.sigma2 <= 0:
            self.sigma2 = self.tolerance / 10

        # self.err = abs(self.sigma2_prev - self.sigma2)

    def get_registration_parameters(self):
        """
        Get the registration parameters.

        Returns
        -------
        self.G : numpy.ndarray
            The Gaussian kernel matrix.
        
        self.W : numpy.ndarray
            The transform parameters.
        """
        return self.G, self.W

Functions

def gaussian_kernel(X, beta, Y=None)

Compute the Gaussian kernel matrix.

Parameters

X : numpy.ndarray
The point cloud.
beta : float
The bandwidth of the Gaussian kernel.
Y : numpy.ndarray, optional
Second point cloud to compute the kernel matrix for. If None, the second kernel matrix is X.

Returns

G : numpy.ndarray
The Gaussian kernel matrix.
Expand source code
def gaussian_kernel(X, beta, Y=None):
    """
    Compute the Gaussian kernel matrix.

    Parameters
    ----------
    X : numpy.ndarray
        The point cloud.

    beta : float
        The bandwidth of the Gaussian kernel.

    Y : numpy.ndarray, optional
        Second point cloud to compute the kernel matrix for. 
        If None, the second kernel matrix is X.
    
    Returns
    -------
    G : numpy.ndarray
        The Gaussian kernel matrix.
    """
    if Y is None:
        Y = X
    diff = X[:, None, :] - Y[None, :, :]
    diff = diff**2
    diff = np.sum(diff, 2)
    return np.exp(-diff / (2 * beta**2))
def lowrankQS(G, num_eig, eig_fgt=False)

Compute the low-rank approximation of the kernel matrix.

!!! This function is a placeholder for implementing the fast gauss transform. It is not yet implemented. !!!

Parameters

G : numpy.ndarray
The gaussian kernel matrix.
num_eig : int
The number of eigenvectors to use.
eig_fgt : bool
If True, use the fast gauss transform to speed up registration.
Expand source code
def lowrankQS(G, num_eig, eig_fgt=False):
    """
    Compute the low-rank approximation of the kernel matrix.

    !!!
    This function is a placeholder for implementing the fast
    gauss transform. It is not yet implemented.
    !!!
    
    Parameters
    ----------
    G : numpy.ndarray
        The gaussian kernel matrix.
    
    num_eig : int
        The number of eigenvectors to use.
    
    eig_fgt : bool
        If True, use the fast gauss transform to speed up registration.

    """
    # if we do not use FGT we construct affinity matrix G and find the
    # first eigenvectors/values directly

    if eig_fgt is False:
        S, Q = np.linalg.eigh(G)
        eig_indices = list(np.argsort(np.abs(S))[::-1][:num_eig])
        Q = Q[:, eig_indices]  # eigenvectors
        S = S[eig_indices]  # eigenvalues.

        return Q, S

    elif eig_fgt is True:
        raise Exception("Fast Gauss Transform Not Yet Implemented!")

Classes

class deformable_registration (alpha=2, beta=2, low_rank=True, num_eig=100, eig_fgt=False, *args, **kwargs)

Expectation Maximization class

Parameters

X : numpy.ndarray
The target point cloud.
Y : numpy.ndarray
The source point cloud.
max_iterations : int
The maximum number of iterations to perform.
tolerance : float
The tolerance for the error.
w : float
Contribution of the uniform distribution to account for outliers. Valid values span 0 (inclusive) and 1 (exclusive).
verbose : bool
If True, print verbose statements to commandline.
print_reg_params : bool
If True, print out the registration parameters.
Expand source code
class deformable_registration(expectation_maximization_registration):
    def __init__(self, alpha=2, beta=2, low_rank=True, num_eig=100, eig_fgt=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        tic = time.time()
        self.alpha = 2 if alpha is None else alpha
        self.beta = 2 if beta is None else beta
        self.num_eig = num_eig
        self.eig_fgt = eig_fgt
        self.W = np.zeros((self.M, self.D))
        self.G = gaussian_kernel(self.Y, self.beta)
        self.low_rank = low_rank

        if self.low_rank is True:
            self.Q, self.S = lowrankQS(self.G, num_eig=self.num_eig, eig_fgt=self.eig_fgt)
            self.inv_S = np.diag(1.0 / self.S)
            self.S = np.diag(self.S)
        toc = time.time()
        self.time_to_initialize_registration = toc - tic

    def update_transform(self):
        """
        Update the transform parameters.
        """
        if self.low_rank is False:
            tic = time.time()
            A = np.dot(np.diag(self.P1), self.G) + self.alpha * self.sigma2 * np.eye(self.M)
            toc = time.time()
            self.A_times.append(toc - tic)
            tic = time.time()
            B = self.PX - np.dot(np.diag(self.P1), self.Y)
            toc = time.time()
            self.B_times.append(toc - tic)
            tic = time.time()
            self.W = np.linalg.solve(A, B)
            toc = time.time()
            self.solve_times.append(toc - tic)
        else:
            dP = np.diag(self.P1)
            dPQ = np.matmul(dP, self.Q)
            F = self.PX - np.matmul(dP, self.Y)

            self.W = (
                1
                / (self.alpha * self.sigma2)
                * (
                    F
                    - np.matmul(
                        dPQ,
                        (
                            np.linalg.solve(
                                (self.alpha * self.sigma2 * self.inv_S + np.matmul(self.Q.T, dPQ)),
                                (np.matmul(self.Q.T, F)),
                            )
                        ),
                    )
                )
            )
            QtW = np.matmul(self.Q.T, self.W)
            self.E = self.E + self.alpha / 2 * np.trace(np.matmul(QtW.T, np.matmul(self.S, QtW)))
            # self.err = abs((self.E - self.E_old) / self.E)
            self.err = abs(self.E - self.E_old)
            # The absolute difference is more conservative (does more iterations) than the line above it which
            # is calculating the normalized change in the E(L). This calculation was changed to match the matlab
            # code created for low_rank matrices.

    def transform_point_cloud(self, Y=None):
        """
        Transform the point cloud.
        
        Parameters
        ----------
        Y : numpy.ndarray, optional
            The point cloud to transform. If None, the inpputted source 
            point cloud is used.

        Returns
        -------
        Y_transformed : numpy.ndarray
            The transformed point cloud.            
        """
        if self.low_rank is False:
            if Y is None:
                self.TY = self.Y + np.dot(self.G, self.W)
                return
            else:
                G = gaussian_kernel(Y, self.beta, Y=self.Y)
                return Y + np.matmul(G, self.W)
        elif self.low_rank is True:
            if Y is None:
                self.TY = self.Y + np.matmul(self.Q, np.matmul(self.S, np.matmul(self.Q.T, self.W)))
                return
            else:
                G = gaussian_kernel(Y, beta=self.beta, Y=self.Y)
                return Y + np.matmul(G, self.W)

    def update_variance(self):
        """
        Update the variance parameters.
        """
        self.sigma2_prev = self.sigma2
        A = np.sum(np.square(self.X) * self.Pt1[:, None])
        B = np.sum(np.square(self.TY) * self.P1[:, None])
        C = 2 * np.trace(np.matmul(self.PX.T, self.TY))
        self.sigma2 = abs(A + B - C) / (self.Np * self.D)

        if self.sigma2 <= 0:
            self.sigma2 = self.tolerance / 10

        # self.err = abs(self.sigma2_prev - self.sigma2)

    def get_registration_parameters(self):
        """
        Get the registration parameters.

        Returns
        -------
        self.G : numpy.ndarray
            The Gaussian kernel matrix.
        
        self.W : numpy.ndarray
            The transform parameters.
        """
        return self.G, self.W

Ancestors

Methods

def get_registration_parameters(self)

Get the registration parameters.

Returns

self.G : numpy.ndarray
The Gaussian kernel matrix.
self.W : numpy.ndarray
The transform parameters.
Expand source code
def get_registration_parameters(self):
    """
    Get the registration parameters.

    Returns
    -------
    self.G : numpy.ndarray
        The Gaussian kernel matrix.
    
    self.W : numpy.ndarray
        The transform parameters.
    """
    return self.G, self.W
def transform_point_cloud(self, Y=None)

Transform the point cloud.

Parameters

Y : numpy.ndarray, optional
The point cloud to transform. If None, the inpputted source point cloud is used.

Returns

Y_transformed : numpy.ndarray
The transformed point cloud.
Expand source code
def transform_point_cloud(self, Y=None):
    """
    Transform the point cloud.
    
    Parameters
    ----------
    Y : numpy.ndarray, optional
        The point cloud to transform. If None, the inpputted source 
        point cloud is used.

    Returns
    -------
    Y_transformed : numpy.ndarray
        The transformed point cloud.            
    """
    if self.low_rank is False:
        if Y is None:
            self.TY = self.Y + np.dot(self.G, self.W)
            return
        else:
            G = gaussian_kernel(Y, self.beta, Y=self.Y)
            return Y + np.matmul(G, self.W)
    elif self.low_rank is True:
        if Y is None:
            self.TY = self.Y + np.matmul(self.Q, np.matmul(self.S, np.matmul(self.Q.T, self.W)))
            return
        else:
            G = gaussian_kernel(Y, beta=self.beta, Y=self.Y)
            return Y + np.matmul(G, self.W)
def update_transform(self)

Update the transform parameters.

Expand source code
def update_transform(self):
    """
    Update the transform parameters.
    """
    if self.low_rank is False:
        tic = time.time()
        A = np.dot(np.diag(self.P1), self.G) + self.alpha * self.sigma2 * np.eye(self.M)
        toc = time.time()
        self.A_times.append(toc - tic)
        tic = time.time()
        B = self.PX - np.dot(np.diag(self.P1), self.Y)
        toc = time.time()
        self.B_times.append(toc - tic)
        tic = time.time()
        self.W = np.linalg.solve(A, B)
        toc = time.time()
        self.solve_times.append(toc - tic)
    else:
        dP = np.diag(self.P1)
        dPQ = np.matmul(dP, self.Q)
        F = self.PX - np.matmul(dP, self.Y)

        self.W = (
            1
            / (self.alpha * self.sigma2)
            * (
                F
                - np.matmul(
                    dPQ,
                    (
                        np.linalg.solve(
                            (self.alpha * self.sigma2 * self.inv_S + np.matmul(self.Q.T, dPQ)),
                            (np.matmul(self.Q.T, F)),
                        )
                    ),
                )
            )
        )
        QtW = np.matmul(self.Q.T, self.W)
        self.E = self.E + self.alpha / 2 * np.trace(np.matmul(QtW.T, np.matmul(self.S, QtW)))
        # self.err = abs((self.E - self.E_old) / self.E)
        self.err = abs(self.E - self.E_old)
def update_variance(self)

Update the variance parameters.

Expand source code
def update_variance(self):
    """
    Update the variance parameters.
    """
    self.sigma2_prev = self.sigma2
    A = np.sum(np.square(self.X) * self.Pt1[:, None])
    B = np.sum(np.square(self.TY) * self.P1[:, None])
    C = 2 * np.trace(np.matmul(self.PX.T, self.TY))
    self.sigma2 = abs(A + B - C) / (self.Np * self.D)

    if self.sigma2 <= 0:
        self.sigma2 = self.tolerance / 10

Inherited members