#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Some useful utilities...
"""
from __future__ import print_function, absolute_import, division , unicode_literals
import six
import math
import copy
import numpy
import sys
import datetime
import hashlib
import os
from six.moves.urllib.request import urlopen # @UnresolvedImport
from distutils.version import LooseVersion
from footprints import FootprintBase
from bronx.graphics.colormapping import (add_cmap,
get_norm4colorscale)
from bronx.syntax.decorators import nicedeco
from bronx.fancies import loggers
from . import config, epygramError
from .colormapping import register_colormap_from_json
epylog = loggers.getLogger(__name__)
[docs]class RecursiveObject(object):
"""
Generic abstract class implementing useful recursive properties:
- display of object: the *__str__* method returns str(attr) for each of the
object's attributes, with automatical indentation for readability.
- test of (in)equality: the a == b test will be true if a and b have the
same attributes and *a.attr == b.attr* for each attribute.
"""
def _strItem(self, item, reclevel=1):
"""Recursive display of object attributes."""
offset = "".rjust(reclevel * 4)
itemstring = ""
if isinstance(item, FootprintBase):
itemstring += item.__class__.__name__ + " containing:"
for attr in item.__dict__.keys():
if attr == '_attributes':
for i in item.__dict__[attr].keys():
itemstring += "\n" + offset + i + ": " + self._strItem(item.__dict__[attr][i], reclevel + 1)
elif attr not in ('_puredict', '_observer'):
itemstring += "\n" + offset + attr + ": " + self._strItem(item.__dict__[attr], reclevel + 1)
elif isinstance(item, list):
if len(item) > 0:
if isinstance(item[0], RecursiveObject):
itemstring = "[" + repr(item[0]) + ", ... (" + \
str(len(item)) + " objects)]"
else:
itemstring = str(numpy.array(item))
else:
itemstring = "[]"
elif isinstance(item, RecursiveObject):
itemstring += item.__class__.__name__ + " containing:"
for attr in item.__dict__.keys():
itemstring += "\n" + offset + attr + ": " + self._strItem(item.__dict__[attr], reclevel + 1)
elif isinstance(item, dict):
for key in sorted(item.keys()):
itemstring += "\n" + offset + str(key) + ": " + self._strItem(item[key], reclevel + 1)
else:
itemstring = str(item)
return itemstring
def __str__(self):
"""
Recursive display of object: displays each of its attributes
with indentation.
"""
return self._strItem(self)
def __eq__(self, other):
"""Test of equality by recursion on the object's attributes."""
return self.tolerant_equal(other, tolerance=0.)
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
# known issue __eq__/__hash__ must be defined both or none, else inheritance is broken
return object.__hash__(self)
[docs] def tolerant_equal(self, other, tolerance=config.epsilon):
"""
Test of equality by recursion on the object's attributes,
with a **tolerance**.
"""
if self.__class__ == other.__class__ and \
set(self.__dict__.keys()) == set(other.__dict__.keys()):
ok = True
for attr in self.__dict__.keys():
if attr in ('_puredict', '_observer'): # footprints special attributes
continue
else:
if not Comparator.are_equal(self.__dict__[attr],
other.__dict__[attr],
tolerance):
ok = False
break
else:
ok = False
return ok
[docs] def copy(self):
"""Returns a copy of the object."""
return copy.copy(self)
[docs] def deepcopy(self):
"""Returns a deepcopy of the object."""
return copy.deepcopy(self)
[docs] def recursive_diff(self, other):
"""Recursively list what differs from **other**."""
if not self.__eq__(other):
diff = {}
if self.__class__ == other.__class__ and \
set(self.__dict__.keys()) == set(other.__dict__.keys()):
for attr in self.__dict__.keys():
if attr in ('_puredict', '_observer'): # footprints special attributes
continue
else:
if not Comparator.are_equal(self.__dict__[attr],
other.__dict__[attr]):
if all([isinstance(obj, RecursiveObject)
for obj in [self.__dict__[attr],
other.__dict__[attr]]]):
diff[attr] = self.__dict__[attr].diff(other.__dict__[attr])
else:
diff[attr] = Comparator.diff(self.__dict__[attr],
other.__dict__[attr])
return diff
class Comparator(object):
"""Helper to recursively compare objects."""
@classmethod
def _float_are_equal(cls, float1, float2, tolerance):
# tolerance for floats
return nearlyEqual(float1, float2, tolerance)
@classmethod
def _array_are_equal(cls, array1, array2, tolerance):
if tolerance == 0.:
return numpy.all(array1 == array2)
else:
if (array1.dtype == array2.dtype and
array1.dtype in [numpy.dtype(d)
for d in ['float16', 'float32', 'float64']]):
# tolerance for floats
return numpy.all(nearlyEqualArray(array1, array2, tolerance))
else:
return numpy.all(array1 == array2)
@classmethod
def _dict_are_equal(cls, dict1, dict2):
if set(dict1.keys()) == set(dict2.keys()):
ok = True
for k in dict1.keys():
if not cls.are_equal(dict1[k], dict2[k]):
ok = False
break
return ok
else:
return False
@classmethod
def _dict_diff(cls, dict1, dict2):
if not cls.are_equal(dict1, dict2):
diff = {}
if set(dict1.keys()) == set(dict2.keys()):
for k in dict1.keys():
if not cls.are_equal(dict1[k], dict2[k]):
diff[k] = cls.diff(dict1[k], dict2[k])
else:
diff = (str(dict1), str(dict2))
else:
diff = None
return diff
@classmethod
def _list_are_equal(cls, list1, list2):
if len(list1) != len(list2):
return False
else:
ok = True
for i in range(len(list1)):
if not cls.are_equal(list1[i], list2[i]):
ok = False
break
return ok
@classmethod
def are_equal(cls, obj1, obj2, tolerance=0.):
"""Checks equality of objects."""
if isinstance(obj1, float) and isinstance(obj2, float):
return cls._float_are_equal(obj1, obj2, tolerance)
elif isinstance(obj1, numpy.ndarray) and isinstance(obj2, numpy.ndarray):
return cls._array_are_equal(obj1, obj2, tolerance)
elif isinstance(obj1, dict) and isinstance(obj2, dict):
return cls._dict_are_equal(obj1, obj2)
elif isinstance(obj1, list) and isinstance(obj2, list):
return cls._list_are_equal(obj1, obj2)
else:
return obj1 == obj2
# CLEANME: why did I do that ?
"""try:
return obj1.__eq__(obj2)
except AttributeError:
return obj1 == obj2"""
@classmethod
def diff(cls, obj1, obj2):
"""Inspect differences between objects."""
if not cls.are_equal(obj1, obj2):
if isinstance(obj1, dict) and isinstance(obj2, dict):
return cls._dict_diff(obj1, obj2)
else:
return (obj1, obj2)
[docs]class Angle(RecursiveObject):
"""
This class handles an angle.
It enables conversions of units, while saving the original unit and value
of the angle at its construction.
Available units: 'degrees', 'radians', 'cos_sin' (cos, sin),
'DMS' (Degree, Minutes, Seconds).
"""
deg = 'degrees'
rad = 'radians'
trig = 'cos_sin'
dms = 'DMS'
units = set([deg, dms, rad, trig])
def __init__(self, value, unit):
"""
Constructor.
'unit' argument must be among:
- 'degrees',
- 'DMS' - in which case, value is a tuple (degrees, minutes, seconds),
- 'radians',
- 'cos_sin' - in which case, value is a tuple (cos, sin).
"""
if unit in Angle.units:
self.__dict__['_' + unit] = value
if unit in ('degrees', 'radians'):
# get modulo around 0.
if unit == 'degrees':
circle = 360.
elif unit == 'radians':
circle = 2. * math.pi
while self.__dict__['_' + unit] > circle / 2.:
self.__dict__['_' + unit] -= circle
while self.__dict__['_' + unit] < -circle / 2.:
self.__dict__['_' + unit] += circle
else:
raise ValueError("this angle unit is not implemented: " + str(unit))
self._origin_unit = unit
self._origin_value = value
def __eq__(self, other):
"""
Redefinition because of dynamism of buffering new computed values...
"""
if not isinstance(other, Angle):
return False
if abs(self.get('degrees') -
degrees_nearest_mod(other.get('degrees'),
self.get('degrees'))) <= config.epsilon:
ok = True
else:
ok = False
return ok
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash(self._origin_value) + hash(self._origin_unit)
[docs] def get(self, unit=None):
"""
Returns the angle in the requested unit.
If no unit is supplied, the origin unit is used.
"""
if unit is None:
unit = self._origin_unit # or a default one ?
elif unit in Angle.units:
if not '_' + unit in self.__dict__:
self._compute(unit)
else:
raise ValueError("this angle unit is not implemented: " + str(unit))
return self.__dict__['_' + unit]
def _compute(self, unit):
"""
Compute the angle in the requested unit, from the original one.
See constructor for more details about units.
"""
# conversion to degrees
if unit == Angle.deg:
if self._origin_unit == Angle.rad:
self.__dict__['_' + unit] = math.degrees(self._origin_value)
elif self._origin_unit == Angle.trig:
self.__dict__['_' + unit] = math.degrees(
math.copysign(math.acos(self._origin_value[0]),
self._origin_value[1]))
elif self._origin_unit == Angle.dms:
self.__dict__['_' + unit] = (self._origin_value[0] +
self._origin_value[1] / 60. +
self._origin_value[2] / 3600.)
else:
raise NotImplementedError("conversion from this unit (" +
self._origin_unit +
") is not coded.")
# conversion to radians
elif unit == Angle.rad:
if self._origin_unit == Angle.deg:
self.__dict__['_' + unit] = math.radians(self._origin_value)
elif self._origin_unit == Angle.trig:
self.__dict__['_' + unit] = math.copysign(
math.acos(self._origin_value[0]),
self._origin_value[1])
elif self._origin_unit == Angle.dms:
self.__dict__['_' + unit] = math.radians(self._origin_value[0] +
self._origin_value[1] / 60. +
self._origin_value[2] / 3600.)
else:
raise NotImplementedError("conversion from this unit (" +
self._origin_unit +
") is not coded.")
# conversion to (cos, sin)
elif unit == Angle.trig:
if self._origin_unit == Angle.deg:
self.__dict__['_' + unit] = (math.cos(math.radians(self._origin_value)),
math.sin(math.radians(self._origin_value)))
elif self._origin_unit == Angle.rad:
self.__dict__['_' + unit] = (math.cos(self._origin_value),
math.sin(self._origin_value))
elif self._origin_unit == Angle.dms:
anglerad = math.radians(self._origin_value[0] +
self._origin_value[1] / 60. +
self._origin_value[2] / 3600.)
self.__dict__['_' + unit] = (math.cos(anglerad),
math.sin(anglerad))
else:
raise NotImplementedError("conversion from this unit (" +
self._origin_unit +
") is not coded.")
# conversion to (degrees, minutes, seconds)
elif unit == Angle.dms:
if self._origin_unit == Angle.deg:
decdeg = self._origin_value
elif self._origin_unit == Angle.rad:
decdeg = math.degrees(self._origin_value)
elif self._origin_unit == Angle.trig:
decdeg = math.degrees(math.copysign(math.acos(self._origin_value[0]),
self._origin_value[1]))
else:
raise NotImplementedError("conversion from this unit (" +
self._origin_unit +
") is not coded.")
sign = int(math.copysign(1, decdeg))
decdeg = decdeg * sign
degrees = int(decdeg)
decmin = (decdeg - degrees) * 60.
minutes = int(decmin)
seconds = (decmin - minutes) * 60
self.__dict__['_' + unit] = (degrees * sign, minutes, seconds)
else:
raise NotImplementedError("conversion to this unit (" + unit +
") is not coded.")
# FUNCTIONS #
#############
def is_scalar(x):
"""
Returns True if argument is scalar
"""
return numpy.ndim(x) == 0
def as_numpy_array(x):
"""
Returns a numpy (ma) array
"""
if isinstance(x, list):
missing = set([item.fill_value for item in x if isinstance(item, numpy.ma.masked_array)])
if len(missing) > 0:
return numpy.ma.array(x, copy=False, ndmin=1, subok=True, fill_value=missing.pop() if len(missing) == 1 else None)
else:
return numpy.array(x, copy=False, ndmin=1, subok=True)
else:
return numpy.array(x, copy=False, ndmin=1, subok=True)
[docs]def find_re_in_list(regexp, a_list):
"""
Finds all elements from a list that match a regular expression.
The regexp and the different elements of the list must be of the same type:
- strings
- tuples with the same length
- dictionnaries: all regexp keys must be keys of the list
"""
def check_string_pattern(pattern, element):
import re
if not isinstance(pattern, six.string_types) or \
not isinstance(element, six.string_types):
raise epygramError("pattern and element must be strings in \
check_string_pattern function.")
# protect '.'
mypattern = re.subn('\.', r'\.', pattern)[0]
# change unix '?' to python '.' (any char)
mypattern = mypattern.replace('?', '.')
# change unix '*' to python '.*' (several any char)
mypattern = mypattern.replace('*', '.*')
mypattern += '(?!.)'
return re.match(mypattern, element.strip())
found = []
if isinstance(regexp, six.string_types):
for field in a_list:
if check_string_pattern(regexp, str(field)):
found.append(field)
elif isinstance(regexp, tuple):
for field in a_list:
if not isinstance(field, tuple):
raise epygramError("pattern and elements of the list must be of\
the same type (tuples here).")
if len(regexp) != len(field):
raise epygramError("pattern and elements of the list must be\
tuples of the same length.")
ok = True
for i in range(len(regexp)):
ok = ok and check_string_pattern(str(regexp[i]), str(field[i]))
if ok:
found.append(field)
elif isinstance(regexp, dict):
raise NotImplementedError("the dictionnary type of regexp is not yet\
implemented.")
else:
raise NotImplementedError("this type of regexp is not (yet?)\
implemented.")
return found
[docs]def degrees_nearest_mod(d, ref):
"""Returns the angle(s) **d** in the modulo nearest to **ref**."""
try:
_ = len(d)
scalar = False
except Exception:
d = [d]
scalar = True
d = numpy.array(d)
d_inf = d - 360.
d_sup = d + 360.
result = d_sup
mask = numpy.logical_and(numpy.abs(d - ref) <= numpy.abs(d_sup - ref),
numpy.abs(d - ref) <= numpy.abs(d_inf - ref))
result[mask] = d[mask]
mask = numpy.logical_and(numpy.abs(d_inf - ref) <= numpy.abs(d_sup - ref),
numpy.abs(d_inf - ref) <= numpy.abs(d - ref))
result[mask] = d_inf[mask]
if scalar:
result = result[0]
return result
[docs]def positive_longitude(lon, unit='degrees'):
"""Returns *lon* shifted in [0;360[ or [0;2pi[ (depending on *unit*)."""
if lon < 0.:
if unit == 'degrees':
lon += 360.
elif unit == 'radians':
lon += 2. * numpy.pi
else:
raise NotImplementedError()
return lon
def get_file(url, filename, authorize_cache=True, subst=None):
"""
Get file from url into filename.
If authorize_cache is True and a directory is set for caching
in user preferences, file is first searched in cache directory,
downloaded in the cache directory if missing, then filename is
built as a symlink to the file in cache directory.
This way, filename must be always deleted by the caller.
:param url: url to get
:param filename: filename in which to put the result
:param authorize_cache: authorize to use cache (if possible)
for this request
:param subst: dictionary whose keys are searched in url to
be replaced by corresponding key
example: url='https://a.tile.openstreetmap.org/${z}/${x}/${y}.png'
subst={'${z}': 4, '${x}': 8, '${y}': 5}
will get the file https://a.tile.openstreetmap.org/4/8/5.png
"""
def md5(s):
h = hashlib.md5()
h.update(s)
return h.hexdigest()
# Substitution
actual_url = url
if subst is not None:
for k, v in subst.items():
actual_url = actual_url.replace(str(k), str(v))
if authorize_cache and config.internet_cache_dir is not None:
# Corresponding file name in cache
url_hash = md5(url.encode('UTF8'))
actual_url_hash = md5(actual_url.encode('UTF8'))
if url_hash != actual_url_hash:
directory = os.path.join(config.internet_cache_dir,
url_hash)
if not os.path.exists(directory):
os.mkdir(directory)
cache_filename = os.path.join(directory, actual_url_hash)
else:
cache_filename = os.path.join(config.internet_cache_dir,
url_hash)
# Cache filling
if not os.path.exists(cache_filename):
get_file(actual_url, cache_filename, authorize_cache=False, subst=None)
# Symlink
if os.path.exists(filename):
os.remove(filename)
os.symlink(cache_filename, filename)
else:
with open(filename, 'wb') as f:
f.write(urlopen(actual_url).read())
[docs]def load_cmap(cmap):
"""
Reads and registers the epygram-or-user-colormap called *cmap*,
which must be either in config.epygram_colormaps or
config.usercolormaps.
Works with both old-way (.cmap) and new way (.json).
"""
import matplotlib.pyplot as plt
if cmap not in plt.colormaps() and cmap in config.colormaps:
filename = config.colormaps[cmap]
if filename.endswith('.json'):
return register_colormap_from_json(filename)
else:
epylog.warning(_deprecated_cmap)
with open(filename, 'r') as ocm:
add_cmap(cmap, ocm)
_deprecated_cmap = ' '.join(["the use of '.cmap' user colormaps is deprecated,",
"(and not possible with cartoplot());",
"move to json format, using epygram.moves.cmap2json()"])
formatting_default_widths = (50, 20)
separation_line = '{:-^{width}}'.format('', width=sum(formatting_default_widths) + 1) + '\n'
[docs]def add_meridians_and_parallels_to(bm,
meridians='auto',
parallels='auto',
ax=None,
drawparallels_kwargs=None,
drawmeridians_kwargs=None,
drawequator_kwargs=None,
drawgreenwich_kwargs=None):
"""
Adds meridians and parallels to a basemap instance *bm*.
*meridians* and *parallels* enable to fine-tune the choice of lines to
plot, with either:
- 'auto': automatic scaling to the basemap extents
- 'default': range(0,360,10) and range(-90,90,10)
- a list of values
- a grid step, e.g. 5 to plot each 5 degree.
- None: no one is plot
- *meridian* == 'greenwich' // 'datechange' // 'greenwich+datechange'
*parallel* == 'equator' // 'polarcircles' // 'tropics' or any
combination (,) will plot only these.
:param ax: the ax to be plotted on
:param drawparallels_kwargs: kwargs to be passed to basemap.drawparallels()
:param drawmeridians_kwargs: kwargs to be passed to basemap.drawgreenwich()
:param drawequator_kwargs: draw kwargs to emphasize equator parallel
:param drawgreenwich_kwargs: draw kwargs to emphasize greenwich meridian
"""
try:
parallels = float(parallels)
except (TypeError, ValueError):
parallels = parallels
try:
meridians = float(meridians)
except (TypeError, ValueError):
meridians = meridians
drawparallels_kwargs = ifNone_emptydict(drawparallels_kwargs)
drawmeridians_kwargs = ifNone_emptydict(drawmeridians_kwargs)
drawequator_kwargs = ifNone_emptydict(drawequator_kwargs)
drawgreenwich_kwargs = ifNone_emptydict(drawgreenwich_kwargs)
if bm.projection == 'rotpole':
Delta_lat = bm.ymax - bm.ymin
else:
Delta_lat = bm.latmax - bm.latmin
if parallels == 'auto' or isinstance(parallels, int) or isinstance(parallels, float):
if Delta_lat <= 10:
delta_lat = 1
elif 10 < Delta_lat <= 30:
delta_lat = 5
else:
delta_lat = 10
if isinstance(parallels, int) or isinstance(parallels, float):
delta_lat = parallels
latmin = bm.latmin - bm.latmin % delta_lat
latmax = bm.latmax - bm.latmax % delta_lat + delta_lat
parallels = numpy.arange(latmin, latmax, delta_lat)
elif parallels == 'default':
parallels = numpy.arange(-90, 90, 10)
elif isinstance(parallels, six.string_types) or isinstance(parallels, list):
pl = []
if 'equator' in parallels:
pl.append(0.)
if 'polarcircles' in parallels:
pl.extend([-66.5628, 66.5628])
if 'tropics' in parallels:
pl.extend([-23.4372, 23.4372])
if isinstance(parallels, list):
for p in parallels:
try:
pl.append(float(p))
except (ValueError, TypeError):
pass
pl.sort()
parallels = pl
if bm.projection == 'rotpole':
Delta_lon = bm.xmax - bm.xmin
else:
Delta_lon = bm.lonmax - bm.lonmin
if meridians == 'auto' or isinstance(meridians, int) or isinstance(meridians, float):
if Delta_lon <= 10:
delta_lon = 1
elif 10 < Delta_lon <= 30:
delta_lon = 5
elif 30 < Delta_lon <= 180:
delta_lon = 10
else:
delta_lon = 20
if isinstance(meridians, int) or isinstance(meridians, float):
delta_lon = meridians
lonmin = bm.lonmin - bm.lonmin % delta_lon
lonmax = bm.lonmax - bm.lonmax % delta_lon + delta_lon
meridians = numpy.arange(lonmin, lonmax, delta_lon)
elif meridians == 'default':
meridians = numpy.arange(0, 360, 10)
elif isinstance(meridians, six.string_types) or isinstance(meridians, list):
ml = []
if 'greenwich' in meridians:
ml.append(0.)
if 'datechange' in meridians:
ml.append(180.)
if isinstance(meridians, list):
for m in meridians:
try:
ml.append(float(m))
except (ValueError, TypeError):
pass
ml.sort()
meridians = ml
if parallels is not None:
if bm.projection in ('ortho', 'nsper'):
if 'labels' not in drawparallels_kwargs.keys():
drawparallels_kwargs['labels'] = [False, False, False, False]
else:
if 'labels' not in drawparallels_kwargs.keys():
drawparallels_kwargs['labels'] = [True, False, False, False]
bm.drawparallels(parallels, ax=ax, **drawparallels_kwargs)
if 0. in parallels or 0 in parallels:
if 'dashes' not in drawequator_kwargs.keys():
drawequator_kwargs['dashes'] = [10, 1]
drawequator_kwargs['labels'] = [False] * 4
bm.drawparallels([0], ax=ax, **drawequator_kwargs)
if meridians is not None:
if bm.projection in ('spstere', 'npstere', 'stere'):
if 'labels' not in drawmeridians_kwargs.keys():
drawmeridians_kwargs['labels'] = [True, False, False, True]
elif bm.projection in ('ortho', 'moll', 'nsper'):
if 'labels' not in drawmeridians_kwargs.keys():
drawmeridians_kwargs['labels'] = [False, False, False, False]
else:
if 'labels' not in drawmeridians_kwargs.keys():
drawmeridians_kwargs['labels'] = [False, False, False, True]
bm.drawmeridians(meridians, ax=ax, **drawmeridians_kwargs)
if 0. in meridians or 0 in meridians:
if 'dashes' not in drawgreenwich_kwargs.keys():
drawgreenwich_kwargs['dashes'] = [10, 1]
drawgreenwich_kwargs['labels'] = [False] * 4
bm.drawmeridians([0], ax=ax, **drawgreenwich_kwargs)
def auto_meridians_parallels(geometry,
meridians='auto',
parallels='auto'):
"""
Compute meridians and parallels.
*meridians* and *parallels* enable to fine-tune the choice of lines to
plot, with either:
- 'auto': automatic scaling to the basemap extents
- 'default': range(0,360,10) and range(-90,90,10)
- a list of values
- a grid step, e.g. 5 to plot each 5 degree.
- None: no one is plot
"""
def Delta2delta(Delta):
if Delta <= 10:
delta = 1
elif 10 < Delta <= 30:
delta = 5
elif 30 < Delta <= 180:
delta = 10
else:
delta = 20
return delta
# meridians # TODO: stereopol : all ! + parallels enough ?
if meridians is None or meridians == 0.:
meridians = []
elif meridians == 'default' or (meridians == 'auto' and
('gauss' in geometry.name or
geometry.name == 'polar_stereographic')):
meridians = numpy.arange(0, 370, 10)
elif meridians == 'auto' or isinstance(meridians, int) or isinstance(meridians, float):
minmax = geometry.minmax_ll()
if meridians == 'auto':
delta_lon = Delta2delta(minmax['lonmax'] - minmax['lonmin'])
else:
delta_lon = float(meridians)
lonmin = minmax['lonmin'] - minmax['lonmin'] % delta_lon
lonmax = minmax['lonmax'] - minmax['lonmax'] % delta_lon + 2 * delta_lon
lonmax = min(lonmax, lonmin + 360.) #space-view geometry returns 1.E30 as lonmax
meridians = numpy.arange(lonmin, lonmax, delta_lon)
if max(meridians > 180.):
meridians = meridians - 180. # FIXME: cartopy does not plot meridians > 180°
# parallels
if parallels is None or parallels == 0.:
parallels = []
elif parallels == 'default' or ('gauss' in geometry.name and parallels == 'auto'):
parallels = numpy.arange(-90, 90, 10)
elif parallels == 'auto' or isinstance(parallels, int) or isinstance(parallels, float):
minmax = geometry.minmax_ll()
if parallels == 'auto':
delta_lat = Delta2delta(minmax['latmax'] - minmax['latmin'])
else:
delta_lat = float(parallels)
latmin = minmax['latmin'] - minmax['latmin'] % delta_lat
if latmin <= -90:
latmin += delta_lat
latmax = minmax['latmax'] - minmax['latmax'] % delta_lat + 2 * delta_lat
latmax = min(latmax, 90. )#space-view geometry returns 1.E30 as latmax
if latmax >= 90:
latmax -= delta_lat
parallels = numpy.arange(latmin, latmax, delta_lat)
return list(meridians), list(parallels)
[docs]def nearlyEqual(a, b, epsilon=config.epsilon):
"""
Function to compare floats
http://floating-point-gui.de/errors/comparison/
Float.MIN_NORMAL was replaced by sys.float_info.min
Float.MAX_VALUE was replaced by sys.float_info.max
"""
absA = numpy.abs(a)
absB = numpy.abs(b)
diff = numpy.abs(a - b)
if a == b: # shortcut, handles infinities
return True
elif a == 0 or b == 0 or diff < sys.float_info.min:
# a or b is zero or both are extremely close to it
# relative error is less meaningful here
return diff < (epsilon * sys.float_info.min)
else: # use relative error
return diff / min((absA + absB), sys.float_info.max) < epsilon
nearlyEqualArray = numpy.vectorize(nearlyEqual)
nearlyEqualArray.__doc__ = "Vector version of nearlyEqual()."
[docs]def scale_colormap(cmap, max_val=None):
"""
.. deprecated:: 1.3.9
Creates a matplotlib.colors.BoundaryNorm object tuned for scaled colormaps,
i.e. discrete, irregular colorshades.
:param cmap: name of the colormap, as found in config.colormaps_scaling
:param max_val: if given, replaces the upper bound.
:return: a tuple (norm, scaling), scaling being eventually modified
according to **max_val**
"""
bounds = copy.copy(config.colormaps_scaling.get(cmap, None))
return get_norm4colorscale(bounds, max_val=max_val)
[docs]def restrain_to_index_i_of_dim_d(a, i, d, n=None):
"""
Of an array a[d1, d2, d3, ... dn], returns the array restricted to
index **i** of the dimension **d**.
:param a: the input array
:param i: index in dimension **d**
:param d: the dimension to restrain
:param n: specify *a priori* the number of dimensions of **a**
A more elegant solution would have been the following, but it does
not work when accessing netCDF variable (for which it was necessary)::
indexes = [range(len(self._dimensions[d])) for d in variable.dimensions] # equivalent to [:, :, :, ...]
for k in only.keys():
indexes[variable.dimensions.index(k)] = [only[k]] # restrain to the "only" give
return array[numpy.ix_(*indexes)]
"""
if n is None:
n = len(a.shape)
if n == 1:
ra = a[[i]]
elif n == 2:
if d == 0:
ra = a[[i], :]
else:
ra = a[:, [i]]
elif n == 3:
if d == 0:
ra = a[[i], :, :]
elif d == 1:
ra = a[:, [i], :]
else:
ra = a[:, :, [i]]
elif n == 4:
if d == 0:
ra = a[[i], :, :, :]
elif d == 1:
ra = a[:, [i], :, :]
elif d == 2:
ra = a[:, :, [i], :]
else:
ra = a[:, :, :, [i]]
elif n == 5:
if d == 0:
ra = a[[i], :, :, :, :]
elif d == 1:
ra = a[:, [i], :, :, :]
elif d == 2:
ra = a[:, :, [i], :, :]
elif d == 3:
ra = a[:, :, :, [i], :]
else:
ra = a[:, :, :, :, [i]]
elif n == 6:
if d == 0:
ra = a[[i], :, :, :, :, :]
elif d == 1:
ra = a[:, [i], :, :, :, :]
elif d == 2:
ra = a[:, :, [i], :, :, :]
elif d == 3:
ra = a[:, :, :, [i], :, :]
elif d == 4:
ra = a[:, :, :, :, [i], :]
else:
ra = a[:, :, :, :, :, [i]]
else:
raise NotImplementedError("more than 5 dimensions in array.")
return ra
[docs]def datetimes2fieldvaliditylist(datetimes, basis=None):
"""
Return a FieldValidityList from a list of datetime.datetime instances
(or a single datetime.datetime).
:param basis: can be either
- None (default): basis = validity
- a single datetime.datetime
- a list of the same length as datetimes
"""
from epygram.base import FieldValidityList
if isinstance(datetimes, datetime.datetime):
assert (isinstance(basis, datetime.datetime) or
(isinstance(basis, list) and
isinstance(basis[0], datetime.datetime)) or
basis is None)
if isinstance(basis, list):
fvl = FieldValidityList(date_time=[datetimes for _ in basis], basis=basis)
else:
fvl = FieldValidityList(date_time=datetimes, basis=basis)
elif isinstance(datetimes, list) and isinstance(datetimes[0], datetime.datetime):
assert (isinstance(basis, datetime.datetime) or
basis is None or
(isinstance(basis, list) and
isinstance(basis[0], datetime.datetime) and
len(basis) == len(datetimes)))
if isinstance(basis, datetime.datetime) or basis is None:
basis = [basis for _ in range(len(datetimes))]
fvl = FieldValidityList(date_time=datetimes,
basis=basis)
else:
raise TypeError("'datetimes' must be a datetime.datetime or a list of.")
return fvl
[docs]def ifNone_emptydict(arg):
"""
Transforms a None into a {}.
To be used as workaround for empty dicts in default values of methods.
"""
if arg is None:
arg = {}
return arg
[docs]def set_map_up(bm, ax,
drawrivers=False,
drawcoastlines=True,
drawcountries=True,
meridians='auto',
parallels='auto',
departments=False,
bluemarble=0.0,
background=False,
drawmapboundary_kwargs=None,
fillcontinents_kwargs=None,
drawcoastlines_kwargs=None,
drawcountries_kwargs=None,
drawparallels_kwargs=None,
drawmeridians_kwargs=None,
drawequator_kwargs=None,
drawgreenwich_kwargs=None):
"""Cf. :meth:`H2DField.plotfield` documentation."""
epylog.warning("DEPRECATED: the basemap package is deprecated")
if drawmapboundary_kwargs is None:
drawmapboundary_kwargs = dict(fill_color='lightskyblue')
if fillcontinents_kwargs is None:
fillcontinents_kwargs = dict(color='wheat', lake_color='skyblue',
zorder=0)
drawcoastlines_kwargs = ifNone_emptydict(drawcoastlines_kwargs)
drawcountries_kwargs = ifNone_emptydict(drawcountries_kwargs)
if background:
bm.drawmapboundary(ax=ax, **drawmapboundary_kwargs)
bm.fillcontinents(ax=ax, **fillcontinents_kwargs)
if bluemarble:
bm.bluemarble(alpha=bluemarble, ax=ax)
if drawcoastlines:
bm.drawcoastlines(ax=ax, **drawcoastlines_kwargs)
if departments:
if not hasattr(bm, '_epygram_departments'):
import json
with open(config.installdir + '/data/french_departments.json', 'r') as dp:
depts = json.load(dp)[1]
bm._epygram_departments = depts
else:
depts = bm._epygram_departments
for d in range(len(depts)):
for part in range(len(depts[d])):
dlon = depts[d][part][0]
dlat = depts[d][part][1]
(x, y) = bm(dlon, dlat)
bm.plot(x, y, color=drawcountries_kwargs.get('color', 'k'), ax=ax)
elif drawcountries:
bm.drawcountries(ax=ax, **drawcountries_kwargs)
if drawrivers:
bm.drawrivers(color='blue', ax=ax)
add_meridians_and_parallels_to(bm,
parallels=parallels,
meridians=meridians,
ax=ax,
drawparallels_kwargs=drawparallels_kwargs,
drawmeridians_kwargs=drawmeridians_kwargs,
drawequator_kwargs=drawequator_kwargs,
drawgreenwich_kwargs=drawgreenwich_kwargs)
[docs]def datetimerange(*_, **__):
"""
.. deprecated:: 1.2.11
"""
raise DeprecationWarning("You should use function daterange/daterangex " +
"from bronx.stdtypes.date")
[docs]def fmtfid(fmt, fid):
"""
Given a resource format name **fmt** and a (full) **fid**, returns the key
corresponding the actual format of the field in resource.
(Useful for distinguishing GRIB1/2)
"""
if fmt == 'GRIB':
if 'GRIB1' in fid:
fmtfid = 'GRIB1'
else:
fmtfid = 'GRIB2'
else:
fmtfid = fmt
return fmtfid
def moveaxis(a, source, destination):
"""
Calls numpy.moveaxis(), or if numpy version is too old, emulates for simple
cases.
"""
# CLEANME: workaround for old installs...
if LooseVersion(numpy.__version__) < LooseVersion('1.11.0'):
if source == 0 and destination == -1:
b = numpy.transpose(a, list(range(len(a.shape))[1:]) + [0])
elif source == -1 and destination == 0:
b = numpy.transpose(a, [-1] + list(range(len(a.shape))[:-1]))
else:
raise NotImplementedError('(source, destination) != (0,-1) or (-1,0) with that version of numpy')
else:
b = numpy.moveaxis(a, source, destination)
return b
@nicedeco
def call_before(mtd, hook_mtd):
"""Decorator for methods: call method hook_mtd before actually calling method."""
def hooked(self, *args, **kwargs):
getattr(self, hook_mtd)()
return mtd(self, *args, **kwargs)
return hooked