Source code for fogpy.lowwatercloud

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2020 Fogpy developers

# This file is part of the fogpy package.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

""" This module implements a class for a 1D low water cloud model.
The approach can be used to determine fog cloud base heights by known
cloud top height and temperature and cloud liquid water path, e.g. from
satellite retrievals.
The implemented approch is based on a publication:


The implementation is based on the following publications:

    * Cermak, J., & Bendix, J. (2011). Detecting ground fog from space–a
      microphysics-based approach. International Journal of Remote Sensing,
      32(12), 3345-3371. doi:10.1016/j.atmosres.2007.11.009
    * Cermak, J., & Bendix, J. (2008). A novel approach to fog/low
      stratus detection using Meteosat 8 data. Atmospheric Research,
      87(3-4), 279-292. doi:10.1016/j.atmosres.2007.11.009
    * Cermak, J. (2006). SOFOS-a new satellite-based operational fog
      observation scheme. (PhD thesis), Philipps-Universität Marburg,
      Marburg, Germany. doi:doi.org/10.17192/z2006.0149

"""

import math
import logging
import time
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import basinhopping
from scipy.optimize import brute

# Configure logger.
logger = logging.getLogger('lowwatercloud')


[docs]class CloudLayer(object): """This class represent a cloud layer - 1D representation of a cloud section from its vertical profile with defined extent and homogenius cloud parameters. The layer is defined by the bottom and top height in the cloud profile""" def __init__(self, bottom, top, lowcloud, add=True): self.bottom = bottom # Bottom height of the cloud layer self.top = top # Top height of the cloud layer self.z = top - (top - bottom) / 2 # Central layer height self.debug = lowcloud.debug # Fix maximum z height to cloud top height from cloud object # Height optimasation can provoke layers above the cth if self.z > lowcloud.cth: self.z = lowcloud.cth # Get temperaure and air presure self.temp = lowcloud.get_moist_adiabatic_lapse_temp(self.z, lowcloud.cth, lowcloud.ctt, True) self.press = lowcloud.get_air_pressure(self.z) # Calculate Vapour pressure and mixing ratio self.psv = lowcloud.get_sat_vapour_pressure(self.temp, lowcloud.vapour_method) self.vmr = lowcloud.get_vapour_mixing_ratio(self.press, self.psv) # Get liquid water mixing ratio if lowcloud.cb_vmr is None: lowcloud.get_cloud_based_vapour_mixing_ratio() self.lmr = lowcloud.get_liquid_mixing_ratio(lowcloud.cb_vmr, self.vmr) # Get layer density self.rho = lowcloud.get_moist_air_density(self.press * 100, self.psv * 100, self.check_temp(self.temp, 'kelvin', self.debug)) # Get layer liquid water density self.lrho = lowcloud.get_liquid_density(self.press * 100, self.check_temp(self.temp, 'celsius')) self.lrho = 1000 # TODO Fix liquid water density method # Get in cloud mixing ratio beta self.beta = lowcloud.get_incloud_mixing_ratio(self.z, lowcloud.cth, lowcloud.cbh) # Get liquid water content self.lwc = lowcloud.get_liquid_water_content(self.z, lowcloud.cth, self.rho, self.lmr, self.beta, lowcloud.upthres, lowcloud.maxlwc) # Get layer effective radius self.reff = lowcloud.get_effective_radius(self.z) # Get layer extinction coefficient self.extinct = lowcloud.get_extinct(self.lwc, self.reff, (self.lrho * 1000.)) # Get visibility self.visibility = lowcloud.get_visibility(self.extinct) # Add cloud layer to low water cloud object if add: lowcloud.layers.append(self) if self.debug: logger.debug('New cloud layer: z: {} m | t: {} °C | p: {} hPa | ' 'psv: {} hPa | vmr: {} g kg-1 | lmr: {} g kg-1 | ' 'beta: {} | rho: {} kg m-3 | lwc: {} g m-3' .format(self.z, self.temp, round(self.press, 3), round(self.psv, 3), round(self.vmr, 3), round(self.lmr, 3), round(self.beta, 3), round(self.rho, 3), round(self.lwc, 3)))
[docs] @classmethod def check_temp(self, temp, unit='celsius', debug=False): """Check for plausible range of temperature value for given unit. Convert if required""" if unit == 'celsius': if temp > 60: result = temp - 273.15 if debug: logger.debug('Temperature {} is in Kelvin. Auto converting' ' to {} °C'.format(temp, result)) else: result = temp elif unit == 'kelvin': if temp <= 60 or temp < 0: result = temp + 273.15 if debug: logger.debug('Temperature {} is in Celsius. ' 'Auto converting to {} K' .format(temp, result)) else: result = temp return(result)
[docs] def get_layer_info(self): print('New cloud layer: z: {} m | t: {} °C | p: {} hPa | ' 'psv: {} hPa | vmr: {} g kg-1 | lmr: {} g kg-1 | ' 'beta: {} | rho: {} kg m-3 | lwc: {} g m-3' .format(self.z, self.temp, round(self.press, 3), round(self.psv, 3), round(self.vmr, 3), round(self.lmr, 3), round(self.beta, 3), round(self.rho, 3), round(self.lwc, 3)))
[docs]class LowWaterCloud(object): """A class to simulate the water content of a low cloud and calculate its meteorological properties. Args: | cth (:obj:`float`): Cloud top height in m. | ctt (:obj:`float`): Cloud top temperature in K. | cwp (:obj:`float`): Cloud water path in kg / m^2. | cbh (:obj:`float`): Cloud base height in m. | reff (:obj:`float`): Droplet effective radius in m. | cbt (:obj:`float`): Cloud base temperature in K. | upthres (:obj:`float`): Top layer thickness with dry air entrainment in m. | lowthres (:obj:`float`): Bottem layer thickness with ground coupling in m. | thickness (:obj:`float`): Layer thickness in m. | debug (:obj:`bool`): Boolean to activate additional debug output. | nodata (:obj:`float`): Provide a specific Nodata value. Default is: -9999. Returns: Calibrated cloud base height in m. """ def __init__(self, cth=None, ctt=None, cwp=None, cbh=0, reff=None, cbt=None, upthres=50., lowthres=75., thickness=10., debug=False, nodata=-9999): self.upthres = upthres # Top layer thickness with dry air entrainment self.lowthres = lowthres # Bottom layer thickness with coupling self.cth = cth # Cloud top height self.ctt = ctt # Cloud top temperature self.cbh = cbh # Cloud base height self.cbt = cbt # Cloud base temperature self.cwp = cwp # Cloud water path self.lwp = None # Dummy for liquid water path calculation self.reff = reff # Droplet effective radius self.layers = [] # List of cloud layers self.vapour_method = "magnus" # Method for saturated vapour pressure self.cb_vmr = None # Water vapour mixing ratio self.thickness = thickness # Layer thickness in m self.debug = debug # Boolean to activate additional output self.nodata = nodata # Specific Nodata value # Get maximal liquid water content underneath cloud top self.maxlwc = None # Raise warnings when thresholds are in conflict with top and # base cloud height if self.cth - self.upthres <= self.cbh: logger.warning("Upper threshold starting level <{}> and cloud base" " <{}> are in conflict" .format(self.cth - self.upthres, self.cbh)) self.upthres = self.cth - self.cbh - 1 logger.info("Reducing upper threshold <{}> to correct conflict" .format(self.upthres)) if self.lowthres >= self.cth: logger.warning("Lower threshold ending level <{}> and cloud top" " <{}> are in conflict" .format(self.lowthres, self.cth)) @property def cbh(self): return self.__cbh @cbh.setter def cbh(self, cbh): # Check conflicts for threshold and cloud base if cbh >= self.cth: self.__cbh = self.cth - 1 self.upthres = self.cth - 1 logger.info("Cloud base <{}> is higher than cloud top <{}>. " "Autmatically reduced to <{}>".format( cbh, self.cth, self.cth - 1)) elif self.cth - self.upthres <= cbh: self.upthres = self.cth - cbh - 1 logger.info("Reducing upper threshold <{}> to correct conflict" .format(self.upthres)) self.__cbh = cbh else: self.__cbh = cbh
[docs] def init_cloud_layers(self, init_cbh, thickness, overwrite=True): """Method to initialize cloud layers and corresponding parameters. the method needs a initial cloud base height and thickness in [m].""" self.cbh = init_cbh self.get_cloud_based_vapour_mixing_ratio() # Get maximal liquid water content underneath cloud top maxlwc_layer = CloudLayer(self.cth - self.upthres - thickness, self.cth - self.upthres + thickness, self, False) self.maxlwc = maxlwc_layer.lwc # Calculate layer properties # Contitional resetting cloud layers if overwrite: self.layers = [] # Cloud base layer CloudLayer(init_cbh - thickness, init_cbh + thickness, self) # Loop over layers layerrange = np.arange(init_cbh, self.cth, thickness) for b in layerrange: CloudLayer(b, b + thickness, self) # Cloud top layer CloudLayer(self.cth - thickness, self.cth + thickness, self) if self.debug: logger.info("Initialize {} cloud layers with {} m thickness" " and {} m cbh" .format(len(self.layers), thickness, init_cbh))
[docs] def get_cloud_base_height(self, start=0, method='basin'): """ Calculate cloud base height [m].""" # Calculate cloud base height self.cbh = self.optimize_cbh(start, method=method, debug=self.debug) return self.cbh
[docs] def get_fog_base_height(self, substitude=False): """This method calculate the fog cloud base height for low clouds with visibilities below 1000 m. Args: | substitude (:obj:`bool`): Optional argument to substitude with cbh if no fbh could be found. Returns: Fog base height """ fog_z = [l.z for l in self.layers if l.visibility is not None and l.visibility <= 1000] if len(fog_z) > 0: # Get lowest heights with visibility treshold self.fbh = min(fog_z) else: if substitude: logger.warning("No fog base height found: Substitude with " "cloud base height: {}".format(self.cbh)) self.fbh = self.cbh else: logger.warning("No fog base height found: Set to NaN") self.fbh = np.nan return self.fbh
[docs] def get_liquid_water_content(self, z, cth, hrho, lmr, beta, thres, maxlwc=None, debug=False): """Calculate liquid water content [g m-3] by air density and liquid water mixing ratio.""" if z > cth - thres: if maxlwc is None: maxlwc_layer = CloudLayer( self.cth - self.upthres - self.thickness, self.cth - self.upthres + self.thickness, self, False) maxlwc = maxlwc_layer.lwc self.maxlwc = maxlwc if self.maxlwc <= 0 and debug: logger.debug( "Maximum liquid water content is zero or negative") lwc = (cth - z) / (thres) * maxlwc # Test if liquid water content is negativ # This occures while optimizing with cbh=cth if lwc < 0: logger.debug("Liquid water content <{}> is negative for " "maximum of <{}> and cbh <{}>" .format(lwc, self.maxlwc, self.cbh)) lwc = 0 # Set lwc to zero else: lwc = (1 - beta) * hrho * lmr return lwc
[docs] @classmethod def get_moist_air_density(self, pa, pv, temp, empiric=False, debug=False): """Calculate air density for humid air with known pressure and water vapour pressure and temperature.""" Rv = 461.495 # Specific gas constant for water vapour J kg-1 K-1 Rd = 287.058 # Specific gas constant for dry air J kg-1 K-1 d_sea = 1.2929 # Density of dry air at sea level if temp <= 60: newtemp = temp + 273.15 if debug: logger.debug('Temperature {} is in Celsius. Auto converting to' ' {} K'.format(temp, newtemp)) temp = newtemp if empiric: hrho = d_sea * (273.15 / temp) * ((pa - 0.3783 * pv) / (1.013 * 10**5)) else: hrho = ((pa - pv) / (Rd * temp)) + (pv / (Rv * temp)) return hrho
[docs] @classmethod def get_moist_adiabatic_lapse_temp(self, z, cth, ctt, convert=False): """Calculate air temperature for height z [K] following a moist adiabatic lapse rate. Requires values for cloud top height and temperature e.g. known from satellite retrievals.""" malr = 0.0065 # ## MALR: Moist adiabatic lapse rate [K m-1] temp = (cth - z) * malr + ctt # Optional convertion to Celsius degrees. if convert: temp = temp - 273.15 return temp
[docs] @classmethod def get_sat_vapour_pressure(self, temp, mode='buck', convert=False, debug=False): """Calculate satured water vapour pressure for temperature [hPa] using different empirical approaches. Options: Buck, Magnus Convert temperatures in K to °C """ if convert: newtemp = temp - 273.15 if self.debug: logger.info("Converting temperature {} K to {} °C" .format(temp, newtemp)) temp = newtemp elif temp > 60: newtemp = temp - 273.15 if debug: logger.debug('Temperature {} is in Kelvin. Auto converting to' ' {} °C'.format(temp, newtemp)) temp = newtemp if mode == 'buck': psv = 0.61121 * math.exp((18.678 - temp / 234.5) * (temp / (257.14 + temp))) elif mode == 'magnus': const1 = 6.1078 if temp > 0: const2 = 17.08085 const3 = 234.175 else: const2 = 17.84362 const3 = 245.425 psv = const1 * math.exp(const2 * temp / (const3 + temp)) # Convert to hPa if mode == 'buck': result = psv * 10 else: result = psv return result
[docs] @classmethod def get_vapour_pressure(self, z, temp): """Calculate water vapour pressure for height z [hPa].""" # TODO Finish implementation wdensity = 0 # Density of water vapour gconst = 461.51 # Gas constante of water vapour in [J kg-1 K-1] # Calculate water vapur pressure vp = wdensity * gconst * temp return vp
[docs] @classmethod def get_air_pressure(self, z, elevation=0): """Calculate ambient air pressure for height z [hPa].""" pa = 100 * ((44331.514 - z) / 11880.516) ** (1 / 0.1902632) return pa / 100
[docs] @classmethod def get_vapour_mixing_ratio(self, pa, pv): """Calculate water vapour mixing ratio for given ambient pressure and water vapour pressure. Also usabale under saturated conditions.""" # Calculate water vapour mixing ratio vmr = 621.97 * pv / (pa - pv) return vmr
[docs] @classmethod def get_liquid_mixing_ratio(self, cb_vmr, vmr, debug=False): """Calculate liquid water mixing ratio for given water vapour mixing ratio in a certain height and the maximum water vapour mixing ratio at cloud base condensation level [g/kg].""" lmr = cb_vmr - vmr if cb_vmr <= vmr and debug: logger.debug("Liquid water mixing ratio will be zero or negative" " for cbvmr: <{}> and vmr: <{}>".format(cb_vmr, vmr)) return lmr
[docs] def get_cloud_based_vapour_mixing_ratio(self, debug=False): # Get temperature and air pressure temp = self.get_moist_adiabatic_lapse_temp(self.cbh, self.cth, self.ctt, True) press = self.get_air_pressure(self.cbh) # Calculate Vapour pressure and mixing ratio psv = self.get_sat_vapour_pressure(temp, self.vapour_method) cb_vmr = self.get_vapour_mixing_ratio(press, psv) self.cb_vmr = cb_vmr if debug: logger.debug("Cloud based vapour mixing ratio: " "<{}> at cloud base <{}>" .format(cb_vmr, self.cbh)) return cb_vmr
[docs] @classmethod def get_incloud_mixing_ratio(self, z, cth, cbh, lowthres=75., upthres=50.): """Calculate in-cloud mixing ratio for given cloud height parameter.""" midbeta = 0.3 * cth / 1000 # Fixed in cloud mixing ratio # Separation in three major cloud layers # Apply fixed value for middle layer if z > cbh + lowthres and z < cth - upthres: beta = midbeta # Apply zero value for upper layer elif z >= cth - upthres: beta = midbeta # Apply linear increase from zero to fixed value in the lower layer elif z <= (cbh + lowthres): beta = (z - cbh) / (lowthres) * midbeta else: beta = midbeta # Default value return beta
[docs] def get_liquid_water_path(self): """Calculate liquid water path for given cloud layers [g m-2].""" z = np.array([l.top - l.bottom for l in self.layers]) lwc = np.array([l.lwc for l in self.layers]) # Get sum of single layer water path lwp = np.sum(z * lwc) self.lwp = lwp return lwp
[docs] def optimize_cbh(self, start, method='basin', debug=False): """Find best fitting cloud base height by comparing calculated liquid water path with given satellite retrieval. Minimization with basinhopping or brute force algorithm from python scipy package.""" cbhbounds = HeightBounds(self.cth - self.upthres, -1000) # Test input data if np.isnan(self.cwp) or np.isnan(self.cth): logger.warning("Input data for cwp: <{}> or cth: <{}> is not" " a number".format(self.cwp, self.cth)) result = np.nan return(result) elif self.cwp == self.nodata or self.cth == self.nodata: logger.warning("Input cwp: <{}> or cth: <{}> in NoData format" .format(self.cwp, self.cth)) result = np.nan return(result) # Choose method if method == 'basin': start_time = int(round(time.time() * 1000)) minimizer_kwargs = {"method": "BFGS", "bounds": (0, self.cth - self.upthres)} ret = basinhopping(self.minimize_cbh, start, T=5.0, minimizer_kwargs=minimizer_kwargs, niter=30, niter_success=5, stepsize=200, accept_test=cbhbounds, seed=42) time_diff = int(round(time.time() * 1000)) - start_time result = float(ret.x[0]) logger.info('Optimized lwp: start cbh: {:.2f}, cth: {:.2f}, ' 'ctt: {:.2f}, observed lwp {:.2f}' ' --> result lwp: {:.2f}, calibrated cbh: {:.2f},' ' elapsed time: {:.2f} ms for {} layers with {} ' 'm thickness' .format(start, float(self.cth), float(self.ctt), float(self.cwp), float(self.lwp), result, time_diff, len(self.layers), self.thickness)) elif method == 'brute': ranges = slice(0, self.cth - self.upthres, 1) ret = brute(self.minimize_cbh, (ranges,), finish=None) result = ret # the minimisation routine self.minimize_cbh calls # self.init_cloud_layers, so in every call the cloud layers get reset; # we don't know if the final call corresponded to the minimum (with # basin hopping it might, with brute force it won't), which was # triggering bug issue #29, therefore re-initialise layers with the # last value self.init_cloud_layers(result, self.thickness, True) self.get_liquid_water_path() # also has side-effect... # Add optional debug output if debug: for l in self.layers: l.get_layer_info() # Set class variable for cloud base height self.cbh = result return(result)
[docs] def minimize_cbh(self, x): """Minimization function for liquid water path.""" x = np.reshape(x, (1,)) self.init_cloud_layers(x[0], self.thickness, True) lwp = self.get_liquid_water_path() diff = abs(lwp - self.cwp) return diff
[docs] def get_liquid_density(self, temp, press): """Calculate the liquid water density in [kg m-3].""" t0 = 0. # Reference temperature in [°C] rho_0 = 999.8 # Density of water for 0°C: [kg/m3] p0 = 1e5 # Air pressure at 0°C in [Pa] beta = 0.000088 # Expansion coefficient of water at 10oC: [m3/m3°C] E = 2.15e9 # Bulk modulus of water: [N/m2] rho = rho_0 / (1 + beta * (temp - t0)) / (1 - (press - p0) / E) return rho
[docs] def get_visibility(self, extinct, contrast=0.02): """Calculate visibility in [m] for given cloud layer. Extinction is directly related to visibility by Koschmieder’s law. """ if extinct is None: return None else: vis = (1 / extinct) * math.log(1 / contrast) # Remove negative visibilities if vis < 0: vis = 0 return vis
[docs] def get_extinct(self, lwc, reff, rho): """Calculate extingtion coeficient [m-1] The extinction therefore is a combination of radiation loss by (diffuse) scattering and molecular absorption. Required are the liquid water content, effective radius and liquid water density TODO: Recheck the unit of liquid water density g or kg? Should be in g """ if reff is None: return None elif lwc == 0: return None else: extinct = 3 * lwc / (2 * reff * rho) return extinct
[docs] def get_effective_radius(self, z): """The droplet effective radius in [um] for each level is computed on the assumptions that reff retrieved at 3.9 μm is the cloud top value, Cloud base reff is at 1 μm and the intermediate values are scaled linearly in between. """ if self.reff is None: return None else: reff = 1e-6 + ((self.reff - 1e-6) / (self.cth - self.cbh)) * (z - self.cbh) return reff
[docs] def plot_lowcloud(self, para, xlabel=None, save=None): """Plotting of selected low water cloud parameters.""" if self.layers == []: logger.info("No layer found. Nothing to plot") heights = [getattr(l, 'z') for l in self.layers] paralist = [getattr(l, para) for l in self.layers] plt.figure() plt.plot(paralist, heights, '-o') plt.ylim((0, max(heights) + 50)) plt.axvline(0, color='grey', ls='--') plt.axhline(self.cth, color='grey', ls='--') plt.axhline(self.cbh, color='grey', ls='--') plt.ylabel('Cloud height z in [m]') if xlabel is not None: plt.xlabel(xlabel) else: plt.xlabel(para) if save is not None: plt.savefig(save) else: plt.show()
[docs]class HeightBounds(object): def __init__(self, xmax=2000, xmin=-1000): self.xmax = xmax self.xmin = xmin def __call__(self, **kwargs): x = kwargs["x_new"] tmax = bool(x <= self.xmax) tmin = bool(x >= self.xmin) return tmax and tmin