#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Contains the class that handle a Horizontal 2D field.
"""
import copy
import numpy
from footprints import FPDict, FPList, proxy as fpx
from epygram import config, epygramError
from epygram.util import write_formatted
from epygram.base import Field, FieldSet, FieldValidity, FieldValidityList, Resource
from epygram.geometries import D3Geometry, SpectralGeometry
[docs]class D3CommonField(Field):
"""
3-Dimensions common field class.
"""
_collector = ('field',)
_abstract = True
_footprint = dict(
attr=dict(
structure=dict(
values=set(['3D']))
)
)
@property
[docs] def spectral(self):
"""Returns True if the field is spectral."""
return self.spectral_geometry != None
##############
# ABOUT DATA #
##############
[docs] def getvalue_ll(self, lon=None, lat=None, level=None, validity=None,
interpolation='nearest',
neighborinfo=False,
one=True,
external_distance=None):
"""
Returns the value of the field on point of coordinates (*lon, lat, level*): \n
- if *interpolation == 'nearest'* (default), returns the value of the
nearest neighboring gridpoint;
- if *interpolation == 'linear'*, computes and returns the field value
with linear spline interpolation;
- if *interpolation == 'cubic'*, computes and returns the field value
with cubic spline interpolation.
*level* is the True level not the index of the level. Depending on the
vertical coordinate, it could be expressed in Pa, m.
*validity* is a FieldValidity or a FieldValidityList instance
If *neighborinfo* is set to **True**, returns a tuple
*(value, (lon, lat))*, with *(lon, lat)* being the actual coordinates
of the neighboring gridpoint (only for *interpolation == 'nearest'*).
*lon* and *lat* may be longer than 1.
If *one* is False and len(lon) is 1, returns [value] instead of value.
*external_distance* can be a dict containing the target point value
and an external field on the same grid as self, to which the distance
is computed within the 4 horizontally nearest points; e.g.
{'target_value':4810, 'external_field':an_H2DField_with_same_geometry}.
If so, the nearest point is selected with
distance = |target_value - external_field.data|
Warning: for interpolation on Gauss geometries, requires the
:mod:`pyproj` module.
"""
if isinstance(validity, FieldValidity):
myvalidity = FieldValidityList(validity)
else:
myvalidity = validity
if self.spectral:
raise epygramError("field must be gridpoint to get value of a" + \
" lon/lat point.")
if len(self.validity) > 1 and myvalidity is None:
raise epygramError("*validity* is mandatory when there are several validities")
if self.geometry.datashape['k'] and level is None:
raise epygramError("*level* is mandatory when field has a vertical coordinate")
if (self.geometry.datashape['j'] or self.geometry.datashape['i']) and \
(lon is None or lat is None):
raise epygramError("*lon* and *lat* are mandatory when field has an horizontal extension")
maxsize = numpy.array([numpy.array(dim).size for dim in [lon, lat, level] if dim is not None]).max()
if myvalidity is not None:
maxsize = max(maxsize, len(myvalidity))
#We look for indexes for vertical and time coordinates (no interpolation)
if myvalidity is None:
my_t = numpy.zeros(maxsize, dtype=int)
else:
my_t = []
for v in myvalidity:
for t in range(len(self.validity)):
if v == self.validity[t]:
my_t.append(t)
break
my_t = numpy.array(my_t)
if my_t.size != maxsize:
if my_t.size != 1:
raise epygramError("validity must be uniq or must have the same length as other indexes")
my_t = numpy.array([my_t.item()] * maxsize)
if len(self.geometry.vcoordinate.levels) != len(set(self.geometry.vcoordinate.levels)):
raise epygramError('Some levels are represented twice in levels list.')
if level is None:
my_k = numpy.zeros(maxsize, dtype=int)
else:
my_level = numpy.array(level)
if my_level.size == 1:
my_level = numpy.array([my_level.item()])
my_k = []
for l in my_level:
my_k.append(self.geometry.vcoordinate.levels.index(l))
my_k = numpy.array(my_k)
if my_k.size != maxsize:
if my_k.size != 1:
raise epygramError("k must be scalar or must have the same length as other indexes")
my_k = numpy.array([my_k.item()] * maxsize)
my_lon = numpy.array(lon)
if my_lon.size == 1:
my_lon = numpy.array([my_lon.item()])
my_lat = numpy.array(lat)
if my_lat.size == 1:
my_lat = numpy.array([my_lat.item()])
if my_lon.size != maxsize or my_lat.size != maxsize:
raise epygramError("lon and lat must have the same length and the same length as level and validity")
if interpolation == 'nearest':
(ri, rj) = self.geometry.nearest_points(lon, lat, 'nearest',
external_distance=external_distance)
value = self.getvalue_ij(ri, rj, my_k, my_t, one=one)
if neighborinfo:
(lon, lat) = self.geometry.ij2ll(ri, rj)
if numpy.shape(lon) in ((1,), ()):
lon = float(lon)
lat = float(lat)
value = (value, (lon, lat))
elif interpolation in ('linear', 'cubic'):
from scipy.interpolate import interp1d, interp2d
nvalue = numpy.zeros(maxsize)
for n in range(maxsize):
if maxsize > 1:
lonn = lon[n]
latn = lat[n]
my_kn = my_k[n]
my_tn = my_t[n]
else:
#FIXME: OK ? looks like. Seb to check
lonn = my_lon.item()
latn = my_lat.item()
#lonn = lon.item()
#latn = lat.item()
my_kn = my_k.item()
my_tn = my_t.item()
points = self.geometry.nearest_points(lonn, latn,
interpolation)
lons = [self.geometry.ij2ll(i, j)[0] for (i, j) in points]
lats = [self.geometry.ij2ll(i, j)[1] for (i, j) in points]
values = [self.getvalue_ij(i, j, my_kn, my_tn, one=True) for (i, j) in points]
if self.geometry.name == 'academic' and \
(self.geometry.dimensions['X'] == 1 or \
self.geometry.dimensions['Y'] == 1):
if self.geometry.dimensions['X'] == 1:
f = interp1d(lats, values, kind=interpolation)
value = f(latn)
else:
f = interp1d(lons, values, kind=interpolation)
value = f(lonn)
else:
f = interp2d(lons, lats, values, kind=interpolation)
value = f(lonn, latn)
nvalue[n] = value
value = nvalue
if one:
try:
value = float(value)
except (ValueError, TypeError):
pass
return copy.copy(value)
[docs] def extract_subdomain(self, geometry, interpolation='nearest',
external_distance=None,
exclude_extralevels=True):
"""
Extracts a subdomain from a field, given a new geometry.
Args: \n
- *geometry* defines the geometry on which extract data
- *interpolation* defines the interpolation function used to compute
the profile at requested lon/lat from the fields grid:
- if 'nearest' (default), extracts profile at the horizontal nearest
neighboring gridpoint;
- if 'linear', computes profile with horizontal linear interpolation;
- if 'cubic', computes profile with horizontal cubic interpolation.
- *external_distance* can be a dict containing the target point value
and an external field on the same grid as self, to which the distance
is computed within the 4 horizontally nearest points; e.g.
{'target_value':4810, 'external_field':an_H2DField_with_same_geometry}.
If so, the nearest point is selected with
distance = |target_value - external_field.data|
- *exclude_extralevels* if True levels with no physical meaning are
suppressed.
"""
# build subdomain fid
subdomainfid = {key:(FPDict(value) if type(value) == type(dict()) else value) for (key, value) in self.fid.iteritems()}
# build vertical geometry
kwargs_vcoord = {'structure':'V',
'typeoffirstfixedsurface': self.geometry.vcoordinate.typeoffirstfixedsurface,
'position_on_grid': self.geometry.vcoordinate.position_on_grid,
}
if self.geometry.vcoordinate.typeoffirstfixedsurface == 119:
kwargs_vcoord['grid'] = copy.copy(self.geometry.vcoordinate.grid)
kwargs_vcoord['levels'] = copy.copy(self.geometry.vcoordinate.levels)
elif self.geometry.vcoordinate.typeoffirstfixedsurface == 118:
kwargs_vcoord['grid'] = copy.copy(self.geometry.vcoordinate.grid)
kwargs_vcoord['levels'] = copy.copy(self.geometry.vcoordinate.levels)
#Suppression of levels above or under physical domain
if exclude_extralevels:
for level in kwargs_vcoord['levels']:
if level < 1 or level > len(self.geometry.vcoordinate.grid['gridlevels']) - 1:
kwargs_vcoord['levels'].remove(level)
kwargs_vcoord['levels']
elif self.geometry.vcoordinate.typeoffirstfixedsurface in [100, 103, 109]:
kwargs_vcoord['levels'] = copy.copy(self.geometry.vcoordinate.levels)
else:
raise NotImplementedError("type of first surface level: " + str(self.geometry.vcoordinate.typeoffirstfixedsurface))
if geometry.vcoordinate.typeoffirstfixedsurface not in [255, kwargs_vcoord['typeoffirstfixedsurface']]:
raise epygramError("extract_subdomain cannot change vertical coordinate.")
if geometry.vcoordinate.position_on_grid not in ['__unknown__', kwargs_vcoord['position_on_grid']]:
raise epygramError("extract_subdomain cannot change position on vertical grid.")
if geometry.vcoordinate.grid != {} and geometry.vcoordinate.grid != kwargs_vcoord['grid']:
#One could check if requested grid is a subsample of field grid
raise epygramError("extract_subdomain cannot change vertical grid")
if geometry.vcoordinate.levels != []:
for level in geometry.vcoordinate.levels:
if level not in kwargs_vcoord['levels']:
raise epygramError("extract_subdomain cannot do vertical interpolations.")
kwargs_vcoord['levels'] = geometry.levels
vcoordinate = fpx.geometry(**kwargs_vcoord)
# build geometry
kwargs_geom = {'structure': geometry.structure,
'name': geometry.name,
'grid': dict(geometry.grid), #do not remove dict(), it is usefull for unstructured grid
'dimensions': copy.copy(geometry.dimensions),
'vcoordinate': vcoordinate,
'position_on_horizontal_grid': 'center'
}
if geometry.projected_geometry:
kwargs_geom['projection'] = copy.copy(geometry.projection)
kwargs_geom['projtool'] = geometry.projtool
kwargs_geom['geoid'] = geometry.geoid
if geometry.position_on_horizontal_grid not in [None, '__unknown__', kwargs_geom['position_on_horizontal_grid']]:
raise epygramError("extract_subdomain cannot deal with position_on_horizontal_grid other than 'center'")
newgeometry = fpx.geometry(**kwargs_geom)
# location & interpolation
lons, lats = newgeometry.get_lonlat_grid()
for (lon, lat) in numpy.nditer([lons, lats]):
if not self.geometry.point_is_inside_domain_ll(lon, lat):
raise ValueError("point (" + str(lon) + ", " + \
str(lat) + ") is out of field domain.")
comment = None
if interpolation == 'nearest':
if lons.size == 1:
true_loc = self.geometry.ij2ll(*self.geometry.nearest_points(lons, lats,
'nearest',
external_distance=external_distance))
distance = self.geometry.distance((float(lons), float(lats)),
(float(true_loc[0]),
float(true_loc[1])))
az = self.geometry.azimuth((float(lons), float(lats)),
(float(true_loc[0]),
float(true_loc[1])))
if -22.5 < az <= 22.5:
direction = 'N'
elif -77.5 < az <= -22.5:
direction = 'NW'
elif -112.5 < az <= -77.5:
direction = 'W'
elif -157.5 < az <= -112.5:
direction = 'SW'
elif -180.5 <= az <= -157.5 or 157.5 < az <= 180.:
direction = 'S'
elif 22.5 < az <= 77.5:
direction = 'NE'
elif 77.5 < az <= 112.5:
direction = 'E'
elif 112.5 < az <= 157.5:
direction = 'SE'
gridpointstr = "(" + \
'{:.{precision}{type}}'.format(float(true_loc[0]),
type='F',
precision=4) + \
", " + \
'{:.{precision}{type}}'.format(float(true_loc[1]),
type='F',
precision=4) + \
")"
comment = "Profile @ " + str(int(distance)) + "m " + \
direction + " from " + str((float(lons), float(lats))) + \
"\n" + "( = nearest gridpoint: " + gridpointstr + ")"
elif interpolation in ('linear', 'cubic'):
if interpolation == 'linear':
interpstr = 'linearly'
elif interpolation == 'cubic':
interpstr = 'cubically'
if lons.size == 1:
comment = "Profile " + interpstr + " interpolated @ " + \
str((float(lons), float(lats)))
else:
raise NotImplementedError(interpolation + "interpolation.")
# Values
shape = [] if len(self.validity) == 1 else [len(self.validity)]
shape += [len(newgeometry.vcoordinate.levels)] + list(lons.shape)
data = numpy.ndarray(tuple(shape))
for t in range(len(self.validity)):
for k in range(len(newgeometry.vcoordinate.levels)):
level = newgeometry.vcoordinate.levels[k]
pos = (k) if len(self.validity) == 1 else (t, k)
data[pos] = self.getvalue_ll(lons, lats, level, self.validity[t],
interpolation=interpolation,
external_distance=external_distance)
# Field
newfield = fpx.field(fid=FPDict(subdomainfid),
structure=newgeometry.structure,
geometry=newgeometry,
validity=self.validity,
processtype=self.processtype,
comment=comment)
newfield.setdata(newgeometry.reshape_data(data, len(self.validity)))
return newfield
###################
# PRE-APPLICATIVE #
###################
# (but useful and rather standard) !
# [so that, subject to continuation through updated versions,
# including suggestions/developments by users...]
def plotfield(self):
raise NotImplementedError("plot of 3D field is not implemented")
[docs] def stats(self, subzone=None):
"""
Computes some basic statistics on the field, as a dict containing:
{'min', 'max', 'mean', 'std', 'quadmean', 'nonzero'}.
See each of these methods for details.
- *subzone*: optional, among ('C', 'CI'), for LAM fields only, plots
the data resp. on the C or C+I zone. \n
Default is no subzone, i.e. the whole field.
"""
return {'min':self.min(subzone=subzone),
'max':self.max(subzone=subzone),
'mean':self.mean(subzone=subzone),
'std':self.std(subzone=subzone),
'quadmean':self.quadmean(subzone=subzone),
'nonzero':self.nonzero(subzone=subzone)}
[docs] def min(self, subzone=None):
"""Returns the minimum value of data."""
data = self.getdata(subzone=subzone)
return numpy.ma.masked_outside(data,
- config.mask_outside,
config.mask_outside).min()
[docs] def max(self, subzone=None):
"""Returns the maximum value of data."""
data = self.getdata(subzone=subzone)
return numpy.ma.masked_outside(data,
- config.mask_outside,
config.mask_outside).max()
[docs] def mean(self, subzone=None):
"""Returns the mean value of data."""
data = self.getdata(subzone=subzone)
return numpy.ma.masked_outside(data,
- config.mask_outside,
config.mask_outside).mean()
[docs] def std(self, subzone=None):
"""Returns the standard deviation of data."""
data = self.getdata(subzone=subzone)
return numpy.ma.masked_outside(data,
- config.mask_outside,
config.mask_outside).std()
[docs] def quadmean(self, subzone=None):
"""Returns the quadratic mean of data."""
data = self.getdata(subzone=subzone)
return numpy.sqrt((numpy.ma.masked_outside(data,
- config.mask_outside,
config.mask_outside) ** 2).mean())
[docs] def nonzero(self, subzone=None):
"""
Returns the number of non-zero values (whose absolute
value > config.epsilon).
"""
data = self.getdata(subzone=subzone)
return numpy.count_nonzero(abs(numpy.ma.masked_outside(data,
- config.mask_outside,
config.mask_outside)) > config.epsilon)
[docs] def dctspectrum(self, k=None, subzone=None):
"""
Returns the DCT spectrum of the field, as a
:class:`epygram.spectra.Spectrum` instance.
*k* is the level index to use to compute de DCT
"""
import epygram.spectra as esp
if k == None and self.geometry.datashape['k']:
raise epygramError("One must choose the level for a 3D field.")
if (not self.geometry.datashape['k']) and k not in [None, 0]:
raise epygramError("k must be None or 0 for a 2D field.")
if k == None:
my_k = 0
else:
my_k = k
if len(self.validity) > 1:
#One must add a validity argument to enable feature
raise NotImplementedError("dctspectrum not yet implemented for multi validities fields.")
field2d = self.getlevel(k=my_k)
variances = esp.dctspectrum(field2d.geometry.reshape_data(field2d.getdata(subzone=subzone),
len(field2d.validity), d4=True)[0, 0])
spectrum = esp.Spectrum(variances[1:],
name=str(self.fid),
resolution=self.geometry.grid['X_resolution'] / 1000.,
mean2=variances[0])
return spectrum
[docs] def global_shift_center(self, longitude_shift):
"""
For global RegLLGeometry grids only !
Shifts the center of the geometry (and the data accordingly) by
*longitude_shift* (in degrees). *longitude_shift* has to be a multiple
of the grid's resolution in longitude.
"""
if self.geometry.name != 'regular_lonlat':
raise epygramError("only for regular lonlat geometries.")
self.geometry.global_shift_center(longitude_shift)
n = longitude_shift / self.geometry.grid['X_resolution'].get('degrees')
data4d = self.geometry.reshape_data(self.getdata(), len(self.validity), d4=True)
for t in range(len(self.validity)):
for k in range(len(self.data)):
data4d[t, k] = numpy.concatenate((data4d[t, k, :, n:], data4d[t, k, :, 0:n]),
axis=1)
self.setdata(self.geometry.reshape_data(data4d, len(self.validity)))
[docs] def what(self, out,
validity=True,
vertical_geometry=True,
cumulativeduration=True,
arpifs_var_names=False,
fid=True):
"""
Writes in file a summary of the field.
Args: \n
- *out*: the output open file-like object (duck-typing: *out*.write()
only is needed).
- *vertical_geometry*: if True, writes the validity of the
field.
- *vertical_geometry*: if True, writes the vertical geometry of the
field.
- *cumulativeduration*: if False, not written.
- *arpifs_var_names*: if True, prints the equivalent 'arpifs' variable
names.
- *fid*: if True, prints the fid.
"""
if self.spectral:
spectral_geometry = self.spectral_geometry.truncation
else:
spectral_geometry = None
write_formatted(out, "Kind of producting process", self.processtype)
if validity:
self.validity.what(out, cumulativeduration=cumulativeduration)
self.geometry.what(out,
vertical_geometry=vertical_geometry,
arpifs_var_names=arpifs_var_names,
spectral_geometry=spectral_geometry)
if fid:
for key in self.fid:
write_formatted(out, "fid " + key, self.fid[key])
#############
# OPERATORS #
#############
def _check_operands(self, other):
"""
Internal method to check compatibility of terms in operations on fields.
"""
if isinstance(other, self.__class__):
if self.spectral != other.spectral:
raise epygramError("cannot operate a spectral field with a" + \
" non-spectral field.")
if self.geometry.dimensions != other.geometry.dimensions:
raise epygramError("operations on fields cannot be done if" + \
" fields do not share their gridpoint" + \
" dimensions.")
if self.spectral_geometry != other.spectral_geometry:
raise epygramError("operations on fields cannot be done if" + \
" fields do not share their spectral" + \
" geometry.")
else:
super(D3CommonField, self)._check_operands(other)
def __add__(self, other):
"""
Definition of addition, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'+'} and null validity.
"""
newfield = self._add(other,
structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry)
return newfield
def __mul__(self, other):
"""
Definition of multiplication, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'*'} and null validity.
"""
newfield = self._mul(other,
structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry)
return newfield
def __sub__(self, other):
"""
Definition of substraction, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'-'} and null validity.
"""
newfield = self._sub(other,
structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry)
return newfield
def __div__(self, other):
"""
Definition of division, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'/'} and null validity.
"""
newfield = self._div(other,
structure=self.structure,
geometry=self.geometry,
spectral_geometry=self.spectral_geometry)
return newfield
[docs]class D3Field(D3CommonField):
"""
3-Dimensions field class.
A field is defined by its identifier 'fid',
its data, its geometry (gridpoint and optionally spectral),
and its validity.
The natural being of a field is gridpoint, so that:
a field always has a gridpoint geometry, but it has a spectral geometry only
in case it is spectral.
"""
_collector = ('field',)
_footprint = dict(
attr=dict(
structure=dict(
values=set(['3D'])),
geometry=dict(type=D3Geometry),
validity=dict(
type=FieldValidityList,
optional=True,
access='rwx',
default=FieldValidityList()),
spectral_geometry=dict(
type=SpectralGeometry,
optional=True),
processtype=dict(
optional=True,
info="Generating process.")
)
)
##############
# ABOUT DATA #
##############
[docs] def sp2gp(self):
"""
Transforms the spectral field into gridpoint, according to its spectral
geometry. Replaces data in place.
The spectral transform subroutine is actually included in the spectral
geometry's *sp2gp()* method.
"""
if len(self.validity) != 1:
raise NotImplementedError("sp2gp not implemented for fields with several validities")
if self.spectral:
if len(self.data.shape) != (2 if self.geometry.datashape['k'] else 1):
raise epygramError("spectral data must be 2D for a 3D field and 1D for a 2D field")
if self.geometry.rectangular_grid:
# LAM
gpdims = {}
for dim in ['X', 'Y', 'X_CIzone', 'Y_CIzone']:
gpdims[dim] = self.geometry.dimensions[dim]
else:
# global
gpdims = {}
for dim in ['lat_number', 'lon_number_by_lat']:
gpdims[dim] = self.geometry.dimensions[dim]
if self.geometry.datashape['k']:
dataList = []
for k in range(len(self.data)):
data2d = self.geometry.reshape_data(self.spectral_geometry.sp2gp(self.data[k], gpdims),
len(self.validity))
dataList.append(data2d)
result = numpy.array(dataList)
else:
result = self.geometry.reshape_data(self.spectral_geometry.sp2gp(self.data, gpdims),
len(self.validity))
self._attributes['spectral_geometry'] = None
self.setdata(result)
[docs] def gp2sp(self, spectral_geometry):
"""
Transforms the gridpoint field into spectral space, according to the
*spectral geometry* mandatorily passed as argument. Replaces data in
place.
The spectral transform subroutine is actually included in the spectral
geometry's *gp2sp()* method.
"""
if len(self.validity) != 1:
raise NotImplementedError("sp2gp not implemented for fields with several validities")
if not self.spectral:
if len(self.data.shape) != (3 if self.geometry.datashape['k'] else 2):
raise epygramError("spectral data must be 3D for a 3D field and 2D for a 2D field")
if not isinstance(spectral_geometry, SpectralGeometry):
raise epygramError("a spectral geometry (SpectralGeometry" + \
" instance) must be passed as argument" + \
" to gp2sp()")
if self.geometry.rectangular_grid:
# LAM
if self.geometry.grid['LAMzone'] != 'CIE':
raise epygramError("this field is not bi-periodicized:" + \
" it cannot be transformed into" + \
" spectral space.")
gpdims = {}
for dim in ['X', 'Y', 'X_CIzone', 'Y_CIzone']:
gpdims[dim] = self.geometry.dimensions[dim]
if self.geometry.datashape['k']:
dataList = []
for k in range(len(self.data)):
data1d = self.data[k].flatten()
dataList.append(spectral_geometry.gp2sp(data1d, gpdims))
result = numpy.array(dataList)
else:
data1d = self.data.flatten()
result = spectral_geometry.gp2sp(data1d, gpdims)
else:
# global
gpdims = {}
for dim in ['lat_number', 'lon_number_by_lat']:
gpdims[dim] = self.geometry.dimensions[dim]
if self.geometry.datashape['k']:
dataList = []
for k in range(len(self.data)):
data1d = self.data[k].compressed()
dataList.append(spectral_geometry.gp2sp(data1d, gpdims))
result = numpy.array(dataList)
else:
data1d = self.data.compressed()
result = spectral_geometry.gp2sp(data1d, gpdims)
self._attributes['spectral_geometry'] = spectral_geometry
self.setdata(result)
[docs] def getdata(self, subzone=None):
"""
Returns the field data, with 3D shape if the field is not spectral,
2D if spectral.
- *subzone*: optional, among ('C', 'CI'), for LAM fields only, returns
the data resp. on the C or C+I zone.
Default is no subzone, i.e. the whole field.
Shape of 3D data: \n
- Rectangular grids:\n
grid[k,0,0] is SW, grid[k,-1,-1] is NE \n
grid[k,0,-1] is SE, grid[k,-1,0] is NW \n
with k the level
- Gauss grids:\n
grid[k,0,:Nj] is first (Northern) band of latitude, masked after
Nj = number of longitudes for latitude j \n
grid[k,-1,:Nj] is last (Southern) band of latitude (idem). \n
with k the level
"""
if not self.spectral and subzone and \
self.geometry.grid.get('LAMzone') is not None:
data = self.geometry.extract_subzone(self.data, len(self.validity), subzone)
else:
data = self.data
return data
[docs] def setdata(self, data):
"""
Sets field data, checking *data* to have the good shape according to geometry.
"""
dimensions = 0
if len(self.validity) > 1:
dimensions += 1
if self.geometry.datashape['k']:
dimensions += 1
if self.spectral:
dimensions += 1
dataType = "spectral"
else:
if self.geometry.datashape['j']:
dimensions += 1
if self.geometry.datashape['i']:
dimensions += 1
dataType = "gridpoint"
if len(numpy.shape(data)) != dimensions:
raise epygramError(dataType + " data must be " + str(dimensions) + "D array.")
super(D3Field, self).setdata(data)
[docs] def select_subzone(self, subzone):
"""
If a LAMzone defines the field, select only the *subzone* from it.
*subzone* among ('C', 'CI').
Warning: modifies the field and its geometry in place !
"""
if self.geometry.grid.get('LAMzone') is not None:
data = self.getdata(subzone=subzone)
self._attributes['geometry'] = self.geometry.select_subzone(subzone)
self.setdata(data)
[docs] def getvalue_ij(self, i=None, j=None, k=None, t=None,
one=True):
"""
Returns the value of the field on point of indices (*i, j, k, t*).
Take care (*i, j, k, t*) is python-indexing, ranging from 0 to dimension - 1.
*k* is the index of the level (not a value in Pa or m...)
*t* is the index of the temporal dimension (not a validity object)
*k* and *t* can be scalar even if *i* and *j* are arrays.
If *one* is False, returns [value] instead of value.
"""
if len(self.validity) > 1 and t is None:
raise epygramError("*t* is mandatory when there are several validities")
if self.geometry.datashape['k'] and k is None:
raise epygramError("*k* is mandatory when field has a vertical coordinate")
if self.geometry.datashape['j'] and j is None:
raise epygramError("*j* is mandatory when field has a two horizontal dimensions")
if self.geometry.datashape['i'] and j is None:
raise epygramError("*i* is mandatory when field has one horizontal dimension")
if not self.geometry.point_is_inside_domain_ij(i, j):
raise ValueError("point is out of field domain.")
maxsize = numpy.array([numpy.array(dim).size for dim in [i, j, k, t] if dim is not None]).max()
if t is None:
my_t = numpy.zeros(maxsize, dtype=int)
else:
my_t = numpy.array(t)
if my_t.size != maxsize:
if my_t.size != 1:
raise epygramError("t must be scalar or must have the same length as other indexes")
my_t = numpy.array([my_t.item()] * maxsize)
if k is None:
my_k = numpy.zeros(maxsize, dtype=int)
else:
my_k = numpy.array(k)
if my_k.size != maxsize:
if my_k.size != 1:
raise epygramError("k must be scalar or must have the same length as other indexes")
my_k = numpy.array([my_k.item()] * maxsize)
if j is None:
my_j = numpy.zeros(maxsize, dtype=int)
else:
my_j = numpy.array(j)
if my_j.size != maxsize:
raise epygramError("j must have the same length as other indexes")
if i is None:
my_i = numpy.zeros(maxsize, dtype=int)
else:
my_i = numpy.array(i)
if my_i.size != maxsize:
raise epygramError("i must have the same length as other indexes")
value = numpy.copy(self.geometry.reshape_data(self.data, len(self.validity), d4=True)[my_t, my_k, my_j, my_i])
if value.size == 1 and one:
value = value.item()
return value
[docs] def getlevel(self, level=None, k=None):
"""
Returns a level of the field as a new field.
*level* is the requested level expressed in coordinate value (Pa, m...)
*k* is the index of the requested level
"""
if k == None and level == None:
raise epygramError("You must give k or level.")
if k != None and level != None:
raise epygramError("You cannot give, at the same time, k and level")
if level != None:
if level not in self.geometry.vcoordinate.levels:
raise epygramError("The requested level does not exist.")
my_k = self.geometry.vcoordinate.levels.index(level)
else:
my_k = k
if self.structure == '3D':
newstructure = 'H2D'
elif self.structure == 'V2D':
newstructure = 'H2D'
elif self.structure == 'V1D':
newstructure = 'Point'
else:
raise epygramError("It's not possible to extract a level from a " + self.structure + " field.")
kwargs_vcoord = {'structure': 'V',
'typeoffirstfixedsurface': self.geometry.vcoordinate.typeoffirstfixedsurface,
'position_on_grid': self.geometry.vcoordinate.position_on_grid,
'levels':[level]
}
if self.geometry.vcoordinate.typeoffirstfixedsurface in (118, 119):
kwargs_vcoord['grid'] = copy.copy(self.geometry.vcoordinate.grid)
newvcoordinate = fpx.geometry(**kwargs_vcoord)
kwargs_geom = {'structure':newstructure,
'name': self.geometry.name,
'grid': dict(self.geometry.grid),
'dimensions': copy.copy(self.geometry.dimensions),
'vcoordinate': newvcoordinate,
'position_on_horizontal_grid': self.geometry.position_on_horizontal_grid
}
if self.geometry.projected_geometry:
kwargs_geom['projection'] = copy.copy(self.geometry.projection)
kwargs_geom['projtool'] = self.geometry.projtool
kwargs_geom['geoid'] = self.geometry.geoid
newgeometry = fpx.geometry(**kwargs_geom)
generic_fid = self.fid['generic']
generic_fid['level'] = level
kwargs_field = {'structure':newstructure,
'validity':self.validity.copy(),
'processtype':self.processtype,
'geometry':newgeometry,
'fid':{'generic':generic_fid}}
if self.spectral_geometry is not None:
kwargs_field['spectral_geometry'] = self.spectral_geometry.copy()
newfield = fpx.field(**kwargs_field)
newfield.setdata(self.data[my_k, :, :])
return newfield
[docs]class D3VirtualField(D3CommonField):
"""
3-Dimensions field class.
A field is defined by its identifier 'fid',
Data is taken from other fields.
"""
_collector = ('field',)
_footprint = dict(
attr=dict(
structure=dict(
values=set(['3D'])),
fieldset=dict(
type=FieldSet,
optional=True,
default=FieldSet()),
resource=dict(
type=Resource,
optional=True),
resource_fids=dict(
type=FPList,
optional=True,
default=FPList()),
)
)
def __init__(self, *args, **kwargs):
"""
Constructor. See its footprint for arguments.
"""
super(D3VirtualField, self).__init__(*args, **kwargs)
if self.fieldset != FieldSet():
if self.resource is not None or self.resource_fids != FPList():
raise epygramError("You cannot set fieldset and (resource or resource_fids) at the same time.")
raise NotImplementedError("D3VirtualField from a fieldset is not yet implemented")
else:
self._lastlevelread = None
if self.resource is None or self.resource_fids == FPList():
raise epygramError("If you do not set fieldset, you need to provide resource and resource_fids.")
fidlist = self.resource.find_fields_in_resource(seed=self.resource_fids,
fieldtype=['H2D', '3D'])
if len(fidlist) == 0:
raise epygramError("There is no field in resource matching with resource_fids")
first = True
self._fidList = []
levelList = []
for fid in fidlist:
field = self.resource.readfield(fid, getdata=False)
if field.structure != 'H2D':
raise epygramError("3D virtual fields must be build from H2D fields only")
if first:
self._geometry = field.geometry.copy()
self._validity = field.validity.copy()
if field.spectral_geometry is not None:
self._spectral_geometry = field.spectral_geometry.copy()
else:
self._spectral_geometry = None
self._processtype = field.processtype
else:
if self._geometry.structure != field.geometry.structure or \
self._geometry.name != field.geometry.name or \
self._geometry.grid != field.geometry.grid or \
self._geometry.dimensions != field.geometry.dimensions or \
self._geometry.position_on_horizontal_grid != field.geometry.position_on_horizontal_grid:
raise epygramError("All H2D fields must share the horizontal geometry")
if self._geometry.projected_geometry or field.geometry.projected_geometry:
if self._geometry.projection != field.geometry.projection or \
self._geometry.geoid != field.geometry.geoid:
raise epygramError("All H2D fields must share the geometry projection")
if self._geometry.vcoordinate.typeoffirstfixedsurface != field.geometry.vcoordinate.typeoffirstfixedsurface or \
self._geometry.vcoordinate.position_on_grid != field.geometry.vcoordinate.position_on_grid:
raise epygramError("All H2D fields must share the vertical geometry")
if self._geometry.vcoordinate.grid is not None or field.geometry.vcoordinate.grid is not None:
if self._geometry.vcoordinate.grid != field.geometry.vcoordinate.grid:
raise epygramError("All H2D fields must share the vertical grid")
if self._validity != field.validity:
raise epygramError("All H2D fields must share the validity")
if self._spectral_geometry != field.spectral_geometry:
raise epygramError("All H2D fields must share the spectral geometry")
if self._processtype != field.processtype:
raise epygramError("All H2D fields must share the sprocesstype")
if len(field.geometry.vcoordinate.levels) != 1:
raise epygramError("H2D fields must have only one level")
if field.geometry.vcoordinate.levels[0] in levelList:
raise epygramError("This level have already been found")
levelList.append(field.geometry.vcoordinate.levels[0])
self._fidList.append(fid)
kwargs_vcoord = dict(structure='V',
typeoffirstfixedsurface=self._geometry.vcoordinate.typeoffirstfixedsurface,
position_on_grid=self._geometry.vcoordinate.position_on_grid)
if self._geometry.vcoordinate.grid is not None:
kwargs_vcoord['grid'] = self._geometry.vcoordinate.grid
kwargs_vcoord['levels'] = sorted(levelList) #TOBECHECKED: sorted
self._geometry.vcoordinate = fpx.geometry(**kwargs_vcoord)
self._spgpOpList = []
@property
def geometry(self):
return self._geometry
@property
def validity(self):
return self._validity
@property
def spectral_geometry(self):
return self._spectral_geometry
@property
def processtype(self):
return self._processtype
##############
# ABOUT DATA #
##############
[docs] def sp2gp(self):
"""
Transforms the spectral field into gridpoint, according to its spectral
geometry. Replaces data in place.
The spectral transform subroutine is actually included in the spectral
geometry's *sp2gp()* method.
"""
self._spgpOpList.append(('sp2gp', {}))
self._spectral_geometry = None
[docs] def gp2sp(self, spectral_geometry):
"""
Transforms the gridpoint field into spectral space, according to the
*spectral geometry* mandatorily passed as argument. Replaces data in
place.
The spectral transform subroutine is actually included in the spectral
geometry's *gp2sp()* method.
"""
self._spgpOpList.append(('gp2sp', {'spectral_geometry':spectral_geometry}))
[docs] def getdata(self, subzone=None):
"""
Returns the field data, with 3D shape if the field is not spectral,
2D if spectral.
- *subzone*: optional, among ('C', 'CI'), for LAM fields only, returns
the data resp. on the C or C+I zone.
Default is no subzone, i.e. the whole field.
Shape of 3D data: \n
- Rectangular grids:\n
grid[k,0,0] is SW, grid[k,-1,-1] is NE \n
grid[k,0,-1] is SE, grid[k,-1,0] is NW \n
with k the level
- Gauss grids:\n
grid[k,0,:Nj] is first (Northern) band of latitude, masked after
Nj = number of longitudes for latitude j \n
grid[k,-1,:Nj] is last (Southern) band of latitude (idem). \n
with k the level
"""
dataList = []
for k in range(len(self.geometry.vcoordinate.levels)):
dataList.append(self.getlevel(k).getdata(subzone=subzone))
return numpy.array(dataList)
[docs] def setdata(self, data):
"""
Sets field data, checking *data* to have the good shape according to geometry.
"""
raise epygramError("setdata cannot be implemented on virtual fields")
[docs] def getvalue_ij(self, i=None, j=None, k=None, t=None,
one=True):
"""
Returns the value of the field on point of indices (*i, j, k, t*).
Take care (*i, j, k, t*) is python-indexing, ranging from 0 to dimension - 1.
*k* is the index of the level (not a value in Pa or m...)
*t* is the index of the temporal dimension (not a validity object)
*k* and *t* can be scalar even if *i* and *j* are arrays.
If *one* is False, returns [value] instead of value.
"""
if len(self.validity) > 1 and t is None:
raise epygramError("*t* is mandatory when there are several validities")
if self.geometry.datashape['k'] and k is None:
raise epygramError("*k* is mandatory when field has a vertical coordinate")
if self.geometry.datashape['j'] and j is None:
raise epygramError("*j* is mandatory when field has a two horizontal dimensions")
if self.geometry.datashape['i'] and j is None:
raise epygramError("*i* is mandatory when field has one horizontal dimension")
if not self.geometry.point_is_inside_domain_ij(i, j):
raise ValueError("point is out of field domain.")
maxsize = numpy.array([numpy.array(dim).size for dim in [i, j, k, t] if dim is not None]).max()
if t is None:
my_t = numpy.zeros(maxsize, dtype=int)
else:
my_t = numpy.array(t)
if my_t.size != maxsize:
if my_t.size != 1:
raise epygramError("t must be scalar or must have the same length as other indexes")
my_t = numpy.array([my_t.item()] * maxsize)
if k is None:
my_k = numpy.zeros(maxsize, dtype=int)
else:
my_k = numpy.array(k)
if my_k.size != maxsize:
if my_k.size != 1:
raise epygramError("k must be scalar or must have the same length as other indexes")
my_k = numpy.array([my_k.item()] * maxsize)
if j is None:
my_j = numpy.zeros(maxsize, dtype=int)
else:
my_j = numpy.array(j)
if my_j.size != maxsize:
raise epygramError("j must have the same length as other indexes")
if i is None:
my_i = numpy.zeros(maxsize, dtype=int)
else:
my_i = numpy.array(i)
if my_i.size != maxsize:
raise epygramError("i must have the same length as other indexes")
value = []
for x in range(my_k.size):
field2d = self.getlevel(k=my_k[x] if my_k.size > 1 else my_k.item())
if my_t.size == 1:
pos = (my_t.item(), 0, my_j.item(), my_i.item())
else:
pos = (my_t[x], 0, my_j[x], my_i[x])
value.append(field2d.geometry.reshape_data(field2d.data, len(self.validity), d4=True)[pos])
value = numpy.array(value)
if value.size == 1 and one:
value = value.item()
return value
[docs] def getlevel(self, level=None, k=None):
"""
Returns a level of the field as a new field.
*level* is the requested level expressed in coordinate value (Pa, m...)
*k* is the index of the requested level
"""
if k == None and level == None:
raise epygramError("You must give k or level.")
if k != None and level != None:
raise epygramError("You cannot give, at the same time, k and level")
if level != None:
if level not in self.geometry.vcoordinate.levels:
raise epygramError("The requested level does not exist.")
my_k = self.geometry.vcoordinate.levels.index(level)
else:
my_k = k
if self._lastlevelread != my_k:
self._lastread = self.resource.readfield(self._fidList[my_k])
self._lastlevelread = my_k
result = self._lastread.deepcopy()
for op, kwargs in self._spgpOpList:
if op == 'sp2gp':
result.sp2gp(**kwargs)
elif op == 'gp2sp':
result.gp2sp(**kwargs)
else:
raise epygramError("operation not known")
return result