Source code for gblearn.decomposition

"""Methods for decomposing the SOAP vectors into radial and angular
components for analysis.
"""
from gblearn import msg
import numpy as np
epsilon = 5e-5
"""float: finite precision comparison value for RDF norms.
"""
[docs]def pissnnl(pissnnl, nspecies, vcutoff, nmax=12, lmax=12): """Returns the geometric information for any entries of a pissnnl vector higher than the specified cutoff. Args: pissnnl (numpy.ndarray): vector to examine. nspecies (int): number of species for the SOAP descriptor. vcutoff (float): values higher than this will be returned with geometry information. nmax (int): bandwidth limits for the SOAP descriptor radial basis functions. lmax (int): bandwidth limits for the SOAP descriptor spherical harmonics. Returns: list: of tuples with (index, value, (n_i, s_i), (n_j, s_j), l), where `n_i` is the `n` value of the radial basis function for species `i`, and `s_i` is the value of the species index. """ rs_index = np.zeros((2, nspecies*nmax), int) count = 0 for i in range(1, nspecies+1): for a in range(1, nmax+1): rs_index[:,count] = [a, i] count += 1 ipow = 0 skip = 0 result = [] for ia in range(1, nspecies*nmax+1): for jb in range(1, ia+1): for l in range(lmax+1): if ipow > len(pissnnl):# pragma: no cover skip += 1 ipow += 1 continue if np.abs(pissnnl[ipow]) >= vcutoff: ra = rs_index[0,ia-1] sa = rs_index[1,ia-1] rb = rs_index[0,jb-1] sb = rs_index[1,jb-1] result.append((ipow, pissnnl[ipow], (ra, sa), (rb, sb), l)) ipow += 1 if skip > 0:# pragma: no cover msg.warn("skipped {} entries in the vector.".format(skip)) dmsg = "Considered {} entries in P vector (size={})." msg.info(dmsg.format(ipow, len(pissnnl)), 2) from operator import itemgetter return sorted(result, key=itemgetter(1), reverse=True)
[docs]def fcut(r, rcut, trans_width): """Applies the cutoff function to the coefficients so that the function will go smoothly to zero at the cutoff. Args: r (float): radial distance parameter. rcut (float): values greater than this are identically zero. trans_width(float): the transition to zero at the cutoff happens smoothly across this width. Returns: float: value in [0., 1.] to multiply the coefficient by so that all values fall within the specified cutoff. """ if (isinstance(r, list) or isinstance(r, np.ndarray)) and len(r) > 1: return np.array([fcut(ri, rcut, trans_width) for ri in r]) else: if r > rcut: return 0. elif r > rcut-trans_width: return 0.5*(np.cos(np.pi*(r-rcut+trans_width)/trans_width) + 1) else: return 1.0
[docs]class SOAPDecomposer(object): """Implements decompsition of vectors in the SOAP space allowing for differing `lmax` and `nmax` values in a single session. Uses caching to optimize the decomposition. Args: nspecies (int): number of species for the SOAP descriptor. nmax (int): bandwidth limits for the SOAP descriptor radial basis functions. lmax (int): bandwidth limits for the SOAP descriptor spherical harmonics. rcut (float): local environment finite cutoff parameter. sigma (float): width parameter for the Gaussians on each atom. trans_width (float): distance over which the coefficients in the radial functions are smoothly transitioned to zero. Attributes: alpha (float): constant affecting the width of the Gaussians. Inversely proportional to :attr:`sigma`. rb (numpy.ndarray): positions of radial basis functions in the radial space. partitions (dict): keys are `tuple` of (`l` values, inverse); values are the corresponding indices that form the partition. fcrs (dict): keys are `float` values for radius `r`; values are :func:`fcut` evaluated at `r`. rbexp (list): of `float` values; the `n`-dependent exponential damping factor for radial basis function values. rbsph (dict): keys are `tuple` (n,l,r) where `n` and `l` are the integer radial and spherical basis function indices and `r` is the `float` value at which the spherical bessel function was evaluated. cRs (dict): keys are `float` values of radius `r`; values are the corresponding coefficient matrices returned by :meth:`_c_numeric`. aRs (dict): keys are `tuple` (n,l,r) where `n` and `l` are the integer radial and spherical basis function indices and `r` is the `float` value that the radial functions were evaluated at; values are the corresponding coefficients of the radial basis functions. transformbasis (numpy.ndarray): transformation matrix needed to account for radial basis function positions when calculating the radial function coefficients using linear algebra. """ def __init__(self, nspecies=1, lmax=12, nmax=12, rcut=6.0, sigma=0.5, trans_width=0.5): self.nspecies = nspecies self.lmax = lmax self.nmax = nmax self.rcut = rcut self.sigma = sigma self.trans_width = trans_width self.alpha = 0.5/self.sigma**2 self.rb = self._radial_basis() self.partitions = {} #All of these next dictionaries are used only when caching is enabled to #speed up the calculation. It speeds things up by about 4 or 5 times. self.fcrs = {} self.rbexp = [np.exp(-self.alpha*(self.rb[n-1]**2)) for n in range(1, nmax+1)] self.rbsph = {} self.cRs = {} self.aRs = {} self.transformbasis = None
[docs] def get_params(self): """Returns a dictionary of the constructor parameters needed to re-initialize this decomposer instance. """ return { "nspecies": self.nspecies, "lmax": self.lmax, "nmax": self.nmax, "rcut": self.rcut, "sigma": self.sigma, "trans_width": self.trans_width }
def _init_numeric(self): """Initializes the linear algebra matrices for the radial basis so that the coefficients can be solved using :meth:`cnl`. """ from scipy.special import erf covbasis = np.zeros((self.nmax, self.nmax)) overbasis = np.zeros((self.nmax, self.nmax)) #Get local references to these variables so that we don't need `self` #all over in the overbasis calculation below. alpha = self.alpha rb = self.rb for i in range(self.nmax): for j in range(self.nmax): covbasis[j,i] = np.exp(-alpha * (rb[i] - rb[j])**2) overbasis[j,i] = (np.exp(-alpha*(rb[i]**2+rb[j]**2))*np.sqrt(2.)* alpha**1.5*(rb[i] + rb[j]) + alpha*np.exp(-0.5*alpha*(rb[i] - rb[j])**2)* np.sqrt(np.pi)* (1. + alpha*(rb[i] + rb[j])**2)* (1.0 + erf(np.sqrt(alpha/2.0)*(rb[i]+rb[j])))) overbasis /= np.sqrt(128. * alpha**5) from numpy.linalg import cholesky choloverlap = cholesky(overbasis) for i in range(self.nmax): for j in range(i): choloverlap[j,i] = 0.0 from numpy.linalg import solve self.transformbasis = solve(covbasis, choloverlap) def _radial_basis(self): """Calculates the radial basis using the initialization parameters passed to the class. """ errexp = 10 cutbasis = self.rcut + self.sigma*np.sqrt(2.*errexp*np.log(10.)) spacebasis = cutbasis/self.nmax rbasis = np.zeros(self.nmax) rbasis[0] = 1. for i in range(1, self.nmax): rbasis[i] = rbasis[i-1] + spacebasis return rbasis def _c_numeric(self, rij): """Calculates the matrix of radial basis function coefficients for all `l` and `n` indices up to `self.nmax` and `self.lmax` at once using linear algebra for the specified radial distance `r`. Args: rij (float): distance between atoms `i` and `j` in the structure. """ radial_fun = np.zeros((self.lmax+1, self.nmax)) radial_fun[0,1] = 1.0 #Get local references to these variables so that we don't need `self` #all over in the overbasis calculation below. alpha = self.alpha rb = self.rb for n in range(1, self.nmax+1): argbess = 2*alpha*rb[n-1]*rij ep = np.exp(-alpha*(rij + rb[n-1])**2) em = np.exp(-alpha*(rij - rb[n-1])**2) #In the loops below, msb prefix refers to modified spherical bessel. for l in range(self.lmax+1): if l == 0: if argbess == 0.0: msb_fi_ki_l = np.exp(-alpha*(rb[n-1]**2 + rij**2)) else: #msb_fi_ki_lm = cosh(arg_bess)/arg_bess #msb_fi_ki_l = sinh(arg_bess)/arg_bess msb_fi_ki_lm = 0.5 * (em + ep) / argbess msb_fi_ki_l = 0.5 * (em - ep) / argbess else: if argbess == 0.0: msb_fi_ki_l = 0.0 else: msb_fi_ki_lmm = msb_fi_ki_lm msb_fi_ki_lm = msb_fi_ki_l msb_fi_ki_l = msb_fi_ki_lmm-(2*l-1)*msb_fi_ki_lm/argbess radial_fun[l,n-1] = msb_fi_ki_l #* rb[n-1] fc = fcut(rij, self.rcut, self.trans_width) return np.dot(radial_fun, self.transformbasis)*fc
[docs] def cnl(self, n, l, r, cnum=False, fast=True): """Returns the coefficient of the radial basis function :math:`g_n(r)`. Args: n (int): index of the radial basis function. l (int): index of the spherical harmonic associated with the radial function. r (float): value at which to evaluate the basis function :math:`g_n(r)`. cnum (bool): when True, the function is evaluated numerically using linear algebra instead of the built-in `numpy` functions; can be faster, but the results can have numerical noise issues. fast (bool): when True, caching is used to speed up evaluation of the basis functions between multiple `pissnnl` vectors. """ if cnum: if self.transformbasis is None: self._init_numeric() if r not in self.cRs: self.cRs[r] = self._c_numeric(r) return self.cRs[r][l, n-1] else: from scipy.special import sph_in if fast: if r not in self.fcrs: self.fcrs[r] = 4*np.pi*fcut(r, self.rcut, self.trans_width) if (n, l, r) not in self.rbsph: besseli = sph_in(l, 2*self.alpha*self.rb[n-1]*r)[0][-1] self.rbsph[(n, l, r)] = besseli*self.rb[n-1] if (n, l, r) not in self.aRs: self.aRs[(n, l, r)] = (self.fcrs[r] * self.rbexp[n-1] * np.exp(-self.alpha*(r**2)) * self.rbsph[(n, l, r)]) return self.aRs[(n, l, r)] else: #pragma: no cover #There is no real sense in not using the cached method since the #results are identical. However, we keep it here for reference. return (4*np.pi*fcut(r) * np.exp(-alpha*(self.rb[n-1]**2 + r**2)) * sph_in(l, 2*alpha*self.rb[n-1]*r)[0][-1]*self.rb[n-1])
[docs] def apnl(self, dp, rx, cnum=False, fast=True): """Returns the analytically/numerically derived coefficient for the specified, decomposed pissnnl. Args: dp (tuple): (`int` index in `P`, `float` cnlm, (`int` ni, `int` si), (`int` nj, `int` sj), `int` l). These tuples are calculated in bulk for a `pissnnl` vector using :func:`gblearn.decomposition.pissnnl`. rx (numpy.ndarray): linear space of values to evaluate at. cnum (bool): when True, the function is evaluated numerically using linear algebra instead of the built-in `numpy` functions. See also :meth:`cnl`. fast (bool): when True, caching is used to speed up evaluation of the basis functions between multiple `pissnnl` vectors. """ ni, si = dp[2] nj, sj = dp[3] l = dp[4] return np.array([(self.cnl(ni, l, r, cnum, fast) * self.cnl(nj, l, r, cnum, fast)) for r in rx])
[docs] def RDF(self, dP, rx, fast=True): """Computes the summed radial distribution function for the specified decomposed `P` vector. Args: dp (tuple): (`int` index in `P`, `float` cnlm, (`int` ni, `int` si), (`int` nj, `int` sj), `int` l). These tuples are calculated in bulk for a `P` vector using :func:`gblearn.decomposition.pissnnl`. rx (numpy.ndarray): linear space of values to evaluate at. """ parts = np.zeros((len(dP), len(rx))) for i, dPi in enumerate(dP): w = np.sign(dPi[1])*np.sqrt(np.sqrt(np.abs(dPi[1]))) parts[i,:] = w*self.apnl(dPi, rx, fast=fast) return np.sum(parts, axis=0)
def _renorm_p(self, p): """Returns the re-normalized coefficient of expansion for use in rebuilding the radial and angular distribution functions. """ return np.sign(p)*np.sqrt(np.sqrt(np.abs(p))) def _ang_part(self, dP): """Returns the angular grouping by `l` for the specified P decomposition. """ import pandas as pd dsP = pd.DataFrame(dP, columns=["i", "Pij", "nisi", "njsj", "l"]) dsP["Pij"] = dsP["Pij"].apply(self._renorm_p) return dsP.groupby("l").sum()["Pij"].to_dict()
[docs] def ADF(self, dP, ax): """Calculates the angular distribution function for the given SOAP vector decomposition. Args: dp (tuple): (`int` index in `P`, `float` cnlm, (`int` ni, `int` si), (`int` nj, `int` sj), `int` l). These tuples are calculated in bulk for a `P` vector using :func:`gblearn.decomposition.pissnnl`. ax (numpy.ndarray): linear space of polar angle values to evaluate at. """ from scipy.special import sph_harm ang = self._ang_part(dP) #scipy defines their harmonics to have `theta` be azimuthal, which is #opposite from physics. #we set $m = 0$ so that the azimuthal part doesn't contribute at all. result = np.zeros(len(ax)) for l, p in ang.items(): Ylm = sph_harm(0, l, 0, ax)*np.sqrt(2*l+1) #We are interested in the c* c of this value, which is multiplied #together to get pissnnl. result += p*np.sqrt(np.absolute(Ylm*Ylm.conjugate())) return result
[docs] def decompose(self, P, vcutoff=1e-5): """Decomposes the specified SOAP vector into its radial and angular contributions using :func:`gblearn.decomposition.pissnnl`. Args: P (numpy.ndarray): vector representing the projection of a local atomic environment into the SOAP space. vcutoff (float): values higher than this will be included in the RDF. """ return pissnnl(P, self.nspecies, vcutoff, self.nmax, self.lmax)
[docs] def partition(self, lfilter, inverse=False): """Returns a list of indices for a `P` vector that correspond to the specified `l` values. Args: lfilter (list): of `l` values to include in the resulting indices. inverse (bool): when True, then any `l` values *not* in `lfilter` are included instead of the inverse. Returns: (numpy.ndarray): of `int` indices that have the specified `l` values. """ key = (tuple(lfilter), inverse) if key not in self.partitions: rs_index = np.zeros((2, self.nspecies*self.nmax), int) count = 0 for i in range(1, self.nspecies+1): for a in range(1, self.nmax+1): rs_index[:,count] = [a, i] count += 1 ipow = 0 result = [] for ia in range(1, self.nspecies*self.nmax+1): for jb in range(1, ia+1): for l in range(self.lmax+1): if ((inverse and l not in lfilter) or (not inverse and l in lfilter)): result.append(ipow) ipow += 1 self.partitions[key] = np.array(result) return self.partitions[key]
def _plot_shells(ax, shells, maxy=1.): """Adds the specified shells to axes using a linear colorspace. Args: ax (matplotlib.axes.Axes): axes to plot on. If not specified, a new figure and new axes will be created. shells (list): of `float` values for neighbor shells to be added as vertical lines to the plot. """ if shells is not None: from gblearn.utility import colorspace cycols = colorspace(len(shells)) for shell in shells: ax.plot([shell,shell], [0.0,1.1*maxy], color=next(cycols), lw=2)
[docs]class DF(object): """Represents the Radial or Angluar Distribution Function of a SOAP vector. Args: dP (list): of tuples; result of calling :func:`gblearn.decomposition.pissnnl` on the SOAP vector with :math:`l=0` or :math:`l>0` components set to zero. catom (bool): when True, this DF is for the *central* atom only (i.e., :math:`l=0` components only). x (numpy.ndarray): domain in radial/angular space on which to sample the function. decomposer (SOAPDecomposer): instance used to decompose the SOAP vector; has configuration information for SOAP parameters and caches for rapid evaluation of the basis functions. calculate (bool): when True, the distribution function will be calculated upon initialization. Attributes: df (numpy.ndarray): distribution function values corresponding to :attr:`x`. """ def __init__(self, dP, catom, x, decomposer, radial=True, calculate=True): self.dP = dP self.catom = catom self.x = x self.decomposer = decomposer #We don't necessarily always want to re-calculate on construction. In #that case, the instance is just a useful container for its basic #parameters. if calculate: if radial: self.df = decomposer.RDF(dP, x) else: self.df = decomposer.ADF(dP, x) else: self.df = None self._norm = None """float: :func:`numpy.linalg.norm` of the DF vector. """ self.dtype = "R" if radial else "A" """str: since this could be a radial *or* and angular DF, the type of the distribution function. """ def __eq__(self, other): if isinstance(other, DF): return np.allclose(self.df, other.df) else: return False @property def norm(self): """Calculates the L2 norm of the distribution function values as if they were a vector in some space. """ if self._norm is None: from numpy.linalg import norm as lnorm self._norm = lnorm(self.df) return self._norm def __sub__(self, other): """Determines the distance between two DFs using the dot product metric. """ if not hasattr(other, "dtype") or self.dtype != other.dtype: raise TypeError("Can only calculate distance between two DFs. of " "the same type.") return abs(np.dot(self.df, other.df)/(self.norm*other.norm)-1.)
[docs] @staticmethod def from_file(filename, decomposer=None, x=None): """Restores a DF from file. Args: filename (str): path to the file that was created by :meth:`DF.save`. decomposer (SOAPDecomposer): instance used to decompose the SOAP vector; has configuration information for SOAP parameters and caches for rapid evaluation of the basis functions. x (numpy.ndarray): common domain values shared by a collection; used if the file doesn't explicitly specify the domain values. """ from six.moves.cPickle import load with open(filename, 'rb') as f: data = load(f) return DF.from_dict(data, decomposer, x)
[docs] @staticmethod def from_dict(data, decomposer=None, x=None): """Restores a DF from a serialized dict (i.e., one returned by :meth:`serialize`). Args: data (dict): result of calling :meth:`serialize`. decomposer (SOAPDecomposer): instance used to decompose the SOAP vector; has configuration information for SOAP parameters and caches for rapid evaluation of the basis functions. x (numpy.ndarray): if this DF is constructed from a parent collection, the common domain sample vector to use. """ if x is not None and data["x"] is None: data["x"] = x result = DF(data["dP"], data["catom"], data["x"], decomposer, data["dtype"]=="R", calculate=False) result.df = data["df"] return result
[docs] def serialize(self, withdecomp=True, commonx=None, withdP=True): """Returns a serializable dictionary that represents this DF. Args: withdecomp (bool): when True, include the parameters of the SOAP decomposer in the dict. commonx (numpy.ndarray): xf this DF is part of a collection, the domain values will be the same for every DF; in that case we don't need to include the domain in the serialization. This is the parent collections common radial vector. If the DFs is the same, it won't be serialized. If unspecified, the domain values are serialized. withdP (bool): when True, the large, decomposed SOAP vector values are serialized. Necessary to completely reconstruct the representation, but not necessary for most analysis. """ result = { "dP": self.dP if withdP else None, "catom": self.catom, "x": (None if commonx is not None and np.allclose(commonx, self.x) else self.x), "dtype": self.dtype, "df": self.df } if withdecomp: result["decomposer"] = self.decomposer.get_params() return result
[docs] def save(self, target): """Saves the DF to disk. Args: target (str): path to save the vector to. """ from six.moves.cPickle import dump data = self.serialize() with open(target, 'wb') as f: dump(data, f)
[docs] def same(self, other, epsilon_=None): """Tests whether the specifed DF is similar to this one. Args: other (DF): DF to compare similarity to. epsilon_ (float): override the global (default) value :data:`~gblearn.decomposition.epsilon` for determining if the DFs are similar. Returns: bool: True when both of the DFs are identical within tolerance. """ if epsilon_ is None: return self-other < epsilon else: return self-other < epsilon_
[docs] def plot(self, ax=None, savefile=None, shells=None, opacity=1., color='b', title=None, xlabel=None, ylabel=None): """Plots the DF. Args: ax (matplotlib.axes.Axes): axes to plot on. If not specified, a new figure and new axes will be created. If axes are specifed, *no* `x` or `y` labels or plot titles will be added. savefile (str): path to a file if the figure should be saved. shells (list): of `float` values for neighbor shells to be added as vertical lines to the plot. opacity (float): transparency to use for the DF plot. color: any color option received by :func:`matplotlib.pyplot.plot`. title (str): override the default title of the plot. xlabel (str): override the default (generic, not very helpful) label for the x-axis. ylabel (str): override the default label for the y-axis. """ import matplotlib.pyplot as plt if ax is None: plt.figure() axset = plt else: axset = ax axset.plot(self.x, self.df) if ax is None: if title is None: plt.title("Distribution Function of SOAP Vector") else: plt.title(title) if xlabel is None: plt.xlabel("Distance (unknown units)") else: plt.xlabel(xlabel) if ylabel is None: plt.ylabel("Accumulated Density") else: plt.ylabel(ylabel) _plot_shells(axset, shells, np.max(self.df)) if savefile is not None: plt.savefig(savefile) from gblearn.base import testmode if not testmode:# pragma: no cover plt.show() return axset
[docs]class DFCollection(object): """Represents a collection of Distrubition Functions. A DF can only be added once to a collection (object instance comparison). Two identical DFs (in the physical sense) can belong to the same collection. Args: dfs (list): of :class:`DF` to intialize the collection with. counts (list): of `int` specifying how many DFs of each type are in the collection. For example, `self.counts[3]` says that we have three DFs in the collection that are identical to the DF in `self.dfs[3]`, though we only keep one of them. Attributes: dfs (list): of :class:`DF` to intialize the collection with. counts (list): of `int` specifying how many DFs of each type are in the collection. For example, `self.counts[3]` says that we have three DFs in the collection that are identical to the DF in `self.dfs[3]`, though we only keep one of them. label (str): user-defined label for the collection. tags (dict): user-defined tags to identify this collection in a larger analysis. """ def __init__(self, dfs=None, counts=None): if dfs is None: self.dfs = [] else: self.dfs = dfs if counts is not None: self.counts = counts else: self.counts = [1 for i in range(len(self.dfs))] self._unique = False """bool: when True, this collection has already been tested for uniqueness. Any subsequent additions will be tested for uniqueness before adding them in. """ self._original = len(self.dfs) """int: number of DFs in the original list (before non-unique ones were removed). Useful for statistics. """ self.label = None self.tags = {} self._average = None """numpy.ndarray: averaged distribution function for the entire collection. """ def __eq__(self, other): """Tests the equality of this DFCollection with another. """ if isinstance(other, DFCollection): return all([sdf == odf for sdf, odf in zip(self, other)]) else: return False def __str__(self): title = "{0}DFCollection {1} with {2:d} items" label = "" if self.label is None else "({})".format(self.label) if len(self) > 0: dtype = self[0].dtype[0].upper() else:# pragma: no cover #This is just a sanity check. It should never happen unless someone #is hacking around in the code. dtype = "" return title.format(dtype, label, len(self)) def __repr__(self): return str(self) def __add__(self, other): """Concatenate two collections together. This doesn't enforce any uniqueness constraints. Tags and labels are *not* updated. """ if not isinstance(other, type(self)): raise TypeError("Only DFs of the same type can be combined.") dfs = self.dfs + other.dfs counts = self.counts + other.counts return self.__class__(dfs, counts) def __iadd__(self, other): """Extend this collection with the other one. """ if not isinstance(other, type(self)): raise TypeError("Only DFs of the same type can be combined.") self.dfs.extend(other.dfs) self.counts.extend(other.counts) self._unique = False self._original += other._original if self.label is None: if other.label is not None: self.label = other.label else: if other.label is not None: self.label += "+" + other.label self.tags.update(other.tags) self._average = None return self def __iter__(self): return iter(self.dfs) def __contains__(self, df): return df in self.dfs def __len__(self): return len(self.dfs) def __getitem__(self, index): if isinstance(index, slice): dfs = self.dfs[index] counts = self.counts[index] return self.__class__(dfs, counts) else: return self.dfs[index] def __setitem__(self, index, value): if len(self) > 0: if not isinstance(value, type(self.dfs[0])): emsg = "Only DFs of type {} can be placed in this collection." raise TypeError(emsg.format(type(self))) self.dfs[index] = value self.counts[index] = 1 self._average = None
[docs] @staticmethod def dfs_from_soap(collection, ctype, resolution=75, catom=False): """Creates a DF collection from the specified SOAPVector collection using the :math:`l>0` components. Args: collection (SOAPVectorCollection): collection of SOAP vectors to initialize from. ctype (str): one of ['RDF', 'ADF'], specifies the kind of collection to initialize. resolution (int): number of points to sample in the radial domain. catom (bool): when True, the DFs are constructed using the density of the central atom. """ if ctype not in ["RDF", "ADF"]: raise ValueError("'ctype' must be either 'RDF' or 'ADF'.") from gblearn.base import nprocs if ctype == "RDF": x = np.linspace(0., collection.decomposer.rcut, resolution) else: x = np.linspace(0., np.pi, resolution) if nprocs is not None: from multiprocessing import Pool mpool = Pool(nprocs) compute = [mpool.apply_async(_multiproc_execute, (v, ctype, (x, catom))) for v in collection] dfs = [] for j, df in enumerate(compute): dfs.append(df.get()) else: dfs = [getattr(v, ctype)(x) for v in collection] return dfs
[docs] @staticmethod def from_file(filename): """Restores a DFCollection from file. Args: filename (str): path to the file that was created by :meth:`DF.save`. """ from six.moves.cPickle import load with open(filename, 'rb') as f: data = load(f) return DFCollection.from_dict(data)
[docs] @staticmethod def from_dict(data): """Restores a DFCollection from a serialized dict (i.e., one returned by :meth:`serialize`). Args: data (dict): result of calling :meth:`serialize`. """ dfs = [] decomposer = SOAPDecomposer(**data["decomposer"]) for dfdata in data["dfs"]: dfs.append(DF.from_dict(dfdata, decomposer, data["x"])) if len(dfs) > 0: dtype = dfs[0].dtype else:# pragma: no cover #We default to radial distributions. dtype == "R" if dtype == "R": result = RDFCollection(dfs, data["counts"]) else: result = ADFCollection(dfs, data["counts"]) result.label = data["label"] result.tags = data["tags"] return result
[docs] def serialize(self, withdP=False): """Returns a serializable dictionary that represents this DFCollection. Args: withdP (bool): when True, the large, SOAP decomposition from which the DF was constructed is also included. Wasteful, but necessary to completely represent the DF. """ decomposer = None if len(self) > 0: decomposer = self.dfs[0].decomposer x = self.dfs[0].x dfs = [] for df in self: dfs.append(df.serialize(withdecomp=False, commonx=x, withdP=withdP)) result = { "dfs": dfs, "counts": self.counts, "label": self.label, "tags": self.tags, "x": x } if decomposer is not None: result["decomposer"] = decomposer.get_params() else:# pragma: no cover result["decomposer"] = {} return result
[docs] def save(self, target, withdP=False): """Saves the DF to disk. Args: target (str): path to save the vector to. withdP (bool): when True, the large, SOAP decomposition from which the DF was constructed is also included. Wasteful, but necessary to completely represent the DF. """ from six.moves.cPickle import dump data = self.serialize(withdP=withdP) with open(target, 'wb') as f: dump(data, f)
@property def average(self): """Returns the averaged radial distribution function for the collection. """ if self._average is None: self._average = sum([df.df for df in self])/len(self) return self._average
[docs] def project(self, other, epsilon_=None): """Projects the RDFs in self into the unique RDFs of other. Returns: tuple: (:class:`numpy.ndarray` projection, `list` exceptions); `projection` is the number of each kind of RDF in other with `sum(result) == len(self)`; `exceptions` are indices in `self` for which there was no similar :class:`DF` in other. """ result = np.zeros(len(other), dtype=int) exceptions = [] for si, df in enumerate(self): for oi, bdf in enumerate(other): if df.same(bdf, epsilon_): result[oi] += self.counts[si] break else: exceptions.append(si) return (result, exceptions)
[docs] def refine(self, other, epsilon_=None): """Refines this DFCollection to include all DFs in `other` that are not already in `self`. Also reduces existing DFs in `self` to be unique. Args: other (DFCollection): another collection to augment this one with. """ if not isinstance(other, type(self)): raise TypeError("Can only refine collections of the same type.") #Make a copy of the distribution functions in self; make sure they are #unique to begin with. dfs = self.unique(epsilon_) for io, df in enumerate(other): for xi, xdf in enumerate(dfs): if xdf.same(df, epsilon_): dfs.counts[xi] += other.counts[io] break else: #We don't use :meth:`add` here because we have already #checked uniqueness and we want to keep the number of #identical DFs accurate. dfs.dfs.append(df) dfs.counts.append(other.counts[io]) dfs._original = len(self) + len(other) dfs._unique = True return dfs
[docs] def histogram(self, epsilon_=None, savefile=None, **kwargs): """Determines how many of each kind of DF are in this collection (using the default comparison). Plots the histogram of unique values. This is equivalent to calling :meth:`DFCollection.unique.histogram`. Args: epsilon_ (float): override the global (default) value :data:`~gblearn.decomposition.epsilon` for determining if the DFs are similar. kwargs (dict): arguments will be passed directly to :func:`matplotlib.pyplot.bar` for the plotting. Returns: DFCollection: of the unique values; same result as :meth:`DFCollection.unique`. """ if not self._unique: result = self.unique(epsilon_) else: result = self import matplotlib.pyplot as plt total = sum(result.counts) plt.figure() plt.bar(np.arange(1, len(self)+1), np.array(self.counts, float)/total) htitle = "Histogram of Unique DFs in Collection ({}/{})" plt.title(htitle.format(len(self), total)) plt.xlabel("DF Number") plt.ylabel("Percentage of Identical DFs ") if savefile is not None: plt.savefig(savefile) from gblearn.base import testmode if not testmode:# pragma: no cover plt.show()
[docs] def unique(self, epsilon_=None): """Returns only the unique DFs in this collection. Args: epsilon_ (float): override the global (default) value :data:`~gblearn.decomposition.epsilon` for determining if the DFs are similar. Returns: DFCollection: of unique DFs within the specified tolerance. """ if self._unique: return self if len(self) == 0: #Construct a new, empty collection. return self.__class__() dfs = [self.dfs[0]] counts = [self.counts[0]] for si, df in enumerate(self[1:]): for i, kept in enumerate(dfs): if kept.same(df, epsilon_): counts[i] += self.counts[si+1] break else: dfs.append(df) counts.append(self.counts[si+1]) #We want the return type to be the same as the current instance (radial #or angular). result = self.__class__(dfs, counts) result._unique = True result._original = len(self) return result
[docs] def plot(self, ax=None, savefile=None, shells=None, color='b', title=None, xlabel=None, ylabel=None, withavg=False): """Plots the collection of DFs on the same axes. Opacity of the line is chosen based on how many DFs in the collection are identical to that one (within tolerance). This is accomplished by calling :meth:`DFCollection.histogram`. Args: ax (matplotlib.axes.Axes): axes to plot on. If not specified, a new figure and new axes will be created. savefile (str): path to a file if the figure should be saved. shells (list): of `float` values for neighbor shells to be added as vertical lines to the plot. color: any color option received by :func:`matplotlib.pyplot.plot`. title (str): override the default title of the plot. xlabel (str): override the default (generic, not very helpful) label for the x-axis. ylabel (str): override the default label for the y-axis. withavg (bool): when True, the average DF for the whole collection is also plotted. """ import matplotlib.pyplot as plt if ax is None: plt.figure() axset=plt else: axset=ax cmax = float(max(self.counts)) total = sum(self.counts) nalpha = 0.85 if cmax/total > 0.33 else 0.65 maxy = 1. for di, df in enumerate(self.dfs): alpha=nalpha*self.counts[di]/cmax axset.plot(df.x, df.df, color=color, alpha=alpha) maxy_ = np.max(df.df) if maxy_ > maxy: maxy = maxy_ if withavg and len(self) > 0: x = self.dfs[0].x axset.plot(x, self.average, 'r-') maxy_ = np.max(self.average) if maxy_ > maxy:# pragma: no cover maxy = maxy_ if len(self) > 0: dtype = self.dfs[0].dtype unit = "Ang." if dtype == "R" else "Rad." tstr = "Radial" if dtype == "R" else "Angular" else:# pragma: no cover unit = "unknown units" tstr = "" if ax is None: if title is None: plt.title("{} Distribution Function of Collection".format(tstr)) else: plt.title(title) if xlabel is None: plt.xlabel("Distance ({})".format(unit)) else: plt.xlabel(xlabel) if ylabel is None: plt.ylabel("Accumulated Density") else: plt.ylabel(ylabel) _plot_shells(axset, shells, maxy) if savefile is not None: plt.savefig(savefile) from gblearn.base import testmode if not testmode:# pragma: no cover plt.show() return axset
[docs] def add(self, df): """Add a new DF to the collection. Args: df (DF): instance to add; if it is already in the collection, nothing will happen. """ if not isinstance(df, DF): raise TypeError("Only DF objects can be added to DF collections.") if len(self) > 0 and self[0].dtype != df.dtype: emsg = "Only DFs of type {}DF can be added to this collection." raise TypeError(emsg.format(self[0].dtype)) do_add = False if self._unique: #We need to make sure that the DF is unique before adding it. for xi, xdf in enumerate(self): if not xdf.same(df):# pragma: no cover #I can't seem to get this line to log, no matter what I #throw at it... break else: do_add = True else: if df not in self.dfs: do_add = True if do_add: self.dfs.append(df) self.counts.append(1) self._average = None
[docs] def remove(self, df): """Remove an DF from the collection. Args: rdf (DF): instance to remove; if it isn't already in the collection, nothing will happen. """ if not isinstance(df, DF): return if df in self.dfs: idf = self.dfs.index(df) self.dfs.remove(df) del self.counts[idf] self._average = None
def _multiproc_execute(obj, method, args):# pragma: no cover """Executes the instance method on the given object. Args: obj: object to execute the method on. method (str): name of the method to execute. args (tuple): arguments to pass; method called as obj.method(*args). """ #We don't cover this because it won't necessarily be caught be the coverage #program that is running on the main thread; it may only execute on other #threads. if hasattr(obj, method): return getattr(obj, method)(*args)
[docs]class RDFCollection(DFCollection): """Represents a radial distribution function collection. See also the comments for :class:`DFCollection`. """
[docs] @staticmethod def from_soap(collection, resolution=75, catom=False): """Creates an RDF collection from the specified SOAPVector collection using the :math:`l>0` components. Args: collection (SOAPVectorCollection): collection of SOAP vectors to initialize from. resolution (int): number of points to sample in the radial domain. catom (bool): when True, the DFs are constructed using the density of the central atom. """ dfs = DFCollection.dfs_from_soap(collection, "RDF", resolution, catom) return RDFCollection(dfs)
[docs]class ADFCollection(DFCollection): """Represents a collection of Angular Distrubition Functions. See also the comments for :class:`DFCollection`. """
[docs] @staticmethod def from_soap(collection, resolution=75, catom=False): """Creates an ADF collection from the specified SOAPVector collection using the :math:`l>0` components. Args: collection (SOAPVectorCollection): collection of SOAP vectors to initialize from. resolution (int): number of points to sample in the angular domain. catom (bool): when True, the DFs are constructed using the density of the central atom. """ #We only have degrees of freedom in the polar angle; the azimuthal got #average out to give us rotational invarance. dfs = DFCollection.dfs_from_soap(collection, "ADF", resolution, catom) return ADFCollection(dfs)
[docs]class SOAPVector(object): """Wrapper for a SOAP vector that facilitates decomposition, plotting and analysis in the SOAP space. Args: P (numpy.ndarray): vector representing the projection of a local atomic environment into the SOAP space. decomposer (SOAPDecomposer): instance used to decompose the SOAP vector; has configuration information for SOAP parameters and caches for rapid evaluation of the basis functions. Attributes: dnP (list): of tuples; result of calling :func:`gblearn.decomposition.pissnnl` on the SOAP vector with `l=0` components set to zero. dcP (list): of tuples; result of calling :func:`gblearn.decomposition.pissnnl` on the SOAP vector with `l>0` components set to zero. nP (numpy.ndarray): a copy of `P` with `l=0` components set to zero. cP (numpy.ndarray): a copy of `P` with `l>0` components set to zero. rx (numpy.ndarray): sample points in the radial domain at which :attr:`cRDF` and :attr:`nRDF` were sampled. cRDF (DF): radial distribution function for the central atom. nRDF (DF): radial distribution function for all the neighbors of the central atom. rx (numpy.ndarray): sample points in the angular domain at which :attr:`cADF` and :attr:`nADF` were sampled. cADF (DF): angular distribution function for the central atom. nADF (DF): angular distribution function for all the neighbors of the central atom. """ def __init__(self, P, decomposer): self.P = P self.cP = None self.nP = None self.decomposer = decomposer self.dcP = None self.dnP = None self.rx = None """numpy.ndarray: cached value of the radial values that we evaluated the RDF at; useful if the RDF is requested repeatedly for the same `r` values. """ self.cRDF = None """numpy.ndarray: cached values of the RDF for the cached value in :attr:`rx` with :math:`l=0`. """ self.nRDF = None """numpy.ndarray: cached values of the RDF for the cached value in :attr:`rx` with :math:`l>0`. """ self.ax = None """numpy.ndarray: cached value of the angular values that we evaluated the ADF at; useful if the ADF is requested repeatedly for the same `theta` values. """ self.cADF = None """numpy.ndarray: cached values of the ADF for the cached value in :attr:`ax` with :math:`l=0`. """ self.nADF = None """numpy.ndarray: cached values of the ADF for the cached value in :attr:`ax` with :math:`l>0`. """ def __eq__(self, other): """Tests equality between two SOAP vectors by comparing *only* their P vectors. """ return np.allclose(self.P, other.P)
[docs] def equal(self, other): """Performs a more thorough equality check by also comparing the SOAP decomposers and RDFs. """ Pok = self == other Dok = (self.decomposer.get_params() == other.decomposer.get_params()) if self.rx is not None: if other.rx is not None: Rok = (np.allclose(self.rx, other.rx) and ((self.cRDF is None and other.cRDF is None) or (self.cRDF is not None and other.cRDF is not None and np.allclose(self.cRDF.df, other.cRDF.df))) and ((self.nRDF is None and other.nRDF is None) or (self.nRDF is not None and other.nRDF is not None and np.allclose(self.nRDF.df, other.nRDF.df)))) else:# pragma: no cover Rok = False else: Rok = True if self.ax is not None: if other.ax is not None: Aok = (np.allclose(self.ax, other.ax) and ((self.cADF is None and other.cADF is None) or (self.cADF is not None and other.cADF is not None and np.allclose(self.cADF.df, other.cADF.df))) and ((self.nADF is None and other.nADF is None) or (self.nADF is not None and other.nADF is not None and np.allclose(self.nADF.df, other.nADF.df)))) else:# pragma: no cover Aok = False else: Aok = True return Pok and Dok and Rok and Aok
[docs] @staticmethod def from_element(element, index=0, **kwargs): """Returns the SOAP vector for a pure element. Args: element (str): element name. Must be one of ["Ni", "Cr", "Mg"]. index (int): for multi-atom bases, which atom to represent in the elemental crystal. nmax (int): bandwidth limits for the SOAP descriptor radial basis functions. lmax (int): bandwidth limits for the SOAP descriptor spherical harmonics. rcut (float): local environment finite cutoff parameter. sigma (float): width parameter for the Gaussians on each atom. trans_width (float): distance over which the coefficients in the radial functions are smoothly transitioned to zero. """ from gblearn.elements import pissnnl P = pissnnl(element, **kwargs)[index] decomposer = SOAPDecomposer(1, **kwargs) return SOAPVector(P, decomposer)
[docs] @staticmethod def from_file(filename, decomposer=None, rx=None, ax=None): """Restores a SOAP vector from file. Args: filename (str): path to the file that was created by :meth:`SOAPVector.save`. decomposer (SOAPDecomposer): if this serialization was part of a higher level collection, then the decomposer would not have been included at the item level. For reconstruction, it is passed in by the collection. rx (numpy.ndarray): as for `decomposer`, but with the values in the domain. ax (numpy.ndarray): as for `decomposer`, but with the angular values in the domain. """ from six.moves.cPickle import load with open(filename, 'rb') as f: data = load(f) return SOAPVector.from_dict(data, decomposer, rx, ax)
[docs] @staticmethod def from_dict(data, decomposer_=None, rx=None, ax=None): """Restores a SOAP vector from a serialized dict (i.e., one returned by :meth:`serialize`). Args: data (dict): result of calling :meth:`serialize`. decomposer_ (SOAPDecomposer): if this serialization was part of a higher level collection, then the decomposer would not have been included at the item level. For reconstruction, it is passed in by the collection. rx (numpy.ndarray): as for `decomposer`, but with the values in the domain. ax (numpy.ndarray): as for `decomposer`, but with the angular values in the domain. """ if decomposer_ is not None: decomposer = decomposer_ else: decomposer = SOAPDecomposer(**data["decomposer"]) result = SOAPVector(data["P"], decomposer) result.dcP = data["dcP"] result.dnP = data["dnP"] if rx is not None and data["rx"] is None:# pragma: no cover result.rx = rx else: result.rx = data["rx"] if ax is not None and data["ax"] is None:# pragma: no cover result.ax = ax else: result.ax = data["ax"] if data["cRDF"] is not None: result.cRDF = DF(data["dcP"], True, result.rx, decomposer, calculate=False) result.cRDF.df = data["cRDF"] if data["nRDF"] is not None: result.nRDF = DF(data["dnP"], False, result.rx, decomposer, calculate=False) result.nRDF.df = data["nRDF"] if data["cADF"] is not None: result.cADF = DF(data["dcP"], True, result.ax, decomposer, calculate=False) result.cADF.df = data["cADF"] if data["nADF"] is not None: result.nADF = DF(data["dnP"], False, result.ax, decomposer, calculate=False) result.nADF.df = data["nADF"] return result
[docs] def serialize(self, withdecomp=True, commonrx=None, withdP=True, commonax=None): """Returns a serializable dictionary that represents this SOAP Vector. Args: withdecomp (bool): when True, include the parameters of the SOAP decomposer in the dict. commonrx (numpy.ndarray): xf this DF is part of a collection, the radial values will be the same for every DF; in that case we don't need to include the domain in the serialization. This is the parent collections common radial vector. If the DFs is the same, it won't be serialized. If unspecified, the domain values are serialized. withdP (bool): when True, the large, decomposed SOAP vector values are serialized. Necessary to completely reconstruct the representation, but not necessary for most analysis. commonax (numpy.ndarray): same as for `commonrx` but for the angular domain. """ result = { "P": self.P, "dcP": self.dcP if withdP else None, "dnP": self.dnP if withdP else None, "rx": None, "ax": None, "cRDF": None, "nRDF": None, "cADF": None, "nADF": None } if withdecomp: result["decomposer"] = self.decomposer.get_params() else: result["decomposer"] = {} if self.rx is not None: if commonrx is None or self.rx is not commonrx: result["rx"] = self.rx if self.ax is not None: if commonax is None or self.ax is not commonax: result["ax"] = self.ax if self.cRDF is not None: result["cRDF"] = self.cRDF.df if self.nRDF is not None: result["nRDF"] = self.nRDF.df if self.cADF is not None: result["cADF"] = self.cADF.df if self.nADF is not None: result["nADF"] = self.nADF.df return result
[docs] def save(self, target): """Saves the SOAP vector to disk so that RDF and decomposition operations don't have to be re-executed later. Args: target (str): path to save the vector to. """ from six.moves.cPickle import dump data = self.serialize() with open(target, 'wb') as f: dump(data, f)
def _get_DF(self, x, dfname, xname, catom=False): """Gets the specified distribution function, taking the cache into account. Args: x (numpy.ndarray): values in the radial or angular domain to sample at. dfname (str): name of the attribute on `self` corresponding to the cached distribution function to consider before constructing a new one. *Modified* by this routine if a DF is constructed. xname (str): name of attribute on `self` corresponding to cached sample values to consider before constructing a new :class:`DF`. *Modified* by this routine if a DF is constructed. catom (bool): when True, consider the central atom's distribution function. """ from numpy import allclose cachex = getattr(self, xname) cacheDF = getattr(self, dfname) if (cachex is not None and cachex.shape == x.shape and allclose(x, cachex)): if cacheDF is not None: return cacheDF else: setattr(self, xname, x) setattr(self, dfname, None) P = dP = None if catom: P = self.cP dP = self.dcP else: P = self.nP dP = self.dnP if P is None: lrest = self.decomposer.partition([0], catom) P = self.P.copy() P[lrest] = 0. if catom: self.cP = P else: self.nP = P if dP is None: dP = self.decomposer.decompose(P) if catom: self.dcP = dP else: self.dnP = dP setattr(self, dfname, DF(dP, catom, x, self.decomposer, xname=="rx")) return getattr(self, dfname)
[docs] def ADF(self, ax, catom=False): """Returns the angular distribution function for the SOAP vector. Args: ax (numpy.ndarray): domain in the angular space to evaluate the ADF at. catom (bool): when True, the ADF of the central atom is returned; otherwise, the central atom is ignored and the neighbor ADF is returned. """ if catom: return self._get_DF(ax, "cADF", "ax", catom=True) else: return self._get_DF(ax, "nADF", "ax", catom=False)
[docs] def RDF(self, rx, catom=False): """Returns the *total* radial distribution function for the SOAP vector. Args: rx (numpy.ndarray): domain in the radial space to evaluate the RDF at. catom (bool): when True, the RDF of the central atom is returned; otherwise, the central atom is ignored and the neighbor RDF is returned. """ if catom: return self._get_DF(rx, "cRDF", "rx", catom=True) else: return self._get_DF(rx, "nRDF", "rx", catom=False)
[docs]class SOAPVectorCollection(object): """Represents a collection of SOAP vectors that are logically combined (for example the local environments at a grain boundary are all part of the grain boundary). The collection allows aggregate quantities to be calculated more easily. Args: Pij (numpy.ndarray): a matrix of SOAP vectors, where each *row* is a vector. decomposer (SOAPDecomposer): instance used to decompose the SOAP vectors; has configuration information for SOAP parameters and caches for rapid evaluation of the basis functions. calculate (bool): when True, the SOAPVectors are calculated based on `P`; otherwise, they are left as an empty list. kwargs (dict): arguments that can be passed to :class:`SOAPDecomposer` constuctor. Examples: Create a collection of SOAP vectors and then collapse them into a set of unique ones (an instance of :class:`RDFCollection`). >>> from gblearn.decomposition import SOAPVectorCollection as SVC >>> from numpy import load >>> P = load("pissnnl.npy") >>> svc = SVC(P, lmax=18, nmax=18) >>> urdfs = svc.unique(rdf=True) """ def __init__(self, Pij=None, decomposer=None, calculate=True, **kwargs): self.P = Pij if decomposer is None: self.decomposer = SOAPDecomposer(**kwargs) else: self.decomposer = decomposer if self.P is not None and calculate: self.vectors = [SOAPVector(P, self.decomposer) for P in self.P] else: self.vectors = [] self._rdfs = {} """dict: keys are `int` sampling resolutions; values are *unique* RDFs (:class:`RDFCollection`) in this :class:`SOAPVectorCollection`. """ self._adfs = {} """dict: keys are `int` sampling resolutions; values are *unique* ADFs (:class:`ADFCollection`) in this :class:`SOAPVectorCollection`. """ def __eq__(self, other): return all([a==b for a, b in zip(self.vectors, other.vectors)])
[docs] def equal(self, other): """Performs a more rigorous equality test for two collections by *also* comparing the radial and angular distribution functions. """ return (self == other and self._rdfs == other._rdfs and self._adfs == other._adfs)
def __iter__(self): return iter(self.vectors) def __contains__(self, value): return value in self.vectors def __len__(self): return len(self.vectors) def __getitem__(self, index): if isinstance(index, slice): result = SOAPVectorCollection(decomposer=self.decomposer) result.vectors = self.vectors[index] result.P = self.P[index,:] if len(self._rdfs) > 0: self._rdfs = {r: dfs[index] for r, dfs in self._rdfs.items()} if len(self._adfs) > 0: self._adfs = {r: dfs[index] for r, dfs in self._adfs.items()} return result else: return self.vectors[index] def __setitem__(self, index, value): if not isinstance(value, SOAPVector): raise TypeError("Only SOAPVectors can be placed in an SOAP " "vector collection.") self.vectors[index] = value
[docs] @staticmethod def from_file(filename): """Restores a SOAPVectorCollection from file. Args: filename (str): path to the file that was created by :meth:`SOAPVectorCollection.save`. """ from six.moves.cPickle import load with open(filename, 'rb') as f: data = load(f) return SOAPVectorCollection.from_dict(data)
[docs] @staticmethod def from_dict(data): """Restores a SOAPVectorCollection from a serialized dict (i.e., one returned by :meth:`serialize`). Args: data (dict): result of calling :meth:`serialize`. """ vecs = [] decomposer = SOAPDecomposer(**data["decomposer"]) for vec in data["vectors"]: nvec = SOAPVector.from_dict(vec, decomposer, data["rx"], data["ax"]) vecs.append(nvec) result = SOAPVectorCollection(data["P"], decomposer, calculate=False) result.vectors = vecs result._rdfs = {r: RDFCollection.from_dict(d) for (r, d) in data["RDFs"].items()} result._adfs = {r: ADFCollection.from_dict(d) for (r, d) in data["ADFs"].items()} return result
[docs] def serialize(self, withdP=False): """Returns a serializable dictionary that represents this :class:`SOAPVectorCollection`. Args: withdP (bool): when True, the large, SOAP decomposition from which the DFs are constructed is also included. Wasteful, but necessary to completely represent the DFs. """ if len(self) > 0: rx = self.vectors[0].rx ax = self.vectors[0].ax else: rx = None ax = None vecs = [] for vec in self.vectors: vecs.append(vec.serialize(False, rx, withdP, ax)) result = { "decomposer": self.decomposer.get_params(), "vectors": vecs, "P": self.P, "RDFs": {r: dfcol.serialize(withdP=withdP) for r, dfcol in self._rdfs.items()}, "ADFs": {r: dfcol.serialize(withdP=withdP) for r, dfcol in self._adfs.items()}, "rx": rx, "ax": ax } return result
[docs] def save(self, target, withdP=False): """Saves the DF to disk. Args: target (str): path to save the vector to. withdP (bool): when True, the large, SOAP decomposition from which the DF was constructed is also included. Wasteful, but necessary to completely represent the DF. """ from six.moves.cPickle import dump data = self.serialize(withdP=withdP) with open(target, 'wb') as f: dump(data, f)
[docs] def RDFs(self, resolution=75, catom=False): """Returns the set of *unique* radial distribution functions for this collection. Args: resolution (int): number of points to sample in the radial domain. catom (bool): when True, the DFs are constructed using the density of the central atom. """ if resolution not in self._rdfs: self._rdfs[resolution] = RDFCollection.from_soap(self, resolution, catom) return self._rdfs[resolution]
[docs] def ADFs(self, resolution=100, catom=False): """Returns the set of *unique* angular distribution functions for this collection. Args: resolution (int): number of points to sample in the angular domain. catom (bool): when True, the DFs are constructed using the density of the central atom. """ if resolution not in self._adfs: self._adfs[resolution] = ADFCollection.from_soap(self, resolution, catom) return self._adfs[resolution]