Source code for soupy.optimization.bfgs

# Copyright (c) 2023, The University of Texas at Austin 
# & Georgia Institute of Technology
#
# All Rights reserved.
# See file COPYRIGHT for details.
#
# This file is part of the SOUPy package. For more information see
# https://github.com/hippylib/soupy/
#
# SOUPy is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License (as published by the Free
# Software Foundation) version 3.0 dated June 2007.

import math
import numpy as np
import dolfin as dl
import sys
import os

import hippylib as hp
from ..modeling.variables import CONTROL


[docs] def BFGSoperator_ParameterList(): parameters = {} parameters["BFGS_damping"] = [0.2, "Damping of BFGS"] parameters["memory_limit"] = [np.inf, "Number of vectors to store in limited memory BFGS"] return hp.ParameterList(parameters)
[docs] def BFGS_ParameterList(): parameters = {} parameters["rel_tolerance"] = [1e-6, "we converge when sqrt(g,g)/sqrt(g_0,g_0) <= rel_tolerance"] parameters["abs_tolerance"] = [1e-12, "we converge when sqrt(g,g) <= abs_tolerance"] parameters["gdm_tolerance"] = [1e-12, "we converge when (g,dm) <= gdm_tolerance"] parameters["max_iter"] = [500, "maximum number of iterations"] parameters["c_armijo"] = [1e-4, "Armijo constant for sufficient reduction"] parameters["max_backtracking_iter"] = [25, "Maximum number of backtracking iterations"] parameters["print_level"] = [0, "Control verbosity of printing screen"] parameters["BFGS_op"] = [BFGSoperator_ParameterList(), "BFGS operator"] return hp.ParameterList(parameters)
[docs] class RescaledIdentity(object): """ Default operator for :code:`H0inv`, corresponds to applying :math:`d0 I` """ def __init__(self, init_vector=None): self.d0 = 1.0 self._init_vector = init_vector
[docs] def init_vector(self, x, dim): if self._init_vector: self._init_vector(x,dim) else: raise
[docs] def solve(self, x, b): """ Applies the operator :math:`d0 I` :param x: solution vector :type x: :py:class:`dolfin.Vector` :param b: right-hand side vector :type b: :py:class:`dolfin.Vector` """ # print("\t\t WITHIN IDENTITY") # print("\t\t", self.d0) # print("\t\t", x, x.get_local()) # print("\t\t", b, b.get_local()) x.zero() x.axpy(self.d0, b)
[docs] class BFGS_operator: def __init__(self, parameters=BFGSoperator_ParameterList()): self.S, self.Y, self.R = [],[],[] self.H0inv = None self.help = None self.help2 = None self.update_scaling = True self.parameters = parameters
[docs] def set_H0inv(self, H0inv): """ Set user-defined operator corresponding to :code:`H0inv` :param H0inv: Fenics operator with method :code:`solve()` """ self.H0inv = H0inv
[docs] def solve(self, x, b): """ Solve system: :math:`H_{\mathrm{bfgs}} x = b` where :math:`H_{\mathrm{bfgs}}` is the approximation to the Hessian build by BFGS. That is, we apply :math:`x = (H_{\mathrm{bfgs}})^{-1} b = H_k b` where :math:`H_k` matrix is BFGS approximation to the inverse of the Hessian. Computation done via double-loop algorithm. :param x: The solution to the system :type x: :py:class:`dolfin.Vector` :param b: The right-hand side of the system :type b: :py:class:`dolfin.Vector` """ A = [] if self.help is None: self.help = b.copy() else: # print("\t WITHIN BFGSop solve, before assignment") # print("\t", x, x.get_local()) # print("\t", self.help, self.help.get_local()) self.help.zero() self.help.axpy(1.0, b) for s, y, r in zip(reversed(self.S), reversed(self.Y), reversed(self.R)): a = r * s.inner(self.help) A.append(a) self.help.axpy(-a, y) # print("\t WITHIN BFGSop solve, near solve") # print("\t", x, x.get_local()) # print("\t", self.help, self.help.get_local()) self.H0inv.solve(x, self.help) # x = H0 * x_copy for s, y, r, a in zip(self.S, self.Y, self.R, reversed(A)): b = r * y.inner(x) x.axpy(a - b, s)
[docs] def update(self, s, y): """ Update BFGS operator with most recent gradient update. To handle potential break from secant condition, update done via damping :param s: The update in medium parameters :type s: :py:class:`dolfin.Vector` :param y: The update in gradient :type y: :py:class:`dolfin.Vector` """ damp = self.parameters["BFGS_damping"] memlim = self.parameters["memory_limit"] if self.help2 is None: self.help2 = y.copy() else: self.help2.zero() sy = s.inner(y) # print("\t Within update") # print("\t", y, y.get_local()) # print("\t", self.help2, self.help2.get_local()) self.solve(self.help2, y) yHy = y.inner(self.help) theta = 1.0 if sy < damp*yHy: theta = (1.0-damp)*yHy/(yHy-sy) s *= theta s.axpy(1-theta, self.help2) sy = s.inner(y) assert(sy > 0.) rho = 1./sy self.S.append(s.copy()) self.Y.append(y.copy()) self.R.append(rho) # if L-BFGS if len(self.S) > memlim: self.S.pop(0) self.Y.pop(0) self.R.pop(0) self.update_scaling = True # re-scale H0 based on earliest secant information if hasattr(self.H0inv, "d0") and self.update_scaling: s0 = self.S[0] y0 = self.Y[0] d0 = s0.inner(y0) / y0.inner(y0) self.H0inv.d0 = d0 self.update_scaling = False return theta
[docs] class BFGS: """ Implement BFGS technique with backtracking inexact line search and damped updating \ See `Nocedal & Wright (06), ch.6.2, ch.7.3, ch.18.3` \ The user must provide a :py:class:`ControlCostFunctional` that describes the forward problem, \ cost functional, and all the \ derivatives for the gradient and the Hessian. """ termination_reasons = [ "Maximum number of Iteration reached", #0 "Norm of the gradient less than tolerance", #1 "Maximum number of backtracking reached", #2 "Norm of (g, da) less than tolerance" #3 ] def __init__(self, cost_functional, parameters=BFGS_ParameterList()): """ Initialize the BFGS solver. Type :code:`BFGS_ParameterList().showMe()` for default parameters and their description :param cost_functional: The cost functional :param parameters: The parameters for the BFGS solver """ self.cost_functional = cost_functional self.parameters = parameters self.it = 0 self.converged = False self.ncalls = 0 self.reason = 0 self.final_grad_norm = 0 self.BFGSop = BFGS_operator(self.parameters["BFGS_op"])
[docs] def solve(self, z, H0inv=RescaledIdentity(), box_bounds=None, constraint_projection=None): """ Solve the constrained optimization problem with initial guess :code:`z` :param z: The initial guess :type z: :py:class:`dolfin.Vector` :param H0inv: Initial approximation of the inverse of the Hessian. Has optional method :code:`update(x)` that will update the operator :param box_bounds: Bound constraint. A list with two entries (min and max). Can be either a scalar value or a :code:`dolfin.Vector` of the same size as :code:`z` :type box_bounds: list :param constraint_projection: Alternative projectable constraint :type constraint_projection: :py:class:`ProjectableConstraint` :return: The optimization solution :code:`z` and a dictionary of information .. note:: The input :code:`z` will be overwritten """ if box_bounds is not None: if hasattr(box_bounds[0], "get_local"): param_min = box_bounds[0].get_local() #Assume it is a dolfin vector else: param_min = box_bounds[0]*np.ones_like(z.get_local()) #Assume it is a scalar if hasattr(box_bounds[1], "get_local"): param_max = box_bounds[1].get_local() #Assume it is a dolfin vector else: param_max = box_bounds[1]*np.ones_like(z.get_local()) #Assume it is a scalar rel_tol = self.parameters["rel_tolerance"] abs_tol = self.parameters["abs_tolerance"] max_iter = self.parameters["max_iter"] # ls_list = self.parameters[self.parameters["globalization"]] c_armijo = self.parameters["c_armijo"] max_backtracking_iter = self.parameters["max_backtracking_iter"] print_level = self.parameters["print_level"] self.BFGSop.parameters["BFGS_damping"] = self.parameters["BFGS_op"]["BFGS_damping"] self.BFGSop.parameters["memory_limit"] = self.parameters["BFGS_op"]["memory_limit"] self.BFGSop.set_H0inv(H0inv) # Initialize vectors zhat = self.cost_functional.generate_vector(CONTROL) g = self.cost_functional.generate_vector(CONTROL) dz = self.cost_functional.generate_vector(CONTROL) z_star = self.cost_functional.generate_vector(CONTROL) self.it = 0 self.converged = False self.ncalls += 1 costs = [] cost_old = self.cost_functional.cost(z, order=1) costs.append(cost_old) gradnorms = [] n_backtracks = [] if print_level >= 0: print( "\n{0:3} {1:15} {2:15} {3:15} {4:15}".format( "It", "cost", "||g||L2", "||dz||L2", "alpha") ) print( "{0:3d} {1:15e}".format( self.it, cost_old)) while (self.it < max_iter) and (self.converged == False): if hasattr(self.BFGSop.H0inv, "setPoint"): self.BFGSop.H0inv.setPoint(z) g_old = g.copy() gradnorm = self.cost_functional.grad(g) if gradnorm is None: gradnorm = np.sqrt(g.inner(g)) gradnorms.append(gradnorm) # Update BFGS if self.it > 0: s = zhat * alpha y = g - g_old theta = self.BFGSop.update(s, y) else: gradnorm_ini = gradnorm tol = max(abs_tol, gradnorm_ini*rel_tol) print("Tolerance: ", tol) theta = 1.0 # check if solution is reached if (gradnorm < tol) and (self.it > 0): self.converged = True self.reason = 1 break self.it += 1 self.BFGSop.solve(zhat, -g) # backtracking line-search alpha = 1.0 descent = False n_backtrack = 0 g_zhat = g.inner(zhat) while not descent and n_backtrack < max_backtracking_iter: # Update the optimization variable z_star.zero() z_star.axpy(1.0, z) z_star.axpy(alpha, zhat) if box_bounds is not None: z_star.set_local(np.maximum(z_star.get_local(), param_min)) z_star.set_local(np.minimum(z_star.get_local(), param_max)) if constraint_projection is not None: constraint_projection.project(z_star) # compute the cost cost_new = self.cost_functional.cost(z_star, order=0) if print_level >= 1: print("\tBacktracking cost for step size %g: \t%g" %(alpha, cost_new)) # Check if armijo conditions are satisfied if (cost_new < cost_old + alpha * c_armijo * g_zhat) or (-g_zhat <= self.parameters["gdm_tolerance"]): descent = True else: n_backtrack += 1 alpha *= 0.5 n_backtracks.append(n_backtrack) # Compute size of step dz.zero() dz.axpy(1.0, z_star) dz.axpy(-1.0, z) dz_norm = np.sqrt(dz.inner(dz)) # Update z and compute cost z.zero() z.axpy(1.0, z_star) cost_old = self.cost_functional.cost(z, order=1) costs.append(cost_old) if print_level >= 0: print( "{:3d} {:15e} {:15e} {:14e} {:14e} {:14e}".format( self.it, cost_old, dz_norm, gradnorm, alpha, theta)) if n_backtrack == max_backtracking_iter: self.converged = False self.reason = 2 break if -g_zhat <= self.parameters["gdm_tolerance"]: self.converged = True self.reason = 3 break result = dict() result["costs"] = costs result["gradnorms"] = gradnorms result["n_backtracks"] = n_backtracks result["termination_reason"] = BFGS.termination_reasons[self.reason] if print_level >= 0: print(BFGS.termination_reasons[self.reason]) self.final_grad_norm = gradnorm self.final_cost = cost_old return z, result