from __future__ import print_function
import os
import numpy as np
import uuid
import hashlib
import sys
import getopt
from collections import namedtuple, OrderedDict, defaultdict
from wheezy.template import Engine, CoreExtension, DictLoader
from pyimzml.compression import NoCompression, ZlibCompression
IMZML_TEMPLATE = """\
@require(uuid, sha1sum, mz_data_type, int_data_type, run_id, spectra, mode, obo_codes, obo_names, mz_compression, int_compression, polarity, spec_type, scan_direction, scan_pattern, scan_type, line_scan_direction)
<?xml version="1.0" encoding="ISO-8859-1"?>
<mzML xmlns="http://psi.hupo.org/ms/mzml" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://psi.hupo.org/ms/mzml http://psidev.info/files/ms/mzML/xsd/mzML1.1.0_idx.xsd" version="1.1">
<cvList count="2">
<cv uri="http://psidev.cvs.sourceforge.net/*checkout*/psidev/psi/psi-ms/mzML/controlledVocabulary/psi-ms.obo" fullName="Proteomics Standards Initiative Mass Spectrometry Ontology" id="MS" version="3.65.0"/>
<cv uri="http://obo.cvs.sourceforge.net/*checkout*/obo/obo/ontology/phenotype/unit.obo" fullName="Unit Ontology" id="UO" version="12:10:2011"/>
</cvList>
<fileDescription>
<fileContent>
<cvParam cvRef="MS" accession="MS:1000579" name="MS1 spectrum" value=""/>
@if spec_type=='centroid':
<cvParam cvRef="MS" accession="MS:1000127" name="centroid spectrum" value=""/>
@elif spec_type=='profile':
<cvParam cvRef="MS" accession="MS:1000128" name="profile spectrum" value=""/>
@end
<cvParam cvRef="IMS" accession="IMS:@obo_codes[mode]" name="@mode" value=""/>
<cvParam cvRef="IMS" accession="IMS:1000080" name="universally unique identifier" value="@uuid"/>
<cvParam cvRef="IMS" accession="IMS:1000091" name="ibd SHA-1" value="@sha1sum"/>
</fileContent>
</fileDescription>
<referenceableParamGroupList count="4">
<referenceableParamGroup id="mzArray">
<cvParam cvRef="MS" accession="MS:@obo_codes[mz_compression]" name="@mz_compression" value=""/>
<cvParam cvRef="MS" accession="MS:1000514" name="m/z array" unitCvRef="MS" unitAccession="MS:1000040" unitName="m/z"/>
<cvParam cvRef="MS" accession="MS:@obo_codes[mz_data_type]" name="@mz_data_type" value=""/>
<cvParam cvRef="IMS" accession="IMS:1000101" name="external data" value="true"/>
</referenceableParamGroup>
<referenceableParamGroup id="intensityArray">
<cvParam cvRef="MS" accession="MS:@obo_codes[int_data_type]" name="@int_data_type" value=""/>
<cvParam cvRef="MS" accession="MS:1000515" name="intensity array" unitCvRef="MS" unitAccession="MS:1000131" unitName="number of detector counts"/>
<cvParam cvRef="MS" accession="MS:@obo_codes[int_compression]" name="@int_compression" value=""/>
<cvParam cvRef="IMS" accession="IMS:1000101" name="external data" value="true"/>
</referenceableParamGroup>
<referenceableParamGroup id="scan1">
<cvParam cvRef="MS" accession="MS:1000093" name="increasing m/z scan"/>
<cvParam cvRef="MS" accession="MS:1000512" name="filter string" value=""/>
</referenceableParamGroup>
<referenceableParamGroup id="spectrum1">
<cvParam cvRef="MS" accession="MS:1000579" name="MS1 spectrum" value=""/>
<cvParam cvRef="MS" accession="MS:1000511" name="ms level" value="0"/>
@if spec_type=='centroid':
<cvParam cvRef="MS" accession="MS:1000127" name="centroid spectrum" value=""/>
@elif spec_type=='profile':
<cvParam cvRef="MS" accession="MS:1000128" name="profile spectrum" value=""/>
@end
@if polarity=='positive':
<cvParam cvRef="MS" accession="MS:1000130" name="positive scan" value=""/>
@elif polarity=='negative':
<cvParam cvRef="MS" accession="MS:1000129" name="negative scan" value=""/>
@end
</referenceableParamGroup>
</referenceableParamGroupList>
<softwareList count="1">
<software id="pyimzml" version="0.0001">
<cvParam cvRef="MS" accession="MS:1000799" name="custom unreleased software tool" value="pyimzml exporter"/>
</software>
</softwareList>
<scanSettingsList count="1">
<scanSettings id="scanSettings1">
<cvParam cvRef="IMS" accession="IMS:@obo_codes[scan_direction]" name="@obo_names[scan_direction]"/>
<cvParam cvRef="IMS" accession="IMS:@obo_codes[scan_pattern]" name="@obo_names[scan_pattern]"/>
<cvParam cvRef="IMS" accession="IMS:@obo_codes[scan_type]" name="@obo_names[scan_type]"/>
<cvParam cvRef="IMS" accession="IMS:@obo_codes[line_scan_direction]" name="@obo_names[line_scan_direction]"/>
<cvParam cvRef="IMS" accession="IMS:1000042" name="max count of pixels x" value="@{(max(s.coords[0] for s in spectra))!!s}"/>
<cvParam cvRef="IMS" accession="IMS:1000043" name="max count of pixels y" value="@{(max(s.coords[1] for s in spectra))!!s}"/>
</scanSettings>
</scanSettingsList>
<instrumentConfigurationList count="1">
<instrumentConfiguration id="IC1">
</instrumentConfiguration>
</instrumentConfigurationList>
<dataProcessingList count="1">
<dataProcessing id="export_from_pyimzml">
<processingMethod order="0" softwareRef="pyimzml">
<cvParam cvRef="MS" accession="MS:1000530" name="file format conversion" value="Output to imzML"/>
</processingMethod>
</dataProcessing>
</dataProcessingList>
<run defaultInstrumentConfigurationRef="IC1" id="@run_id">
<spectrumList count="@{len(spectra)!!s}" defaultDataProcessingRef="export_from_pyimzml">
@for index, s in enumerate(spectra):
<spectrum defaultArrayLength="0" id="spectrum=@{(index+1)!!s}" index="@{(index+1)!!s}">
<referenceableParamGroupRef ref="spectrum1"/>
<cvParam cvRef="MS" accession="MS:1000528" name="lowest observed m/z" value="@{s.mz_min!!s}" unitCvRef="MS" unitAccession="MS:1000040" unitName="m/z"/>
<cvParam cvRef="MS" accession="MS:1000527" name="highest observed m/z" value="@{s.mz_max!!s}" unitCvRef="MS" unitAccession="MS:1000040" unitName="m/z"/>
<cvParam cvRef="MS" accession="MS:1000504" name="base peak m/z" value="@{s.mz_base!!s}" unitCvRef="MS" unitAccession="MS:1000040" unitName="m/z"/>
<cvParam cvRef="MS" accession="MS:1000505" name="base peak intensity" value="@{s.int_base!!s}" unitCvRef="MS" unitAccession="MS:1000131" unitName="number of counts"/>
<cvParam cvRef="MS" accession="MS:1000285" name="total ion current" value="@{s.int_tic!!s}"/>
<scanList count="1">
<cvParam accession="MS:1000795" cvRef="MS" name="no combination"/>
<scan instrumentConfigurationRef="instrumentConfiguration0">
<referenceableParamGroupRef ref="scan1"/>
<cvParam accession="IMS:1000050" cvRef="IMS" name="position x" value="@{s.coords[0]!!s}"/>
<cvParam accession="IMS:1000051" cvRef="IMS" name="position y" value="@{s.coords[1]!!s}"/>
@if len(s.coords) == 3:
<cvParam accession="IMS:1000052" cvRef="IMS" name="position z" value="@{s.coords[2]!!s}"/>
@end
@if s.userParams:
@for up in s.userParams:
<userParam name="@up['name']" value="@up['value']"/>
@end
@end
</scan>
</scanList>
<binaryDataArrayList count="2">
<binaryDataArray encodedLength="0">
<referenceableParamGroupRef ref="mzArray"/>
<cvParam accession="IMS:1000103" cvRef="IMS" name="external array length" value="@{s.mz_len!!s}"/>
<cvParam accession="IMS:1000104" cvRef="IMS" name="external encoded length" value="@{s.mz_enc_len!!s}"/>
<cvParam accession="IMS:1000102" cvRef="IMS" name="external offset" value="@{s.mz_offset!!s}"/>
<binary/>
</binaryDataArray>
<binaryDataArray encodedLength="0">
<referenceableParamGroupRef ref="intensityArray"/>
<cvParam accession="IMS:1000103" cvRef="IMS" name="external array length" value="@{s.int_len!!s}"/>
<cvParam accession="IMS:1000104" cvRef="IMS" name="external encoded length" value="@{s.int_enc_len!!s}"/>
<cvParam accession="IMS:1000102" cvRef="IMS" name="external offset" value="@{s.int_offset!!s}"/>
<binary/>
</binaryDataArray>
</binaryDataArrayList>
</spectrum>
@end
</spectrumList>
</run>
</mzML>
"""
class _MaxlenDict(OrderedDict):
def __init__(self, *args, **kwargs):
self.maxlen = kwargs.pop('maxlen', None)
OrderedDict.__init__(self, *args, **kwargs)
def __setitem__(self, key, value):
if self.maxlen is not None and len(self) >= self.maxlen:
self.popitem(0) #pop oldest
OrderedDict.__setitem__(self, key, value)
_Spectrum = namedtuple('_Spectrum', 'coords mz_len mz_offset mz_enc_len int_len int_offset int_enc_len mz_min mz_max mz_base int_base int_tic userParams') #todo: change named tuple to dict and parse xml template properly (i.e. remove hardcoding so parameters can be optional)
[docs]class ImzMLWriter(object):
"""
Create an imzML+ibd file.
:param output_filename:
is used to make the base name by removing the extension (if any).
two files will be made by adding ".ibd" and ".imzML" to the base name
:param intensity_dtype:
The numpy data type to use for saving intensity values
:param mz_dtype:
The numpy data type to use for saving mz array values
:param mode:
* "continuous" mode will save the first mz array only
* "processed" mode save every mz array separately
* "auto" mode writes only mz arrays that have not already been written
:param intensity_compression:
How to compress the intensity data before saving
must be an instance of :class:`~pyimzml.compression.NoCompression` or :class:`~pyimzml.compression.ZlibCompression`
:param mz_compression:
How to compress the mz array data before saving
"""
def __init__(self, output_filename,
mz_dtype=np.float64, intensity_dtype=np.float32, mode="auto", spec_type="centroid",
scan_direction="top_down", line_scan_direction="line_left_right", scan_pattern="one_way", scan_type="horizontal_line",
mz_compression=NoCompression(), intensity_compression=NoCompression(),
polarity=None):
self.mz_dtype = mz_dtype
self.intensity_dtype = intensity_dtype
self.mode = mode
self.spec_type = spec_type
self.mz_compression = mz_compression
self.intensity_compression = intensity_compression
self.run_id = os.path.splitext(output_filename)[0]
self.filename = self.run_id + ".imzML"
self.ibd_filename = self.run_id + ".ibd"
self.xml = open(self.filename, 'w')
self.ibd = open(self.ibd_filename, 'wb+')
self.sha1 = hashlib.sha1()
self.uuid = uuid.uuid4()
self.scan_direction = scan_direction
self.scan_pattern = scan_pattern
self.scan_type = scan_type
self.line_scan_direction = line_scan_direction
self._write_ibd(self.uuid.bytes)
self.wheezy_engine = Engine(loader=DictLoader({'imzml': IMZML_TEMPLATE}), extensions=[CoreExtension()])
self.imzml_template = self.wheezy_engine.get_template('imzml')
self.spectra = []
self.first_mz = None
self.hashes = defaultdict(list) # mz_hash -> list of mz_data (disk location)
self.lru_cache = _MaxlenDict(maxlen=10) # mz_array (as tuple) -> mz_data (disk location)
self._setPolarity(polarity)
@staticmethod
def _np_type_to_name(dtype):
if dtype.__name__.startswith('float'):
return "%s-bit float" % dtype.__name__[5:]
elif dtype.__name__.startswith('int'):
return "%s-bit integer" % dtype.__name__[3:]
def _setPolarity(self, polarity):
if polarity:
if polarity.lower() in ['positive', 'negative']:
self.polarity = polarity.lower()
else:
raise ValueError('value for polarity must be one of "positive", "negative". Received: {}'.format(polarity))
else:
self.polarity = ""
def _write_xml(self):
spectra = self.spectra
mz_data_type = self._np_type_to_name(self.mz_dtype)
int_data_type = self._np_type_to_name(self.intensity_dtype)
obo_codes = {"32-bit integer": "1000519",
"16-bit float": "1000520",
"32-bit float": "1000521",
"64-bit integer": "1000522",
"64-bit float": "1000523",
"continuous": "1000030",
"processed": "1000031",
"zlib compression": "1000574",
"no compression": "1000576",
"line_bottom_up": "1000492",
"line_left_right": "1000491",
"line_right_left": "1000490",
"line_top_down": "1000493",
"bottom_up": "1000400",
"left_right": "1000402",
"right_left": "1000403",
"top_down": "1000401",
"meandering": "1000410",
"one_way": "1000411",
"random_access": "1000412",
"horizontal_line": "1000480",
"vertical_line": "1000481"}
obo_names = {"line_bottom_up": "linescan bottom up",
"line_left_right": "linescan left right",
"line_right_left": "linescan right left",
"line_top_down": "linescan top down",
"bottom_up": "bottom up",
"left_right": "left right",
"right_left": "right left",
"top_down": "top down",
"meandering": "meandering",
"one_way": "one way",
"random_access": "random access",
"horizontal_line": "horizontal line scan",
"vertical_line": "vertical line scan"}
uuid = ("{%s}" % self.uuid).upper()
sha1sum = self.sha1.hexdigest().upper()
run_id = self.run_id
if self.mode == 'auto':
mode = "processed" if len(self.lru_cache) > 1 else "continuous"
else:
mode = self.mode
spec_type = self.spec_type
mz_compression = self.mz_compression.name
int_compression = self.intensity_compression.name
polarity = self.polarity
scan_direction = self.scan_direction
scan_pattern = self.scan_pattern
scan_type = self.scan_type
line_scan_direction = self.line_scan_direction
self.xml.write(self.imzml_template.render(locals()))
def _write_ibd(self, bytes):
self.ibd.write(bytes)
self.sha1.update(bytes)
return len(bytes)
def _encode_and_write(self, data, dtype=np.float32, compression=NoCompression()):
data = np.asarray(data, dtype=dtype)
offset = self.ibd.tell()
bytes = data.tobytes()
bytes = compression.compress(bytes)
return offset, data.shape[0], self._write_ibd(bytes)
def _read_mz(self, mz_offset, mz_len, mz_enc_len):
'''reads a mz array from the currently open ibd file'''
self.ibd.seek(mz_offset)
data = self.ibd.read(mz_enc_len)
self.ibd.seek(0, 2)
data = self.mz_compression.decompress(data)
return tuple(np.fromstring(data, dtype=self.mz_dtype))
def _get_previous_mz(self, mzs):
'''given an mz array, return the mz_data (disk location)
if the mz array was not previously written, write to disk first'''
mzs = tuple(mzs) # must be hashable
if mzs in self.lru_cache:
return self.lru_cache[mzs]
# mz not recognized ... check hash
mz_hash = "%s-%s-%s" % (hash(mzs), sum(mzs), len(mzs))
if mz_hash in self.hashes:
for mz_data in self.hashes[mz_hash]:
test_mz = self._read_mz(*mz_data)
if mzs == test_mz:
self.lru_cache[test_mz] = mz_data
return mz_data
# hash not recognized
# must be a new mz array ... write it, add it to lru_cache and hashes
mz_data = self._encode_and_write(mzs, self.mz_dtype, self.mz_compression)
self.hashes[mz_hash].append(mz_data)
self.lru_cache[mzs] = mz_data
return mz_data
[docs] def addSpectrum(self, mzs, intensities, coords, userParams=[]):
"""
Add a mass spectrum to the file.
:param mz:
mz array
:param intensities:
intensity array
:param coords:
* 2-tuple of x and y position OR
* 3-tuple of x, y, and z position
note some applications want coords to be 1-indexed
"""
# must be rounded now to allow comparisons to later data
# but don't waste CPU time in continuous mode since the data will not be used anyway
if self.mode != "continuous" or self.first_mz is None:
mzs = self.mz_compression.rounding(mzs)
intensities = self.intensity_compression.rounding(intensities)
if self.mode == "continuous":
if self.first_mz is None:
self.first_mz = self._encode_and_write(mzs, self.mz_dtype, self.mz_compression)
mz_data = self.first_mz
elif self.mode == "processed":
mz_data = self._encode_and_write(mzs, self.mz_dtype, self.mz_compression)
elif self.mode == "auto":
mz_data = self._get_previous_mz(mzs)
else:
raise TypeError("Unknown mode: %s" % self.mode)
mz_offset, mz_len, mz_enc_len = mz_data
int_offset, int_len, int_enc_len = self._encode_and_write(intensities, self.intensity_dtype, self.intensity_compression)
mz_min = np.min(mzs)
mz_max = np.max(mzs)
ix_max = np.argmax(intensities)
mz_base = mzs[ix_max]
int_base = intensities[ix_max]
int_tic = np.sum(intensities)
s = _Spectrum(coords, mz_len, mz_offset, mz_enc_len, int_len, int_offset, int_enc_len, mz_min, mz_max, mz_base, int_base, int_tic, userParams)
self.spectra.append(s)
[docs] def close(self): # 'close' is a more common use for this
"""
Writes the XML file and closes all files.
Will be called automatically if ``with``-pattern is used.
"""
self.finish()
[docs] def finish(self):
'''alias of close()'''
self.ibd.close()
self._write_xml()
self.xml.close()
def __enter__(self):
return self
def __exit__(self, exc_t, exc_v, trace):
if exc_t is None:
self.finish()
else:
self.ibd.close()
self.xml.close()
def _main(argv):
from pyimzml.ImzMLParser import ImzMLParser
inputfile = ''
outputfile = ''
try:
opts, args = getopt.getopt(argv,"hi:o:",["ifile=","ofile="])
except getopt.GetoptError:
print('test.py -i <inputfile> -o <outputfile>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print('test.py -i <inputfile> -o <outputfile>')
sys.exit()
elif opt in ("-i", "--ifile"):
inputfile = arg
elif opt in ("-o", "--ofile"):
outputfile = arg
if inputfile == '':
print('test.py -i <inputfile> -o <outputfile>')
raise IOError('input file not specified')
if outputfile=='':
outputfile=inputfile+'.imzML'
imzml = ImzMLParser(inputfile)
spectra = []
with ImzMLWriter(outputfile, mz_dtype=np.float32, intensity_dtype=np.float32) as writer:
for i, coords in enumerate(imzml.coordinates):
mzs, intensities = imzml.getspectrum(i)
writer.addSpectrum(mzs, intensities, coords)
spectra.append((mzs, intensities, coords))
imzml = ImzMLParser(outputfile)
spectra2 = []
for i, coords in enumerate(imzml.coordinates):
mzs, intensities = imzml.getspectrum(i)
spectra2.append((mzs, intensities, coords))
print(spectra[0] == spectra2[0])
if __name__ == '__main__':
_main(sys.argv[1:])