"""RHINO utilities.
"""
# Authors: Mark Woolrich <mark.woolrich@ohba.ox.ac.uk>
# Chetan Gohil <chetan.gohil@psych.ox.ac.uk>
# Rob Seymour <rob.seymour@psych.ox.ac.uk>
# Mats van Es <mats.vanes@psych.ox.ac.uk>
import os
import os.path as op
import subprocess
import pickle
from pathlib import Path
from glob import glob
from shutil import copy
import numpy as np
from sklearn.neighbors import NearestNeighbors
import nibabel as nib
import pandas as pd
from scipy import LowLevelCallable
from scipy.ndimage import generic_filter
from scipy.spatial import KDTree
import matplotlib.pyplot as plt
from mne import Transform
from mne.transforms import read_trans
from mne.surface import write_surface
from mne import io
from mne.channels import make_dig_montage
from numba import cfunc, carray
from numba.types import intc, intp, float64, voidptr
from numba.types import CPointer
import logging
logging.getLogger("numba").setLevel(logging.WARNING)
from fsl import wrappers as fsl_wrappers
from osl_ephys.source_recon.beamforming import transform_recon_timeseries
from osl_ephys.utils.logger import log_or_print
from osl_ephys.utils import soft_import
[docs]def get_rhino_filenames(subjects_dir, subject):
"""Get paths to all RHINO files.
Files will be in subjects_dir/subject/rhino.
Parameters
----------
subjects_dir : string
Directory containing the subject directories.
subject : string
Subject directory name to put the coregistration files in.
Returns
-------
files : dict
A dict of files generated and used by RHINO. Contains three keys:
- 'surf': containing surface extraction file paths.
- 'coreg': containing coregistration file paths.
- 'fwd_model': containing the forward model file path.
"""
# Base RHINO directory
rhino_dir = op.join(subjects_dir, subject, "rhino")
if " " in rhino_dir:
raise ValueError("subjects_dir cannot contain spaces.")
# Surfaces files
surfaces_dir = op.join(rhino_dir, "surfaces")
os.makedirs(surfaces_dir, exist_ok=True)
surf_files = {
"basedir": surfaces_dir,
"smri_file": op.join(surfaces_dir, "smri.nii.gz"),
"mni2mri_flirt_xform_file": op.join(surfaces_dir, "mni2mri_flirt_xform.txt"),
"mni_mri_t_file": op.join(surfaces_dir, "mni_mri-trans.fif"),
"bet_outskin_mesh_vtk_file": op.join(surfaces_dir, "outskin_mesh.vtk"), # BET output
"bet_inskull_mesh_vtk_file": op.join(surfaces_dir, "inskull_mesh.vtk"), # BET output
"bet_outskull_mesh_vtk_file": op.join(surfaces_dir, "outskull_mesh.vtk"), # BET output
"bet_outskin_mesh_file": op.join(surfaces_dir, "outskin_mesh.nii.gz"),
"bet_outskin_plus_nose_mesh_file": op.join(surfaces_dir, "outskin_plus_nose_mesh.nii.gz"),
"bet_inskull_mesh_file": op.join(surfaces_dir, "inskull_mesh.nii.gz"),
"bet_outskull_mesh_file": op.join(surfaces_dir, "outskull_mesh.nii.gz"),
"std_brain": op.join(os.environ["FSLDIR"], "data", "standard", "MNI152_T1_1mm_brain.nii.gz"),
"std_brain_bigfov": op.join(os.environ["FSLDIR"], "data", "standard", "MNI152_T1_1mm_BigFoV_facemask.nii.gz"),
"completed": op.join(surfaces_dir, "completed.txt"),
}
# Coregistration files
coreg_dir = op.join(rhino_dir, "coreg")
os.makedirs(coreg_dir, exist_ok=True)
coreg_files = {
"basedir": coreg_dir,
"info_fif_file": op.join(coreg_dir, "info-raw.fif"),
"smri_file": op.join(coreg_dir, "scaled_smri.nii.gz"),
"head_scaledmri_t_file": op.join(coreg_dir, "head_scaledmri-trans.fif"),
"head_mri_t_file": op.join(coreg_dir, "head_mri-trans.fif"),
"ctf_head_mri_t_file": op.join(coreg_dir, "ctf_head_mri-trans.fif"),
"mrivoxel_scaledmri_t_file": op.join(coreg_dir, "mrivoxel_scaledmri_t_file-trans.fif"),
"mni_nasion_mni_file": op.join(coreg_dir, "mni_nasion.txt"),
"mni_rpa_mni_file": op.join(coreg_dir, "mni_rpa.txt"),
"mni_lpa_mni_file": op.join(coreg_dir, "mni_lpa.txt"),
"smri_nasion_file": op.join(coreg_dir, "smri_nasion.txt"),
"smri_rpa_file": op.join(coreg_dir, "smri_rpa.txt"),
"smri_lpa_file": op.join(coreg_dir, "smri_lpa.txt"),
"polhemus_nasion_file": op.join(coreg_dir, "polhemus_nasion.txt"),
"polhemus_rpa_file": op.join(coreg_dir, "polhemus_rpa.txt"),
"polhemus_lpa_file": op.join(coreg_dir, "polhemus_lpa.txt"),
"polhemus_headshape_file": op.join(coreg_dir, "polhemus_headshape.txt"),
"coreg_error_file": op.join(coreg_dir, "coreg_err.txt"),
# BET mesh output in native space
"bet_outskin_mesh_vtk_file": op.join(coreg_dir, "scaled_outskin_mesh.vtk"),
"bet_inskull_mesh_vtk_file": op.join(coreg_dir, "scaled_inskull_mesh.vtk"),
"bet_outskull_mesh_vtk_file": op.join(coreg_dir, "scaled_outskull_mesh.vtk"),
# Freesurfer mesh in native space
# - these are the ones shown in coreg_display() if doing surf plot
# - these are also used by MNE forward modelling
"bet_outskin_surf_file": op.join(coreg_dir, "scaled_outskin_surf.surf"),
"bet_inskull_surf_file": op.join(coreg_dir, "scaled_inskull_surf.surf"),
"bet_outskull_surf_file": op.join(coreg_dir, "scaled_outskull_surf.surf"),
"bet_outskin_plus_nose_surf_file": op.join(coreg_dir, "scaled_outskin_plus_nose_surf.surf"),
# BET output surface mask as nii in native space
"bet_outskin_mesh_file": op.join(coreg_dir, "scaled_outskin_mesh.nii.gz"),
"bet_outskin_plus_nose_mesh_file": op.join(coreg_dir, "scaled_outskin_plus_nose_mesh.nii.gz"),
"bet_inskull_mesh_file": op.join(coreg_dir, "scaled_inskull_mesh.nii.gz"),
"bet_outskull_mesh_file": op.join(coreg_dir, "scaled_outskull_mesh.nii.gz"),
"std_brain": op.join(os.environ["FSLDIR"], "data", "standard", "MNI152_T1_1mm_brain.nii.gz"),
}
# All RHINO files
files = {"surf": surf_files, "coreg": coreg_files, "fwd_model": op.join(rhino_dir, "model-fwd.fif")}
return files
[docs]def system_call(cmd, verbose=False):
""" Run a system call.
Parameters
----------
cmd : string
Command to run.
verbose : bool
Print command before running.
"""
if verbose:
log_or_print(cmd)
subprocess.call(cmd, shell=True)
[docs]def get_gridstep(coords):
"""Get gridstep (i.e. spatial resolution of dipole grid) in mm.
Parameters
----------
coords : numpy.ndarray
Coordinates.
Returns
-------
gridstep: int
Spatial resolution of dipole grid in mm.
"""
store = []
for ii in range(coords.shape[0]):
store.append(np.sqrt(np.sum(np.square(coords[ii, :] - coords[0, :]))))
store = np.asarray(store)
gridstep = int(np.round(np.min(store[np.where(store > 0)]) * 1000))
return gridstep
[docs]def niimask2indexpointcloud(nii_fname, volindex=None):
"""Takes in a nii.gz mask file name (which equals zero for background and != zero for the mask) and returns the mask as a 3 x npoints point cloud.
Parameters
----------
nii_fname : string
A nii.gz mask file name (with zero for background, and !=0 for the mask).
volindex : int
Volume index, used if nii_mask is a 4D file.
Returns
-------
pc : numpy.ndarray
3 x npoints point cloud as voxel indices.
"""
vol = nib.load(nii_fname).get_fdata()
if len(vol.shape) == 4 and volindex is not None:
vol = vol[:, :, :, volindex]
if not len(vol.shape) == 3:
Exception("nii_mask must be a 3D volume, or nii_mask must be a 4D volume with volindex specifying a volume index")
# Turn the nvoxx x nvoxy x nvoxz volume into a 3 x npoints point cloud
pc = np.asarray(np.where(vol != 0))
return pc
[docs]def niimask2mmpointcloud(nii_mask, volindex=None):
"""Takes in a nii.gz mask (which equals zero for background and neq zero for the mask) and returns the mask as a 3 x npoints point cloud in native space in mm's.
Parameters
----------
nii_mask : string
A nii.gz mask file name or the [x,y,z] volume (with zero for background, and !=0 for the mask).
volindex : int
Volume index, used if nii_mask is a 4D file.
Returns
-------
pc : numpy.ndarray
3 x npoints point cloud as mm in native space (using sform).
values : numpy.ndarray
npoints values.
"""
vol = nib.load(nii_mask).get_fdata()
if len(vol.shape) == 4 and volindex is not None:
vol = vol[:, :, :, volindex]
if not len(vol.shape) == 3:
Exception("nii_mask must be a 3D volume, or nii_mask must be a 4D volume with volindex specifying a volume index")
# Turn the nvoxx x nvoxy x nvoxz volume into a 3 x npoints point cloud
pc_nativeindex = np.asarray(np.where(vol != 0))
values = np.asarray(vol[vol != 0])
# Move from native voxel indices to native space coordinates (in mm)
pc = xform_points(get_sform(nii_mask)["trans"], pc_nativeindex)
return pc, values
[docs]def closest_node(node, nodes):
"""Find nearest node in nodes to the passed in node.
Returns
-------
index : int
Index to the nearest node in nodes.
distance : float
Distance.
"""
if len(nodes) == 1:
nodes = np.reshape(nodes, [-1, 1])
kdtree = KDTree(nodes)
distance, index = kdtree.query(node)
return index, distance
[docs]def get_vol_info_from_nii(mri):
"""Read volume info from an MRI file.
Parameters
----------
mri : str
Path to MRI file.
Returns
-------
out : dict
Dictionary with keys 'mri_width', 'mri_height', 'mri_depth' and 'mri_volume_name'.
"""
dims = nib.load(mri).get_fdata().shape
out = dict(mri_width=dims[0], mri_height=dims[1], mri_depth=dims[2], mri_volume_name=mri)
return out
[docs]def get_orient(nii_file):
cmd = "fslorient -getorient {}".format(nii_file)
# use os.popen rather than os.system as we want to return a value, note that this will wait until the read() works before continuing.
# Without the read() the code will continue without waiting for the system call to finish
orient = os.popen(cmd).read().strip()
return orient
[docs]def check_nii_for_nan(filename):
img = nib.load(filename)
data = img.get_fdata()
return np.isnan(data).any()
@cfunc(intc(CPointer(float64), intp, CPointer(float64), voidptr))
[docs]def majority(values_ptr, len_values, result, data):
"""
def _majority(buffer, required_majority):
return buffer.sum() >= required_majority
See: https://ilovesymposia.com/2017/03/12/scipys-new-lowlevelcallable-is-a-game-changer/
Numba cfunc that takes in:
a double pointer pointing to the values within the footprint,
a pointer-sized integer that specifies the number of values in the footprint,
a double pointer for the result, and
a void pointer, which could point to additional parameters
"""
values = carray(values_ptr, (len_values,), dtype=float64)
required_majority = 14 # in 3D we have 27 voxels in total
result[0] = values.sum() >= required_majority
return 1
[docs]def binary_majority3d(img):
"""
Set a pixel to 1 if a required majority (default=14) or more pixels in its 3x3x3 neighborhood are 1, otherwise, set the pixel to 0. img is a 3D binary image
"""
if img.dtype != "bool":
raise ValueError("binary_majority3d(img) requires img to be binary")
if len(img.shape) != 3:
raise ValueError("binary_majority3d(img) requires img to be 3D")
imgout = generic_filter(img, LowLevelCallable(majority.ctypes), size=3).astype(int)
return imgout
[docs]def nearest_neighbor(src, dst):
"""Find the nearest (Euclidean) neighbor in dst for each point in src.
Parameters
----------
src : numpy.ndarray
Nxm array of points.
dst : numpy.ndarray
Nxm array of points.
Returns
-------
distances : numpy.ndarray
Euclidean distances of the nearest neighbor.
indices : numpy.ndarray
dst indices of the nearest neighbor.
"""
neigh = NearestNeighbors(n_neighbors=1)
neigh.fit(dst)
distances, indices = neigh.kneighbors(src, return_distance=True)
return distances.ravel(), indices.ravel()
[docs]def icp(A, B, init_pose=None, max_iterations=50, tolerance=0.0001):
"""The Iterative Closest Point method: finds best-fit transform that maps points A on to points B.
Parameters
----------
A : numpy.ndarray
Nxm numpy array of source mD points.
B : numpy.ndarray
Nxm numpy array of destination mD point.
init_pose : numpy.ndarray
(m+1)x(m+1) homogeneous transformation.
max_iterations : int
Exit algorithm after max_iterations.
tolerance : float
Convergence criteria.
Returns
-------
T : numpy.ndarray
(4 x 4) Final homogeneous transformation that maps A on to B.
distances : numpy.ndarray
Euclidean distances (errors) of the nearest neighbor.
i : float
Number of iterations to converge.
Notes
-----
From: https://github.com/ClayFlannigan/icp/blob/master/icp.py
"""
# Get number of dimensions
m = A.shape[1]
# Make points homogeneous, copy them to maintain the originals
src = np.ones((m + 1, A.shape[0]))
dst = np.ones((m + 1, B.shape[0]))
src[:m, :] = np.copy(A.T)
dst[:m, :] = np.copy(B.T)
# Apply the initial pose estimation
if init_pose is not None:
src = np.dot(init_pose, src)
prev_error = 0
kdtree = KDTree(dst[:m, :].T)
for i in range(max_iterations):
# Find the nearest neighbors between the current source and destination points
#distances, indices = nearest_neighbor(src[:m,:].T, dst[:m,:].T)
distances, indices = kdtree.query(src[:m, :].T)
# Compute the transformation between the current source and nearest # destination points
T = best_fit_transform(src[:m, :].T, dst[:m, indices].T)
# Update the current source
src = np.dot(T, src)
# Check RMS error
mean_error = np.sqrt(np.mean(np.square(distances)))
if np.abs(prev_error - mean_error) < tolerance:
break
prev_error = mean_error
# Calculate final transformation
T = best_fit_transform(A, src[:m, :].T)
return T, distances, i
[docs]def rhino_icp(smri_headshape_polhemus, polhemus_headshape_polhemus, n_init=10):
"""Runs Iterative Closest Point (ICP) with multiple initialisations.
Parameters
----------
smri_headshape_polhemus : numpy.ndarray
[3 x N] locations of the Headshape points in polehumus space (i.e. MRI scalp surface).
polhemus_headshape_polhemus : numpy.ndarray
[3 x N] locations of the Polhemus headshape points in polhemus space.
n_init : int
Number of random initialisations to perform.
Returns
-------
xform : numpy.ndarray
[4 x 4] rigid transformation matrix mapping data2 to data.
Notes
-----
Based on Matlab version from Adam Baker 2014.
"""
# These are the "destination" points that are held static
data1 = smri_headshape_polhemus
# These are the "source" points that will be moved around
data2 = polhemus_headshape_polhemus
err_old = np.inf
err = np.zeros(n_init)
Mr = np.eye(4)
incremental = False
if incremental:
Mr_total = np.eye(4)
data2r = data2
for init in range(n_init):
Mi, distances, i = icp(data2r.T, data1.T)
# RMS error
e = np.sqrt(np.mean(np.square(distances)))
err[init] = e
if err[init] < err_old:
log_or_print("ICP found better xform, error={}".format(e))
err_old = e
if incremental:
Mr_total = Mr @ Mr_total
xform = Mi @ Mr_total
else:
xform = Mi @ Mr
if False:
import matplotlib.pyplot as plt
# after ICP
data2r_icp = xform_points(xform, data2)
plt.figure(frameon=False)
ax = plt.axes(projection="3d")
ax.scatter(data1[0, 0:-1:10], data1[1, 0:-1:10], data1[2, 0:-1:10], c="blue", marker=".", s=1)
ax.scatter(data2[0, :], data2[1, :], data2[2, :], c="red", marker="o", s=5)
ax.scatter(data2r[0, :], data2r[1, :], data2r[2, :], c="green", marker="o", s=5)
ax.scatter(data2r_icp[0, :], data2r_icp[1, :], data2r_icp[2, :], c="yellow", marker="o", s=5)
plt.show()
plt.draw()
# Give the registration a kick...
a = (np.random.uniform() - 0.5) * np.pi / 6
b = (np.random.uniform() - 0.5) * np.pi / 6
c = (np.random.uniform() - 0.5) * np.pi / 6
Rx = np.array([(1, 0, 0), (0, np.cos(a), -np.sin(a)), (0, np.sin(a), np.cos(a))])
Ry = np.array([(np.cos(b), 0, np.sin(b)), (0, 1, 0), (-np.sin(b), 0, np.cos(b))])
Rz = np.array([(np.cos(c), -np.sin(c), 0), (np.sin(c), np.cos(c), 0), (0, 0, 1)])
T = 10 * np.array([np.random.uniform() - 0.5, np.random.uniform() - 0.5, np.random.uniform() - 0.5])
Mr = np.eye(4)
Mr[0:3, 0:3] = Rx @ Ry @ Rz
Mr[0:3, -1] = np.reshape(T, (1, -1))
if incremental:
data2r = Mr @ Mr_total @ np.vstack((data2, np.ones((1, data2.shape[1]))))
else:
data2r = Mr @ np.vstack((data2, np.ones((1, data2.shape[1]))))
data2r = data2r[0:3, :]
return xform, err, err_old
[docs]def get_vtk_mesh_native(vtk_mesh_file, nii_mesh_file):
"""
Returns mesh rrs in native space in mm and the mesh tris for the passed in vtk_mesh_file
nii_mesh_file needs to be the corresponding niftii file from bet that corresponds to the same mesh as in vtk_mesh_file
"""
data = pd.read_csv(vtk_mesh_file, sep=r"\s+")
num_rrs = int(data.iloc[3, 1])
# these will be in voxel index space
rrs_flirtcoords = data.iloc[4 : num_rrs + 4, 0:3].to_numpy().astype(np.float64)
# move from flirtcoords mm to mri mm (native) space
xform_flirtcoords2nii = get_flirtcoords2native_xform(nii_mesh_file)
rrs_nii = xform_points(xform_flirtcoords2nii, rrs_flirtcoords.T).T
num_tris = int(data.iloc[num_rrs + 4, 1])
tris_nii = data.iloc[num_rrs + 5 : num_rrs + 5 + num_tris, 1:4].to_numpy().astype(int)
return rrs_nii, tris_nii
[docs]def timeseries2nii(timeseries, timeseries_coords, reference_mask_fname, out_nii_fname, times=None):
"""Maps the (ndipoles,tpts) array of timeseries to the grid defined by reference_mask_fname and outputs them as a niftii file.
Assumes the timeseries' dipoles correspond to those in reference_mask_fname. Both timeseries and reference_mask_fname are often output from rhino.transform_recon_timeseries.
Parameters
----------
timeseries : (ndipoles, ntpts) numpy.ndarray
Time courses. Assumes the timeseries' dipoles correspond to those in reference_mask_fname. Typically derives from rhino.transform_recon_timeseries
timeseries_coords : (ndipoles, 3) numpy.ndarray
Coords in mm for dipoles corresponding to passed in timeseries
reference_mask_fname : string
A nii.gz mask file name (with zero for background, and !=0 for the mask). Assumes the mask was used to set dipoles for timeseries, typically derived from
rhino.transform_recon_timeseries
out_nii_fname : string
output name of niftii file
times : (ntpts, ) numpy.ndarray
Times points in seconds. Assume that times are regularly spaced. Used to set nii file up correctly.
Returns
-------
out_nii_fname : string
Name of output niftii file
"""
if len(timeseries.shape) == 1:
timeseries = np.reshape(timeseries, [-1, 1])
mni_nii_nib = nib.load(reference_mask_fname)
coords_ind = niimask2indexpointcloud(reference_mask_fname).T
coords_mni, tmp = niimask2mmpointcloud(reference_mask_fname)
mni_nii_values = mni_nii_nib.get_fdata()
mni_nii_values = np.zeros(np.append(mni_nii_values.shape, timeseries.shape[1]))
kdtree = KDTree(coords_mni.T)
gridstep = int(get_gridstep(coords_mni.T) / 1000)
for ind in range(timeseries_coords.shape[1]):
distance, index = kdtree.query(timeseries_coords[:, ind])
# Exclude any timeseries_coords that are further than gridstep away
# from the best matching coords_mni
if distance < gridstep:
mni_nii_values[coords_ind[ind, 0], coords_ind[ind, 1], coords_ind[ind, 2], :] = timeseries[ind, :]
# SAVE AS NIFTI
vol_nii = nib.Nifti1Image(mni_nii_values, mni_nii_nib.affine)
vol_nii.header.set_xyzt_units(2) # mm
if times is not None:
vol_nii.header["pixdim"][4] = times[1] - times[0]
vol_nii.header["toffset"] = -0.5
vol_nii.header.set_xyzt_units(2, 8) # mm and secs
nib.save(vol_nii, out_nii_fname)
return out_nii_fname
[docs]def recon_timeseries2niftii(
subjects_dir,
subject,
recon_timeseries,
out_nii_fname,
spatial_resolution=None,
reference_brain="mni",
times=None,
):
"""Converts a (ndipoles,tpts) array of reconstructed timeseries (in head/polhemus space) to the corresponding dipoles in a standard brain grid in MNI space
and outputs them as a niftii file.
Parameters
----------
subjects_dir : string
Directory to find RHINO subject directories in.
subject : string
Subject name directory to find RHINO files in.
recon_timeseries : (ndipoles, ntpts) np.ndarray
Reconstructed time courses (in head (polhemus) space). Assumes that the dipoles are the same (and in the same order) as those in the forward model,
rhino_files['fwd_model']. Typically derive from the VolSourceEstimate's output by MNE source recon methods, e.g. mne.beamformer.apply_lcmv,
obtained using a forward model generated by RHINO.
spatial_resolution : int
Resolution to use for the reference brain in mm (must be an integer, or will be cast to nearest int). If None, then the gridstep used in rhino_files['fwd_model'] is used.
reference_brain : string, 'mni' or 'mri'
'mni' indicates that the reference_brain is the stdbrain in MNI space.
'mri' indicates that the reference_brain is the sMRI in native/mri space.
times : (ntpts, ) np.ndarray
Times points in seconds. Will assume that these are regularly spaced.
Returns
-------
out_nii_fname : string
Name of output niftii file.
reference_brain_fname : string
Niftii file name of standard brain mask in MNI space at requested resolution, int(stdbrain_resolution) (with zero for background, and !=0 for the mask).
"""
if len(recon_timeseries.shape) == 1:
recon_timeseries = np.reshape(recon_timeseries, [recon_timeseries.shape[0], 1])
# ------------------------------------------------------------------------------------------------
# Convert the recon_timeseries to the standard space brain dipole grid at the specified resolution
recon_ts_out, reference_brain_fname, recon_coords_out, recon_indices = transform_recon_timeseries(
subjects_dir, subject, recon_timeseries=recon_timeseries, spatial_resolution=spatial_resolution, reference_brain=reference_brain,
)
# ----------------------------------
# Output recon_ts_out as niftii file
out_nii_fname = timeseries2nii(recon_ts_out, recon_coords_out, reference_brain_fname, out_nii_fname, times=times)
return out_nii_fname, reference_brain_fname
[docs]def save_or_show_renderer(renderer, filename):
"""Save or show a renderer.
Parameters
----------
renderer : mne.viz.backends._notebook._Renderer Object
MNE renderer object.
filename : str
Filename to save display to (as an interactive html). Must have extension .html. If None we display the renderer.
"""
if filename is None:
renderer.show()
else:
allowed_extensions = [".html", ".pdf", ".svg", ".eps", ".ps", ".tex"]
ext = Path(filename).suffix
if ext not in allowed_extensions:
raise ValueError(f"{ext} not allowed, please use one of the following: {allowed_extensions}")
log_or_print(f"saving {filename}")
if ext == ".html":
logging.getLogger("param.Row").setLevel(logging.ERROR) # suppress warnings
logging.getLogger("param.VTKRenderWindowSynchronized").setLevel(logging.ERROR)
renderer.figure.plotter.export_html(filename)
elif ext in allowed_extensions:
renderer.figure.plotter.save_graphic(filename)
[docs]def create_freesurfer_meshes_from_bet_surfaces(filenames, xform_mri_voxel2mri):
# Create sMRI-derived freesurfer surfaces in native/mri space in mm, for use by forward modelling
create_freesurfer_mesh_from_bet_surface(
infile=filenames["bet_inskull_mesh_vtk_file"],
surf_outfile=filenames["bet_inskull_surf_file"],
nii_mesh_file=filenames["bet_inskull_mesh_file"],
xform_mri_voxel2mri=xform_mri_voxel2mri,
)
create_freesurfer_mesh_from_bet_surface(
infile=filenames["bet_outskull_mesh_vtk_file"],
surf_outfile=filenames["bet_outskull_surf_file"],
nii_mesh_file=filenames["bet_outskull_mesh_file"],
xform_mri_voxel2mri=xform_mri_voxel2mri,
)
create_freesurfer_mesh_from_bet_surface(
infile=filenames["bet_outskin_mesh_vtk_file"],
surf_outfile=filenames["bet_outskin_surf_file"],
nii_mesh_file=filenames["bet_outskin_mesh_file"],
xform_mri_voxel2mri=xform_mri_voxel2mri,
)
[docs]def create_freesurfer_mesh_from_bet_surface(infile, surf_outfile, xform_mri_voxel2mri, nii_mesh_file=None):
"""Creates surface mesh in .surf format and in native mri space in mm from infile.
Parameters
----------
infile : string
Either:
1) .nii.gz file containing zero's for background and one's for surface
2) .vtk file generated by bet_surf (in which case the path to the
structural MRI, smri_file, must be included as an input)
surf_outfile : string
Path to the .surf file generated, containing the surface mesh in mm
xform_mri_voxel2mri : numpy.ndarray
4x4 array. Transform from voxel indices to native/mri mm.
nii_mesh_file : string
Path to the niftii mesh file that is the niftii equivalent of vtk file passed in as infile (only needed if infile is a vtk file).
"""
pth, name = op.split(infile)
name, ext = op.splitext(name)
if ext == ".gz":
print("Creating surface mesh for {} .....".format(infile))
# Soft import raising an informative warning if not installed
o3d = soft_import("open3d")
name, ext = op.splitext(name)
if ext != ".nii":
raise ValueError("Invalid infile. Needs to be a .nii.gz or .vtk file")
# convert to point cloud in voxel indices
nii_nativeindex = niimask2indexpointcloud(infile)
step = 1
nii_native = xform_points(xform_mri_voxel2mri, nii_nativeindex[:, 0:-1:step])
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(nii_native.T)
pcd.estimate_normals()
# to obtain a consistent normal orientation
pcd.orient_normals_towards_camera_location(pcd.get_center())
# or you might want to flip the normals to make them point outward, not mandatory
pcd.normals = o3d.utility.Vector3dVector(-np.asarray(pcd.normals))
mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(pcd, depth=8)[0]
#mesh = mesh.simplify_quadric_decimation(nii_nativeindex.shape[1])
verts = np.asarray(mesh.vertices)
tris = np.asarray(mesh.triangles).astype(int)
# output in freesurfer file format
write_surface(surf_outfile, verts, tris, file_format="freesurfer", overwrite=True)
elif ext == ".vtk":
if nii_mesh_file is None:
raise ValueError("You must specify a nii_mesh_file (niftii format), if infile format is vtk")
rrs_native, tris_native = get_vtk_mesh_native(infile, nii_mesh_file)
write_surface(surf_outfile, rrs_native, tris_native, file_format="freesurfer", overwrite=True)
else:
raise ValueError("Invalid infile. Needs to be a .nii.gz or .vtk file")
[docs]def save_polhemus_fif(raw, subjects_dir,subject):
"""Save the polhemus information (nasion, LPA, RPA, headshape) from a raw MNE file
Files will be in subjects_dir/subject/coreg.
Parameters
----------
raw : raw MNE data structure
Data loaded into MNE with preload=True
subjects_dir : string
Directory containing the subject directories.
subject : string
Subject directory name to put the coregistration files in.
"""
# Check the input is a raw data structure
if not isinstance(raw, io.BaseRaw):
log_or_print("The object is not a raw data structure.")
return
# Initialize empty lists to store the 3D coordinates
nasion = np.empty((0, 3))
LPA = np.empty((0, 3))
RPA = np.empty((0, 3))
headshape = np.empty((0, 3))
# Check if there is digitisation information
if not raw.info["dig"]:
log_or_print("Error: raw.info['dig'] is empty. Exiting function.")
return
else:
# Iterate through the digitalization points - the *1000 is for m to mm conversion
for p in raw.info["dig"]:
if p["kind"] == 1 and p["ident"] == 1:
LPA = np.append(LPA,(p["r"]*1000)) # Append the 3D coordinates of LPA
elif p["kind"] == 1 and p["ident"] == 2:
nasion = np.append(nasion,p["r"]*1000) # Append the 3D coordinates of nasion
elif p["kind"] == 1 and p["ident"] == 3:
RPA = np.append(RPA,(p["r"]*1000)) # Append the 3D coordinates of RPA
elif p["kind"] == 4:
headshape = np.vstack([headshape, p["r"]*1000]) # Stack headshape points
# Get the location to save the files to
coreg_filenames = get_rhino_filenames(subjects_dir, subject)
path_to_save = coreg_filenames['coreg']['basedir']
# Save to .txt files
if nasion.any():
np.savetxt(os.path.join(path_to_save, "polhemus_nasion.txt"), nasion.T, delimiter=' ', fmt='%.40e')
if LPA.any():
np.savetxt(os.path.join(path_to_save, "polhemus_LPA.txt"), LPA.T, delimiter=' ', fmt='%.40e')
if RPA.any():
np.savetxt(os.path.join(path_to_save, "polhemus_RPA.txt"), RPA.T, delimiter=' ', fmt='%.40e')
if headshape.any():
np.savetxt(os.path.join(path_to_save, "polhemus_headshape.txt"), headshape.T, delimiter=' ', fmt='%.40e')
log_or_print("Saved files to {}".format(path_to_save))
[docs]def downsample_headshape(
headshape,
downsample_amount=0.015, # average over 1.5cm
include_facial_info=True,
remove_zlim=0.02, # Remove 2cm above nasion
angle=10, # At an angle of 10deg
method="gridaverage",
face_Z = [-0.08, 0.02], # Z-axis (up-down) -8cm to 2cm
face_Y = [0.06, 0.15], # Y-axis (forward-back) 6 to 15cm
face_X = [-0.07, 0.07], # X-axis (left_right) -7 to 7cm
downsample_facial_info=True,
downsample_facial_info_amount=0.008 # average over 0.8cm
):
"""
Downsample headshape information for more accurate coregistration.
Parameters
----------
- headshape : numpy.ndarray
Nx3 array of headshape points in meters.
- include_facial_info : bool
Include facial points if True.
- remove_zlim : float
Remove points above nasion on the z-axis in meters.
- method : str
Downsampling method. Note: only method supported is 'gridaverage'.
- facial_info_above_z (float): float
Max z-value for facial points in meters.
- facial_info_below_z : float
Min z-value for facial points in meters.
- facial_info_above_y : float
Max y-value for facial points in meters.
- facial_info_below_y : float
Min y-value for facial points in meters.
- facial_info_below_x : float
Min x-value for facial points in meters.
- downsample_facial_info : bool
Whether to downsample facial points.
- downsample_facial_info_amount : float
Grid size for downsampling facial info in meters.
Returns:
- numpy.ndarray: Downsampled headshape points.
"""
# Original headshape copy
headshape_orig = np.copy(headshape)
# Filter facial points if needed
if include_facial_info:
facial_mask = (
(headshape[:, 2] > face_Z[0]) &
(headshape[:, 2] < face_Z[1]) &
(headshape[:, 1] > face_Y[0]) &
(headshape[:, 1] < face_Y[1]) &
(headshape[:, 0] > face_X[0]) &
(headshape[:, 0] < face_X[1])
)
# Separate facial points and non-facial points
facial_points = headshape[facial_mask]
# Print shape of the headshape without facial points
if downsample_facial_info:
facial_points = grid_average_downsample(facial_points, downsample_facial_info_amount)
# Remove points above z-limit
if remove_zlim is not None:
log_or_print('Removing Points Below Z_lim')
# Rotate the point cloud to align the Z-axis with the specified angle
rotated_headshape = rotate_pointcloud(headshape, angle, 'x')
# Filter points based on Z-limit
z_mask = rotated_headshape[:, 2] > remove_zlim
filtered_rotated_points = rotated_headshape[z_mask]
# Rotate the filtered points back to the original orientation
headshape = rotate_pointcloud(filtered_rotated_points, -angle, 'x')
# Downsample the overall headshape
if method == 'gridaverage':
headshape = grid_average_downsample(headshape,downsample_amount)
else:
raise ValueError("Unsupported downsampling method: {}".format(method))
# Combine headshape and facial points (if included)
if include_facial_info:
headshape = np.vstack((headshape, facial_points))
fig = plt.figure(figsize=(12, 6))
ax = fig.add_subplot(111, projection='3d')
ax.scatter(headshape_orig[:, 0], headshape_orig[:, 1], headshape_orig[:, 2], c='red', alpha=0.1, s=1)
ax.scatter(headshape[:, 0], headshape[:, 1], headshape[:, 2], c='blue', alpha=1, s=10)
plt.title("Downsampled Headshape")
plt.show()
return headshape
[docs]def rotate_pointcloud(points, angle_degrees, axis='x'):
"""
Rotates the point cloud around a specified axis.
Parameters
----------
points : np.array
Headshape points
angle_degrees: float
Amount to rotate in degrees.
axis: str
Axis to rotate.
"""
angle_radians = np.radians(angle_degrees)
if axis == 'x':
rotation_matrix = np.array([
[1, 0, 0],
[0, np.cos(angle_radians), -np.sin(angle_radians)],
[0, np.sin(angle_radians), np.cos(angle_radians)]
])
elif axis == 'y':
rotation_matrix = np.array([
[np.cos(angle_radians), 0, np.sin(angle_radians)],
[0, 1, 0],
[-np.sin(angle_radians), 0, np.cos(angle_radians)]
])
elif axis == 'z':
rotation_matrix = np.array([
[np.cos(angle_radians), -np.sin(angle_radians), 0],
[np.sin(angle_radians), np.cos(angle_radians), 0],
[0, 0, 1]
])
else:
raise ValueError("Invalid axis. Choose from 'x', 'y', or 'z'.")
return np.dot(points, rotation_matrix.T)
import numpy as np
[docs]def grid_average_downsample(point_cloud, voxel_size):
"""
Downsample a point cloud using grid averaging.
This function divides the space into a voxel grid, computes the average
position of points within each voxel, and returns a downsampled point cloud.
Parameters
----------
point_cloud : numpy.ndarray
A numpy array of shape (N, 3) representing the point cloud, where N
is the number of points, and each point has (x, y, z) coordinates.
voxel_size : float
The size of the voxel grid. Points within a grid cell are averaged
to compute the downsampled point.
Returns
-------
downsampled_cloud : numpy.ndarray
A numpy array of shape (M, 3) representing the downsampled point cloud,
where M is the number of voxels containing points.
Notes
-----
- This method assumes the input point cloud is dense and unstructured.
- For very large point clouds, consider optimizing memory usage.
Examples
--------
>>> import numpy as np
>>> point_cloud = np.random.rand(1000, 3) # Generate random point cloud
>>> voxel_size = 0.05
>>> downsampled_cloud = grid_average_downsample(point_cloud, voxel_size)
>>> print(downsampled_cloud.shape)
"""
# Quantize the coordinates into voxel indices
voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32)
# Use a dictionary to collect points in the same voxel
voxel_dict = {}
for idx, point in zip(voxel_indices, point_cloud):
key = tuple(idx)
if key not in voxel_dict:
voxel_dict[key] = []
voxel_dict[key].append(point)
# Compute the average for each voxel
downsampled_cloud = np.array([np.mean(voxel_dict[key], axis=0) for key in voxel_dict])
return downsampled_cloud
[docs]def replace_headshape(raw, ds_headshape):
"""
Replace headshape points in an MNE Raw object with downsampled points and update fiducials.
Parameters
----------
raw : mne.io.Raw
The raw object containing the original digitization points.
ds_headshape : ndarray, shape (N, 3)
Array of downsampled headshape points (in head coordinates).
Returns
-------
raw_copy : mne.io.Raw
A copy of the input Raw object with the updated headshape points and montage.
"""
# Get current digitization points
dig = raw.info['dig']
# Initialize fiducial positions
fid_positions = {
'nasion': None,
'lpa': None,
'rpa': None
}
# Extract fiducials from the dig points
for f in dig:
if f['coord_frame'] == 4: # Ensure it's in the head coordinate frame
if f['ident'] == 2 and fid_positions['nasion'] is None: # Nasion
fid_positions['nasion'] = f['r']
elif f['ident'] == 1 and fid_positions['lpa'] is None: # Left Preauricular (LPA)
fid_positions['lpa'] = f['r']
elif f['ident'] == 3 and fid_positions['rpa'] is None: # Right Preauricular (RPA)
fid_positions['rpa'] = f['r']
# Verify the extracted fiducials
if any(v is None for v in fid_positions.values()):
raise ValueError("One or more fiducials (nasion, LPA, RPA) not found in the head coordinate frame.")
# Create a DigMontage using the extracted fiducials and downsampled headshape points
montage = make_dig_montage(
hsp=ds_headshape, # Downsampled headshape points
nasion=fid_positions['nasion'],
lpa=fid_positions['lpa'],
rpa=fid_positions['rpa'],
coord_frame='head'
)
# Create a copy of the raw object and set the new montage
raw_copy = raw.copy()
raw_copy.set_montage(montage)
return raw_copy