Source code for soupy.modeling.superquantileRiskMeasureSAA

# Copyright (c) 2023, The University of Texas at Austin 
# & Georgia Institute of Technology
#
# All Rights reserved.
# See file COPYRIGHT for details.
#
# This file is part of the SOUPy package. For more information see
# https://github.com/hippylib/soupy/
#
# SOUPy is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License (as published by the Free
# Software Foundation) version 3.0 dated June 2007.

import logging 
import numpy as np 
import dolfin as dl 
import mpi4py
import scipy.optimize

from hippylib import ParameterList, Random

from .riskMeasure import RiskMeasure
from .variables import STATE, PARAMETER, ADJOINT, CONTROL
from .controlModelHessian import ControlModelHessian

from ..collectives import NullCollective, MultipleSamePartitioningPDEsCollective, \
        MultipleSerialPDEsCollective, allocate_process_sample_sizes
from .smoothPlusApproximation import SmoothPlusApproximationQuartic, SmoothPlusApproximationSoftplus
from .augmentedVector import AugmentedVector



[docs] def sample_superquantile(samples, beta): """ Evaluate superquantile from samples """ quantile = np.percentile(samples, beta * 100) return quantile + np.mean(np.maximum(samples - quantile, 0))/(1-beta)
[docs] def sample_superquantile_by_minimization(samples, beta, epsilon=1e-2): """ Evaluate superquantile from samples by minimization """ quantile = np.percentile(samples, beta * 100) smoothPlus = SmoothPlusApproximationQuartic(epsilon=epsilon) cvar_obj = lambda t : t + np.mean(smoothPlus(samples - t))/(1-beta) minimum = scipy.optimize.fmin(cvar_obj, quantile) quantile = minimum[0] superquantile = cvar_obj(quantile) return quantile, superquantile
[docs] def superquantileRiskMeasureSAASettings(data = {}): data['sample_size'] = [100,'Number of Monte Carlo samples'] data['beta'] = [0.95, 'Quantile value for superquantile'] data['epsilon'] = [0.01, 'Sharpness of smooth plus approximation'] data['seed'] = [1, 'rng seed for sampling'] data['smoothplus_type'] = ['quartic', 'approximation type for smooth plus function'] return ParameterList(data)
[docs] class SuperquantileRiskMeasureSAA(RiskMeasure): """ Risk measure for the sample average approximation of the superquantile risk measure (CVaR) with sample parallelism using MPI """ def __init__(self, control_model, prior, settings = superquantileRiskMeasureSAASettings(), comm_sampler=mpi4py.MPI.COMM_WORLD): """ Constructor: :param control_model: control model containing the :code:`soupy.PDEVariationalControlProblem` and :code:`soupy.ControlQoI` :type control_model: :py:class:`soupy.ControlModel` :param prior: The prior distribution for the random parameter :type prior: :py:class:`hippylib.Prior` :param settings: additional settings given in :code:`superquantileRiskMeasureSAASettings()` :type settings: :py:class:`hippylib.ParameterList` :param comm_sampler: MPI communicator for sample parallelism :type comm_sampler: :py:class:`mpi4py.MPI.Comm` """ self.model = control_model self.prior = prior self.settings = settings self.sample_size = self.settings['sample_size'] self.beta = settings['beta'] if self.settings["smoothplus_type"] == "softplus": self.smoothplus = SmoothPlusApproximationSoftplus(self.settings["epsilon"]) elif self.settings["smoothplus_type"] == "quartic": self.smoothplus = SmoothPlusApproximationQuartic(self.settings["epsilon"]) else: # Default case self.smoothplus = SmoothPlusApproximationQuartic(self.settings["epsilon"]) assert self.sample_size >= comm_sampler.Get_size(), \ "Total samples %d needs to be greater than MPI size %d" %(self.sample_size, comm_sampler.Get_size()) self.comm_sampler = comm_sampler self.sample_size_allprocs = allocate_process_sample_sizes(self.sample_size, self.comm_sampler) self.sample_size_proc = self.sample_size_allprocs[self.comm_sampler.rank] if comm_sampler.Get_size() == 1: self.collective = NullCollective() else: self.collective = MultipleSerialPDEsCollective(self.comm_sampler) # Within process self.q_samples = np.zeros(self.sample_size_proc) # Aggregate components for computing cost, grad, hess self.s_bar = 0 self.sprime_bar = 0 self.sprime_g_bar = self.model.generate_vector(CONTROL) # For sampling self.noise = dl.Vector(self.model.problem.Vh[STATE].mesh().mpi_comm()) # use the mesh mpi comm self.prior.init_vector(self.noise, "noise") rng = Random(seed=self.settings['seed']) # Generate samples for m self.x_mc = [] self.z = self.model.generate_vector(CONTROL) self.zt = AugmentedVector(self.z, copy_vector=False) self.g_mc = [] if mpi4py.MPI.COMM_WORLD.rank == 0: logging.info("Initial sampling of stochastic parameter") # Burn in for parallel sampling n_burn = int(np.sum(self.sample_size_allprocs[:self.comm_sampler.rank])) for i in range(n_burn): rng.normal(1.0, self.noise) # Actual sampling for i in range(self.sample_size_proc): ui = self.model.generate_vector(STATE) mi = self.model.generate_vector(PARAMETER) pi = self.model.generate_vector(ADJOINT) x = [ui, mi, pi, self.z] rng.normal(1.0, self.noise) self.prior.sample(self.noise, mi) self.x_mc.append(x) g = self.model.generate_vector(CONTROL) self.g_mc.append(g) if mpi4py.MPI.COMM_WORLD.rank == 0: logging.info("Done sampling") # Helper variable self.diff_helper= self.model.generate_vector(CONTROL) self.has_adjoint_solve = False self.has_forward_solve = False self.Hi_zhat = self.model.generate_vector(CONTROL) self.control_model_hessian = None
[docs] def generate_vector(self, component = "ALL"): """ If :code:`component` is :code:`STATE`, :code:`PARAMETER`, :code:`ADJOINT`, \ return a vector corresponding to that function space. \ If :code:`component == CONTROL`, return an :py:class:`soupy.AugmentedVector` \ that augments the control variable :code:`z` with a scalar that can be used \ for optimization If :code:`component == "ALL"`, \ Generate the list of vectors :code:`x = [u,m,p,z]`. \ Note that in this case, the :code:`CONTROL` variable will not be augmented \ with the scalar, and can be used directly for methods like :code:`solveFwd`. """ if component == CONTROL: return AugmentedVector(self.model.generate_vector(CONTROL), copy_vector=False) else: return self.model.generate_vector(component)
[docs] def computeComponents(self, zt, order=0, **kwargs): """ Computes the components for the evaluation of the risk measure :param zt: the control variable with the scalar :code:`t` appended :type zt: :py:class:`soupy.AugmentedVector` :param order: Order of the derivatives needed. :code:`0` for cost, :code:`1` for gradient, :code:`2` for Hessian :type order: int :param kwargs: dummy keyword arguments for compatibility """ z = zt.get_vector() t = zt.get_scalar() new_forward_solve = False self.s_bar = 0 self.sprime_bar = 0 self.sprime_g_bar.zero() # Check if a new control variable is used self.diff_helper.zero() self.diff_helper.axpy(1.0, self.z) self.diff_helper.axpy(-1.0, z) diff_norm = np.sqrt(self.diff_helper.inner(self.diff_helper)) # Store zt in class self.zt.zero() self.zt.axpy(1.0, zt) # Check if new forward solve is needed if diff_norm > dl.DOLFIN_EPS or not self.has_forward_solve: # Update control variable (changes all samples) # Ask that new forward and adjoint solves are computed new_forward_solve = True logging.info("Using new forward solve") # Check if a new adjoint solve is needed if new_forward_solve: # Reset the averages for new solve # Make sure previous adjoint solve is no longer valid self.has_adjoint_solve = False new_adjoint_solve = True elif self.has_adjoint_solve: # If it's not a new forward solve, check if we already have an adjoint new_adjoint_solve = False # self.sprime_bar = 0 # self.sprime_g_bar.zero() else: # If we don't already have an adjoint, compute a new one new_adjoint_solve = True # self.sprime_bar = 0 # self.sprime_g_bar.zero() # In this case, self.has_adjoint_solve is already False # Now actually compute the solves for i in range(self.sample_size_proc): x = self.x_mc[i] if new_forward_solve: # Solve state self.model.solveFwd(x[STATE], x) # Compute the averages qi = self.model.cost(x) self.q_samples[i] = qi self.s_bar += self.smoothplus(qi - t)/self.sample_size if order >= 1: if new_adjoint_solve: self.model.solveAdj(x[ADJOINT], x) self.model.evalGradientControl(x, self.g_mc[i]) self.sprime_bar += self.smoothplus.grad(qi - t)/self.sample_size self.sprime_g_bar.axpy(self.smoothplus.grad(qi - t)/self.sample_size, self.g_mc[i]) self.s_bar = self.collective.allReduce(self.s_bar, "SUM") # if order >= 1 and new_adjoint_solve: if order >= 1: self.collective.allReduce(self.sprime_g_bar, "SUM") self.sprime_bar = self.collective.allReduce(self.sprime_bar, "SUM") # We have computed a new adjoint solve # Don't need to alter if it's any other case self.has_adjoint_solve = True # Will always be true after solving self.has_forward_solve = True
[docs] def cost(self): """ Evaluates the value of the risk measure once components have been computed :return: Value of the cost functional .. note:: Assumes :code:`computeComponents` has been called with :code:`order>=0` """ t = self.zt.get_scalar() return t + 1/(1-self.beta) * self.s_bar
[docs] def grad(self, gt): """ Evaluates the gradient of the risk measure once components have been computed :param g: (Dual of) the gradient of the risk measure to store result in :type g: :py:class:`dolfin.Vector` .. note:: Assumes :code:`self.computeComponents` has been called with :code:`order>=1` """ # print("(proc %d) q_bar = %g" %(self.comm_sampler.Get_rank(), self.q_bar)) dzJ_np = self.sprime_g_bar.get_local()/(1-self.beta) dtJ_np = 1 - self.sprime_bar/(1-self.beta) dz_np = np.append(dzJ_np, dtJ_np) gt.set_local(dz_np)
[docs] def hessian(self, zt_hat, Hzt_hat): """ Apply the hessian of the risk measure once components have been computed \ in the direction :code:`zhat` :param zt_hat: Direction for application of Hessian action of the risk measure :type zt_hat: :py:class:`soupy.AugmentedVector` :param Hzt_hat: (Dual of) Result of the Hessian action of the risk measure to store the result in :type Hzt_hat: :py:class:`soupy.AugmentedVector` .. note:: Assumes :code:`self.computeComponents` has been called with :code:`order >= 1` """ if self.control_model_hessian is None: self.control_model_hessian = ControlModelHessian(self.model) Hzt_hat.zero() Hz_hat = Hzt_hat.get_vector() Ht_hat = 0.0 z_hat = zt_hat.get_vector() t_hat = zt_hat.get_scalar() t = self.zt.get_scalar() for i in range(self.sample_size_proc): xi = self.x_mc[i] gi = self.g_mc[i] qi = self.q_samples[i] # Derivatives of the smoothed max approximation sprime_i = self.smoothplus.grad(qi - t) sprimeprime_i = self.smoothplus.hessian(qi - t) self.model.setLinearizationPoint(xi) self.control_model_hessian.mult(z_hat, self.Hi_zhat) gi_inner_zhat = gi.inner(z_hat) # Output of Hessian in z zz_hessian_scale_factor = sprime_i/(1-self.beta)/self.sample_size zz_gradient_scale_factor = sprimeprime_i * gi_inner_zhat/(1-self.beta)/self.sample_size zt_gradient_scale_factor = -sprimeprime_i * t_hat/(1-self.beta)/self.sample_size Hz_hat.axpy(zz_hessian_scale_factor, self.Hi_zhat) Hz_hat.axpy(zz_gradient_scale_factor + zt_gradient_scale_factor, gi) # Output of Hessian in t Ht_hat += sprimeprime_i/(1-self.beta)/self.sample_size * (t_hat - gi_inner_zhat) self.collective.allReduce(Hz_hat, "SUM") Ht_hat_all = self.collective.allReduce(Ht_hat, "SUM") Hzt_hat.set_scalar(Ht_hat_all)
[docs] def gather_samples(self): """ Gather the QoI samples from all processes :return: An array of the sample QoI values :return type: :py:class:`numpy.ndarray` """ q_all = np.zeros(self.sample_size) self.comm_sampler.Allgatherv(self.q_samples, [q_all, self.sample_size_allprocs]) return q_all
[docs] def superquantile(self): """ Evaluate the superquantile using the computed samples :return: Value of the superquantile by sampling .. note:: Assumes :code:`computeComponents` has been called with :code:`order>=0` """ q_all = self.gather_samples() value = sample_superquantile(q_all, self.beta) return value