Identification by Accurate Mass

Example workflow for the processing of a set of mzML files (defined in the files variable) including centroiding, feature detection, feature linking and accurate mass search. The resulting data gets processed in a pandas data frame with feature filtering (missing values, quality) and imputation of remaining missing values. Compounds detected during accurate mass search will be annoted in the resulting dataframe.

Imports and mzML file path

import os
import shutil
import requests

import pandas as pd

from pyopenms import *

import numpy as np

from sklearn.impute import KNNImputer
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline

import plotly.graph_objects as go
import plotly.express as px
import matplotlib.pyplot as plt

# set path to your mzML files, or leave like this to use the example data
files = os.path.join(os.getcwd(), "IdByMz_Example")

Download Example Data

Execute this cell only for the example workflow.

if not os.path.isdir(os.path.join(os.getcwd(), 'IdByMz_Example')):
    os.mkdir(os.path.join(os.getcwd(), 'IdByMz_Example'))

base = 'https://abibuilder.cs.uni-tuebingen.de/archive/openms/Tutorials/Data/latest/Example_Data/Metabolomics/'
urls = ['datasets/2012_02_03_PStd_050_1.mzML',
        'datasets/2012_02_03_PStd_050_2.mzML',
        'datasets/2012_02_03_PStd_050_3.mzML',
        'databases/PositiveAdducts.tsv',
        'databases/NegativeAdducts.tsv',
        'databases/HMDBMappingFile.tsv',
        'databases/HMDB2StructMapping.tsv']

for url in urls:
    request = requests.get(base + url, allow_redirects=True)
    open(os.path.join(files, os.path.basename(url)), 'wb').write(request.content)

Centroiding

If files are already centroided this step can bet omitted.

in: path to MS data (files)

out: path to centroided mzML files in a subfolder ‘centroid’ (files)

if os.path.exists(os.path.join(files, "centroid")):
    shutil.rmtree(os.path.join(files, "centroid"))
os.mkdir(os.path.join(files, "centroid"))

for file in os.listdir(files):

    if file.endswith(".mzML"):
        exp_raw = MSExperiment()
        MzMLFile().load(os.path.join(files, file), exp_raw)
        exp_centroid = MSExperiment()

        PeakPickerHiRes().pickExperiment(exp_raw, exp_centroid, True)

        MzMLFile().store(os.path.join(files, "centroid", file), exp_centroid)
        del exp_raw

files = os.path.join(files, "centroid")

Feature Detection

in: path to centroided mzML files (files)

out: list with FeatureMaps (feature_maps)

feature_maps = []

for file in os.listdir(files):

    if file.endswith(".mzML"):
        exp = MSExperiment()
        MzMLFile().load(os.path.join(files, file), exp)

        exp.sortSpectra(True)

        mass_traces = []
        mtd = MassTraceDetection()
        mtd_params = mtd.getDefaults()
        mtd_params.setValue(
            "mass_error_ppm", 5.0
        )  # set according to your instrument mass error
        mtd_params.setValue(
            "noise_threshold_int", 1000.0
        )  # adjust to noise level in your data
        mtd.setParameters(mtd_params)
        mtd.run(exp, mass_traces, 0)

        mass_traces_split = []
        mass_traces_final = []
        epd = ElutionPeakDetection()
        epd_params = epd.getDefaults()
        epd_params.setValue("width_filtering", "fixed")
        epd.setParameters(epd_params)
        epd.detectPeaks(mass_traces, mass_traces_split)

        if epd.getParameters().getValue("width_filtering") == "auto":
            epd.filterByPeakWidth(mass_traces_split, mass_traces_final)
        else:
            mass_traces_final = mass_traces_split

        feature_map = FeatureMap()
        feat_chrom = []
        ffm = FeatureFindingMetabo()
        ffm_params = ffm.getDefaults()
        ffm_params.setValue("isotope_filtering_model", "none")
        ffm_params.setValue(
            "remove_single_traces", "true"
        )  # set false to keep features with only one mass trace
        ffm_params.setValue("mz_scoring_by_elements", "false")
        ffm_params.setValue("report_convex_hulls", "true")
        ffm.setParameters(ffm_params)
        ffm.run(mass_traces_final, feature_map, feat_chrom)

        feature_map.setUniqueIds()
        feature_map.setPrimaryMSRunPath([file[:-5].encode()])

        feature_maps.append(feature_map)

Feature Map Retention Time Alignment

in: unaligned feature maps (feature_maps)

out: feature maps aligned on the first feature map in the list (feature_maps)

# get in index of feature map with highest number of features in feature map list
ref_index = [i[0] for i in sorted(
    enumerate([fm.size() for fm in feature_maps]), key=lambda x: x[1])][-1]

aligner = MapAlignmentAlgorithmPoseClustering()

aligner.setReference(feature_maps[ref_index])

for feature_map in feature_maps[:ref_index] + feature_maps[ref_index + 1:]:
    trafo = TransformationDescription()
    aligner.align(feature_map, trafo)
    transformer = MapAlignmentTransformer()
    transformer.transformRetentionTimes(
        feature_map, trafo, True
    )  # store original RT as meta value

