Source code for osl_ephys.source_recon.minimum_norm

"""Minimum norm source localization using MNE-Python.

"""
    
# Authors: Chetan Gohil <chetan.gohil@psych.ox.ac.uk>
#          Mats van Es <mats.vanes@psych.ox.ac.uk>
#          Hongyu Qian <hongyu.qian@queens.ox.ac.uk>


import os
import os.path as op

import mne
import numpy as np
from mne import (
    read_forward_solution,
    Covariance,
    compute_covariance,
    compute_raw_covariance,
)

from . import freesurfer_utils
from ..utils.logger import log_or_print

from osl_ephys.source_recon.rhino import utils as rhino_utils

import logging

[docs]logger = logging.getLogger(__name__)
[docs]def get_mne_filenames(subjects_dir, subject): """Get minimum norm (MNE) filenames. Files will be in subjects_dir/subject/mne Parameters ---------- subjects_dir : string Directory containing the subject directories. subject : string Subject name. Returns ------- filenames : dict A dict of files. """ basedir = op.join(subjects_dir, subject, "mne") if " " in basedir: raise ValueError("subjects_dir cannot contain spaces.") os.makedirs(basedir, exist_ok=True) filenames = { "inverse_operator": op.join(basedir, "op-inv.fif"), "source_estimate_raw": op.join(basedir, "src-raw"), # followed by -lh/rh.stc "source_estimate_epo": op.join(basedir, "src-epo"), } return filenames
[docs]def create_inverse_operator( fwd, data, chantypes, rank, depth, loose, filename, ): """Creates minimum norm (MNE) inverse operator. Parameters ---------- fwd : mne forward model or str Forward model. data : mne.io.Raw, mne.Epochs Preprocessed data. chantypes : list List of channel types to include. rank : int Rank of the data covariance matrix. depth : float Depth weighting. loose : float Loose parameter. inv_op_filename : str Output filename. """ log_or_print("*** RUNNING MNE SOURCE LOCALIZATION ***") noise_cov = calc_noise_cov(data, rank, chantypes) if isinstance(fwd, str): fwd = mne.read_forward_solution(fwd) inverse_operator = mne.minimum_norm.make_inverse_operator( data.info, fwd, noise_cov, loose=loose, depth=depth, rank=rank, ) mne.minimum_norm.write_inverse_operator( filename, inverse_operator, overwrite=True ) return inverse_operator
[docs]def apply_inverse_operator_surf( outdir, subject, data, method, lambda2, pick_ori, inverse_operator=None, morph="fsaverage", save=False, ): """Apply previously computed minimum norm inverse solution (surface). Parameters ---------- outdir : str Output directory. subject : str Subject ID. data : mne.io.Raw, mne.Epochs Raw or Epochs object. inverse_operator : mne.minimum_norm.InverseOperator Inverse operator. method : str Inverse method. "MNE" | "dSPM" | "sLORETA" | "eLORETA". (or "mne" | "dspm" | "sloreta" | "eloreta"). lambda2 : float Regularization parameter. pick_ori : str Orientation to pick. morph : bool, str Morph method, e.g. fsaverage. Can be False. save : bool Save source estimate (default: False). """ mne_dir = op.join(outdir, subject, "mne") mne_files = get_mne_filenames(outdir, subject) coreg_files = freesurfer_utils.get_coreg_filenames(outdir, subject) if inverse_operator is None: inv_op_fname = mne_files["inverse_operator"] inverse_operator = mne.minimum_norm.read_inverse_operator(inv_op_fname) if method == "mne": method = "MNE" if method == "dspm": method = "dSPM" if method == "sloreta": method = "sLORETA" if method == "eloreta": method = "eLORETA" if pick_ori == "None": pick_ori = None log_or_print(f"*** Applying {method} surface inverse operator ***") if isinstance(data, mne.Epochs): stc = mne.minimum_norm.apply_inverse_epochs( data, inverse_operator, lambda2=lambda2, method=method, pick_ori=pick_ori, ) else: stc = mne.minimum_norm.apply_inverse_raw( data, inverse_operator, lambda2=lambda2, method=method, pick_ori=pick_ori, ) if morph: log_or_print(f"*** Morphing source estimate to {morph} ***") src_from = mne.read_source_spaces(coreg_files['source_space']) morph = morph_surface(outdir, subject, src_from, subject_to=morph) morph.save(coreg_files['source_space-morph'], overwrite=True) stc = morph.apply(stc) if save: log_or_print("*** Saving source estimate ***") if isinstance(data, mne.Epochs): stc.save(op.join(mne_dir, "src-epo"), overwrite=True) else: stc.save(op.join(mne_dir, "src-raw"), overwrite=True) return stc
[docs]def apply_inverse_operator_vol( outdir, subject, data, method, lambda2, pick_ori="pca", inverse_operator=None, transform=None, ): """Apply previously computed minimum norm inverse solution (volumetric). Parameters ---------- outdir : str Output directory. subject : str Subject ID. data : mne.io.Raw, mne.Epochs Raw or Epochs object. inverse_operator : mne.minimum_norm.InverseOperator Inverse operator. method : str Inverse method. "MNE" | "dSPM" | "sLORETA" | "eLORETA". (or "mne" | "dspm" | "sloreta" | "eloreta"). lambda2 : float Regularization parameter. pick_ori : str Orientation to pick. transform : str, optional Should we standardise ('ztrans') or de-mean ('demean') the voxel time courses? If None, no transform is applied. Returns ------- ts : (voxels, time) array In native MRI space. """ mne_dir = op.join(outdir, subject, "mne") mne_files = get_mne_filenames(outdir, subject) if inverse_operator is None: inv_op_fname = mne_files["inverse_operator"] inverse_operator = mne.minimum_norm.read_inverse_operator(inv_op_fname) if method == "mne": method = "MNE" if method == "dspm": method = "dSPM" if method == "sloreta": method = "sLORETA" if method == "eloreta": method = "eLORETA" log_or_print(f"*** Applying {method} volumetric inverse operator ***") if isinstance(data, mne.Epochs): raise ValueError("Currently volumetric MNE only supports Raw data.") if pick_ori.lower() != "pca": raise ValueError("pick_ori must be 'pca'.") # Estimate source activity stc = mne.minimum_norm.apply_inverse_raw( data, inverse_operator, lambda2=lambda2, pick_ori=pick_ori, method=method, method_params={"eps": 1e-8, "max_iter": 100}, ) # Use PCA to project 3D value into maximum variance axis xyz_ts = stc.data source_nn = inverse_operator['source_nn'][2::3] del inverse_operator xyz_ts = xyz_ts - np.mean(xyz_ts, axis=-1, keepdims=True) eig_vals, eig_vecs = np.linalg.eig(np.matmul(xyz_ts,xyz_ts.transpose(0,2,1))) order = np.argsort(np.abs(eig_vals), axis=-1) max_power_ori = eig_vecs[np.arange(len(eig_vecs)), :, order[:, -1]] assert max_power_ori.shape == (xyz_ts.shape[0], xyz_ts.shape[1]) signs = np.sign(np.sum(max_power_ori * source_nn, axis=1, keepdims=True)) signs[signs == 0] = 1.0 max_power_ori *= signs ts = np.squeeze(np.matmul(np.expand_dims(max_power_ori,1),xyz_ts),axis = 1) # Only keep good time points _, times = data.get_data( reject_by_annotation="omit", return_times=True, verbose=0 ) indices = data.time_as_index(times, use_rounding=True) ts = ts[..., indices] # Transform the voxel time courses if transform == "ztrans": log_or_print(f"Applying {transform} to voxel time courses") ts -= np.mean(ts, axis=0, keepdims=True) ts /= np.std(ts, axis=0, keepdims=True) elif transform == "demean": log_or_print(f"Applying {transform} to voxel time courses") ts -= np.mean(ts, axis=0, keepdims=True) return ts
[docs]def calc_noise_cov(data, data_cov_rank, chantypes): """Calculate noise covariance. Parameters ---------- raw : mne.io.Raw Raw object. data_cov_rank : int Rank of the data covariance matrix. chantypes : list List of channel types to include. """ # In MNE, the noise cov is normally obtained from empty room noise # recordings or from a baseline period. Here (if no noise cov is passed in) # we mimic what the osl_normalise_sensor_data.m function in Matlab OSL does, # by computing a diagonal noise cov with the variances set to the mean # variance of each sensor type (e.g. mag, grad, eeg.) log_or_print("*** Calculating noise covariance ***") data = data.pick(chantypes) if isinstance(data, mne.io.Raw): data_cov = mne.compute_raw_covariance(data, method="empirical", rank=data_cov_rank) else: data_cov = mne.compute_covariance(data, method="empirical", rank=data_cov_rank) n_channels = data_cov.data.shape[0] noise_cov_diag = np.zeros(n_channels) for type in chantypes: # Indices of this channel type type_raw = data.copy().pick(type, exclude="bads") inds = [] for chan in type_raw.info["ch_names"]: inds.append(data_cov.ch_names.index(chan)) # Mean variance of channels of this type variance = np.mean(np.diag(data_cov.data)[inds]) noise_cov_diag[inds] = variance log_or_print(f"variance for chantype {type} is {variance}") bads = [b for b in data.info["bads"] if b in data_cov.ch_names] noise_cov = mne.Covariance( noise_cov_diag, data_cov.ch_names, bads, data.info["projs"], nfree=data.n_times, ) return noise_cov
[docs]def morph_surface( subjects_dir, subject, src_from, subject_to="fsaverage", src_to=None, spacing=None, ): """Morph source space to another subject's surface. Parameters ---------- subject : str Subject ID. subjects_dir : str Subjects directory. src_from : mne.SourceSpaces Original source space. src_to : str, mne.SourceSpaces Destination source space. can be 'fsaverage' or a source space. """ # get source spacing from src_to fsaverage_coreg_filenames = freesurfer_utils.get_coreg_filenames(subjects_dir, "fsaverage") if subject_to == "fsaverage" and not op.exists(fsaverage_coreg_filenames['source_space']): # estimate source spacing from src_from if 'spacing' not in src_from.info['command_line']: src_to = freesurfer_utils.make_fsaverage_src(subjects_dir) # use default else: spacing = int(src_from.info['command_line'].split('spacing=')[1].split(', ')[0]) src_to = freesurfer_utils.make_fsaverage_src(subjects_dir, spacing) morph = mne.compute_source_morph( src_from, subject_from=subject, subject_to=subject_to, src_to=src_to, subjects_dir=subjects_dir, ) return morph