#!/usr/bin/env python3
"""
Phys2bids is a python3 library meant to set physiological files in BIDS standard.
It was born for Acqknowledge files (BIOPAC), and at the moment it supports
``.acq`` files and ``.txt`` files obtained by labchart (ADInstruments).
It requires python 3.6 or above, as well as the modules:
- `numpy`
- `matplotlib`
In order to process ``.acq`` files, it needs `bioread`, an excellent module
that can be found at `this link`_
The project is under development.
At the very moment, it assumes:
- the input file is from one individual scan, not one session with multiple scans.
.. _this link:
https://github.com/uwmadison-chm/bioread
Copyright 2019, The Phys2BIDS community.
Please scroll to bottom to read full license.
"""
import datetime
import logging
import os
import sys
from copy import deepcopy
from shutil import copy as cp
import numpy as np
from phys2bids import _version, bids, utils, viz
from phys2bids.cli.run import _get_parser
from phys2bids.physio_obj import BlueprintOutput
from phys2bids.slice4phys import slice4phys
from . import __version__
from .due import Doi, due
LGR = logging.getLogger(__name__)
LGR.setLevel(logging.INFO)
def print_summary(filename, ntp_expected, ntp_found, samp_freq, time_offset, outfile):
"""
Print a summary onscreen and in file with information on the files.
Parameters
----------
filename: str
Name of the input of phys2bids.
ntp_expected: int
Number of expected timepoints, as defined by user.
ntp_found: int
Number of timepoints found with the automatic process.
samp_freq: float
Frequency of sampling for the output file.
time_offset: float
Difference between beginning of file and first TR.
outfile: str or path
Fullpath to output file.
Notes
-----
Outcome:
summary: str
Prints the summary on screen
outfile: .log file
File containing summary
"""
start_time = -time_offset
summary = (
f"\n------------------------------------------------\n"
f"Filename: {filename}\n"
f"\n"
f"Timepoints expected: {ntp_expected}\n"
f"Timepoints found: {ntp_found}\n"
f"Sampling Frequency: {samp_freq} Hz\n"
f"Sampling started at: {start_time:.4f} s\n"
f"Tip: Time 0 is the time of first trigger\n"
f"------------------------------------------------\n"
)
LGR.info(summary)
utils.write_file(outfile, ".log", summary)
def print_json(outfile, samp_freq, time_offset, ch_name):
"""
Print the json required by BIDS format.
Parameters
----------
outfile: str or path
Fullpath to output file.
samp_freq: float
Frequency of sampling for the output file.
time_offset: float
Difference between beginning of file and first TR.
ch_name: list of str
List of channel names, as specified by BIDS format.
Notes
-----
Outcome:
outfile: .json file
File containing information for BIDS.
"""
start_time = -time_offset
summary = dict(SamplingFrequency=samp_freq, StartTime=round(start_time, 4), Columns=ch_name)
utils.write_json(outfile, summary, indent=4, sort_keys=False)
[docs]@due.dcite(
Doi("10.5281/zenodo.3470091"),
path="phys2bids",
description="Conversion of physiological trace data to BIDS format",
version=__version__,
cite_module=True,
)
@due.dcite(
Doi("10.1038/sdata.2016.44"),
path="phys2bids",
description="The BIDS specification",
cite_module=True,
)
def phys2bids(
filename,
info=False,
indir=".",
outdir=".",
heur_file=None,
sub=None,
ses=None,
chtrig=0,
chsel=None,
num_timepoints_expected=None,
tr=None,
thr=None,
pad=9,
ch_name=[],
yml="",
debug=False,
quiet=False,
):
"""
Run main workflow of phys2bids.
Runs the parser, does some checks on input, then imports
the right interface file to read the input. If only info is required,
it returns a summary onscreen.
Otherwise, it operates on the input to return a .tsv.gz file, possibly
in BIDS format.
Raises
------
RuntimeError[NotImplementedError]
If invalid input data is provided or if the file extension is not supported yet.
"""
# Check options to make them internally coherent pt. I
# #!# This can probably be done while parsing?
outdir = os.path.abspath(outdir)
os.makedirs(outdir, exist_ok=True)
os.makedirs(os.path.join(outdir, "code"), exist_ok=True)
conversion_path = os.path.join(outdir, "code", "conversion")
os.makedirs(conversion_path, exist_ok=True)
# Create logfile name
basename = "phys2bids_"
extension = "tsv"
isotime = datetime.datetime.now().strftime("%Y-%m-%dT%H%M%S")
logname = os.path.join(conversion_path, (basename + isotime + "." + extension))
# Set logging format
log_formatter = logging.Formatter(
"%(asctime)s\t%(name)-12s\t%(levelname)-8s\t%(message)s", datefmt="%Y-%m-%dT%H:%M:%S"
)
# Set up logging file and open it for writing
log_handler = logging.FileHandler(logname)
log_handler.setFormatter(log_formatter)
sh = logging.StreamHandler()
if quiet:
logging.basicConfig(
level=logging.WARNING,
handlers=[log_handler, sh],
format="%(levelname)-10s %(message)s",
)
elif debug:
logging.basicConfig(
level=logging.DEBUG, handlers=[log_handler, sh], format="%(levelname)-10s %(message)s"
)
else:
logging.basicConfig(
level=logging.INFO, handlers=[log_handler, sh], format="%(levelname)-10s %(message)s"
)
version_number = _version.get_versions()["version"]
LGR.info(f"Currently running phys2bids version {version_number}")
LGR.info(f"Input file is {filename}")
# Save call.sh
arg_str = " ".join(sys.argv[1:])
call_str = f"phys2bids {arg_str}"
f = open(os.path.join(conversion_path, "call.sh"), "a")
f.write(f"#!bin/bash \n{call_str}")
f.close()
# Check options to make them internally coherent pt. II
# #!# This can probably be done while parsing?
indir = os.path.abspath(indir)
if chtrig and chtrig < 0:
raise RuntimeError("Wrong trigger channel. Channel indexing starts with 0!")
utils.check_ge(filename, indir)
filename, ftype = utils.check_input_type(filename, indir)
if heur_file:
heur_file = utils.check_input_ext(heur_file, ".py")
utils.check_file_exists(heur_file)
infile = os.path.join(indir, filename)
utils.check_file_exists(infile)
if isinstance(num_timepoints_expected, int):
num_timepoints_expected = [num_timepoints_expected]
if isinstance(tr, (int, float)):
tr = [tr]
if tr is not None and num_timepoints_expected is not None:
# If tr and ntp were specified, check that tr is either length one or ntp.
if len(num_timepoints_expected) != len(tr) and len(tr) != 1:
raise RuntimeError(
"Number of sequence types listed with TR "
"doesn't match expected number of runs in "
"the session"
)
# Read file!
LGR.info(f"Reading the file {infile}")
if ftype == "acq":
from phys2bids.io import load_acq
phys_in = load_acq(infile, chtrig)
elif ftype == "txt":
from phys2bids.io import load_txt
phys_in = load_txt(infile, chtrig)
elif ftype == "mat":
from phys2bids.io import load_mat
phys_in = load_mat(infile, chtrig)
elif ftype == "gep":
from phys2bids.io import load_gep
phys_in = load_gep(infile)
LGR.info("Checking that units of measure are BIDS compatible")
for index, unit in enumerate(phys_in.units):
phys_in.units[index] = bids.bidsify_units(unit)
LGR.info("Reading infos")
phys_in.print_info(filename)
# #!# Here the function viz.plot_channel should be called
viz.plot_all(
phys_in.ch_name, phys_in.timeseries, phys_in.units, phys_in.freq, infile, conversion_path
)
# If only info were asked, end here.
if info:
return
# The next few lines remove the undesired channels from phys_in.
if chsel:
LGR.info("Dropping unselected channels")
chsel.insert(0, 0)
if phys_in.trigger_idx not in chsel:
LGR.warning(
f"The selected channels {tuple(chsel)} do not "
f"contain the trigger channel ({phys_in.trigger_idx}). "
f"Adding channel {phys_in.trigger_idx} to the selection."
)
chsel.extend(phys_in.trigger_idx)
chsel.sort()
for i in range(phys_in.ch_amount - 1, 0, -1):
if i not in chsel:
phys_in.delete_at_index(i)
# Update trigger index
phys_in.trigger_idx = chsel.index(phys_in.trigger_idx)
# If requested, change channel names.
if ch_name:
LGR.info("Renaming channels with given names")
phys_in.rename_channels(ch_name)
# Checking acquisition type via user's input
if tr is not None and num_timepoints_expected is not None:
# Multi-run acquisition type section
# Check list length, more than 1 means multi-run
if len(num_timepoints_expected) > 1:
# if multi-run of same sequence type, pad list with ones
# and multiply array with user's input
if len(tr) == 1:
tr = np.ones(len(num_timepoints_expected)) * tr[0]
# Sum of values in ntp_list should be equivalent to num_timepoints_found
phys_in.check_trigger_amount(
thr=thr, num_timepoints_expected=sum(num_timepoints_expected), tr=1
)
# Check that sum of tp expected is equivalent to num_timepoints_found,
# if it passes call slice4phys
if phys_in.num_timepoints_found != sum(num_timepoints_expected):
raise RuntimeError(
"The number of triggers found is different "
"than expected. Better stop now than break "
"something."
)
# slice the recording based on user's entries
# !!! ATTENTION: PHYS_IN GETS OVERWRITTEN AS DICTIONARY
phys_in = slice4phys(phys_in, num_timepoints_expected, tr, phys_in.thr, pad)
# returns a dictionary in the form {take_idx: phys_in[startpoint, endpoint]}
# save a figure for each take | give the right acquisition parameters for takes
fileprefix = os.path.join(
conversion_path, os.path.splitext(os.path.basename(filename))[0]
)
for i, take in enumerate(phys_in.keys()):
plot_fileprefix = f"{fileprefix}_{take:02d}"
viz.export_trigger_plot(
phys_in[take],
phys_in[take].trigger_idx,
plot_fileprefix,
tr[i],
num_timepoints_expected[i],
filename,
sub,
ses,
)
# Single take acquisition type, or : nothing to split workflow
else:
# Run analysis on trigger channel to get first timepoint
# and the time offset.
phys_in.check_trigger_amount(thr, num_timepoints_expected[0], tr[0])
# save a figure of the trigger
fileprefix = os.path.join(
conversion_path, os.path.splitext(os.path.basename(filename))[0]
)
viz.export_trigger_plot(
phys_in,
phys_in.trigger_idx,
fileprefix,
tr[0],
num_timepoints_expected[0],
filename,
sub,
ses,
)
# Reassign phys_in as dictionary
# !!! ATTENTION: PHYS_IN GETS OVERWRITTEN AS DICTIONARY
phys_in = {1: phys_in}
else:
LGR.warning(
"Skipping trigger pulse count. If you want to run it, "
'call phys2bids using both "-ntp" and "-tr" arguments'
)
# !!! ATTENTION: PHYS_IN GETS OVERWRITTEN AS DICTIONARY
phys_in = {1: phys_in}
# The next few lines create a dictionary of different BlueprintInput
# objects, one for each unique frequency for each take in phys_in
# they also save the amount of runs and unique frequencies
take_amount = len(phys_in)
uniq_freq_list = set(phys_in[1].freq)
freq_amount = len(uniq_freq_list)
if freq_amount > 1:
LGR.info(f"Found {freq_amount} different frequencies in input!")
if take_amount > 1:
LGR.info(f"Found {take_amount} different takes in input!")
LGR.info(f"Preparing {freq_amount*take_amount} output files.")
# Create phys_out dict that will have a blueprint object for each different frequency
phys_out = {}
if heur_file is not None and sub is not None:
LGR.info(f"Preparing BIDS output using {heur_file}")
# If heuristics are used, init a dict of arguments to pass to use_heuristic
heur_args = {
"heur_file": heur_file,
"sub": sub,
"ses": ses,
"filename": filename,
"outdir": outdir,
"take": "",
"record_label": "",
}
# Generate participants.tsv file if it doesn't exist already.
# Update the file if the subject is not in the file.
# Do not update if the subject is already in the file.
bids.participants_file(outdir, yml, sub)
# Generate dataset_description.json file if it doesn't exist already.
bids.dataset_description_file(outdir)
# Generate README file if it doesn't exist already.
bids.readme_file(outdir)
cp(
heur_file,
os.path.join(
conversion_path, os.path.splitext(os.path.basename(heur_file))[0] + ".py"
),
)
elif heur_file is not None and sub is None:
LGR.warning(
'While "-heur" was specified, option "-sub" was not.\n' "Skipping BIDS formatting."
)
# Export a (set of) phys_out for each element in phys_in
# run keys start from 1 (human friendly)
for take in phys_in.keys():
for uniq_freq in uniq_freq_list:
# Initialise the key for the (possibly huge amount of) dictionary entries
key = f"{take}_{uniq_freq}"
# copy the phys_in object to the new dict entry
phys_out[key] = deepcopy(phys_in[take])
# this counter will take into account how many channels are eliminated
count = 0
# for each channel in the original phys_in object
# take the frequency
for idx, i in enumerate(phys_in[take].freq):
# if that frequency is different than the frequency of the phys_obj entry
if i != uniq_freq:
# eliminate that channel from the dict since we only want channels
# with the same frequency
phys_out[key].delete_at_index(idx - count)
# take into account the elimination so in the next eliminated channel we
# eliminate correctly
count += 1
# Also create a BlueprintOutput object for each unique frequency found.
# Populate it with the corresponding blueprint input and replace it
# in the dictionary.
# Add time channel in the proper frequency.
if uniq_freq != phys_in[take].freq[0]:
phys_out[key].ch_name.insert(0, phys_in[take].ch_name[0])
phys_out[key].units.insert(0, phys_in[take].units[0])
phys_out[key].timeseries.insert(
0,
np.linspace(
phys_in[take].timeseries[0][0],
phys_in[take].timeseries[0][-1],
num=phys_out[key].timeseries[0].shape[0],
),
)
# Add trigger channel in the proper frequency.
if uniq_freq != phys_in[take].freq[phys_in[take].trigger_idx]:
phys_out[key].ch_name.insert(1, phys_in[take].ch_name[phys_in[take].trigger_idx])
phys_out[key].units.insert(1, phys_in[take].units[phys_in[take].trigger_idx])
phys_out[key].timeseries.insert(
1,
np.interp(
phys_out[key].timeseries[0],
phys_in[take].timeseries[0],
phys_in[take].timeseries[phys_in[take].trigger_idx],
),
)
phys_out[key] = BlueprintOutput.init_from_blueprint(phys_out[key])
# Preparing output parameters: name and folder.
for uniq_freq in uniq_freq_list:
key = f"{take}_{uniq_freq}"
# If possible, prepare bids renaming.
if heur_file is not None and sub is not None:
# Add take info to heur_args if more than one take is present
if take_amount > 1:
heur_args["take"] = f"{take:02d}"
# Append "recording-freq" to filename if more than one freq
if freq_amount > 1:
heur_args["record_label"] = f"{uniq_freq:.0f}Hz"
phys_out[key].filename = bids.use_heuristic(**heur_args)
# If any filename exists already because of multi-take, append labels
# But warn about the non-validity of this BIDS-like name.
if take_amount > 1:
for n, k in enumerate(phys_out.keys()):
if k != key and phys_out[key].filename == phys_out[k].filename:
phys_out[key].filename = (
f"{phys_out[key].filename}" f"_take-{take:02d}"
)
LGR.warning(
"Identified multiple outputs with the same name.\n"
"Appending fake label to avoid overwriting.\n"
"!!! ATTENTION !!! the output is not BIDS compliant.\n"
"Please check heuristics to solve the problem."
)
else:
phys_out[key].filename = os.path.join(
outdir, os.path.splitext(os.path.basename(filename))[0]
)
# Append "take" to filename if more than one take
if take_amount > 1:
phys_out[key].filename = f"{phys_out[key].filename}_{take:02d}"
# Append "freq" to filename if more than one freq
if freq_amount > 1:
phys_out[key].filename = f"{phys_out[key].filename}_{uniq_freq:.0f}Hz"
LGR.info(f"Exporting files for take {take} freq {uniq_freq}")
np.savetxt(
phys_out[key].filename + ".tsv.gz",
phys_out[key].timeseries,
fmt="%.8e",
delimiter="\t",
)
print_json(
phys_out[key].filename,
phys_out[key].freq,
phys_out[key].start_time,
phys_out[key].ch_name,
)
print_summary(
filename,
num_timepoints_expected,
phys_in[take].num_timepoints_found,
uniq_freq,
phys_out[key].start_time,
os.path.join(
conversion_path, os.path.splitext(os.path.basename(phys_out[key].filename))[0]
),
)
def _main(argv=None):
options = _get_parser().parse_args(argv)
phys2bids(**vars(options))
if __name__ == "__main__":
_main(sys.argv[1:])
"""
Copyright 2019, The Phys2BIDS community.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""