Visualization of RTs before and after alignment

fmaps = (
    [feature_maps[ref_index]] + feature_maps[:ref_index] +
    feature_maps[ref_index + 1:]
)

fig = plt.figure(figsize=(10, 5))

ax = fig.add_subplot(1, 2, 1)
ax.set_title("consensus map before alignment")
ax.set_ylabel("m/z")
ax.set_xlabel("RT")

# use alpha value to display feature intensity
ax.scatter(
    [f.getRT() for f in fmaps[0]],
    [f.getMZ() for f in fmaps[0]],
    alpha=np.asarray([f.getIntensity() for f in fmaps[0]])
    / max([f.getIntensity() for f in fmaps[0]]),
)

for fm in fmaps[1:]:
    ax.scatter(
        [f.getMetaValue("original_RT") for f in fm],
        [f.getMZ() for f in fm],
        alpha=np.asarray([f.getIntensity() for f in fm])
        / max([f.getIntensity() for f in fm]),
    )

ax = fig.add_subplot(1, 2, 2)
ax.set_title("consensus map after alignment")
ax.set_xlabel("RT")

for fm in fmaps:
    ax.scatter(
        [f.getRT() for f in fm],
        [f.getMZ() for f in fm],
        alpha=np.asarray([f.getIntensity() for f in fm])
        / max([f.getIntensity() for f in fm]),
    )

fig.tight_layout()
fig.legend(
    [fmap.getMetaValue("spectra_data")[0].decode() for fmap in fmaps],
    loc="lower center",
)
# in some cases get file name elsewhere, e.g. fmap.getDataProcessing()[0].getMetaValue('parameter: out')
fig.show()

Feature Linking

in: list with FeatureMaps (feature_maps)

out: ConsensusMap (consensus_map)

feature_grouper = FeatureGroupingAlgorithmQT()

consensus_map = ConsensusMap()
file_descriptions = consensus_map.getColumnHeaders()

for i, feature_map in enumerate(feature_maps):
    file_description = file_descriptions.get(i, ColumnHeader())
    file_description.filename = feature_map.getMetaValue("spectra_data")[0].decode()
    file_description.size = feature_map.size()
    file_description.unique_id = feature_map.getUniqueId()
    file_descriptions[i] = file_description

consensus_map.setColumnHeaders(file_descriptions)
feature_grouper.group(feature_maps, consensus_map)

ConsensusMap to pandas DataFrame

in: ConsensusMap (consensus_map)

out: DataFrame with RT, mz and quality from ConsensusMap (cm_df)

intensities = consensus_map.get_intensity_df()

meta_data = consensus_map.get_metadata_df()[["RT", "mz", "quality"]]

cm_df = pd.concat([meta_data, intensities], axis=1)
cm_df.reset_index(drop=True, inplace=True)
cm_df

Data Filtering and Imputation

in: unfiltered ConsensusMap DataFrame (cm_df)

out: features below minimum quality and with too many missing values removed, remaining missing values imputated with KNN algorithm (cm_df)

allowed_missing_values = 1
min_feature_quality = 0.8
n_nearest_neighbours = 2

# drop features that have more then the allowed number of missing values or are below minimum feature quality
to_drop = []

for i, row in cm_df.iterrows():
    if (
        row.isna().sum() > allowed_missing_values
        or row["quality"] < min_feature_quality
    ):
        to_drop.append(i)

cm_df.drop(index=cm_df.index[to_drop], inplace=True)

# Data imputation with KNN
imputer = Pipeline(
    [
        ("imputer", KNNImputer(n_neighbors=2)),
        (
            "pandarizer",
            FunctionTransformer(lambda x: pd.DataFrame(
                x, columns=cm_df.columns)),
        ),
    ]
)

cm_df = imputer.fit_transform(cm_df)
cm_df

Annotate features with identified compounds

in: ConsensusMap DataFrame without identifications (cm_df) and AccurateMassSearch DataFrame (ams_df)

out: ConsensusMap DataFrame with new identifications column (id_df)

id_df = cm_df

id_df["identifications"] = pd.Series(
    ["" for x in range(len(id_df.index))])

for rt, mz, description in zip(
    ams_df["retention_time"], ams_df["exp_mass_to_charge"], ams_df["description"]
):
    indices = id_df.index[
        np.isclose(id_df["mz"], float(mz), atol=1e-05)
        & np.isclose(id_df["RT"], float(rt), atol=1e-05)
    ].tolist()
    for index in indices:
        if description != "null":
            id_df.loc[index, "identifications"] += description + ";"
id_df["identifications"] = [
    item[:-1] if ";" in item else "" for item in id_df["identifications"]
]
id_df.to_csv(os.path.join(files, "result.tsv"), sep="\t", index=False)
id_df

Visualize consensus features with identifications

fig = px.scatter(id_df, x="RT", y="mz", hover_name="identifications")
fig.update_layout(title="Consensus features with identifications (hover)")
fig.show()