Source code for qnet.misc.trajectory_data

import hashlib
import uuid
import os
import re
import json
from collections import OrderedDict

import numpy as np

[docs]class TrajectoryParserError(Exception): """Exception raised if a TrajectoryData file is malformed""" pass
[docs]class TrajectoryData(object): """Tabular data of expectation values for one or more trajectories. Multiple TrajectoryData objects can be combined with the `extend` method, in order to accumulate averages over an arbitrary number o trajectories. As much as possible, it is checked that all trajectories are statistically independent. A record is kept to ensure exact reproducability. :attribute ID: A unique ID for the current state of the TrajectoryData (read-only). See property documentation below :type ID: str :attribute table: A table (OrderedDict of column names to numpy arrays) that contains four column for every known operator (real/imaginary part of the expectation value, real/imaginary part of the variance). Note that the `table` attribute can easily be converted to a pandas DataFrame (``DataFrame(data=traj.table)``). The `table` attribute should be considered read-only. :type table: OrderedDict(str) => numpy array :attribute dt: Time step between data points :type dt: float :attribute nt: Number of time steps / data points :type nt: int :attribute operators: An iterator of the operator names. The column names in the `table` attribute derive from these. Assuming "X" is one of the operator names, there will be four keys in `table`: "Re[<X>]", "Im[<X>]", "Re[var(X)]", "Im[var(X)]" :type operators: list of str :attribute record: A copy of the complete record of how the averaged expectation values for all operators were obtained. See indepth discussion in the property documentation below. :type record: OrderedDic(str) => tuple(int, int, list) :attribute col_width: width of the data columns when writing out data. Defaults to 25 (allowing to full double precision). Note that operator names may be at most of length `col_width-10` :type col_width: int """ # When instantiating with the from_qsd_data class method, we want the # ID to depend uniquely on the read data files (via their md5 hash). # RFC4122 accounts for a situation like this with a "UUID3" that requires # the combination of a namespace and a name. We define a completely # arbitrary namespace UUID here for this purpose. _uuid_namespace = uuid.UUID('c84069eb-cf80-48a6-9584-74b7f2c742c1') _prec_dt = 1.0e-6 # how close dt's have to be to be equal col_width = 25 # width of columns when writing _col_padding = 10 # space needed in col header besides operator name _rx = { 'op_name': re.compile(r'^[\x20-\x7E]+$'), # ascii w/o control chars 'head_ID': re.compile(r'# QNET Trajectory Data ID\s+' r'(?P<ID>[a-f\d-]{36})'), 'record': re.compile(r'# Record\s+(?P<ID>[a-f\d-]{36})\s*' r'\(seed (?P<seed>\d+)\):' r'\s*(?P<n_traj>\d+)' r'(\s*(?P<ops>\[.*\]))?$'), 'header': re.compile(r'#\s+t') } def __init__(self, ID, dt, seed, n_trajectories, data): """Initialize a new TrajectoryData instance :param ID: A unique, RFC 4122 compliant identifier (as generated by the `new_id` class method) :type ID: str :param dt: Time step between data points (>0) :type dt: float :param seed: The random number generator seed on which the data is based :type seed: int :param n_trajectories: The number of trajectories from which the data is averaged (It is assumed that the random number generator was seeded with the given seed, and then the given number of trajectories were calculated *sequentially*) :type n_trajectories: int :param data: dictionary (preferably OrderedDict) of expectation value data. The value of `data[operator_name]` must be a tuple of four numpy arrays (real part of expectation value, imaginary part of expectation value, real part of standard deviation, imaginary part of standard deviation). The operator names must contain only ASCII characters and must be shorter than `col_width - 10`. :type data: dict(str) => tuple of arrays :raises ValueError: if `ID` is not RFC 4122 compliant, `dt` is an invalid or non-positive float, data does not follow the correct structure """ # seed and n_trajectories may be passed None without raising an error, # assuming the _record is manually set immediately after instantiation self._ID = str(uuid.UUID(ID)) # self.ID = ID, with validation self.table = OrderedDict() try: self._dt = float(dt) except TypeError: raise ValueError("dt must be a float with value >0") if self.dt <= 0.0: raise ValueError("dt must be a value >0") self._operators = [] for op, (re_exp, im_exp, re_var, im_var) in data.items(): op = str(op).strip() if len(op) > self.col_width-self._col_padding: self.col_width = len(op) + self._col_padding self._check_op_name(op) self._operators.append(op) self._nt = len(re_exp) # assumed valid for all (check below) re_exp_lb, im_exp_lb, re_var_lb, im_var_lb \ = self._operator_cols(op) self.table[re_exp_lb] = np.array(re_exp, dtype=np.float64) self.table[im_exp_lb] = np.array(im_exp, dtype=np.float64) self.table[re_var_lb] = np.array(re_var, dtype=np.float64) self.table[im_var_lb] = np.array(im_var, dtype=np.float64) for col in self.table: if len(self.table[col]) != self.nt: raise ValueError("All columns must be of length nt") record_ops = list(self._operators) self._record = OrderedDict([ (self.ID, (seed, n_trajectories, record_ops)), ]) @classmethod def _check_op_name(cls, op): """Raise a ValueError if op is not a valid operator name (with at most `max_len` characters if `max_len > 0`)""" if not cls._rx['op_name'].match(op): raise ValueError(("Operator name '%s' contains invalid " "characters") % op) brackets = 0 for letter in op: if letter == '[': brackets += 1 if letter == ']': brackets -= 1 if brackets != 0: raise ValueError(("Operator name '%s' contains unbalanced " "brackets") % op) def __eq__(self, other): return self.ID == other.ID def __hash__(self): return hash(self.ID)
[docs] def copy(self): """Return a (deep) copy of the current object""" data = OrderedDict() for op in self._operators: cols = self._operator_cols(op) data[op] = tuple([self.table[col] for col in cols]) new = self.__class__(ID=self.ID, dt=self.dt, seed=None, n_trajectories=None, data=data) new._record = self._record.copy() return new
@classmethod def _operator_cols(cls, op): """Return the four column names holding the data for the given operator name :raises ValueError: if invalid operator name is given """ cls._check_op_name(op) return ['Re[<'+op+'>]', 'Im[<'+op+'>]', 'Re[var('+op+')]', 'Im[var('+op+')]'] @staticmethod def _parse_header_line(line, strip=False): """Split up the header from a data file (see read method)""" fields = [] field = '' bracket_level = 0 for letter in line: field += letter if letter == '[': bracket_level += 1 elif letter == ']': bracket_level -= 1 if bracket_level < 0: raise TrajectoryParserError( "Invalid header line (unbalanced brackets): '%s'" % line.strip()) if bracket_level == 0: if strip: field = field.strip() fields.append(field); field = '' elif len(fields) == 0: if letter == 't': fields.append(field); field = '' if len(field.strip()) > 0: raise TrajectoryParserError( "Invalid header line (trailing characters): '%s'" % line.strip()) return fields @classmethod def _get_op_names_from_header_line(cls, line): """Return list of operator names based on the data file header (see read method). Raises TrajectoryParserError if any operator names are invalid or the header is in any other way invalid """ # we assume that column names follow cls._operator_cols(op) ops = [] cols = cls._parse_header_line(line, strip=True) for col_name in cols: if col_name.startswith("Re[<"): op = col_name[4:-2] try: cls._check_op_name(op) except ValueError as exc: raise TrajectoryParserError(exc) ops.append(op) if len(ops) == 0: raise TrajectoryParserError("Malformed header, cannot extract " "operator names") if (4*len(ops)+1) != len(cols): raise TrajectoryParserError( "Unexpected number of columns for %d operator(s)" % len(ops)) return ops
[docs] @classmethod def read(cls, filename): """Read in TrajectoryData from the given filename. The file must be in the format generated by the `write` method. :raises TrajectoryParserError: if the file has an incorrect format """ ID = None dt = None operators = [] record = OrderedDict() col_width = cls.col_width with open(filename) as in_fh: # process the header only for line in in_fh: if not line.startswith("#"): break # done with header m = cls._rx['head_ID'].match(line) if m: ID = m.group('ID') m = cls._rx['record'].match(line) if m: record_ID = m.group('ID') seed = int(m.group('seed')) n_traj = int(m.group('n_traj')) record_ops = 'ALL' # temporary placeholder if m.group('ops') is not None: record_ops = json.loads(m.group('ops')) record[record_ID] = (seed, n_traj, record_ops) m = cls._rx['header'].match(line) if m: try: operators = cls._get_op_names_from_header_line(line) except TrajectoryParserError as exc: raise TrajectoryParserError("File %s: %s" % (filename, exc)) # We assume that the regex is written in such a way that it # matches exactly the first column header (time grid), such # that we can derive the general column width from the # extent of the match (all columns have the same width) col_width = m.end() if ID is None: raise TrajectoryParserError("File %s does not define an ID" % filename) if len(record) == 0: raise TrajectoryParserError("File %s does not contain a record" % filename) if len(operators) == 0: raise TrajectoryParserError(("File %s does not contain a header, " "or missing time grid") % filename) for (record_ID, (seed, n_traj, record_ops)) in record.items(): if record_ops == 'ALL': # temporary placeholder, see above record[record_ID] = (seed, n_traj, list(operators)) # process the actual data try: table = np.loadtxt(filename, dtype=np.float64, comments='#') except ValueError as exc: raise TrajectoryParserError("Could not read data in %s: %s" % (filename, str(exc))) if len(table) == 0: raise TrajectoryParserError("File %s contains no data" % filename) if len(table.shape) < 2: raise TrajectoryParserError("Too few rows in %s (minimum 2)" % filename) n_cols = table.shape[1] if n_cols != (4*len(operators)+1): raise TrajectoryParserError(("In file %s, the number of data " "columns differs from the number indicated in the header") % filename) dt = table[1,0] - table[0,0] data = OrderedDict() # data in the format that __init__ accepts i = 0 # counter for column in table for op in operators: op_cols = [] for col in cls._operator_cols(op): i += 1 op_cols.append(np.array(table[:,i])) data[op] = tuple(op_cols) traj = cls(ID, dt=dt, seed=None, n_trajectories=None, data=data) traj._record = record if col_width != traj.col_width: traj.col_width = col_width return traj
[docs] @classmethod def from_qsd_data(cls, operators, seed, workdir='.'): """Instantiate from one or more QSD output files specified as values of the dictionary `operators` Each QSD output file must have the following structure: * The first line must start with the string "Number_of_Trajectories", followed by an integer (separated by whitespace) * All following lines must contain five floating point numbers (time, real/imaginary part of expectation value, and real/imaginary part of variance), separated by whitespace. All QSD output files must contain the same number of lines, specify the same number of trajectories, and use the same time grid values (first column). It is the user's responsibility to ensure that all out output files were indeed generated in a single QSD run using the specified initial seed for the random number generator. :param operators: dictionary (preferrably OrderedDict) of operator name to filename. The filenames are relative to the `workdir`. Each filename must contain data in the format described above :type operators: dict(str) => str :param seed: The seed to the random number generator that was used to produce the data file :type seed: int :param workdir: directory to which the filenames in `operators` are relative to 'type workdir: str :raises ValueError: if any of the datafiles do not have the correct format or are inconsistent Note: Remember that is is vitally important that all quantum trajectories that go into an average are statistically independent. The TrajectoryData class tries as much as possible to ensure this, by refusing to combine indentical IDs, or trajectories originating from the same seed. To this end, in the `from_qsd_data` method, the ID of the instantiated object will depend uniquely on the collective data read from the QSD output files.""" md5s = [] # MD5 hash of all files (in order to generate ID) data = OrderedDict(); n_trajectories = None dt = None if len(operators) == 0: raise ValueError("Must give at least one mapping " "operator_name => file in operators dic") for (operator_name, file_name) in operators.items(): file_name = os.path.join(workdir, file_name) md5s.append(hashlib.md5(open(file_name,'rb').read()).hexdigest()) with open(file_name) as in_fh: header = in_fh.readline() m = re.match(r'Number_of_Trajectories\s*(\d+)', header) if m: file_n_trajectories = int(m.group(1)) else: raise ValueError(("First line in %s must contain the " "Number_of_Trajectories")%file_name) (tgrid, re_exp, im_exp, re_var, im_var) \ = np.genfromtxt(file_name, dtype=np.float64, skip_header=1, unpack=True) data[operator_name] = (re_exp, im_exp, re_var, im_var) if len(tgrid) < 2: raise ValueError(("File %s does not contain sufficient " "data (at least two rows)") % file_name) file_dt = tgrid[1] - tgrid[0] if dt is None: dt = file_dt else: if abs(dt-file_dt) > cls._prec_dt: raise ValueError(("dt in file %s inconsistent with dt " "in other files")%file_name) if n_trajectories is None: n_trajectories = file_n_trajectories else: if n_trajectories != file_n_trajectories: raise ValueError(("number of trajectories in file %s " "does not match number of trajectories " "in other files")%file_name) ID = cls.new_id(name="".join(sorted(md5s))) return cls(ID, dt, seed, n_trajectories, data)
[docs] @classmethod def new_id(cls, name=None): """Generate a new unique identifier, as a string. The identifier will be RFC 4122 compliant. If name is None, the resulting ID will be random. Otherwise, name must be a string that the ID will depend on. That is, calling `new_id` repeatedly with the same `name` will result in identical IDs. """ if name is None: return str(uuid.uuid4()) else: return str(uuid.uuid3(cls._uuid_namespace, name))
@property def ID(self): """A unique RFC 4122 complient identifier. The identifier changes whenever the class data is modified (via the `extend` method). Two instances of TrajectoryData with the same ID are assumed to be identical """ return self._ID @property def record(self): """A copy of the full trajectory record, i.e. a history of calls to the `extend` method. Its purpose is to ensure that the data is completely reproducible. This entails storing the seed to the random number generator for all sets of trajectories. The record is an OrderedDict that maps the original ID of any TrajectoryData instance combined via `extend` to a tuple ``(seed, n_trajectories, ops)``, where ``seed`` is the seed to the random number generator that was used to calculate a specific set of trajectories (sequentially), ``n_trajectories`` are the number of trajectories in that dataset, and ``ops`` is a list of operator names for which expectation values were calculated. This may be the complete list of operators in the `operators` attribute, or a subset of those operators (Not all trajectories have to include data for all operators). For example, let's assume we have used the ``QSDCodeGen`` class to set up a QSD propagation. Two observables 'X1', 'X2', have been set up to be written to file 'X1.out', and 'X2.out'. The `QSDCodeGen.set_trajectories` method has been called with `n_trajectories=10`, after which a call to `QSDCodeGen.run` with argument `seed=SEED1`, performed a sequential propagation of 10 trajectories, with the averaged expectation values written to the output files. This data may now be read into a new `TrajectoryData` instance `traj` via the `from_qsd_data` class method (with `seed=SEED1`). The newly created instance (with, let's say, ``ID='8d102e4b-...'``) will have one entry in its record: '8d102e4b-...': (SEED1, 10, ['X1', 'X2']) Now, let's say we add a new observable 'A2' (output file 'A2.out') for the `QSDCodeGen` (in addition to the existing observables X1, X2), and run the `QSDCodeGen.run` method again, with a new seed `SEED2`. We then update `traj` with a call such as traj.extend(TrajectoryData.from_qsd_data( {'X1':'X1.out', 'X2':'X2.out', 'A2':'A2.out'}, SEED2) The record will now have an additional entry, e.g. 'd9831647-...': (SEED2, 10, ['X1', 'X2', 'A2']) `traj.table` will contain the averaged expectation values (average over 20 trajectories for 'X1', 'X2', and 10 trajectories for 'A2'). The record tells use that to reproduce this table, 10 sequential trajectories starting from SEED1 must be performed for X1, X2, followed by another 10 trajectories for X1, X2, A2 starting from SEED2. """ return self._record.copy() @property def operators(self): """Iterator over all operators""" return iter(self._operators) @property def record_IDs(self): """Set of all IDs in the record""" return set(self._record.keys()) @property def dt(self): """Time step between data points""" return self._dt @property def nt(self): """Number of time steps / data points""" return self._nt @property def shape(self): """"Tuple (n_row, n_cols) for the data in self.table. The time grid is included in the column count""" return (self.nt, len(self.table.keys())+1) @property def record_seeds(self): """Set of all random number generator seeds in the record""" return set([seed for (seed, ntraj, op) in self._record.values()]) @property def tgrid(self): """Time grid, as numpy array""" return np.array(range(self.nt)) * self._dt def __str__(self): return self.to_str(show_rows=6) def __repr__(self): return "TrajectoryData(ID=%s)" % self.ID def _data_line(self, index, fmt): line = fmt % (self._dt*index) for col in self.table.keys(): line += fmt % self.table[col][index] return line
[docs] def to_str(self, show_rows=-1): """Generate full string represenation of TrajectoryData :param show_rows: If given > 0, maximum number of data rows to show. If there are more rows, they will be indicated by an ellipsis ('...') :type show_rows: int :raises ValueError: if any operator name is too long to generate a label that fits in the limit given by the `col_width` class attribute """ lines = [] w = self.col_width prec = min(self.col_width - 9, 16) # 16 = max. double precision fmt = "%{width:d}.{prec:d}e".format(width=w, prec=prec) ellipsis = "...".center(w) lines.append("# QNET Trajectory Data ID %s" % self.ID) for (ID, (seed, n_traj, ops)) in self._record.items(): if set(ops) == set(self._operators): lines.append("# Record %s (seed %d): %d" % (ID, seed, n_traj)) else: lines.append("# Record %s (seed %d): %d %s" % (ID, seed, n_traj, json.dumps(ops))) header = ("#%{width:d}s".format(width=w-1)) % 't' for col in self.table.keys(): if len(col) >= self.col_width: raise ValueError(("Column name '%s' must be shorter than max " "column width %d") % (col, self.col_width)) header += ("%{width:d}s".format(width=w)) % col lines.append(header) nt = self.nt if (show_rows < 0) or (show_rows >= nt): show_row_indices_1 = range(nt) show_row_indices_2 = [] else: show_row_indices_1 = range(show_rows//2) show_row_indices_2 = range(nt - ((show_rows//2)+(show_rows%2)), nt) # if show_rows is odd, we show the first show_rows//2 rows, and the # last (show_rows//2)+1 rows. for i in show_row_indices_1: lines.append(self._data_line(i, fmt)) if len(show_row_indices_2) > 0: lines.append((ellipsis * int(len(self.table)+1)).rstrip()) for i in show_row_indices_2: lines.append(self._data_line(i, fmt)) return "\n".join(lines)
[docs] def write(self, filename): """Write data to a text file. The TrajectoryData may later be restored by the `read` class method from the same file""" with open(filename, 'w') as out_fh: out_fh.write(self.to_str())
[docs] def n_trajectories(self, operator): "Return the total number of trajectories for the given operator" n_total = 0 for ID, (seed, n_traj, ops) in self._record.items(): if operator in ops: n_total += n_traj return n_total
[docs] def extend(self, other): """Extend data with another Trajectory data set, averaging the expectation values. Equivalently to ``traj1.extend(traj2)``, the syntax ``traj1 += traj2`` may be used. :raises ValueError: if data in self and other are incompatible """ err_msg = "TrajectoryData may only be extended by completely "\ "disjunct other TrajectoryData object" if not self.record_IDs.isdisjoint(other.record_IDs): raise ValueError("%s: Repeated ID"%err_msg) if not self.record_seeds.isdisjoint(other.record_seeds): if not set(self.operators).isdisjoint(set(other.operators)): raise ValueError("%s: Repeated seed"%err_msg) if not abs(self.dt - other.dt) < self._prec_dt: raise ValueError("Extending TrajectoryData does not match dt") for op in self._operators: if op in other._operators: n_self = self.n_trajectories(op) n_other = other.n_trajectories(op) for col in self._operator_cols(op): self.table[col] *= n_self self.table[col] += n_other * other.table[col] self.table[col] /= float(n_self + n_other) # we may also have to account for other containing new operators for op in other._operators: if op not in self._operators: self._operators.append(op) for col in self._operator_cols(op): self.table[col] = other.table[col].copy() self._record.update(other._record) self._ID = self.new_id(name="".join(sorted([self.ID, other.ID])))
def __add__(self, other): combined = self.copy() combined.extend(other) return combined def __iadd__(self, other): self.extend(other) return self