"""Beamforming.
"""
# Authors: Mark Woolrich <mark.woolrich@ohba.ox.ac.uk>
# Chetan Gohil <chetan.gohil@psych.ox.ac.uk>
import os
import os.path as op
import numpy as np
import matplotlib.pyplot as plt
import mne
from mne import read_forward_solution, Covariance, compute_covariance, compute_raw_covariance
from mne.rank import compute_rank
from mne.minimum_norm.inverse import _check_depth, _prepare_forward, _get_vertno
from mne.source_estimate import _get_src_type
from mne.forward import _subject_from_forward
from mne.forward.forward import is_fixed_orient
from mne.beamformer import read_beamformer
from mne.beamformer._lcmv import _apply_lcmv
from mne.beamformer._compute_beamformer import _reduce_leadfield_rank, _sym_inv_sm, Beamformer
from mne.minimum_norm.inverse import _check_reference
from mne.utils import (
_check_channels_spatial_filter,
_check_one_ch_type,
_check_info_inv,
_check_option,
_reg_pinv,
_pl,
_sym_mat_pow,
_check_src_normal,
check_version,
verbose,
warn,
)
from mne.utils import logger as mne_logger
try:
from mne._fiff.meas_info import _simplify_info
from mne._fiff.pick import pick_channels_cov, pick_info
from mne._fiff.proj import make_projector
except ImportError:
# Depreciated in mne 1.6
from mne.io.meas_info import _simplify_info
from mne.io.pick import pick_channels_cov, pick_info
from mne.io.proj import make_projector
from osl_ephys.source_recon import rhino
from osl_ephys.source_recon.rhino import utils as rhino_utils
from osl_ephys.utils.logger import log_or_print
[docs]def make_lcmv(
subjects_dir,
subject,
data,
chantypes=None,
data_cov=None,
noise_cov=None,
reg=0,
label=None,
pick_ori="max-power-pre-weight-norm",
rank="info",
noise_rank="info",
weight_norm="unit-noise-gain-invariant",
reduce_rank=True,
depth=None,
inversion="matrix",
verbose=None,
save_filters=False,
):
"""Compute LCMV spatial filter.
Modified version of mne.beamformer.make_lcmv.
Parameters
----------
subjects_dir : str
Directory to find subject directories in.
subject : str
Subject name.
data : instance of mne.Raw | mne.Epochs
The measurement data to specify the channels to include. Bad channels in info['bads'] are not used. Will also be used to calculate data_cov.
chantypes : list
List of channel types to use to calculate the noise covariance. E.g. ['eeg'], ['mag', 'grad'], ['eeg', 'mag', 'grad'].
data_cov : instance of mne.Covariance | None
The noise covariance matrix used to whiten. If None will be computed from dat.
noise_cov : instance of mne.Covariance | None
The noise covariance matrix used to whiten. If None will be computed from dat as a diagonal matrix with variances set to the average of all sensors of that type.
reg : float
The regularization for the whitened data covariance.
label : instance of Label
Restricts the LCMV solution to a given label.
pick_ori : None | 'normal' | 'max-power' | max-power-pre-weight-norm
The source orientation to compute the beamformer in.
rank : dict | None | 'full' | 'info'
This controls the rank computation that can be read from the measurement info or estimated from the data. When a noise covariance is used for whitening, this should reflect the rank of that covariance, otherwise amplification of noise components can occur in whitening (e.g., often during source localization).
``None``
The rank will be estimated from the data after proper scaling of different channel types.
``'info'``
The rank is inferred from info. If data have been processed with Maxwell filtering, the Maxwell filtering header is used. Otherwise, the channel counts themselves are used. In both cases, the number of projectors is subtracted from the (effective) number of channels in the data. For example, if Maxwell filtering reduces the rank to 68, with two projectors the returned value will be 66.
``'full'``
The rank is assumed to be full, i.e. equal to the number of good channels. If a Covariance is passed, this can make sense if it has been (possibly improperly) regularized without taking into account the true data rank.
dict
Calculate the rank only for a subset of channel types, and explicitly specify the rank for the remaining channel types. This can be extremely useful if you already know the rank of (part of) your data, for instance in case you have calculated it earlier.
This parameter must be a dictionary whose keys correspond to channel types in the data (e.g. 'meg', 'mag', 'grad', 'eeg'), and whose values are integers representing the respective ranks. For example, {'mag': 90, 'eeg': 45} will assume a rank of 90 and 45 for magnetometer data and EEG data, respectively.
The ranks for all channel types present in the data, but not specified in the dictionary will be estimated empirically. That is, if you passed a dataset containing magnetometer, gradiometer, and EEG data together with the dictionary from the previous example, only the gradiometer rank would be determined, while the specified magnetometer and EEG ranks would be taken for granted.
The default is ``'info'``.
noise_rank : dict | None | 'full' | 'info'
This controls the rank computation that can be read from the measurement info or estimated from the data. When a noise covariance is used for whitening, this should reflect the rank of that covariance, otherwise amplification of noise components can occur in whitening (e.g., often during source localization).
weight_norm : None | 'unit-noise-gain' | 'unit-noise-gain-invariant' | 'nai'
The weight normalization scheme to use.
reduce_rank : bool
Whether to reduce the rank by one during computation of the filter.
depth : None | float | dict
How to weight (or normalize) the forward using a depth prior (see MNE docs).
inversion : 'matrix' | 'single'
The inversion scheme to compute the weights.
save_filters : bool
Should we save the LCMV beamforming filter?
Returns
-------
filters : instance of mne.beamformer.Beamformer
Dictionary containing filter weights from LCMV beamformer. See MNE docs.
"""
log_or_print("*** RUNNING OSL MAKE LCMV ***")
rhino_files = rhino_utils.get_rhino_filenames(subjects_dir, subject)
beamforming_files = get_beamforming_filenames(subjects_dir, subject)
# load forward solution
fwd_fname = rhino_files["fwd_model"]
fwd = read_forward_solution(fwd_fname)
if data_cov is None:
# Note that if chantypes are meg, eeg; and meg includes mag, grad then compute_covariance will project data separately for meg and eeg to reduced
# rank subspace (i.e. mag and grad will be combined together inside the meg subspace, eeg will haeve a separate subspace). I.e. data will become
# (ntpts x (rank_meg + rank_eeg)) and cov will be (rank_meg + rank_eeg) x (rank_meg + rank_eeg) and include correlations between eeg and meg subspaces.
# The output data_cov is cov projected back onto the indivdual sensor types mag, grad, eeg.
#
# Prior to computing anything, including the subspaces each of mag, grad, eeg are scaled so that they are on comparable scales to aid mixing in the
# subspace and improve numerical stability. This is equivalent to what the osl_normalise_sensor_data.m function in Matlab OSL is trying to do.
# Note that in the output data_cov the scalings have been undone.
if isinstance(data, mne.io.Raw):
data_cov = compute_raw_covariance(data, method="empirical", rank=rank)
else:
data_cov = compute_covariance(data, method="empirical", rank=rank)
if noise_cov is None:
if chantypes is None:
raise ValueError("chantypes must be passed.")
# Calculate noise covariance matrix
#
# Later this will be inverted and used to whiten the data AND the lead fields as part of the source recon. See:
#
# https://www.sciencedirect.com/science/article/pii/S1053811914010325?via%3Dihub
#
# In MNE, the noise cov is normally obtained from empty room noise recordings or from a baseline period. Here (if no noise cov is passed in) we
# mimic what the osl_normalise_sensor_data.m function in Matlab OSL does, by computing a diagonal noise cov with the variances set to the mean
# variance of each sensor type (e.g. mag, grad, eeg.)
n_channels = data_cov.data.shape[0]
noise_cov_diag = np.zeros(n_channels)
for type in chantypes:
# Indices of this channel type
type_data = data.copy().pick(type, exclude="bads")
inds = []
for chan in type_data.info["ch_names"]:
inds.append(data_cov.ch_names.index(chan))
# Mean variance of channels of this type
variance = np.mean(np.diag(data_cov.data)[inds])
noise_cov_diag[inds] = variance
log_or_print("variance for chantype {} is {}".format(type, variance))
bads = [b for b in data.info["bads"] if b in data_cov.ch_names]
noise_cov = Covariance(noise_cov_diag, data_cov.ch_names, bads, data.info["projs"], nfree=1e10)
filters = _make_lcmv(
data.info,
fwd,
data_cov,
noise_cov=noise_cov,
reg=reg,
pick_ori=pick_ori,
weight_norm=weight_norm,
rank=rank,
noise_rank=noise_rank,
reduce_rank=reduce_rank,
verbose=verbose,
)
if save_filters:
log_or_print(f"saving {beamforming_files['filters_file']}")
filters.save(beamforming_files["filters_file"], overwrite=True)
log_or_print("*** OSL MAKE LCMV COMPLETE ***")
return filters
[docs]def make_plots(subjects_dir, subject, filters, data):
"""Plot LCMV beamforming filters.
Parameters
----------
subjects_dir : string
Directory containing the subject directories.
subject : string
Subject name.
filters : mne.beamformer.Beamformer
Filters to plot.
data : mne.Raw or mne.Epochs
Data used to create the filters.
Returns
-------
filters_cov_plot : str
Path to covariance plot.
filters_svd_plot : str
Path to eigenspectrum plot.
"""
beamforming_files = get_beamforming_filenames(subjects_dir, subject)
fig_cov, fig_svd = filters["data_cov"].plot(data.info, show=False, verbose=False)
fig_cov.savefig(beamforming_files["filters_plot_cov"], dpi=150)
fig_svd.savefig(beamforming_files["filters_plot_svd"], dpi=150)
plt.close("all")
return beamforming_files["filters_plot_cov"], beamforming_files["filters_plot_svd"]
[docs]def load_lcmv(subjects_dir, subject):
"""Load LCMV beamforming filters.
Parameters
----------
subjects_dir : string
Directory containing the subject directories.
subject : string
Subject name.
Returns
-------
filters : instance of mne.beamformer.Beamformer
Dictionary containing filter weights from LCMV beamformer. See MNE docs.
"""
filenames = get_beamforming_filenames(subjects_dir, subject)
log_or_print(f"loading {filenames['filters_file']}")
return read_beamformer(filenames["filters_file"])
[docs]def apply_lcmv(data, filters, reject_by_annotation="omit"):
"""Apply a LCMV filter to an MNE Raw or Epochs object.
Parameters
----------
data : instance of :py:class:`mne.io.Raw` or :py:class:`mne.Epochs`
The data to apply the LCMV filter to.
filters : instance of :py:class:`mne.beamformer.Beamformer`
The LCMV filter to apply.
reject_by_annotation : str | list of str | None
If string, the annotation description to use to reject epochs.
If list of str, the annotation descriptions to use to reject epochs.
If None, do not reject epochs.
Returns
-------
:py:class:`mne.SourceEstimate`
"""
log_or_print("beamforming.apply_lcmv")
if isinstance(data, mne.io.Raw):
return apply_lcmv_raw(data, filters, reject_by_annotation)
else:
return mne.beamformer.apply_lcmv_epochs(data, filters)
[docs]def apply_lcmv_raw(raw, filters, reject_by_annotation="omit"):
"""Modified version of mne.beamformer.apply_lcmv_raw.
Parameters
----------
raw : instance of :py:class:`mne.io.Raw`
The raw data to apply the LCMV filter to.
filters : instance of :py:class:`mne.beamformer.Beamformer`
The LCMV filter to apply.
reject_by_annotation : str | list of str | None
If string, the annotation description to use to reject epochs.
If list of str, the annotation descriptions to use to reject epochs.
If None, do not reject epochs.
Returns
-------
:py:class:`mne.SourceEstimate`
"""
_check_reference(raw)
# Get data from the mne.Raw object
data, times = raw.get_data(return_times=True, reject_by_annotation=reject_by_annotation)
# Select channels
chan_inds = _check_channels_spatial_filter(raw.ch_names, filters)
data = data[chan_inds]
# Apply LCMV beamformer
stc = _apply_lcmv(data=data, filters=filters, info=raw.info, tmin=times[0])
return next(stc)
[docs]def get_recon_timeseries(subjects_dir, subject, coord_mni, recon_timeseries_head):
"""Gets the reconstructed time series nearest to the passed coordinates in MNI space.
Parameters
----------
subjects_dir : string
Directory to find subject directories in.
subject : string
Subject name.
coord_mni : (3,) numpy.ndarray
3D coordinate in MNI space to get timeseries for
recon_timeseries_head : (ndipoles, ntpts) np.array
Reconstructed time courses in head (polhemus) space. Assumes that the dipoles are the same (and in the same order) as those in the forward model, rhino_files['fwd_model'].
Returns
-------
recon_timeseries : numpy.ndarray
The timecourse in recon_timeseries_head nearest to coord_mni.
"""
rhino_files = rhino_utils.get_rhino_filenames(subjects_dir, subject)
surfaces_filenames = rhino_files["surf"]
coreg_filenames = rhino_files["coreg"]
# Get coord_mni in mri space
mni_mri_t = rhino_utils.read_trans(surfaces_filenames["mni_mri_t_file"])
coord_mri = rhino_utils.xform_points(mni_mri_t["trans"], coord_mni)
# Get hold of coords of points reconstructed to. Note, MNE forward model is done in head space in metres. RHINO does everything in mm
fwd = read_forward_solution(rhino_files["fwd_model"])
vs = fwd["src"][0]
recon_coords_head = vs["rr"][vs["vertno"]] * 1000 # in mm
# Convert coords_head from head to mri space to get index of reconstructed coordinate nearest to coord_mni
head_scaledmri_t = rhino_utils.read_trans(coreg_filenames["head_scaledmri_t_file"])
recon_coords_scaledmri = rhino_utils.xform_points(head_scaledmri_t["trans"], recon_coords_head.T).T
recon_index, d = rhino_utils.closest_node(coord_mri.T, recon_coords_scaledmri)
recon_timeseries = np.abs(recon_timeseries_head[recon_index, :]).T
return recon_timeseries
[docs]def get_lcmv_weights(
subjects_dir,
subject,
spatial_resolution=None,
reference_brain="mni",
verbose=None,
):
"""Get LCMV beamformer weights.
Parameters
----------
subjects_dir : str
Directory to find subject directories in.
subject : str
Subject name.
spatial_resolution : int
Resolution to use for the reference brain in mm (must be an integer, or will be cast to nearest int). If None, then the gridstep used in rhino_files['fwd_model'] is used.
reference_brain : str
'mni' indicates that the reference_brain is the stdbrain in MNI space.
'mri' indicates that the reference_brain is the subject's sMRI in the scaled native/mri space.
'unscaled_mri' indicates that the reference_brain is the subject's sMRI in unscaled native/mri space.
Note that Scaled/unscaled relates to the allow_smri_scaling option in coreg. If allow_scaling was False, then the unscaled MRI will be the same as the scaled MRI.
verbose : bool
If True, print out more information.
Returns
-------
weights : (ndipoles, nsensors) numpy.ndarray
Beamformer weights resampled on the reference brain grid.
coords : (3, ndipoles) numpy.ndarray
Array of coordinates (in mm) of dipoles in weights in "reference_brain" space.
"""
rhino_files = rhino_utils.get_rhino_filenames(subjects_dir, subject)
# Load forward model and LCMV beamformer
fwd = read_forward_solution(rhino_files["fwd_model"], verbose=verbose)
filters = load_lcmv(subjects_dir, subject)
# Weights in head space
weights = filters["weights"]
# Account for whether the sensor data was being whitened
whitener = filters["whitener"]
if whitener is not None:
weights = weights.dot(whitener)
# Transform to a different coordinate space
weights, _, coords, _ = transform_recon_timeseries(
subjects_dir=subjects_dir,
subject=subject,
recon_timeseries=weights,
spatial_resolution=spatial_resolution,
reference_brain=reference_brain,
)
return weights, coords
[docs]def get_leadfields(
subjects_dir,
subject,
spatial_resolution=None,
reference_brain="mni",
orientation="max-dim",
verbose=None,
):
"""Get leadfields from a forward model.
Parameters
----------
subjects_dir : str
Directory to find subject directories in.
subject : str
Subject name.
spatial_resolution : int
Resolution to use for the reference brain in mm (must be an integer, or will be cast to nearest int). If None, then the gridstep used in rhino_files['fwd_model'] is used.
reference_brain : str
'mni' indicates that the reference_brain is the stdbrain in MNI space.
'mri' indicates that the reference_brain is the subject's sMRI in the scaled native/mri space.
'unscaled_mri' indicates that the reference_brain is the subject's sMRI in unscaled native/mri space.
Note that Scaled/unscaled relates to the allow_smri_scaling option in coreg. If allow_scaling was False, then the unscaled MRI will be the same as the scaled MRI.
orientation : str
How should we reduce the 3 leadfield dimensions into a scaler? Options:
'l2-norm' takes the L2 norm across the xyz dimensions.
'max-dim' takes the leadfield with the highest value across the xyz dimensions.
'max-power' projects the leadfield onto the eigenvector with the smallest eigenvalue, this is equivalent to the maximum power orientation.
verbose : bool
If True, print out more information.
Returns
-------
leadfield : (nsensors, ndipoles) numpy.ndarray
Lead fields resampled on the reference brain grid.
coords : (3, ndipoles) numpy.ndarray
Array of coordinates (in mm) of dipoles in leadfield_out in "reference_brain" space.
"""
rhino_files = rhino_utils.get_rhino_filenames(subjects_dir, subject)
# ------------------------
# Leadfields in head space
# Load the leadfields
fwd = read_forward_solution(rhino_files["fwd_model"], verbose=verbose)
L = fwd['sol']['data']
L = L.reshape((L.shape[0], -1, 3))
# Reduce the 3 dimensions (x,y,z) to a scalar
if orientation == "l2-norm":
leadfields = np.linalg.norm(L, axis=-1)
elif orientation == "max-dim":
L_max = np.max(L, axis=-1)
L_min = np.min(L, axis=-1)
L_max[abs(L_min) > L_max] = L_min[abs(L_min) > L_max]
leadfields = L_max
elif orientation == "max-power":
# Get the orientation for maximum power as the eigenvector with the smallest eigenvalue
LT_L = np.matmul(L.transpose(1, 2, 0), L.transpose((1, 0, 2)))
eig_vals, eig_vecs = np.linalg.eig(LT_L)
order = np.argsort(np.abs(eig_vals), axis=-1)
max_power_ori = eig_vecs[np.arange(len(eig_vecs)), :, order[:, 0]]
# Fix the sign
source_nn = fwd["source_nn"][2::3]
signs = np.sign(np.sum(max_power_ori * source_nn, axis=1, keepdims=True))
signs[signs == 0] = 1.0
max_power_ori *= signs
# Leadfield for max power orientation
leadfields = np.sum(L * max_power_ori[None, ...], axis=-1)
else:
raise ValueError("orientation should be 'l2-norm', 'max' or 'max-power'.")
# Transform to a different coordinate space
leadfields, _, coords, _ = transform_recon_timeseries(
subjects_dir=subjects_dir,
subject=subject,
recon_timeseries=leadfields.T, # pretend sensor dimension is time
spatial_resolution=spatial_resolution,
reference_brain=reference_brain,
)
return leadfields.T, coords
@verbose
[docs]def _make_lcmv(
info,
forward,
data_cov,
reg=0.05,
noise_cov=None,
label=None,
pick_ori=None,
rank="info",
noise_rank="info",
weight_norm="unit-noise-gain-invariant",
reduce_rank=False,
depth=None,
inversion="matrix",
verbose=None,
):
"""Compute LCMV spatial filter.
Modified version of mne.beamformer._make_lcmv.
Parameters
----------
info : instance of :py:class:`mne.Info`
The measurement info to specify the channels to include.
forward : instance of :py:class:`mne.Forward`
The forward solution.
data_cov : instance of :py:class:`mne.Covariance`
The data covariance object.
reg : float
The regularization for the whitened data covariance.
noise_cov : instance of :py:class:`mne.Covariance`
The noise covariance object.
label : instance of :py:class:`mne.Label`
Restricts the LCMV solution to a given label.
pick_ori : None | 'normal' | 'max-power' | max-power-pre-weight-norm
The source orientation to compute the beamformer in.
rank : dict | None | 'full' | 'info'
This controls the rank computation that can be read from the measurement info or estimated from the data. When a noise covariance is used for whitening, this should reflect the rank of that covariance, otherwise amplification of noise components can occur in whitening (e.g., often during source localization).
``None``
The rank will be estimated from the data after proper scaling of different channel types.
``'info'``
The rank is inferred from info. If data have been processed with Maxwell filtering, the Maxwell filtering header is used. Otherwise, the channel counts themselves are used. In both cases, the number of projectors is subtracted from the (effective) number of channels in the data. For example, if Maxwell filtering reduces the rank to 68, with two projectors the returned value will be 66.
``'full'``
The rank is assumed to be full, i.e. equal to the number of good channels. If a Covariance is passed, this can make sense if it has been (possibly improperly) regularized without taking into account the true data rank.
dict
Calculate the rank only for a subset of channel types, and explicitly specify the rank for the remaining channel types. This can be extremely useful if you already know the rank of (part of) your data, for instance in case you have calculated it earlier.
This parameter must be a dictionary whose keys correspond to channel types in the data (e.g. 'meg', 'mag', 'grad', 'eeg'), and whose values are integers representing the respective ranks. For example, {'mag': 90, 'eeg': 45} will assume a rank of 90 and 45 for magnetometer data and EEG data, respectively.
The ranks for all channel types present in the data, but not specified in the dictionary will be estimated empirically. That is, if you passed a dataset containing magnetometer, gradiometer, and EEG data together with the dictionary from the previous example, only the gradiometer rank would be determined, while the specified magnetometer and EEG ranks would be taken for granted.
The default is ``'info'``.
noise_rank : dict | None | 'full' | 'info'
This controls the rank computation that can be read from the measurement info or estimated from the data. When a noise covariance is used for whitening, this should reflect the rank of that covariance, otherwise amplification of noise components can occur in whitening (e.g., often during source localization).
weight_norm : None | 'unit-noise-gain' | 'nai'
The weight normalization scheme to use.
reduce_rank : bool
Whether to reduce the rank by one during computation of the filter.
depth : None | float | dict
How to weight (or normalize) the forward using a depth prior (see Notes).
inversion : 'matrix' | 'single'
The inversion scheme to compute the weights.
verbose : bool, str, int, or None
If not None, override default verbose level (see mne.verbose).
Returns
-------
filters : instance of :py:class:`mne.beamformer.Beamformer`
Dictionary containing filter weights from LCMV beamformer. See MNE docs.
"""
# Code that is different to mne.beamformer.make_lcmv is labelled with MWW
# Check number of sensor types present in the data and ensure a noise cov
info = _simplify_info(info)
noise_cov, _, allow_mismatch = _check_one_ch_type("lcmv", info, forward, data_cov, noise_cov)
# NOTE: we need this extra picking step (can't just rely on minimum norm's because there can be a mismatch.
# Should probably add an extra arg to _prepare_beamformer_input at some point (later)
picks = _check_info_inv(info, forward, data_cov, noise_cov)
info = pick_info(info, picks)
data_rank = compute_rank(data_cov, rank=rank, info=info)
noise_rank = compute_rank(noise_cov, rank=noise_rank, info=info)
#for key in data_rank:
# if (
# key not in noise_rank or data_rank[key] != noise_rank[key]
# ) and not allow_mismatch:
# raise ValueError(
# "%s data rank (%s) did not match the noise "
# "rank (%s)" % (key, data_rank[key], noise_rank.get(key, None))
# )
# del noise_rank
rank = data_rank
mne_logger.info("Making LCMV beamformer with data cov rank {}".format(rank))
mne_logger.info("Making LCMV beamformer with noise cov rank {}".format(noise_rank))
del data_rank
depth = _check_depth(depth, "depth_sparse")
if inversion == "single":
depth["combine_xyz"] = False
is_free_ori, info, proj, vertno, G, whitener, nn, orient_std = _prepare_beamformer_input(
info, forward, label, pick_ori, noise_cov=noise_cov, rank=noise_rank, pca=False, **depth,
)
ch_names = list(info["ch_names"])
data_cov = pick_channels_cov(data_cov, include=ch_names)
Cm = data_cov._get_square()
if "estimator" in data_cov:
del data_cov["estimator"]
rank_int = sum(rank.values())
del rank
# Compute spatial filter
n_orient = 3 if is_free_ori else 1
W, max_power_ori = _compute_beamformer(
G,
Cm,
reg,
n_orient,
weight_norm,
pick_ori,
reduce_rank,
rank_int,
inversion=inversion,
nn=nn,
orient_std=orient_std,
whitener=whitener,
)
# Get src type to store with filters for _make_stc
src_type = _get_src_type(forward["src"], vertno)
# Get subject to store with filters
subject_from = _subject_from_forward(forward)
# Is the computed beamformer a scalar or vector beamformer?
is_free_ori = is_free_ori if pick_ori in [None, "vector"] else False
is_ssp = bool(info["projs"])
filters = Beamformer(
kind="LCMV",
weights=W,
data_cov=data_cov,
noise_cov=noise_cov,
whitener=whitener,
weight_norm=weight_norm,
pick_ori=pick_ori,
ch_names=ch_names,
proj=proj,
is_ssp=is_ssp,
vertices=vertno,
is_free_ori=is_free_ori,
n_sources=forward["nsource"],
src_type=src_type,
source_nn=forward["source_nn"].copy(),
subject=subject_from,
rank=rank_int,
max_power_ori=max_power_ori,
inversion=inversion,
)
return filters
[docs]def voxel_timeseries(
subjects_dir,
subject,
preproc_file,
chantypes,
freq_range=None,
spatial_resolution=None,
reference_brain="mni",
reject_by_annotation=None,
):
"""Get the voxel time series of beamformed data.
Parameters
----------
subjects_dir : str
Directory to find subject directories in.
subject : str
Subject name.
preproc_file : str
Path to the preprocessed fif file.
chantypes : list of str
Channel types to use in beamforming.
freq_range : list
Lower and upper band to bandpass filter before beamforming. If None, no filtering is done.
spatial_resolution : int
Resolution for beamforming to use for the reference brain in mm (must be an integer, or will be cast to nearest int).
If None, then the gridstep used in rhino_files['fwd_model'] is used.
reference_brain : str
'mni' indicates that the reference_brain is the stdbrain in MNI space.
'mri' indicates that the reference_brain is the subject's sMRI in the scaled native/mri space.
'unscaled_mri' indicates that the reference_brain is the subject's sMRI in unscaled native/mri space.
Note that Scaled/unscaled relates to the allow_smri_scaling option in coreg. If allow_scaling was False, then the unscaled MRI will be the same as the scaled MRI.
reject_by_annotation : str
Argument passed to .get_data() if the preproc file contains an MNE Raw object.
Returns
-------
voxel_data : np.ndarray
Voxel time series in (voxels, time) format.
voxel_coords : np.ndarray
Voxel coordinates in MNI space.
"""
if isinstance(preproc_file, str):
# Load sensor data
if "raw.fif" in preproc_file:
# Load preprocessed data
data = mne.io.read_raw_fif(preproc_file, preload=True)
elif "epo.fif" in preproc_file:
# Load epoched data
data = mne.read_epochs(preproc_file, preload=True)
else:
raise ValueError("fif file must end in 'raw.fif' or 'epo.fif'.")
else:
# Assume an mne Raw or Epochs object has been passed
data = preproc_file
log_or_print(f"using chantypes: {chantypes}")
if isinstance(chantypes, str):
chantypes = [chantypes]
data.pick(chantypes)
# Bandpass filter
if freq_range is not None:
log_or_print(f"bandpass filtering: {freq_range}")
data = data.filter(l_freq=freq_range[0], h_freq=freq_range[1], method="iir", iir_params={"order": 5, "ftype": "butter"})
# Load the beamformer and apply
filters = load_lcmv(subjects_dir, subject)
bf_data = apply_lcmv(data, filters, reject_by_annotation)
# Transform to MNI space
if isinstance(data, mne.io.Raw):
bf_data = bf_data.data
else:
bf_data = np.transpose([bf.data for bf in bf_data], axes=[1, 2, 0])
voxel_timeseries, _, voxel_coords, _ = transform_recon_timeseries(
subjects_dir=subjects_dir,
subject=subject,
recon_timeseries=bf_data,
spatial_resolution=spatial_resolution,
reference_brain=reference_brain,
)
return voxel_timeseries, voxel_coords