Source code for PyFields.core.vectorfield

# -*- coding: utf-8 -*-
#!/bin/env python3

# Copyright (C) 2018 Gaby Launay

# Author: Gaby Launay  <gaby.launay@tutanota.com>
# URL: https://github.com/galaunay/pyfields
# Version: 0.1

# This file is part of PyFields

# PyFields is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.

# PyFields is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import copy

import matplotlib.pyplot as plt
import numpy as np

import unum
from . import field as fld, scalarfield as sf
from ..utils import make_unit

# TODO: check for remaining 'ScalarField'


[docs]class VectorField(fld.Field): """ Class representing a vector field (2D field, with two components on each point). """ def __init__(self, axis_x, axis_y, comp_x, comp_y, mask=None, unit_x="", unit_y="", unit_values=""): """ Object representing a vector field (2D field, with two component on each point). Parameters ---------- axis_x : nx1 array x axis values. axis_y : mx1 array y axis values. comp_x : array or masked array Values of the x component at the axis points comp_y : array or masked array Values of the y component at the axis points unit_x, unit_y : String or Unum object, optional x and y axis units. unit_values : String or Unum object, optional Field values unit. """ # build axis system fld.Field.__init__(self, axis_x=axis_x, axis_y=axis_y, unit_x=unit_x, unit_y=unit_y) self.__mask = None self.__comp_x = None self.__comp_y = None # store values properties self.comp_x = np.array(comp_x) self.comp_y = np.array(comp_y) if self.comp_y.shape != self.comp_x.shape: raise ValueError("'comp_x' and 'comp_y' should have the" " same dimensions") if self.comp_x.shape[0] != len(self.axis_x) or \ self.comp_x.shape[1] != len(self.axis_y): raise ValueError('Incoherent shapes, axis sizes are {}, {},' 'but comp_x size is {}' .format(len(self.axis_x), len(self.axis_y), self.comp_x.shape)) self.unit_values = unit_values # store mask (if necessary) if mask is None: nans = np.logical_or(np.isnan(comp_x), np.isnan(comp_y)) if np.any(nans): mask = nans else: mask = False else: try: mask = np.array(mask, dtype=bool) except TypeError: raise TypeError("'mask' should be a boolean or an array of" "boolean, but is currently {}" .format(mask)) if mask.shape != () and np.any(mask.shape != self.shape): raise ValueError("'mask' should be of the same size as the" " axis system: {}," " but is currently of size {}" .format(self.shape, mask.shape)) nans = np.logical_or(np.isnan(comp_x), np.isnan(comp_y)) mask = np.logical_or(mask, nans) self.mask = mask def __eq__(self, another): if not isinstance(another, VectorField): return False if not super().__eq__(another): return False if not np.all(self.mask == another.mask): return False if not np.all(self.comp_x[~self.mask] == another.comp_x[~another.mask]): return False if not np.all(self.comp_y[~self.mask] == another.comp_y[~another.mask]): return False try: self.unit_values == another.unit_values except unum.IncompatibleUnitsError: return False return True def __neg__(self): tmpvf = self.copy() tmpvf.comp_x = -tmpvf.comp_x tmpvf.comp_y = -tmpvf.comp_y return tmpvf def __add__(self, otherone): # if we add with a VectorField object if isinstance(otherone, VectorField): # test unities system self.unit_values + otherone.unit_values self.unit_x + otherone.unit_x self.unit_y + otherone.unit_y # identical shape and axis if super().__eq__(otherone): tmpvf = self.copy() fact = otherone.unit_values/self.unit_values tmpvf.comp_x += otherone.comp_x*fact.asNumber() tmpvf.comp_y += otherone.comp_y*fact.asNumber() tmpvf.mask = np.logical_or(self.mask, otherone.mask) # different shape, partially same axis else: # getting shared points new_ind_x = np.array([val in otherone.axis_x for val in self.axis_x]) new_ind_y = np.array([val in otherone.axis_y for val in self.axis_y]) new_ind_xo = np.array([val in self.axis_x for val in otherone.axis_x]) new_ind_yo = np.array([val in self.axis_y for val in otherone.axis_y]) if not np.any(new_ind_x) or not np.any(new_ind_y): raise ValueError("Incompatible shapes") new_ind_Y, new_ind_X = np.meshgrid(new_ind_y, new_ind_x) new_ind_comp = np.logical_and(new_ind_X, new_ind_Y) new_ind_Yo, new_ind_Xo = np.meshgrid(new_ind_yo, new_ind_xo) new_ind_compo = np.logical_and(new_ind_Xo, new_ind_Yo) # getting new axis and values new_axis_x = self.axis_x[new_ind_x] new_axis_y = self.axis_y[new_ind_y] fact = otherone.unit_values/self.unit_values new_comp_x = (self.comp_x[new_ind_comp] + otherone.comp_x[new_ind_compo] * fact.asNumber()) new_comp_y = (self.comp_y[new_ind_comp] + otherone.comp_y[new_ind_compo] * fact.asNumber()) new_comp_x = new_comp_x.reshape((len(new_axis_x), len(new_axis_y))) new_comp_y = new_comp_y.reshape((len(new_axis_x), len(new_axis_y))) new_mask = np.logical_or(self.mask[new_ind_comp], otherone.mask[new_ind_compo]) new_mask = new_mask.reshape((len(new_axis_x), len(new_axis_y))) # creating sf tmpvf = VectorField(new_axis_x, new_axis_y, new_comp_x, new_comp_y, mask=new_mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmpvf # if we add with a unit object elif isinstance(otherone, unum.Unum): try: self.unit_values + otherone except unum.IncompatibleUnitsError as m: raise ValueError("Units don't match: {}".format(m)) tmpsf = self.copy() tmpsf.comp_x += (otherone/self.unit_values).asNumber() tmpsf.comp_y += (otherone/self.unit_values).asNumber() return tmpsf else: try: tmpsf = self.copy() tmpsf.comp_x += otherone tmpsf.comp_y += otherone return tmpsf except TypeError: raise TypeError("You can only add a vectorfield " "with others vectorfields or numbers," "not with {}".format(otherone)) def __radd__(self, obj): return self.__add__(obj) def __sub__(self, obj): return self.__add__(-obj) def __rsub__(self, obj): return self.__neg__() + obj def __truediv__(self, obj): # units object if isinstance(obj, unum.Unum): tmpvf = self.copy() unit_values = tmpvf.unit_values / obj tmpvf.comp_x *= unit_values.asNumber() tmpvf.comp_y *= unit_values.asNumber() unit_values /= unit_values.asNumber() tmpvf.unit_values = unit_values return tmpvf # other vectorfield if isinstance(obj, VectorField): if np.any(self.axis_x != obj.axis_x)\ or np.any(self.axis_y != obj.axis_y)\ or self.unit_x != obj.unit_x\ or self.unit_y != obj.unit_y: raise ValueError("Fields are not consistent") tmpsf = self.copy() filt_nan = np.logical_and(obj.comp_x != 0, obj.comp_y != 0) comp_x = np.zeros(shape=self.comp_x.shape) comp_x[filt_nan] = self.comp_x[filt_nan]/obj.comp_x[filt_nan] comp_y = np.zeros(shape=self.comp_y.shape) comp_y[filt_nan] = self.comp_y[filt_nan]/obj.comp_y[filt_nan] mask = np.logical_or(self.mask, obj.mask) mask = np.logical_or(mask, np.logical_not(filt_nan)) unit = self.unit_values / obj.unit_values tmpsf.comp_x = comp_x*unit.asNumber() tmpsf.comp_y = comp_y*unit.asNumber() tmpsf.mask = mask tmpsf.unit_values = unit/unit.asNumber() return tmpsf # array try: obj[0] obj = np.array(obj, subok=True) if not obj.shape == self.shape: raise ValueError() tmpsf = self.copy() mask = np.logical_or(self.mask, obj == 0) not_mask = np.logical_not(mask) comp_x = tmpsf.comp_x comp_y = tmpsf.comp_y comp_x[not_mask] /= obj[not_mask] comp_y[not_mask] /= obj[not_mask] tmpsf.comp_x = comp_x tmpsf.comp_y = comp_y tmpsf.mask = mask return tmpsf except TypeError: pass # number try: tmpsf = self.copy() tmpsf.comp_x /= obj tmpsf.comp_y /= obj return tmpsf except: pass # else... raise TypeError("Unsupported operation between {} and a " "VectorField object".format(type(obj))) def __rtruediv__(self, obj): tmpsf = self.copy() # units object if isinstance(obj, unum.Unum): tmpsf.comp_x = obj.asNumber()/tmpsf.comp_x tmpsf.comp_y = obj.asNumber()/tmpsf.comp_y tmpsf.unit_values = obj/obj.asNumber()/tmpsf.unit_values return tmpsf try: obj[0] obj = np.array(obj, subok=True) if not obj.shape == self.shape: raise ValueError() mask = np.logical_or(self.mask, obj == 0) not_mask = np.logical_not(mask) comp_x = tmpsf.comp_x comp_y = tmpsf.comp_y comp_x[not_mask] = obj[not_mask] / tmpsf.comp_x[not_mask] comp_y[not_mask] = obj[not_mask] / tmpsf.comp_y[not_mask] tmpsf.comp_x = comp_x tmpsf.comp_y = comp_y tmpsf.mask = mask return tmpsf except TypeError: pass # number try: tmpsf.comp_x = obj/tmpsf.comp_x tmpsf.comp_y = obj/tmpsf.comp_y tmpsf.unit_values = 1/tmpsf.unit_values return tmpsf except: raise TypeError("Unsupported operation between {} and a " "VectorField object".format(type(obj))) def __mul__(self, obj): # units if isinstance(obj, unum.Unum): tmpsf = self.copy() tmpsf.comp_x *= obj.asNumber() tmpsf.comp_y *= obj.asNumber() tmpsf.unit_values *= obj/obj.asNumber() tmpsf.mask = self.mask return tmpsf # sclarfield if isinstance(obj, VectorField): if np.any(self.axis_x != obj.axis_x)\ or np.any(self.axis_y != obj.axis_y)\ or self.unit_x != obj.unit_x\ or self.unit_y != obj.unit_y: raise ValueError("Fields are not consistent") tmpsf = self.copy() comp_x = self.comp_x * obj.comp_x comp_y = self.comp_y * obj.comp_y mask = np.logical_or(self.mask, obj.mask) unit = self.unit_values * obj.unit_values tmpsf.comp_x = comp_x*unit.asNumber() tmpsf.comp_y = comp_y*unit.asNumber() tmpsf.mask = mask tmpsf.unit_values = unit/unit.asNumber() return tmpsf # array try: obj[0] obj = np.array(obj, subok=True) if not obj.shape == self.shape: raise ValueError() tmpsf = self.copy() mask = self.mask not_mask = np.logical_not(mask) comp_x = tmpsf.comp_x comp_y = tmpsf.comp_y comp_x[not_mask] *= obj[not_mask] comp_y[not_mask] *= obj[not_mask] tmpsf.comp_x = comp_x tmpsf.comp_y = comp_y tmpsf.mask = mask return tmpsf except TypeError: pass # numbers try: tmpsf = self.copy() tmpsf.comp_x *= obj tmpsf.comp_y *= obj tmpsf.mask = self.mask return tmpsf except: raise TypeError("Unsupported operation between {} and a " "VectorField object".format(type(obj))) __rmul__ = __mul__ def __abs__(self): tmpsf = self.copy() tmpsf.comp_x = np.abs(tmpsf.comp_x) tmpsf.comp_y = np.abs(tmpsf.comp_y) return tmpsf def __pow__(self, number): tmpsf = self.copy() tmpsf.comp_x[np.logical_not(tmpsf.mask)] \ = np.power(tmpsf.comp_x[np.logical_not(tmpsf.mask)], number) tmpsf.comp_y[np.logical_not(tmpsf.mask)] \ = np.power(tmpsf.comp_y[np.logical_not(tmpsf.mask)], number) tmpsf.mask = self.mask tmpsf.unit_values = np.power(tmpsf.unit_values, number) return tmpsf def __iter__(self): for ij, xy in fld.Field.__iter__(self): i = ij[0] j = ij[1] if not self.mask[i, j]: yield ij, xy, self.comp_x[i, j], self.comp_y[i, j] def __repr__(self): return self.get_props() @property def comp_x(self): comp_x = self.__comp_x.copy() if self.__mask is not None: try: comp_x[self.mask] = np.nan except ValueError: comp_x[self.mask] = 0 return comp_x @comp_x.setter def comp_x(self, new_comp_x): new_comp_x = np.asarray(new_comp_x) if self.shape[0] == new_comp_x.shape[0]\ and self.shape[1] == new_comp_x.shape[1]: self.__comp_x = new_comp_x else: raise ValueError("'comp_x' should have the same shape as the " "axis system: {}, not {}." .format(self.shape, new_comp_x.shape)) @comp_x.deleter def comp_x(self): raise Exception("Can't delete that") @property def comp_x_as_sf(self): tmp_vf = sf.ScalarField(self.axis_x, self.axis_y, self.comp_x, mask=self.mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_vf @property def comp_y(self): comp_y = self.__comp_y.copy() if self.__mask is not None: try: comp_y[self.mask] = np.nan except ValueError: comp_y[self.mask] = 0 return comp_y @comp_y.setter def comp_y(self, new_comp_y): new_comp_y = np.asarray(new_comp_y) if self.shape[0] == new_comp_y.shape[0]\ and self.shape[1] == new_comp_y.shape[1]: self.__comp_y = new_comp_y else: raise ValueError("'comp_y' should have the same shape as the " "axis system: {}, not {}." .format(self.shape, new_comp_y.shape)) @comp_y.deleter def comp_y(self): raise Exception("Can't delete that") @property def comp_y_as_sf(self): tmp_vf = sf.ScalarField(self.axis_x, self.axis_y, self.comp_y, mask=self.mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_vf @property def dtype(self): return (self.comp_x.dtype, self.comp_y.dtype) @property def mask(self): return np.logical_or(self.__mask, np.logical_or( np.isnan(self.__comp_x), np.isnan(self.__comp_y))) @mask.setter def mask(self, new_mask): # check 'new_mask' coherence if isinstance(new_mask, (bool, np.bool)): fill_value = new_mask new_mask = np.empty(self.shape, dtype=bool) new_mask.fill(fill_value) else: try: new_mask = np.asarray(new_mask, dtype=bool) except TypeError: raise TypeError("'mask' should be a boolean or an array" "of booleans, but is currently {}" .format(new_mask)) if self.shape[0] != new_mask.shape[0]\ or self.shape[1] != new_mask.shape[1]: raise ValueError("'mask' should be of the same size as the " "axis system: {}, but is currently: {}" .format(self.shape, new_mask.shape)) self.__mask = new_mask @mask.deleter def mask(self): raise Exception("Can't delete that") @property def mask_as_sf(self): tmp_vf = sf.ScalarField(self.axis_x, self.axis_y, self.mask, mask=None, unit_x=self.unit_x, unit_y=self.unit_y, unit_values='') return tmp_vf @property def unit_values(self): return self.__unit_values @unit_values.setter def unit_values(self, new_unit_values): if isinstance(new_unit_values, unum.Unum): if np.isclose(new_unit_values.asNumber(), 1): self.__unit_values = new_unit_values else: raise ValueError('New values unit is not 1') else: try: self.__unit_values = make_unit(new_unit_values) except TypeError: raise TypeError('Unrecognized unity representation.') @unit_values.deleter def unit_values(self): raise Exception("Can't delete that.") @property def min(self): return np.min(self.magnitude[~self.mask]) @property def max(self): return np.max(self.magnitude[~self.mask]) @property def mean(self): return np.mean(self.magnitude[~self.mask]) @property def std(self): return np.std(self.magnitude[~self.mask]) @property def magnitude(self): """ Return a scalar field with the velocity field magnitude. """ comp_x, comp_y = self.comp_x, self.comp_y mask = self.mask values = (comp_x**2 + comp_y**2)**(.5) values[mask] = np.NaN return values @property def magnitude_as_sf(self): """ Return a scalarfield with the velocity field magnitude. """ tmp_vf = sf.ScalarField(self.axis_x, self.axis_y, self.magnitude, mask=self.mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_vf @property def theta(self): """ Return a scalar field with the vector angle (in reference of the unit_y vector [1, 0]). Parameters ---------- low_velocity_filter : number If not zero, points where V < Vmax*low_velocity_filter are masked. Returns ------- theta_sf : sf.ScalarField object Contening theta field. """ # get data comp_x, comp_y = self.comp_x, self.comp_y not_mask = np.logical_not(self.mask) theta = np.zeros(self.shape) # getting angle norm = self.magnitude not_mask = np.logical_and(not_mask, norm != 0) theta[not_mask] = comp_x[not_mask]/norm[not_mask] theta[not_mask] = np.arccos(theta[not_mask]) tmp_comp_y = comp_y.copy() tmp_comp_y[~not_mask] = 0 sup_not_mask = tmp_comp_y < 0 theta[sup_not_mask] = 2*np.pi - theta[sup_not_mask] return theta @property def theta_as_sf(self): """ Return a scalarfield with the velocity field angles. """ tmp_vf = sf.ScalarField(self.axis_x, self.axis_y, self.theta, mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_vf
[docs] def get_props(self): """ Print the VectorField main properties """ text = "Shape: {}\n".format(self.shape) unit_x = self.unit_x.strUnit() text += "Axe x: [{}..{}]{}\n".format(self.axis_x[0], self.axis_x[-1], unit_x) unit_y = self.unit_y.strUnit() text += "Axe y: [{}..{}]{}\n".format(self.axis_y[0], self.axis_y[-1], unit_y) unit_values = self.unit_values.strUnit() xmin = np.min(self.comp_x[~self.mask]) xmax = np.max(self.comp_x[~self.mask]) ymin = np.min(self.comp_y[~self.mask]) ymax = np.max(self.comp_y[~self.mask]) text += "Comp x: [{}..{}]{}\n".format(xmin, xmax, unit_values) text += "Comp y: [{}..{}]{}\n".format(ymin, ymax, unit_values) nmb_mask = np.sum(self.mask) nmb_tot = self.shape[0]*self.shape[1] text += "Masked values: {}/{}".format(nmb_mask, nmb_tot) return text
[docs] def get_value(self, x, y, ind=False, unit=False): """ Return the vector field components on the point (x, y). Parameters ---------- x, y: numbers Positions where to get the components. ind: boolean If 'True', x and y are indices, else (default), x and y are in axis units unit: boolean If 'True', also return the components unities. (default to False) """ return np.array([self.comp_x_as_sf.get_value(x, y, ind=ind, unit=unit), self.comp_y_as_sf.get_value(x, y, ind=ind, unit=unit)])
[docs] def get_profile(self, x=None, y=None, component='x', ind=False, interp='linear'): """ Return a profile of the vector field, at the given position. If position is an interval, the fonction return an average profile in this interval. Parameters ---------- x, y: numbers or 2x1 array of numbers Position of the wanted profile. component: string in ['x', 'y'] Component to get a profile from. (default to 'x') ind : boolean If 'True', position has to be given in indices If 'False' (default), position has to be given in axis unit. interp : string in ['nearest', 'linear'] if 'nearest', get the profile at the nearest position on the grid, if 'linear', use linear interpolation to get the profile at the exact position Returns ------- profile : prof.Profile object Wanted profile """ if component == 'x': return self.comp_x_as_sf.get_profile(x, y, ind, interp=interp) elif component == 'y': return self.comp_y_as_sf.get_profile(x, y, ind, interp=interp) else: raise TypeError("'component' must be 'x' or 'y'")
[docs] def get_histograms(self, bins=200, cum=False, normalized=False): """ Return a vectorfield component histograms. Parameters ========== cum: boolean If True, get a cumulative histogram. normalized: boolean If True, normalize the histogram. bins: integer Number of bins (default to 200). Returns ======= histx, histy: Profile objects. Histograms. """ histx = self.comp_x_as_sf.get_histogram(bins=bins, cum=cum, normalized=normalized) histy = self.comp_y_as_sf.get_histogram(bins=bins, cum=cum, normalized=normalized) return histx, histy
[docs] def get_interpolators(self, interp="linear"): """ Return the field interpolators. Parameters ---------- kind : {‘linear’, ‘cubic’, ‘quintic’}, optional The kind of spline interpolation to use. Default is ‘linear’. Returns ------- interpx, interpy Interpolators for each components Example ------- >>> interp = SF.get_interpolator(interp='linear') >>> print(interp(4, 5.3)) ... [34.3] >>> print(interp([3, 4, 5], 5.3)) ... [23, 34.3, 54] """ interpx = self.comp_x_as_sf.get_interpolator(interp=interp) interpy = self.comp_y_as_sf.get_interpolator(interp=interp) return interpx, interpy
[docs] def copy(self): """ Return a copy of the vectorfield. """ return copy.deepcopy(self)
[docs] def get_norm(self, norm=2): """ Return the field norm. Parameters ---------- norm: positive integer Norm order. Returns ------- norm : number Norm. """ values = np.concatenate((self.comp_x[~self.mask], self.comp_y[~self.mask])) res = (np.sum(np.abs(values)**norm))**(1./norm) return res
[docs] def scale(self, scalex=None, scaley=None, scalev=None, inplace=False): """ Scale the VectorField. Parameters ---------- scalex, scaley, scalev : numbers or Unum objects Scale for the axis and the values. inplace : boolean . """ if inplace: tmp_f = self else: tmp_f = self.copy() # xy revx, revy = fld.Field.scale(tmp_f, scalex=scalex, scaley=scaley, inplace=True, output_reverse=True) # v if scalev is None: pass elif isinstance(scalev, unum.Unum): new_unit = tmp_f.unit_values*scalev fact = new_unit.asNumber() new_unit /= fact tmp_f.unit_values = new_unit tmp_f.comp_x *= fact tmp_f.comp_y *= fact else: tmp_f.comp_x *= scalev tmp_f.comp_y *= scalev if revx and revy: tmp_f.comp_x = -tmp_f.comp_x[::-1, ::-1] tmp_f.comp_y = -tmp_f.comp_y[::-1, ::-1] elif revx: tmp_f.comp_x = -tmp_f.comp_x[::-1, :] tmp_f.comp_y = tmp_f.comp_y[::-1, :] elif revy: tmp_f.comp_x = tmp_f.comp_x[:, ::-1] tmp_f.comp_y = -tmp_f.comp_y[:, ::-1] # returning if not inplace: return tmp_f
[docs] def rotate(self, angle, inplace=False): """ Rotate the vector field. Parameters ---------- angle : integer Angle in degrees (signe in trigonometric convention). In order to preserve the orthogonal grid, only multiples of 90° are accepted. inplace : boolean, optional If True, the field is rotated in place, else (default), a rotated copy is returned. Returns ------- rotated_field : Field object Rotated field. """ # get data if inplace: tmp_field = self else: tmp_field = self.copy() # normalize angle angle = angle % 360 # rotate the parent fld.Field.rotate(tmp_field, angle, inplace=True) # rotate nmb_rot90 = int(angle/90) mask = tmp_field.mask comp_x = tmp_field.comp_x comp_y = tmp_field.comp_y tmp_field.__comp_x = np.rot90(comp_x, nmb_rot90) tmp_field.__comp_y = np.rot90(comp_y, nmb_rot90) tmp_field.__mask = np.rot90(mask, nmb_rot90) # returning if not inplace: return tmp_field
[docs] def change_unit(self, axis, new_unit): """ Change the unit of an axis. Parameters ---------- axis : string 'y' for changing the profile y axis unit 'x' for changing the profile x axis unit 'values' or changing values unit new_unit : Unum.unit object or string The new unit. """ if not isinstance(new_unit, unum.Unum): new_unit = make_unit(new_unit) if axis not in ['x', 'y', 'values']: raise TypeError("'axis' should be 'x', 'y', or 'values'") if axis == 'x': fld.Field.change_unit(self, axis, new_unit) elif axis == 'y': fld.Field.change_unit(self, axis, new_unit) elif axis == 'values': old_unit = self.unit_values new_unit = old_unit.asUnit(new_unit) fact = new_unit.asNumber() self.comp_x *= fact self.comp_y *= fact self.unit_values = new_unit/fact else: raise ValueError()
[docs] def crop(self, intervx=None, intervy=None, ind=False, inplace=False): """ Crop the vector field. Parameters ---------- intervx : array, optional Wanted interval along x. intervy : array, optional Wanted interval along y. ind : boolean, optional If 'True', intervals are understood as indices along axis. If 'False' (default), intervals are understood in axis units. inplace : boolean, optional If 'True', the field is croped in place, else (default), a copy is returned. """ if inplace: comp_x = self.comp_x comp_y = self.comp_y mask = self.mask indmin_x, indmax_x, indmin_y, indmax_y = \ fld.Field.crop(self, intervx, intervy, full_output=True, ind=ind, inplace=True) self.__comp_x = comp_x[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] self.__comp_y = comp_y[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] self.__mask = mask[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] else: indmin_x, indmax_x, indmin_y, indmax_y, cropfield = \ fld.Field.crop(self, intervx=intervx, intervy=intervy, full_output=True, ind=ind, inplace=False) cropfield.__comp_x = self.comp_x[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] cropfield.__comp_y = self.comp_y[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] cropfield.__mask = self.mask[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] return cropfield
[docs] def extend(self, nmb_left=0, nmb_right=0, nmb_up=0, nmb_down=0, value=None, inplace=False, ind=True): """ Add columns and/or lines of masked values to the field. Parameters ---------- nmb_left, nmb_right, nmb_up, nmb_down : integers Number of lines/columns to add in each direction. inplace : bool If 'True', extend the field in place, else (default), return an extended copy of the field. Returns ------- Extended_field : Field object Extended field. Note ---- If the axis values are not equally spaced, a linear extrapolation is used to obtain the new axis values. """ if inplace: tmp_vf = self else: tmp_vf = self.copy() if not ind: dx = self.dx dy = self.dy nmb_left = np.ceil(nmb_left/dx) nmb_right = np.ceil(nmb_right/dx) nmb_up = np.ceil(nmb_up/dy) nmb_down = np.ceil(nmb_down/dy) ind = True # check params if not (isinstance(nmb_left, int) or nmb_left % 1 == 0): raise TypeError() if not (isinstance(nmb_right, int) or nmb_right % 1 == 0): raise TypeError() if not (isinstance(nmb_up, int) or nmb_up % 1 == 0): raise TypeError() if not (isinstance(nmb_down, int) or nmb_down % 1 == 0): raise TypeError() nmb_left = int(nmb_left) nmb_right = int(nmb_right) nmb_up = int(nmb_up) nmb_down = int(nmb_down) if np.any(np.array([nmb_left, nmb_right, nmb_up, nmb_down]) < 0): raise ValueError() # used herited method to extend the field fld.Field.extend(tmp_vf, nmb_left=nmb_left, nmb_right=nmb_right, nmb_up=nmb_up, nmb_down=nmb_down, inplace=True) new_shape = tmp_vf.shape # extend the value ans mask if value is None: new_comp_x = np.zeros(new_shape, dtype=float) new_comp_y = np.zeros(new_shape, dtype=float) new_mask = np.ones(new_shape, dtype=bool) else: new_comp_x = np.ones(new_shape, dtype=float)*value new_comp_y = np.ones(new_shape, dtype=float)*value new_mask = np.zeros(new_shape, dtype=bool) if nmb_right == 0: slice_x = slice(nmb_left, new_comp_x.shape[0] + 2) else: slice_x = slice(nmb_left, -nmb_right) if nmb_up == 0: slice_y = slice(nmb_down, new_comp_x.shape[1] + 2) else: slice_y = slice(nmb_down, -nmb_up) new_comp_x[slice_x, slice_y] = self.comp_x new_comp_y[slice_x, slice_y] = self.comp_y new_mask[slice_x, slice_y] = self.mask tmp_vf.__comp_x = new_comp_x tmp_vf.__comp_y = new_comp_y tmp_vf.__mask = new_mask # return if not inplace: return tmp_vf
[docs] def crop_masked_border(self, hard=False, inplace=False): """ Crop the masked border of the field in place or not. Parameters ---------- hard : boolean, optional If 'True', partially masked border are croped as well. """ # if inplace: tmp_vf = self else: tmp_vf = self.copy() # checking masked values presence mask = tmp_vf.mask if not np.any(mask): return None # hard cropping if hard: # remove trivial borders tmp_vf.crop_masked_border(hard=False, inplace=True) # until there is no more masked values while np.any(tmp_vf.mask): # getting number of masked value on each border bd1 = np.sum(tmp_vf.mask[0, :]) bd2 = np.sum(tmp_vf.mask[-1, :]) bd3 = np.sum(tmp_vf.mask[:, 0]) bd4 = np.sum(tmp_vf.mask[:, -1]) # getting more masked border more_masked = np.argmax([bd1, bd2, bd3, bd4]) # deleting more masked border if more_masked == 0: len_x = len(tmp_vf.axis_x) tmp_vf.crop(intervx=[1, len_x], ind=True, inplace=True) elif more_masked == 1: len_x = len(tmp_vf.axis_x) tmp_vf.crop(intervx=[0, len_x - 2], ind=True, inplace=True) elif more_masked == 2: len_y = len(tmp_vf.axis_y) tmp_vf.crop(intervy=[1, len_y], ind=True, inplace=True) elif more_masked == 3: len_y = len(tmp_vf.axis_y) tmp_vf.crop(intervy=[0, len_y - 2], ind=True, inplace=True) # soft cropping else: axis_x_m = np.logical_not(np.all(mask, axis=1)) axis_y_m = np.logical_not(np.all(mask, axis=0)) axis_x_min = np.where(axis_x_m)[0][0] axis_x_max = np.where(axis_x_m)[0][-1] axis_y_min = np.where(axis_y_m)[0][0] axis_y_max = np.where(axis_y_m)[0][-1] tmp_vf.crop([axis_x_min, axis_x_max], [axis_y_min, axis_y_max], ind=True, inplace=True) # returning if not inplace: return tmp_vf
[docs] def fill(self, kind='linear', value=0., inplace=False, reduce_tri=True, crop=False): """ Fill the masked parts of the vector field. Parameters ---------- kind : string, optional Type of algorithm used to fill. 'value' : fill with the given value 'nearest' : fill with the nearest value 'linear' (default): fill using linear interpolation (Delaunay triangulation) 'cubic' : fill using cubic interpolation (Delaunay triangulation) value : number Value used to fill (for kind='value'). inplace : boolean, optional If 'True', fill the vector field in place. If 'False' (default), return a filled version of the field. reduce_tri : boolean, optional If 'True', treatment is used to reduce the triangulation effort (faster when a lot of masked values) If 'False', no treatment (faster when few masked values) crop : boolean, optional If 'True', SF borders are croped before filling. """ if inplace: tmp_f = self else: tmp_f = self.copy() # vx = self.comp_x_as_sf vy = self.comp_y_as_sf vx.fill(kind=kind, value=value, inplace=True, reduce_tri=reduce_tri, crop=crop) vy.fill(kind=kind, value=value, inplace=True, reduce_tri=reduce_tri, crop=crop) tmp_f.comp_x = vx.values tmp_f.comp_y = vy.values tmp_f.mask = False # return tmp_f
[docs] def smooth(self, tos='uniform', size=None, inplace=False, **kw): """ Smooth the vectorfield. Warning : fill up the field (should be used carefully with masked field borders) Parameters ---------- tos : string, optional Type of smoothing, can be 'uniform' (default) or 'gaussian' (See ndimage module documentation for more details) size : number, optional Size of the smoothing (is radius for 'uniform' and sigma for 'gaussian') in indice number. Default is 3 for 'uniform' and 1 for 'gaussian'. inplace : boolean, optional If True, Field is smoothed in place, else, the smoothed field is returned. kw : dic Additional parameters for ndimage methods (See ndimage documentation) """ if inplace: tmp_f = self else: tmp_f = self.copy() # vx = self.comp_x_as_sf vy = self.comp_y_as_sf vx.smooth(tos=tos, size=size, inplace=True, **kw) vy.smooth(tos=tos, size=size, inplace=True, **kw) tmp_f.comp_x = vx.values tmp_f.comp_y = vy.values tmp_f.mask = False # return return tmp_f
[docs] def make_evenly_spaced(self, interp="linear", res=1, inplace=False): """ Use interpolation to make the field evenly spaced. Parameters ---------- interp : {‘linear’, ‘cubic’, ‘quintic’}, optional The kind of spline interpolation to use. Default is ‘linear’. res : number Resolution of the resulting field. A value of 1 meaning a spatial resolution equal to the smallest space along the two axis for the initial field. A value of 2 means half this resolution. inplace: boolean If True, modify the vector field in place, else, return a modified version of it. """ if inplace: tmp_f = self else: tmp_f = self.copy() # get data vx = self.comp_x_as_sf vy = self.comp_y_as_sf vx.make_evenly_spaced(interp=interp, res=res, inplace=True) vy.make_evenly_spaced(interp=interp, res=res, inplace=True) tmp_f.__init__(vx.axis_x, vx.axis_y, vx.values, vy.values, mask=np.logical_or(vx.mask, vy.mask), unit_x=vx.unit_x, unit_y=vx.unit_y, unit_values=vx.unit_values) # return return tmp_f
[docs] def reduce_resolution(self, fact, inplace=False): """ Reduce the spatial resolution of the vector field by a factor 'fact'. Parameters ---------- fact : int Reducing factor. inplace : boolean, optional . """ if inplace: tmp_vf = self else: tmp_vf = self.copy() # fact = int(fact) if fact < 1: raise ValueError() if fact == 1: if inplace: pass else: return tmp_vf if fact % 2 == 0: pair = True else: pair = False # get new axis axis_x = tmp_vf.axis_x axis_y = tmp_vf.axis_y if pair: new_axis_x = (axis_x[np.arange(fact/2 - 1, len(axis_x) - fact/2, fact, dtype=int)] + axis_x[np.arange(fact/2, len(axis_x) - fact/2 + 1, fact, dtype=int)])/2. new_axis_y = (axis_y[np.arange(fact/2 - 1, len(axis_y) - fact/2, fact, dtype=int)] + axis_y[np.arange(fact/2, len(axis_y) - fact/2 + 1, fact, dtype=int)])/2. else: new_axis_x = axis_x[np.arange((fact - 1)/2, len(axis_x) - (fact - 1)/2, fact, dtype=int)] new_axis_y = axis_y[np.arange((fact - 1)/2, len(axis_y) - (fact - 1)/2, fact, dtype=int)] # get new values comp_x = tmp_vf.comp_x comp_y = tmp_vf.comp_y mask = tmp_vf.mask if pair: inds_x = np.arange(fact/2, len(axis_x) - fact/2 + 1, fact, dtype=int) inds_y = np.arange(fact/2, len(axis_y) - fact/2 + 1, fact, dtype=int) new_comp_x = np.zeros((len(inds_x), len(inds_y))) new_comp_y = np.zeros((len(inds_x), len(inds_y))) new_mask = np.zeros((len(inds_x), len(inds_y))) for i in np.arange(len(inds_x)): intervx = slice(inds_x[i] - int(fact/2), inds_x[i] + int(fact/2)) for j in np.arange(len(inds_y)): intervy = slice(inds_y[j] - int(fact/2), inds_y[j] + int(fact/2)) if np.all(mask[intervx, intervy]): new_mask[i, j] = True new_comp_x[i, j] = 0. new_comp_y[i, j] = 0. else: new_comp_x[i, j] = np.mean(comp_x[intervx, intervy] [~mask[intervx, intervy]]) new_comp_y[i, j] = np.mean(comp_y[intervx, intervy] [~mask[intervx, intervy]]) else: inds_x = np.arange((fact - 1)/2, len(axis_x) - (fact - 1)/2, fact) inds_y = np.arange((fact - 1)/2, len(axis_y) - (fact - 1)/2, fact) new_comp_x = np.zeros((len(inds_x), len(inds_y))) new_comp_y = np.zeros((len(inds_x), len(inds_y))) new_mask = np.zeros((len(inds_x), len(inds_y))) for i in np.arange(len(inds_x)): intervx = slice(int(inds_x[i] - (fact - 1)/2), int(inds_x[i] + (fact - 1)/2 + 1)) for j in np.arange(len(inds_y)): intervy = slice(int(inds_y[j] - (fact - 1)/2), int(inds_y[j] + (fact - 1)/2 + 1)) print(intervx, intervy) if np.all(mask[intervx, intervy]): new_mask[i, j] = True new_comp_x[i, j] = 0. new_comp_y[i, j] = 0. else: new_comp_x[i, j] = np.mean(comp_x[intervx, intervy] [~mask[intervx, intervy]]) new_comp_y[i, j] = np.mean(comp_y[intervx, intervy] [~mask[intervx, intervy]]) # returning tmp_vf.__init__(new_axis_x, new_axis_y, new_comp_x, new_comp_y, mask=new_mask, unit_x=tmp_vf.unit_x, unit_y=tmp_vf.unit_y, unit_values=tmp_vf.unit_values) if not inplace: return tmp_vf
[docs] def display(self, component=None, kind=None, annotate=True, **plotargs): """ Display the vector field. Parameters ---------- component : string, optional Component to display, can be 'V', 'x', 'y', 'magnitude' or 'mask' kind : string, optinnal can be 'quiver', 'stream', 'imshow', 'contour', 'contourf' annotate: boolean If True (default) add label and legedn to the graph. **plotargs : dict Arguments passed to the plotting function. Returns ------- fig : figure reference Reference to the displayed figure. """ # default if component is None: if kind in ['quiver', 'stream', None]: component = 'V' else: component = 'magnitude' if kind is None: if component in ['V']: kind = 'quiver' else: kind = 'imshow' # check if component not in ['V', 'x', 'y', 'magnitude', 'mask']: raise ValueError("'component' must be 'x', 'y', 'magnitude', " "'V' or 'mask', not {}".format(component)) if kind not in ['quiver', 'stream', 'imshow', 'contour', 'contourf']: raise ValueError("'kind' must be 'quiver', 'stream', " "'imshow', 'contour', or 'contourf'," " not {}".format(kind)) if component == 'V': if kind in ['imshow', 'contour', 'contourf']: raise ValueError('Can plot a {} with component {}' .format(kind, component)) else: if kind in ['quiver', 'stream']: raise ValueError('Can plot a {} with component {}' .format(kind, component)) # use scalarfield to display if component == "x": return self.comp_x_as_sf.display(component='values', kind=kind, annotate=annotate, **plotargs) if component == "y": return self.comp_y_as_sf.display(component='values', kind=kind, annotate=annotate, **plotargs) if component == "magnitude": return self.magnitude_as_sf.display(component='values', kind=kind, annotate=annotate, **plotargs) if component == "mask": return self.comp_x_as_sf.display(component='mask', kind=kind, annotate=annotate, **plotargs) # getting datas axis_x, axis_y = self.axis_x, self.axis_y unit_x, unit_y = self.unit_x, self.unit_y X, Y = np.meshgrid(self.axis_y, self.axis_x) # quiver if kind == 'quiver': if 'color' in list(plotargs.keys()): C = plotargs.pop('color') if 'c' in plotargs.keys(): plotargs.pop('c') else: C = self.magnitude.transpose() if 'c' in plotargs.keys(): plotargs.pop('c') plot = plt.quiver(axis_x, axis_y, self.comp_x.transpose(), self.comp_y.transpose(), C, **plotargs) if kind == 'stream': # set adptative linewidth if 'lw' in list(plotargs.keys()): tmp_lw = plotargs.pop('lw') elif 'linewidth' in list(plotargs.keys()): tmp_lw = plotargs.pop('linewidth') else: tmp_lw = 1 if np.array(tmp_lw).shape != (): pass else: magn = self.magnitude magn[np.isnan(magn)] = 0 tmp_lw *= 0.1 + 0.9*magn/np.max(magn) tmp_lw = tmp_lw.transpose() # set color if 'color' in list(plotargs.keys()): color = plotargs.pop('color') if 'c' in plotargs.keys(): plotargs.pop('c') else: color = self.magnitude.transpose() if 'c' in plotargs.keys(): plotargs.pop('c') # plot Vx = self.comp_x.transpose() Vy = self.comp_y.transpose() print(axis_x) print(axis_y) print(Vx) print(Vy) print(color) plot = plt.streamplot(axis_x, axis_y, Vx, Vy, color=color, linewidth=tmp_lw, **plotargs) # Annotate quiver/streams if annotate: # labels if unit_x.strUnit() == "[]": plt.xlabel("x") else: plt.xlabel("x " + unit_x.strUnit()) if unit_y.strUnit() == "[]": plt.ylabel("y") else: plt.ylabel("y " + unit_y.strUnit()) # title plt.title("") # colorbar try: if kind == 'stream': cb = plt.colorbar(plot.lines) else: cb = plt.colorbar(plot) if self.unit_values.strUnit() == "[]": cb.set_label("Magnitude") else: cb.set_label(self.unit_values.strUnit()) except TypeError: pass # search for limits in case of masked field if component != 'mask': mask = self.mask for i in np.arange(len(self.axis_x)): if not np.all(mask[i, :]): break xmin = self.axis_x[i] for i in np.arange(len(self.axis_x) - 1, -1, -1): if not np.all(mask[i, :]): break xmax = self.axis_x[i] for i in np.arange(len(self.axis_y)): if not np.all(mask[:, i]): break ymin = self.axis_y[i] for i in np.arange(len(self.axis_y) - 1, -1, -1): if not np.all(mask[:, i]): break ymax = self.axis_y[i] plt.xlim([xmin, xmax]) plt.ylim([ymin, ymax]) else: raise ValueError("'component' must be 'x', 'y', 'magnitude', " "'V' or 'mask'") return plt.gca()