#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Contains classes for netCDF4 resource.
"""
__all__ = ['netCDF']
import datetime
import dateutil
import copy
import json
import footprints
from footprints import proxy as fpx, FPDict
from epygram import config, epygramError, util
from epygram.base import FieldValidity
from epygram.resources import FileResource
from epygram.fields import H2DField
import netCDF4
epylog = footprints.loggers.getLogger(__name__)
[docs]class netCDF(FileResource):
"""Class implementing all specificities for netCDF (4) resource format."""
_footprint = dict(
attr=dict(
format=dict(
values=set(['netCDF']),
default='netCDF'),
behaviour=dict(
type=FPDict,
optional=True,
default=config.netCDF_default_behaviour)
)
)
def __init__(self, *args, **kwargs):
"""Constructor. See its footprint for arguments."""
self.isopen = False
super(netCDF, self).__init__(*args, **kwargs)
if self.openmode in ('r', 'a'):
try:
guess = netCDF4.Dataset(self.container.abspath, self.openmode)
except RuntimeError:
raise IOError("this resource is not a netCDF one.")
finally:
guess.close()
behaviour = copy.copy(config.netCDF_default_behaviour)
behaviour.update(self.behaviour)
self._attributes['behaviour'] = behaviour
if not self.fmtdelayedopen:
self.open()
[docs] def open(self, openmode=None):
"""
Opens a netCDF and initializes some attributes.
- *openmode*: optional, to open with a specific openmode, eventually
different from the one specified at initialization.
"""
super(netCDF, self).open(openmode=openmode)
self._nc = netCDF4.Dataset(self.container.abspath, self.openmode)
self.isopen = True
[docs] def close(self):
"""
Closes a netCDF.
"""
if hasattr(self, '_nc'):
self._nc.close()
self.isopen = False
[docs] def variables_number(self):
"""Return the number of variables in resource."""
return len(self._variables)
def _listfields(self):
"""Returns the fid list of the fields inside the resource."""
return self._variables.keys()
@FileResource._openbeforedelayed
[docs] def readfield(self, fid,
getdata=True):
"""
Reads one field, given its netCDF name, and returns a Field instance.
Args: \n
- *fid*: netCDF field identifier
- *getdata*: if *False*, only metadata are read, the field do not
contain data.
"""
if self.openmode == 'w':
raise epygramError("cannot read fields in resource if with" + \
" openmode == 'w'.")
assert fid in self.listfields(), ' '.join(["field",
fid,
"not found in resource."])
field_kwargs = {'fid':{'netCDF':fid}}
# geometry
dimensions = {d:len(self._dimensions[d]) for d in self._variables[fid].dimensions}
geometryname = 'unstructured'
if set(self._variables[fid].dimensions) == set(self.behaviour['H2D_dimensions_names']):
read_as_miscfield = False
# this is a H2D field
structure = 'H2D'
lons = self._variables[self.behaviour['variable_name_for_longitudes']][:, :]
lats = self._variables[self.behaviour['variable_name_for_latitudes']][:, :]
lons_dim = self._variables[self.behaviour['variable_name_for_longitudes']].dimensions[:]
var_dim = [self._variables[fid].dimensions[i:i + len(lons_dim)]
for i in range(0, len(self._variables[fid].dimensions) - len(lons_dim) + 1)]
assert lons_dim in var_dim, \
"lons/lats and variable " + fid + " dimensions mismatch"
i = var_dim.index(lons_dim)
grid = {'longitudes':lons,
'latitudes':lats}
else:
epylog.warning("unable to assume geometry of field. Read as MiscField.")
read_as_miscfield = True
# validity
if self.behaviour.get('variable_name_for_validity') in self._variables[fid].dimensions:
# temporal dimension
_validity = self._variables[self.behaviour['variable_name_for_validity']]
raise NotImplementedError('temporal dimension of field: not yet !')
elif 'validity' in self._variables[fid].ncattrs():
# validity stored as an attribute of variable (as in writefield !)
try:
_validity = json.loads(self._variables[fid].validity)
_validity['basis'] = dateutil.parser.parse(_validity['basis'])
_validity['date_time'] = dateutil.parser.parse(_validity['date_time'])
if _validity.get('cumulativeduration') is not None:
_validity['cumulativeduration'] = datetime.timedelta(seconds=float(_validity['cumulativeduration']))
except (KeyError, ValueError):
epylog.warning("unable to decode validity attribute.")
validity = FieldValidity()
else:
validity = FieldValidity(**_validity)
# build field
if not read_as_miscfield:
field_kwargs['validity'] = validity
kwargs_geom = {'structure':structure,
'name':geometryname,
'grid':grid,
'dimensions':dimensions,
'vcoordinate':{'structure':'V',
'typeoffirstfixedsurface':255,
'levels':[0]},
'position_on_horizontal_grid':'center'}
geometry = fpx.geometry(**kwargs_geom)
field_kwargs['geometry'] = geometry
field_kwargs['structure'] = structure
comment = {}
for a in self._variables[fid].ncattrs():
if read_as_miscfield or (not read_as_miscfield and a != 'validity'):
comment.update({a:self._variables[fid].getncattr(a)})
comment = json.dumps(comment)
if comment != '{}':
field_kwargs['comment'] = comment
field = fpx.field(**field_kwargs)
if getdata:
field.setdata(self._variables[fid][...])
return field
[docs] def writefield(self, field, compression=4, metadata={}):
"""
Write a field in resource.
Args:\n
- *compression* ranges from 1 (low compression, fast writing)
to 9 (high compression, slow writing). 0 is no compression.
- *metadata* can be filled by any meta-data, that will be stored
as attribute of the netCDF variable.
"""
vartype = 'f8'
if isinstance(field, H2DField):
# dimensions
dims = (k for k in field.geometry.dimensions.keys() if len(k) == 1)
if self.behaviour['transpose_data_ordering']:
dims = sorted(dims, reverse=False)
else:
dims = sorted(dims, reverse=True)
for k in dims:
if k not in self._dimensions:
self._nc.createDimension(k, size=field.geometry.dimensions[k])
else:
assert len(self._dimensions[k]) == field.geometry.dimensions[k], \
"dimensions mismatch: " + k + ": " + \
str(self._dimensions[k]) + " != " + str(field.geometry.dimensions[k])
# geometry (lons/lats)
(lons, lats) = field.geometry.get_lonlat_grid()
if self.behaviour['transpose_data_ordering']:
lons = lons.transpose()
lats = lats.transpose()
if self.behaviour['variable_name_for_longitudes'] in self._variables:
lons_ok = lons.shape == self._variables[self.behaviour['variable_name_for_longitudes']].shape
assert lons_ok, "dimensions mismatch: lons grid."
else:
lons_var = self._nc.createVariable(self.behaviour['variable_name_for_longitudes'],
vartype, dims)
lons_var[...] = lons
if self.behaviour['variable_name_for_latitudes'] in self._variables:
lats_ok = lats.shape == self._variables[self.behaviour['variable_name_for_latitudes']].shape
assert lats_ok, "dimensions mismatch: lats grid."
else:
lats_var = self._nc.createVariable(self.behaviour['variable_name_for_latitudes'],
vartype, dims)
lats_var[...] = lats
# validity
if len(field.validity) == 1:
validity = {'basis':None, 'date_time':None}
if field.validity[0].getbasis() is not None:
validity['basis'] = field.validity[0].getbasis().isoformat()
if field.validity[0].get() is not None:
validity['date_time'] = field.validity[0].get().isoformat()
if field.validity[0].cumulativeduration() is not None:
validity['cumulativeduration'] = str(field.validity[0].cumulativeduration().total_seconds())
elif len(field.validity) > 1:
raise NotImplementedError("not yet !")
#TODO: create a 'time' dimension
#if 'time' not in self._dimensions:
# self._nc.createDimension('time', size=None)
# for v in field.validity:
# self._nc.dimensions['time'].append(v.get('IntStr'))
#else:
# # check that time dimension is compatible ?
# raise NotImplementedError("not yet !")
# create variable
var = self._nc.createVariable(util.linearize(str(field.fid.get('netCDF', field.fid))),
vartype, dims,
zlib=bool(compression), complevel=compression)
# set metadata
if len(field.validity) == 1:
var.validity = json.dumps(validity)
if field.comment is not None:
metadata.update(field.comment)
for k, v in metadata.items():
setattr(var, k, v)
# set data
if self.behaviour['transpose_data_ordering']:
data = field.data.transpose()
else:
data = field.data
if 'gauss' in field.geometry.name:
fill_value = -999999.9
var.missing_value = fill_value
data = data.filled(fill_value)
var[...] = data
else:
raise NotImplementedError("not yet !")
[docs] def behave(self, **kwargs):
"""
Set-up the given arguments in self.behaviour, for the purpose of
building fields from netCDF.
"""
self.behaviour.update(kwargs)
@property
@FileResource._openbeforedelayed
def _dimensions(self):
return self._nc.dimensions
@property
@FileResource._openbeforedelayed
def _variables(self):
return self._nc.variables