localizationSpectral_mod sourceΒΆ

   1MODULE localizationSpectral_mod
   2  ! MODULE localizationSpectral_mod (prefix='lsp' category='2. B and R matrices')
   3  !
   4  !:Purpose:  To compute localized 3D gridpoint amplitude fields for each
   5  !           ensemble member from a given (1D) control vector of
   6  !           SPECTRAL ELEMENTS
   7  !
   8  use midasMpi_mod
   9  use utilities_mod
  10  use globalSpectralTransform_mod
  11  use lamSpectralTransform_mod
  12  use localizationFunction_mod
  13  use horizontalCoord_mod
  14  use earthConstants_mod
  15  use ensembleStatevector_mod
  16  implicit none
  17  save
  18  private
  19
  20  ! public derived type
  21  public :: struct_lsp
  22  ! public procedures
  23  public :: lsp_setup, lsp_Lsqrt, lsp_LsqrtAd, lsp_finalize
  24  public :: lsp_reducetompilocal, lsp_reducetompilocal_r4
  25  public :: lsp_expandtompiglobal, lsp_expandtompiglobal_r4
  26
  27  type :: struct_lsp
  28     logical             :: initialized = .false.
  29     real(8),allocatable :: LhorizSqrt(:,:)
  30     real(8),allocatable :: LvertSqrt(:,:)
  31     type(struct_lst)    :: lst    ! Spectral transform Parameters
  32     real(8)             :: dlon
  33     integer             :: gstID
  34     integer, pointer    :: ilaList_mpiglobal(:)
  35     integer, pointer    :: ilaList_mpilocal(:)
  36     integer             :: cvDim_mpilocal
  37     integer             :: cvDim_mpiglobal
  38     integer             :: nla_mpilocal
  39     integer             :: nla_mpiglobal
  40     integer             :: ntrunc
  41     integer             :: nphase
  42     integer             :: ni, nj
  43     integer             :: myLatBeg, myLatEnd
  44     integer             :: myLonBeg, myLonEnd
  45     integer             :: nEnsOverDimension, nEns
  46     integer             :: nLev
  47     integer             :: mymBeg, mymEnd, mymSkip
  48     integer             :: mynBeg, mynEnd, mynSkip
  49     logical             :: global
  50  end type struct_lsp
  51
  52  real(8), parameter :: rsq2 = sqrt(2.0d0)
  53
  54  logical, parameter :: verbose = .false.
  55
  56CONTAINS
  57
  58  !--------------------------------------------------------------------------
  59  ! lsp_setup
  60  !--------------------------------------------------------------------------
  61  SUBROUTINE lsp_setup(hco_loc, nEns, nLev, vertLocation, ntrunc, locType, &
  62                       locMode, horizLengthScale1, horizLengthScale2, vertLengthScale, &
  63                       cvDim_out, lsp, nEnsOverDimension_out)
  64    implicit none
  65
  66    ! Arguments:
  67    type(struct_lsp), pointer, intent(out) :: lsp
  68    type(struct_hco), pointer, intent(in)  :: hco_loc
  69    integer,                   intent(in)  :: nEns
  70    integer,                   intent(in)  :: nLev
  71    integer,                   intent(in)  :: nTrunc
  72    real(8),                   intent(in)  :: vertLocation(nLev)
  73    real(8),                   intent(in)  :: horizLengthScale1
  74    real(8),                   intent(in)  :: horizLengthScale2
  75    real(8),                   intent(in)  :: vertLengthScale
  76    character(len=*),          intent(in)  :: locType
  77    character(len=*),          intent(in)  :: locMode
  78    integer,                   intent(out) :: cvDim_out
  79    integer,                   intent(out) :: nEnsOverDimension_out
  80
  81    ! Locals:
  82    integer :: latPerPE, latPerPEmax, lonPerPE, lonPerPEmax, mymCount, mynCount, mIndex, nIndex, maxMyNla
  83    integer :: myMemberBeg, myMemberEnd, myMemberCount, maxMyMemberCount, ierr
  84
  85    if (verbose) write(*,*) 'Entering lsp_Setup'
  86
  87    !
  88    !- 1.  Allocation
  89    !
  90    if (.not. associated(lsp)) then
  91      allocate(lsp)
  92    else
  93      call utl_abort('lsp_setup: supplied lsp must be null')
  94    endif
  95
  96    !
  97    !- 2.  Settings
  98    !
  99    
 100    !- 2.1 Mode
 101    if ( trim(locType) == 'spectral') then
 102       if (mmpi_myid == 0) write(*,*)
 103       if (mmpi_myid == 0) write(*,*) 'lsp_setup: LocType = ', trim(locType)
 104    else
 105       write(*,*)
 106       write(*,*) 'locType = ', trim(locType)
 107       call utl_abort('lsp_setup: unknown locType')
 108    end if
 109
 110    !- 2.2 Ensemble members and Levels
 111    lsp%nEns=nEns
 112    lsp%nLev=nLev
 113 
 114    !- 2.3 Horizontal grid
 115    lsp%ni   = hco_loc%ni
 116    lsp%nj   = hco_loc%nj
 117
 118    call mmpi_setup_latbands(lsp%nj, latPerPE, latPerPEmax, lsp%myLatBeg, lsp%myLatEnd)
 119    call mmpi_setup_lonbands(lsp%ni, lonPerPE, lonPerPEmax, lsp%myLonBeg, lsp%myLonEnd)
 120
 121    lsp%global = hco_loc%global
 122    if (lsp%global) then
 123      if (mmpi_myid == 0) write(*,*)
 124      if (mmpi_myid == 0) write(*,*) 'lsp_setup: GLOBAL mode activated'
 125    else
 126      if (mmpi_myid == 0) write(*,*)
 127      if (mmpi_myid == 0) write(*,*) 'lsp_setup: LAM mode activated'
 128    end if
 129
 130    !- 2.4 Spectral Transform
 131    lsp%nTrunc=nTrunc
 132    lsp%dlon = hco_loc%dlon
 133
 134    call mmpi_setup_levels(lsp%nEns,myMemberBeg,myMemberEnd,myMemberCount)
 135    call rpn_comm_allreduce(myMemberCount, maxMyMemberCount, &
 136                              1,"MPI_INTEGER","mpi_max","GRID",ierr)
 137    nEnsOverDimension_out     = mmpi_npex * maxMyMemberCount
 138    lsp%nEnsOverDimension = nEnsOverDimension_out
 139
 140    if (lsp%global) then
 141       ! Global Mode
 142       lsp%nphase = 2
 143       lsp%nla_mpiglobal = (lsp%ntrunc+1)*(lsp%ntrunc+2)/2
 144       
 145       lsp%gstID = gst_setup(lsp%ni,lsp%nj,lsp%ntrunc,lsp%nEnsOverDimension)
 146       if (mmpi_myid == 0) write(*,*) 'lsp_setup: returned value of gstID = ',lsp%gstID
 147
 148    else
 149       ! LAM mode
 150       call lst_Setup(lsp%lst,                                          & ! OUT
 151                      lsp%ni, lsp%nj, lsp%dlon, lsp%ntrunc, & ! IN
 152                      'LatLonMN', maxlevels_opt=lsp%nEnsOverDimension,  & ! IN
 153                       gridDataOrder_opt='kij')                               ! IN
 154
 155       lsp%nphase       = lsp%lst%nphase
 156       lsp%nla_mpilocal = lsp%lst%nla
 157       
 158    end if
 159
 160    !- 2.5 Distribute control vector over mpi processes according to member index and m
 161    call mmpi_setup_m(lsp%ntrunc,lsp%mymBeg,lsp%mymEnd,lsp%mymSkip,mymCount)
 162    call mmpi_setup_n(lsp%ntrunc,lsp%mynBeg,lsp%mynEnd,lsp%mynSkip,mynCount)
 163
 164    if (lsp%global) then
 165       ! compute arrays to facilitate conversions between ila_mpilocal and ila_mpiglobal
 166       call gst_ilaList_mpiglobal(lsp%ilaList_mpiglobal,lsp%nla_mpilocal,maxMyNla,lsp%gstID, &
 167                                  lsp%mymBeg,lsp%mymEnd,lsp%mymSkip,lsp%mynBeg, &
 168                                  lsp%mynEnd,lsp%mynSkip)
 169       call gst_ilaList_mpilocal(lsp%ilaList_mpilocal,lsp%gstID,lsp%mymBeg,lsp%mymEnd, &
 170                                 lsp%mymSkip,lsp%mynBeg,lsp%mynEnd,lsp%mynSkip)
 171       write(*,*) 'lsp_setup: nla_mpiglobal, nla_mpilocal, maxMyNla = ', lsp%nla_mpiglobal, &
 172            lsp%nla_mpilocal, maxMyNla
 173    end if
 174
 175    !- 2.6 Localization
 176    call lfn_setup('FifthOrder') ! IN
 177
 178    call setupLocalizationMatrices(lsp, horizLengthScale1, horizLengthScale2, & ! IN
 179                                   vertLengthScale, vertLocation, locMode)  ! IN
 180
 181    if (lsp%global) then
 182       lsp%cvDim_mpiglobal = (lsp%ntrunc+1)*(lsp%ntrunc+1)*lsp%nLev*lsp%nEns
 183       lsp%cvDim_mpilocal  = 0
 184       
 185       do mIndex = lsp%mymBeg, lsp%mymEnd, lsp%mymSkip
 186          do nIndex = lsp%mynBeg, lsp%mynEnd, lsp%mynSkip
 187             if (mIndex.le.nIndex) then
 188                if (mIndex == 0) then
 189                   ! controlVector only contains real part for mIndex=0
 190                   lsp%cvDim_mpilocal = lsp%cvDim_mpilocal + 1*lsp%nLev*lsp%nEns
 191                else
 192                   ! controlVector contains real and imag parts for mIndex>0
 193                   lsp%cvDim_mpilocal = lsp%cvDim_mpilocal + 2*lsp%nLev*lsp%nEns
 194                end if
 195             end if
 196          end do
 197       end do
 198    else
 199       lsp%cvDim_mpiglobal = lsp%lst%nlaGlobal * lsp%nphase * lsp%nLev * lsp%nEns
 200       lsp%cvDim_mpilocal  = lsp%nla_mpilocal      * lsp%nphase * lsp%nLev * lsp%nEns
 201       write(*,*) 'cvDim_mpiglobal ', lsp%cvDim_mpiglobal, lsp%lst%nlaGlobal, &
 202            lsp%nphase, lsp%nLev, lsp%nEns
 203       write(*,*) 'cvDim_mpilocal  ', lsp%cvDim_mpilocal, lsp%nla_mpilocal, &
 204            lsp%nphase, lsp%nLev, lsp%nEns
 205    end if
 206    cvDim_out = lsp%cvDim_mpilocal
 207
 208    !
 209    !- 3.  Ending
 210    !
 211    lsp%initialized = .true.
 212
 213  END SUBROUTINE lsp_setup
 214
 215  !--------------------------------------------------------------------------
 216  ! setupLocalizationMatrices
 217  !--------------------------------------------------------------------------
 218  SUBROUTINE setupLocalizationMatrices(lsp,horizLengthScale1,horizLengthScale2 ,vertLengthScale,&
 219                                       vertLocation,localizationMode)
 220    implicit none
 221
 222    ! Arguments:
 223    type(struct_lsp), pointer, intent(inout) :: lsp
 224    real(8),                   intent(in)    :: horizLengthScale1
 225    real(8),                   intent(in)    :: horizLengthScale2 
 226    real(8),                   intent(in)    :: vertLengthScale
 227    real(8),                   intent(in)    :: vertLocation(lsp%nLev)
 228    character(len=*),          intent(in)    :: localizationMode
 229
 230    ! Locals:
 231    real(8)  :: zr,zcorr
 232    integer :: levIndex,levIndex1,levIndex2,ierr
 233    real(8) :: horizLengthScaleAll(lsp%nLev)
 234
 235    if (verbose) write(*,*) 'Entering setupLocalizationMatrices'
 236
 237    !
 238    !- 1.  Allocation
 239    !
 240    allocate(lsp%LhorizSqrt(0:lsp%nTrunc,lsp%nLev),stat=ierr)
 241    if (ierr.ne.0 ) then
 242       write(*,*) 'lsp_setup: Problem allocating memory! id=9',ierr
 243       call utl_abort('setupLocalizationMatrices')
 244    end if
 245    
 246    allocate(lsp%LvertSqrt(lsp%nLev,lsp%nLev),stat=ierr)
 247    if (ierr.ne.0 ) then
 248       write(*,*) 'bmatrixEnsemble: Problem allocating memory! id=10',ierr
 249       call utl_abort('setupLocalizationMatrices')
 250    end if
 251    
 252    !
 253    !- 2.  Compute HORIZONTAL localization correlation matrix
 254    !
 255
 256    !  2.1 Determine localization length scale for each vertical level
 257    if ( trim(localizationMode) == 'LevelDependent' .and. horizLengthScale2 > 0.0d0 ) then
 258      ! vertically varying horizontal localization (linear in log P)
 259      do levIndex = 1, lsp%nLev
 260        horizLengthScaleAll(levIndex) = ( horizLengthScale1*( vertLocation(levIndex) - &
 261                                                              vertLocation(1       ) ) +    &
 262                                          horizLengthScale2*( vertLocation(lsp%nLev   ) - &
 263                                                              vertLocation(levIndex) ) ) /  &
 264                                         ( vertLocation(lsp%nLev)-vertLocation(1) )
 265        if (mmpi_myid == 0) then
 266          write(*,*) 'loc: localization length scale (',levIndex,') = ',horizLengthScaleAll(levIndex)
 267        end if
 268      end do
 269    else
 270      ! vertically constant horizontal localization
 271       horizLengthScaleAll(:) = horizLengthScale1
 272    end if
 273    
 274    !- 2.2 Compute the matrix
 275    if (lsp%global) then
 276       call setupGlobalSpectralHLoc(lsp,horizLengthScaleAll) ! IN
 277    else
 278       call setupLamSpectralHLoc(lsp,horizLengthScaleAll) ! IN
 279    end if
 280
 281    !
 282    !- 3.  Compute VERTICAL localization correlation matrix
 283    !
 284    if (vertLengthScale > 0.0d0 .and. lsp%nLev > 1) then
 285
 286      !-  3.1 Calculate 5'th order function
 287      do levIndex1 = 1, lsp%nLev
 288        do levIndex2 = 1, lsp%nLev
 289          ZR = abs(vertLocation(levIndex2) - vertLocation(levIndex1))
 290          zcorr = lfn_response(zr,vertLengthScale)
 291          lsp%LvertSqrt(levIndex1,levIndex2) = zcorr
 292        end do
 293      end do
 294
 295      !- 3.2 Compute sqrt of the matrix
 296      if (vertLengthScale > 0.0d0) then
 297        call utl_matSqrt(lsp%LvertSqrt(1,1),lsp%nLev,1.0d0,.false.)
 298      end if
 299
 300    else
 301      lsp%LvertSqrt(:,:) = 1.d0 ! no vertical localization
 302    end if
 303
 304  END SUBROUTINE setupLocalizationMatrices
 305
 306  !--------------------------------------------------------------------------
 307  ! setupGlobalSpectralHLoc
 308  !--------------------------------------------------------------------------
 309  SUBROUTINE setupGlobalSpectralHLoc(lsp, local_length)
 310    implicit none
 311
 312    ! Arguments:
 313    type(struct_lsp), pointer, intent(inout) :: lsp
 314    real(8),                   intent(in)    :: local_length(lsp%nLev)
 315
 316    ! Locals:
 317    real(8) ::   zlc,zr,zpole,zcorr
 318    ! NOTE: arrays passed to spectral transform are dimensioned as follows
 319    !       gd: lat/lon tiles and sp: member index
 320    real(8) :: sp_mpilocal(lsp%nla_mpilocal,lsp%nphase,lsp%nLev)
 321    real(8) :: sp_mpiglobal(lsp%nla_mpiglobal,lsp%nphase,lsp%nLev)
 322    real(8) :: sp_mympiglobal(lsp%nla_mpiglobal,lsp%nphase,lsp%nLev)
 323    real(8) :: zgd_gst(lsp%myLonBeg:lsp%myLonEnd,lsp%myLatBeg:lsp%myLatEnd,lsp%nLev)
 324    integer :: nIndex,latIndex,lonIndex,levIndex,nsize,ierr
 325    integer :: ila_mpiglobal,jla_mpilocal,gstID2
 326
 327    if (local_length(1).gt.0.0d0) then
 328
 329      gstID2 = gst_setup(lsp%ni,lsp%nj,lsp%ntrunc,lsp%nLev)
 330
 331      zgd_gst(:,:,:) = 0.0d0
 332      do levIndex = 1, lsp%nLev
 333        ! Calculate 5th Order Correlation Functions in Physical Space
 334        zlc = 1000.0d0*local_length(levIndex)
 335        do latIndex = lsp%myLatBeg, lsp%myLatEnd
 336          zr = ec_ra * acos(gst_getrmu(latIndex,gstID2))
 337          zcorr = lfn_response(zr,zlc)
 338          do lonIndex = lsp%myLonBeg, lsp%myLonEnd
 339            zgd_gst(lonIndex,latIndex,levIndex) = zcorr
 340          end do
 341        end do
 342      end do
 343
 344      ! Transform to spectral space
 345      sp_mpilocal(:,:,:) = 0.0d0
 346      call gst_setID(gstID2)
 347      call gst_reespe(sp_mpilocal,zgd_gst)
 348
 349      ! Make mpiglobal in spectral space
 350      sp_mympiglobal(:,:,:) = 0.0d0
 351      do jla_mpilocal = 1, lsp%nla_mpilocal
 352        ila_mpiglobal = lsp%ilaList_mpiglobal(jla_mpilocal)
 353        sp_mympiglobal(ila_mpiglobal,:,:) = sp_mpilocal(jla_mpilocal,:,:)
 354      end do
 355      nsize = lsp%nla_mpiglobal*lsp%nphase*lsp%nLev
 356      sp_mpiglobal(:,:,:) = 0.0d0
 357      call rpn_comm_allreduce(sp_mympiglobal,sp_mpiglobal,nsize,"mpi_double_precision","mpi_sum","GRID",ierr)
 358      
 359      do levIndex = 1, lsp%nLev
 360        do nIndex = 0, lsp%ntrunc
 361          lsp%LhorizSqrt(nIndex,levIndex) = sp_mpiglobal(nIndex+1,1,levIndex)
 362        end do
 363      end do
 364
 365      ! Make sure it's one at the pole
 366      do levIndex = 1, lsp%nLev
 367        do  nIndex = 0, lsp%ntrunc
 368          lsp%LhorizSqrt(nIndex,levIndex) = abs(lsp%LhorizSqrt(nIndex,levIndex))
 369        end do
 370      end do
 371      do levIndex = 1, lsp%nLev
 372        zpole = 0.d0
 373        do  nIndex = 0, lsp%ntrunc
 374          zpole = zpole + lsp%LhorizSqrt(nIndex,levIndex)*sqrt((2.d0*nIndex+1.d0)/2.d0)
 375        end do
 376        if (zpole.le.0.d0) then
 377          write(*,*)'POLE VALUE NEGATIVE IN setupGlobalSpectralHLoc levIndex=',levIndex
 378          call utl_abort('setupGlobalSpectralHLoc')
 379        end if
 380        do nIndex = 0, lsp%ntrunc
 381          lsp%LhorizSqrt(nIndex,levIndex) = lsp%LhorizSqrt(nIndex,levIndex)/zpole
 382        end do
 383      end do
 384
 385      ! Convert back to correlations and take sqrt
 386      do levIndex = 1, lsp%nLev
 387        do nIndex = 0, lsp%ntrunc
 388          lsp%LhorizSqrt(nIndex,levIndex) = sqrt( 0.5d0*lsp%LhorizSqrt(nIndex,levIndex) * &
 389                                                       ((2.0d0/(2.0d0*nIndex+1.0d0))**0.5d0) )
 390        end do
 391      end do
 392
 393    else
 394
 395       ! NO HORIZONTAL LOCALIZATION, set lsp%LhorizSqrt to 1.0 for wavenumber 0
 396       do levIndex = 1, lsp%nLev
 397          lsp%LhorizSqrt(:,levIndex) = 0.0d0
 398          lsp%LhorizSqrt(0,levIndex) = 1.0d0
 399       end do
 400    end if
 401
 402  END SUBROUTINE setupGlobalSpectralHLoc
 403
 404  !--------------------------------------------------------------------------
 405  ! setupLamSpectralHLoc
 406  !--------------------------------------------------------------------------
 407  SUBROUTINE setupLamSpectralHLoc(lsp, local_length)
 408    implicit none
 409
 410    ! Arguments:
 411    type(struct_lsp), pointer, intent(inout) :: lsp
 412    real(8),                   intent(in)    :: local_length(lsp%nLev)
 413
 414    ! Locals:
 415    real(8), allocatable :: sp(:,:,:)
 416    real(8), allocatable :: gd(:,:,:)
 417    real(8), allocatable :: SumWeight(:)
 418    real(8) :: sum
 419    type(struct_lst)     :: lst_hloc    ! Spectral transform Parameters
 420    integer :: k, p, e, ila, totwvnb
 421    character(len=19)   :: kind
 422
 423    if ( local_length(1) > 0.d0 ) then
 424      !
 425      !- 1. Enforce HORIZONTAL LOCALIZATION
 426      !
 427
 428      !- 1.1 Setup a non-MPI spectral transform
 429      call lst_Setup(lst_hloc,                             & ! OUT
 430                     lsp%ni, lsp%nj, lsp%dlon, & ! IN
 431                     lsp%ntrunc, 'NoMpi')                ! IN
 432
 433      !- 1.2 Create a correlation function in physical space
 434      allocate (gd(lsp%ni,lsp%nj,lsp%nLev))
 435
 436      call lfn_CreateBiPerFunction( gd,                                  & ! OUT
 437                                    local_length, lsp%dlon,          & ! IN
 438                                    lsp%ni, lsp%nj, lsp%nLev ) ! IN
 439
 440      !- 1.3 Transform to spectral space
 441      allocate (sp(lst_hloc%nla, lsp%nphase, lsp%nLev))
 442
 443      kind = 'GridPointToSpectral'
 444      call lst_VarTransform(lst_hloc,           & ! IN
 445                            sp,                 & ! OUT
 446                            gd,                 & ! IN
 447                            kind, lsp%nLev)   ! IN
 448 
 449      !- 1.4 Compute band mean
 450      allocate(SumWeight(0:lsp%nTrunc))
 451      SumWeight  (:)  = 0.d0
 452
 453      lsp%LhorizSqrt(:,:) = 0.d0
 454      do totwvnb = 0, lsp%ntrunc
 455         do e = 1, lst_hloc%nePerK(totwvnb)
 456            ila = lst_hloc%ilaFromEK(e,totwvnb)
 457            do p = 1, lst_hloc%nphase
 458               SumWeight(totwvnb) = SumWeight(totwvnb) + lst_hloc%Weight(ila)
 459               do k = 1, lsp%nLev
 460                  lsp%LhorizSqrt(totwvnb,k) = lsp%LhorizSqrt(totwvnb,k) + &
 461                                                             lst_hloc%Weight(ila) * abs(sp(ila,p,k))
 462                end do
 463             end do
 464         end do
 465      end do
 466
 467      do totwvnb = 0, lsp%ntrunc
 468         if (SumWeight(totwvnb) /= 0.d0) then
 469            lsp%LhorizSqrt(totwvnb,:) = lsp%LhorizSqrt(totwvnb,:) / SumWeight(totwvnb)
 470         else
 471            lsp%LhorizSqrt(totwvnb,:) = 0.d0
 472         end if
 473      end do
 474
 475      deallocate(SumWeight)
 476
 477      !- 1.5 Normalization to one of correlation function from spectral densities: Part 1
 478      !$OMP PARALLEL DO PRIVATE (totwvnb,k,sum)
 479      do k = 1, lsp%nLev
 480         sum = 0.0d0
 481         do totwvnb = 0, lsp%ntrunc
 482            sum = sum + real(totwvnb,8) * lsp%LhorizSqrt(totwvnb,k)
 483         end do
 484         do totwvnb = 0, lsp%ntrunc
 485            if ( sum /= 0.0d0 ) then
 486               lsp%LhorizSqrt(totwvnb,k) = lsp%LhorizSqrt(totwvnb,k) / sum
 487            else
 488               lsp%LhorizSqrt(totwvnb,k) = 0.d0
 489            end if
 490         end do
 491      end do
 492      !$OMP END PARALLEL DO
 493
 494      !- 1.6 Normalization to one of correlation function from spectral densities: Part 2
 495
 496      !- 1.6.1 Spectral transform of a delta function (at the center of the domain)
 497      gd(:,:,:) = 0.d0
 498      gd(lsp%ni/2,lsp%nj/2,:) = 1.d0
 499
 500      kind = 'GridPointToSpectral'
 501      call lst_VarTransform(lst_hloc,           & ! IN
 502                            sp,                 & ! OUT
 503                            gd,                 & ! IN
 504                            kind, lsp%nLev )  ! IN
 505
 506      !- 1.6.2 Apply the correlation function
 507      !$OMP PARALLEL DO PRIVATE (totwvnb,e,ila,p,k)
 508      do totwvnb = 0, lsp%ntrunc
 509         do e = 1, lst_hloc%nePerK(totwvnb)
 510            ila = lst_hloc%ilaFromEK(e,totwvnb)
 511            do p = 1, lsp%nphase
 512               do k = 1, lsp%nLev
 513                  sp(ila,p,k) = sp(ila,p,k) * lsp%LhorizSqrt(totwvnb,k) * &
 514                                lst_hloc%NormFactor(ila,p) * lst_hloc%NormFactorAd(ila,p)
 515               end do
 516            end do
 517         end do
 518      end do
 519      !$OMP END PARALLEL DO
 520
 521      !- 1.6.3 Move back to physical space
 522      kind = 'SpectralToGridPoint'
 523      call lst_VarTransform(lst_hloc,           & ! IN
 524                            sp,                 & ! IN
 525                            gd,                 & ! OUT
 526                            kind, lsp%nLev )  ! IN
 527
 528      !- 1.6.4 Normalize to 1
 529      do k = 1, lsp%nLev
 530         if ( gd(lsp%ni/2,lsp%nj/2,k) <= 0.d0 ) then
 531            write(*,*) 'setupLamSpectralHLoc: Problem in normalization ',gd(lsp%ni/2,lsp%nj/2,k)
 532            call utl_abort('setupLamSpectralHLoc')
 533         end if
 534         if ( mmpi_myid == 0 ) then
 535           write(*,*) 'setupLamSpectralHLoc: Normalization factor = ', k, gd(lsp%ni/2,lsp%nj/2,k), 1.d0 / gd(lsp%ni/2,lsp%nj/2,k)
 536         end if
 537         lsp%LhorizSqrt(:,k) = lsp%LhorizSqrt(:,k) / gd(lsp%ni/2,lsp%nj/2,k)
 538      end do
 539
 540      !- 1.7 Take sqrt
 541      lsp%LhorizSqrt(:,:) = sqrt(lsp%LhorizSqrt(:,:))
 542
 543      deallocate(sp)
 544      deallocate(gd)
 545
 546    else
 547      !
 548      !- 2. NO HORIZONTAL LOCALIZATION: set ensLocal%LhorizSqrt to 1.0 for wavenumber 0
 549      !
 550      lsp%LhorizSqrt(:,:) = 0.0d0
 551      lsp%LhorizSqrt(0,:) = 1.0d0
 552    end if
 553
 554  END SUBROUTINE setupLamSpectralHLoc
 555
 556  !--------------------------------------------------------------------------
 557  ! lsp_Lsqrt
 558  !--------------------------------------------------------------------------
 559  SUBROUTINE lsp_Lsqrt(lsp, controlVector, ensAmplitude, stepIndex)
 560    implicit none
 561
 562    ! Arguments:
 563    type(struct_lsp), pointer, intent(inout) :: lsp
 564    integer,                   intent(in)    :: stepIndex
 565    real(8),                   intent(in)    :: controlVector(lsp%cvDim_mpilocal)
 566    type(struct_ens),          intent(inout) :: ensAmplitude
 567
 568    ! Locals:
 569    integer :: levIndex1,levIndex2,jla,p,memberIndex
 570    integer :: levIndex, latIndex
 571    character(len=19) :: kind
 572    real(8) ,allocatable :: sp_hLoc(:,:,:,:),sp_vhLoc(:,:,:,:) 
 573    real(8), pointer :: ensAmplitude_oneLev(:,:,:,:)
 574
 575    allocate(sp_vhLoc(lsp%nla_mpilocal,lsp%nphase,lsp%nLev,lsp%nEnsOverDimension))
 576    allocate(sp_hLoc (lsp%nla_mpilocal,lsp%nphase,lsp%nLev,lsp%nEns             ))
 577
 578    if (verbose) write(*,*) 'Entering lsp_Lsqrt'
 579    call lsp_check(lsp)
 580
 581    !
 582    !- 1.  Horizontal Localization
 583    !
 584    if (lsp%global) then
 585       call globalSpectralHLoc( lsp,           & ! IN
 586                                sp_hLoc,      & ! OUT
 587                                controlVector ) ! IN
 588    else
 589       call lamSpectralHLoc( lsp,           & ! IN
 590                             sp_hLoc,      & ! OUT
 591                             controlVector ) ! IN
 592    end if
 593
 594    !
 595    !- 2.  Vertical localization
 596    !
 597    if (lsp%nEnsOverDimension > lsp%nEns) then
 598       sp_vhLoc(:,:,:,lsp%nEns+1:lsp%nEnsOverDimension) = 0.0d0
 599    end if
 600
 601    !$OMP PARALLEL DO PRIVATE (memberIndex,levIndex1,levIndex2,p,jla)
 602    do memberIndex = 1, lsp%nEns
 603      sp_vhLoc(:,:,:,memberIndex) = 0.0d0
 604      do levIndex1 = 1, lsp%nLev
 605        do levIndex2 = 1, lsp%nLev
 606          do p = 1, lsp%nphase
 607            do jla = 1, lsp%nla_mpilocal
 608              sp_vhLoc(jla,p,levIndex1,memberIndex) = sp_vhLoc(jla,p,levIndex1,memberIndex) +  &
 609                          lsp%LvertSqrt(levIndex1,levIndex2)*sp_hLoc(jla,p,levIndex2,memberIndex)
 610            end do
 611          end do
 612        end do
 613      end do
 614    end do
 615    !$OMP END PARALLEL DO
 616
 617    !
 618    !- 3.  Transform to gridpoint space all ensemble amplitudes
 619    !
 620    do levIndex = 1, lsp%nLev ! loop over all levels for the amplitudes
 621
 622      ensAmplitude_oneLev => ens_getOneLev_r8(ensAmplitude,levIndex)
 623
 624      !$OMP PARALLEL DO PRIVATE (latIndex)
 625       do latIndex = lsp%myLatBeg, lsp%myLatEnd
 626          ensAmplitude_oneLev(:,stepIndex,:,latIndex) = 0.0d0
 627       end do
 628       !$OMP END PARALLEL DO
 629
 630       if (lsp%global) then
 631          call gst_setID(lsp%gstID)
 632          call gst_speree_kij(sp_vhLoc(:,:,levIndex,:),ensAmplitude_oneLev(:,stepIndex,:,:))
 633       else
 634          kind = 'SpectralToGridPoint'
 635          call lst_VarTransform(lsp%lst,                          & ! IN
 636                                sp_vhLoc(:,:,levIndex,:),             & ! IN
 637                                ensAmplitude_oneLev(:,stepIndex,:,:), & ! OUT
 638                                kind, lsp%nEnsOverDimension)        ! IN
 639       end if
 640
 641    end do ! Loop on levels
 642
 643    deallocate(sp_hLoc )
 644    deallocate(sp_vhLoc)
 645    
 646  END SUBROUTINE lsp_Lsqrt
 647
 648  !--------------------------------------------------------------------------
 649  ! globalSpectralHLoc
 650  !--------------------------------------------------------------------------
 651  SUBROUTINE globalSpectralHLoc(lsp, sp_all, controlVector)
 652    implicit none
 653
 654    ! Arguments:
 655    type(struct_lsp), pointer, intent(inout) :: lsp
 656    real(8),                   intent(in)    :: controlVector(lsp%cvDim_mpilocal)
 657    real(8),                   intent(out)   :: sp_all(lsp%nla_mpilocal,lsp%nphase,lsp%nLev,lsp%nEns)
 658
 659    ! Locals:
 660    integer :: levIndex, mIndex, nIndex, ila_mpilocal, ila_mpiglobal, dimIndex, memberIndex 
 661
 662    if (verbose) write(*,*) 'Entering globalSpectralHloc'
 663    call lsp_check(lsp)
 664
 665    dimIndex = 0
 666
 667    do memberIndex = 1, lsp%nEns
 668
 669      do levIndex = 1, lsp%nLev
 670        do mIndex = lsp%mymBeg, lsp%mymEnd, lsp%mymSkip
 671          do nIndex = lsp%mynBeg, lsp%mynEnd, lsp%mynSkip
 672            if (mIndex .le. nIndex) then
 673
 674              ila_mpiglobal = gst_getnind(mIndex,lsp%gstID) + nIndex - mIndex
 675              ila_mpilocal  = lsp%ilaList_mpilocal(ila_mpiglobal)
 676              if (mIndex == 0) then
 677                ! controlVector only contain real part for mIndex=0
 678                dimIndex = dimIndex + 1
 679                sp_all(ila_mpilocal,1,levIndex,memberIndex) = controlVector(dimIndex)*lsp%LhorizSqrt(nIndex,levIndex)*rsq2
 680                sp_all(ila_mpilocal,2,levIndex,memberIndex) = 0.0d0
 681              else
 682                ! controlVector contains real and imag parts for mIndex>0
 683                dimIndex = dimIndex + 1
 684                sp_all(ila_mpilocal,1,levIndex,memberIndex) = controlVector(dimIndex)*lsp%LhorizSqrt(nIndex,levIndex)
 685                dimIndex = dimIndex + 1
 686                sp_all(ila_mpilocal,2,levIndex,memberIndex) = controlVector(dimIndex)*lsp%LhorizSqrt(nIndex,levIndex)
 687              end if
 688
 689            end if
 690          end do
 691        end do
 692      end do
 693
 694      if (dimIndex.gt.lsp%cvDim_mpilocal) then
 695        write(*,*) 'loc globalSpectralHLoc: dimIndex > cvDim_mpilocal! ',dimIndex,memberIndex,lsp%cvDim_mpilocal
 696        call utl_abort('globalSpectralHLoc')
 697      end if
 698
 699    end do
 700
 701  END SUBROUTINE globalSpectralHLoc
 702
 703  !--------------------------------------------------------------------------
 704  ! lamSpectralHLoc
 705  !--------------------------------------------------------------------------
 706  SUBROUTINE lamSpectralHLoc(lsp, sp_all, controlVector)
 707    implicit none
 708
 709    ! Arguments:
 710    type(struct_lsp), pointer, intent(inout) :: lsp
 711    real(8),                   intent(in)    :: controlVector(lsp%cvDim_mpilocal)
 712    real(8),                   intent(out)   :: sp_all(lsp%nla_mpilocal,lsp%nphase,lsp%nLev,lsp%nEns)
 713
 714    ! Locals:
 715    integer :: levIndex,jla, dimIndex, memberIndex, p 
 716
 717    if (verbose) write(*,*) 'Entering lamSpectralHloc'
 718    call lsp_check(lsp)
 719
 720    !
 721    !- Reshape + Horizontal localization + Scaling (parseval)
 722    !
 723    dimIndex = 0
 724
 725    do memberIndex = 1, lsp%nEns
 726
 727       do levIndex = 1, lsp%nLev
 728         do jla = 1, lsp%nla_mpilocal
 729            do p = 1, lsp%nphase
 730              dimIndex = dimIndex + 1
 731              sp_all(jla,p,levIndex,memberIndex) = controlVector(dimIndex)           * &
 732                                                lsp%LhorizSqrt(lsp%lst%k(jla),levIndex) * &
 733                                                lsp%lst%NormFactor(jla,p)
 734            end do
 735         end do
 736       end do
 737       if (dimIndex > lsp%cvDim_mpilocal ) then
 738          write(*,*) 'loc lamSpectralHLoc: dimIndex > cvDim! ',dimIndex,memberIndex,lsp%cvDim_mpilocal
 739          call utl_abort('lamSpectralHLoc')
 740       end if
 741
 742    end do
 743
 744  END SUBROUTINE lamSpectralHLoc
 745  
 746  !--------------------------------------------------------------------------
 747  ! spectralLocalizationSqrtAd
 748  !--------------------------------------------------------------------------
 749  SUBROUTINE lsp_LsqrtAd(lsp, ensAmplitude, controlVector, stepIndex)
 750    implicit none
 751
 752    ! Arguments:
 753    type(struct_lsp), pointer, intent(inout) :: lsp
 754    integer,                   intent(in)    :: stepIndex
 755    real(8),                   intent(out)   :: controlVector(lsp%cvDim_mpilocal)
 756    type(struct_ens),          intent(in)    :: ensAmplitude
 757
 758    ! Locals:
 759    integer :: levIndex1,levIndex2,jla,memberIndex,p
 760    integer :: levIndex
 761    character(len=19) :: kind
 762    real(8) ,allocatable :: sp_hLoc(:,:,:,:),sp_vhLoc(:,:,:,:)
 763    real(8), pointer :: ensAmplitude_oneLev(:,:,:,:)
 764
 765    if (verbose) write(*,*) 'Entering lsp_LsqrtAd'
 766    call lsp_check(lsp)
 767
 768    allocate(sp_vhLoc(lsp%nla_mpilocal,lsp%nphase,lsp%nLev,lsp%nEnsOverDimension))
 769    allocate(sp_hLoc (lsp%nla_mpilocal,lsp%nphase,lsp%nLev,lsp%nEns             ))
 770
 771    !
 772    !- 3.  Transform to gridpoint space all ensemble amplitudes
 773    !
 774    do levIndex = 1, lsp%nLev ! loop over all levels for the amplitudes
 775
 776      ensAmplitude_oneLev => ens_getOneLev_r8(ensAmplitude,levIndex)
 777
 778      sp_vhLoc(:,:,levIndex,:) = 0.d0 ! needed, not everything is set
 779
 780      if (lsp%global) then
 781        call gst_setID(lsp%gstID)
 782        call gst_speree_kij_ad(sp_vhLoc(:,:,levIndex,:),ensAmplitude_oneLev(:,stepIndex,:,:))
 783      else
 784        kind = 'GridPointToSpectral'
 785        call lst_VarTransform(lsp%lst,                          & ! IN
 786                              sp_vhLoc(:,:,levIndex,:),             & ! OUT
 787                              ensAmplitude_oneLev(:,stepIndex,:,:), & ! IN
 788                              kind, lsp%nEnsOverDimension )       ! IN
 789        end if
 790
 791     end do
 792
 793     !
 794     !- 2.  Vertical Localization
 795     !
 796     !$OMP PARALLEL DO PRIVATE (memberIndex,levIndex1,levIndex2,p,jla)
 797     do memberIndex = 1, lsp%nEns
 798        sp_hLoc(:,:,:,memberIndex) = 0.0d0
 799        do levIndex1 = 1, lsp%nLev
 800           do levIndex2 = 1, lsp%nLev
 801              do p = 1, lsp%nphase
 802                 do jla = 1, lsp%nla_mpilocal
 803                    sp_hLoc(jla,p,levIndex2,memberIndex) = sp_hLoc(jla,p,levIndex2,memberIndex) +  & 
 804                         lsp%LvertSqrt(levIndex2,levIndex1)*sp_vhLoc(jla,p,levIndex1,memberIndex)
 805                 end do
 806              end do
 807           end do
 808        end do
 809     end do
 810     !$OMP END PARALLEL DO
 811
 812    !
 813    !- 1.  Horizontal Localization
 814    !
 815    if (lsp%global) then
 816      call globalSpectralHLocAd( lsp, sp_hLoc,  & ! IN
 817                                 controlVector ) ! OUT
 818    else
 819      call lamSpectralHLocAd( lsp, sp_hLoc,  & ! IN
 820                              controlVector ) ! OUT
 821    end if
 822
 823    deallocate(sp_hLoc )
 824    deallocate(sp_vhLoc)
 825
 826  END SUBROUTINE lsp_LsqrtAd
 827
 828  !--------------------------------------------------------------------------
 829  ! globalSpectralHLocAd
 830  !--------------------------------------------------------------------------
 831  SUBROUTINE globalSpectralHLocAd(lsp, sp_all, controlVector)
 832    implicit none
 833
 834    ! Arguments:
 835    type(struct_lsp), pointer, intent(inout) :: lsp
 836    real(8),                   intent(out)   :: controlVector(lsp%cvDim_mpilocal)
 837    real(8),                   intent(in)    :: sp_all(lsp%nla_mpilocal,lsp%nphase,lsp%nLev,lsp%nEns)
 838
 839    ! Locals:
 840    integer :: levIndex, mIndex, nIndex, ila_mpilocal, ila_mpiglobal, dimIndex, memberIndex 
 841
 842    if (verbose) write(*,*) 'Entering globalSpectralHLocAd'
 843    call lsp_check(lsp)
 844
 845    dimIndex = 0
 846
 847    do memberIndex = 1, lsp%nEns
 848
 849       do levIndex = 1, lsp%nLev
 850          do mIndex = lsp%mymBeg, lsp%mymEnd, lsp%mymSkip
 851            do nIndex = lsp%mynBeg, lsp%mynEnd, lsp%mynSkip
 852              if (mIndex .le. nIndex) then
 853
 854                ila_mpiglobal = gst_getnind(mIndex,lsp%gstID) + nIndex - mIndex
 855                ila_mpilocal  = lsp%ilaList_mpilocal(ila_mpiglobal)
 856                if (mIndex == 0) then
 857                  ! controlVector only contain real part for mIndex=0
 858                  dimIndex = dimIndex + 1
 859                  controlVector(dimIndex) = controlVector(dimIndex) +  &
 860                                            sp_all(ila_mpilocal,1,levIndex,memberIndex)*lsp%LhorizSqrt(nIndex,levIndex)*rsq2
 861                else
 862                  ! controlVector contains real and imag parts for mIndex>0
 863                  dimIndex = dimIndex + 1
 864                  controlVector(dimIndex) = controlVector(dimIndex) +  &
 865                                            sp_all(ila_mpilocal,1,levIndex,memberIndex)*lsp%LhorizSqrt(nIndex,levIndex)*2.0d0
 866                  dimIndex = dimIndex + 1
 867                  controlVector(dimIndex) = controlVector(dimIndex) +  &
 868                                            sp_all(ila_mpilocal,2,levIndex,memberIndex)*lsp%LhorizSqrt(nIndex,levIndex)*2.0d0
 869                end if
 870
 871             end if
 872           end do
 873         end do
 874       end do
 875
 876       if (dimIndex.gt.lsp%cvDim_mpilocal) then
 877          write(*,*) 'loc globalSpectralHLocAd: dimIndex > cvDim_mpilocal! ',dimIndex,memberIndex,lsp%cvDim_mpilocal
 878          call utl_abort('globalSpectralHLocAd')
 879       end if
 880    
 881    end do
 882
 883  END SUBROUTINE globalSpectralHLocAd
 884
 885  !--------------------------------------------------------------------------
 886  ! lamSpectralHLocAd
 887  !--------------------------------------------------------------------------
 888  SUBROUTINE lamSpectralHLocAd(lsp, sp_all, controlVector)
 889    implicit none
 890
 891    ! Arguments:
 892    type(struct_lsp), pointer, intent(inout) :: lsp
 893    real(8),                   intent(out)   :: controlVector(lsp%cvDim_mpilocal)
 894    real(8),                   intent(in)    :: sp_all(lsp%nla_mpilocal,lsp%nphase,lsp%nLev,lsp%nEns)
 895
 896    ! Locals:
 897    integer :: jla, levIndex, dimIndex, memberIndex, p
 898
 899    if (verbose) write(*,*) 'Entering lamSpectralHLocAd'
 900    call lsp_check(lsp)
 901
 902    !
 903    !- Reshape + Horizontal localization + Scaling (parseval)
 904    !
 905    dimIndex = 0
 906
 907    do memberIndex = 1, lsp%nEns
 908
 909       do levIndex = 1, lsp%nLev
 910         do jla = 1, lsp%nla_mpilocal
 911           do p = 1, lsp%nphase
 912             dimIndex = dimIndex + 1
 913             controlVector(dimIndex) = controlVector(dimIndex) +           &
 914                                       ( sp_all(jla,p,levIndex,memberIndex)  * &
 915                                         lsp%LhorizSqrt(lsp%lst%k(jla),levIndex) * &
 916                                         lsp%lst%NormFactorAd(jla,p)    )
 917           end do
 918         end do
 919       end do
 920       if (dimIndex > lsp%cvDim_mpilocal ) then
 921          write(*,*) 'BEN: lamSpectralHLocAD: dimIndex > cvDim! ',dimIndex, memberIndex, lsp%cvDim_mpilocal
 922          call utl_abort('lamSpectralHLocAd')
 923       end if
 924
 925    end do
 926
 927  END SUBROUTINE lamSpectralHLocAd
 928
 929  !--------------------------------------------------------------------------
 930  ! lsp_finalize
 931  !--------------------------------------------------------------------------
 932  SUBROUTINE lsp_finalize(lsp)
 933    implicit none
 934
 935    ! Arguments:
 936    type(struct_lsp), pointer, intent(inout) :: lsp
 937
 938    if (verbose) write(*,*) 'Entering lsp_finalize'
 939    call lsp_check(lsp)
 940
 941    deallocate(lsp%LhorizSqrt)
 942    deallocate(lsp%LvertSqrt )
 943
 944  end SUBROUTINE lsp_finalize
 945
 946  !--------------------------------------------------------------------------
 947  ! lsp_reduceToMPILocal
 948  !--------------------------------------------------------------------------
 949  SUBROUTINE lsp_reduceToMPILocal(lsp,cv_mpilocal,cv_mpiglobal)
 950    implicit none
 951
 952    ! Arguments:
 953    type(struct_lsp), pointer, intent(inout) :: lsp
 954    real(8),                   intent(out)   :: cv_mpilocal(lsp%cvDim_mpilocal)
 955    real(8),                   intent(in)    :: cv_mpiglobal(:)
 956
 957    ! Locals:
 958    real(8), allocatable :: cv_allmaxmpilocal(:,:)
 959    integer, allocatable :: cvDim_allMpilocal(:), displs(:)
 960    integer, allocatable :: ilaGlobal(:), allnlaLocal(:)
 961    integer, allocatable :: allilaGlobal(:,:)
 962    integer, allocatable :: allnBeg(:),allnEnd(:),allnSkip(:)
 963    integer, allocatable :: allmBeg(:),allmEnd(:),allmSkip(:)
 964    integer :: jproc, cvDim_maxmpilocal
 965    integer :: dimIndex_mpilocal, dimIndex_mpiglobal, ila_mpilocal, ila_mpiglobal
 966    integer :: mIndex, nIndex, memberIndex, levIndex, ierr, p, nlaMax
 967
 968    if (verbose) write(*,*) 'Entering lsp_reduceToMPILocal'
 969    call lsp_check(lsp)
 970
 971    call rpn_comm_allreduce(lsp%cvDim_mpilocal, cvDim_maxmpilocal, &
 972         1,"MPI_INTEGER","MPI_MAX","GRID",ierr)
 973
 974    allocate(cvDim_allMpiLocal(mmpi_nprocs))
 975    call rpn_comm_allgather(lsp%cvDim_mpiLocal   ,1,"mpi_integer",       &
 976                            cvDim_allMpiLocal,1,"mpi_integer","GRID",ierr)
 977
 978    ! assign part of mpiglobal vector from current mpi process
 979
 980    if (lsp%global) then
 981
 982      ! Global
 983
 984      allocate(allnBeg(mmpi_nprocs))
 985      call rpn_comm_allgather(lsp%mynBeg,1,"mpi_integer",       &
 986                              allnBeg,1,"mpi_integer","GRID",ierr)
 987      allocate(allnEnd(mmpi_nprocs))
 988      call rpn_comm_allgather(lsp%mynEnd,1,"mpi_integer",       &
 989                              allnEnd,1,"mpi_integer","GRID",ierr)
 990      allocate(allnSkip(mmpi_nprocs))
 991      call rpn_comm_allgather(lsp%mynSkip,1,"mpi_integer",       &
 992                              allnSkip,1,"mpi_integer","GRID",ierr)
 993
 994      allocate(allmBeg(mmpi_nprocs))
 995      call rpn_comm_allgather(lsp%mymBeg,1,"mpi_integer",       &
 996                              allmBeg,1,"mpi_integer","GRID",ierr)
 997      allocate(allmEnd(mmpi_nprocs))
 998      call rpn_comm_allgather(lsp%mymEnd,1,"mpi_integer",       &
 999                              allmEnd,1,"mpi_integer","GRID",ierr)
1000      allocate(allmSkip(mmpi_nprocs))
1001      call rpn_comm_allgather(lsp%mymSkip,1,"mpi_integer",       &
1002                              allmSkip,1,"mpi_integer","GRID",ierr)
1003
1004
1005      if (mmpi_myid == 0) then
1006
1007        allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
1008
1009        !$OMP PARALLEL DO PRIVATE(jproc,dimIndex_mpilocal,memberIndex,levIndex,mIndex,nIndex,ila_mpiglobal,dimIndex_mpiglobal)
1010        do jproc = 0, (mmpi_nprocs-1)
1011          cv_allmaxmpilocal(:,jproc+1) = 0.d0
1012
1013          dimIndex_mpilocal = 0
1014          do memberIndex = 1, lsp%nEns
1015
1016            do levIndex = 1, lsp%nLev
1017              do mIndex = allmBeg(jproc+1), allmEnd(jproc+1), allmSkip(jproc+1)
1018                do nIndex = allnBeg(jproc+1), allnEnd(jproc+1), allnSkip(jproc+1)
1019
1020                  if (mIndex.le.nIndex) then
1021
1022                    ! figure out index into global control vector
1023                    ila_mpiglobal = gst_getNIND(mIndex,lsp%gstID) + nIndex - mIndex
1024                    if (mIndex == 0) then
1025                      ! for mIndex=0 only real part
1026                      dimIndex_mpiglobal = ila_mpiglobal
1027                    else
1028                      ! for mIndex>0 both real and imaginary part
1029                      dimIndex_mpiglobal = 2*ila_mpiglobal-1 - (lsp%ntrunc+1)
1030                    end if
1031                    ! add offset for level
1032                    dimIndex_mpiglobal = dimIndex_mpiglobal + (levIndex-1) * (lsp%ntrunc+1)*(lsp%ntrunc+1)
1033                    ! add offset for member index
1034                    dimIndex_mpiglobal = dimIndex_mpiglobal + (memberIndex-1) * (lsp%ntrunc+1)*(lsp%ntrunc+1)*lsp%nLev
1035
1036                    if (mIndex == 0) then
1037                      ! controlVector only contain real part for mIndex=0
1038                      dimIndex_mpilocal = dimIndex_mpilocal + 1
1039                      cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1) = cv_mpiglobal(dimIndex_mpiglobal)
1040                    else
1041                      ! controlVector contains real and imag parts for mIndex>0
1042                      dimIndex_mpilocal = dimIndex_mpilocal + 1
1043                      cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1) = cv_mpiglobal(dimIndex_mpiglobal)
1044                      dimIndex_mpilocal = dimIndex_mpilocal + 1
1045                      cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1) = cv_mpiglobal(dimIndex_mpiglobal+1)
1046                    end if
1047                           
1048                    if (dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)) then
1049                      write(*,*)
1050                      write(*,*) 'ERROR: dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)', dimIndex_mpilocal, cvDim_allMpiLocal(jproc+1)
1051                      write(*,*) '       proc, levIndex, nIndex, mIndex = ',jproc, levIndex, nIndex, mIndex
1052                      call utl_abort('lsp_reduceToMPILocal')
1053                    end if
1054                    if (dimIndex_mpiglobal > lsp%cvDim_mpiglobal) then
1055                      write(*,*)
1056                      write(*,*) 'ERROR: dimIndex_mpiglobal > cvDim_mpiglobal', dimIndex_mpiglobal, lsp%cvDim_mpiglobal
1057                      write(*,*) '       proc, levIndex, nIndex, mIndex = ',jproc, levIndex, nIndex, mIndex
1058                      call utl_abort('lsp_reduceToMPILocal')
1059                    end if
1060 
1061                  end if
1062                end do
1063              end do
1064            end do
1065                
1066          end do
1067
1068        end do ! procs
1069        !$OMP END PARALLEL DO
1070
1071      else
1072        allocate(cv_allmaxmpilocal(1,1))
1073      end if
1074
1075      deallocate(allnBeg)
1076      deallocate(allnEnd)
1077      deallocate(allnSkip)
1078      deallocate(allmBeg)
1079      deallocate(allmEnd)
1080      deallocate(allmSkip)
1081
1082    else
1083       
1084      ! LAM
1085      call rpn_comm_allreduce(lsp%lst%nla,nlaMax,1,"mpi_integer","mpi_max","GRID",ierr)
1086
1087      if (mmpi_myid == 0) then
1088        allocate(allnlaLocal(mmpi_nprocs))
1089        allocate(allilaGlobal(nlaMax,mmpi_nprocs))
1090      else
1091        allocate(allnlaLocal(1))
1092        allocate(allilaGlobal(1,1))
1093      end if
1094      
1095      allocate(ilaGlobal(nlaMax))
1096      ilaGlobal(:)             = -1
1097      ilaGlobal(1:lsp%lst%nla) = lsp%lst%ilaGlobal(:)
1098      
1099      call rpn_comm_gather(lsp%lst%nla, 1, "mpi_integer",       &
1100                           allnlaLocal, 1, "mpi_integer", 0, "GRID", ierr)
1101      call rpn_comm_gather(ilaGlobal   , nlaMax, "mpi_integer",       &
1102                           allilaGlobal, nlaMax, "mpi_integer",0 ,"GRID", ierr)
1103
1104      deallocate(ilaGlobal)
1105
1106      if (mmpi_myid == 0) then
1107
1108        allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
1109
1110        do jproc = 0, (mmpi_nprocs-1)
1111          cv_allmaxmpilocal(:,jproc+1) = 0.d0
1112          do memberIndex = 1, lsp%nEns
1113            do levIndex = 1, lsp%nLev
1114              do ila_mpilocal = 1, allnlaLocal(jproc+1)
1115                do p = 1, lsp%lst%nphase
1116
1117                  dimIndex_mpilocal = ( (levIndex-1) * lsp%nEns * allnlaLocal(jproc+1) * lsp%lst%nphase ) + &
1118                                      ( (memberIndex-1) * allnlaLocal(jproc+1) * lsp%lst%nphase ) + &
1119                                                            ( (ila_mpilocal-1) * lsp%lst%nphase ) + p
1120
1121                  ila_mpiglobal = allilaGlobal(ila_mpilocal,jproc+1)
1122                  if ( ila_mpiglobal <= 0 ) then 
1123                    write(*,*) 'lsp_reduceToMPILocal: invalid ila_mpiglobal index ', ila_mpiglobal
1124                    call utl_abort('lsp_reduceToMPILocal')
1125                  end if
1126                  dimIndex_mpiglobal = ( (levIndex-1) * lsp%nEns * lsp%lst%nlaGlobal * lsp%lst%nphase ) + &
1127                                       ( (memberIndex-1) * lsp%lst%nlaGlobal * lsp%lst%nphase ) + &
1128                                                         ( (ila_mpiglobal-1) * lsp%lst%nphase ) + p
1129  
1130                  if (dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)) then
1131                    write(*,*)
1132                    write(*,*) 'ERROR: dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)', dimIndex_mpilocal, cvDim_allMpiLocal(jproc+1)
1133                    write(*,*) '       proc, memberIndex, levIndex, ila, p = ',jproc,memberIndex,levIndex,ila_mpilocal,p
1134                    call utl_abort('lsp_reduceToMPILocal')
1135                  end if
1136                  if (dimIndex_mpiglobal > lsp%cvDim_mpiglobal) then
1137                    write(*,*)
1138                    write(*,*) 'ERROR: dimIndex_mpiglobal > cvDim_mpiglobal', dimIndex_mpiglobal, lsp%cvDim_mpiglobal
1139                    write(*,*) '       proc, memberIndex, levIndex, ila, p = ',jproc,memberIndex,levIndex,ila_mpilocal,p
1140                    call utl_abort('lsp_reduceToMPILocal')
1141                  end if
1142                  
1143                  cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1) = cv_mpiglobal(dimIndex_mpiglobal)
1144
1145                end do
1146              end do
1147            end do
1148          end do
1149        end do
1150      else
1151        allocate(cv_allmaxmpilocal(1,1))
1152      end if
1153
1154      deallocate(allnlaLocal)
1155      deallocate(allilaGlobal) 
1156      
1157    end if
1158
1159    !- Distribute
1160    allocate(displs(mmpi_nprocs))
1161    do jproc = 0, (mmpi_nprocs-1)
1162      displs(jproc+1) = jproc*cvDim_maxMpiLocal ! displacement wrt cv_allMaxMpiLocal from which
1163                                                ! to take the outgoing data to process jproc
1164    end do
1165
1166    call rpn_comm_scatterv(cv_allMaxMpiLocal, cvDim_allMpiLocal, displs, "mpi_double_precision", &
1167                           cv_mpiLocal, lsp%cvDim_mpiLocal, "mpi_double_precision", &
1168                           0, "GRID", ierr)
1169
1170    deallocate(displs) 
1171    deallocate(cv_allMaxMpiLocal)
1172    deallocate(cvDim_allMpiLocal)
1173   
1174  END SUBROUTINE Lsp_reduceToMPILocal
1175
1176  !--------------------------------------------------------------------------
1177  ! lsp_reduceToMPILocal_r4
1178  !--------------------------------------------------------------------------
1179  SUBROUTINE lsp_reduceToMPILocal_r4(lsp,cv_mpilocal,cv_mpiglobal)
1180    implicit none
1181
1182    ! Arguments:
1183    type(struct_lsp), pointer, intent(inout) :: lsp
1184    real(4),                   intent(out)   :: cv_mpilocal(lsp%cvDim_mpilocal)
1185    real(4),                   intent(in)    :: cv_mpiglobal(:)
1186
1187    ! Locals:
1188    real(4), allocatable :: cv_allmaxmpilocal(:,:)
1189    integer, allocatable :: cvDim_allMpilocal(:), displs(:)
1190    integer, allocatable :: ilaGlobal(:), allnlaLocal(:)
1191    integer, allocatable :: allilaGlobal(:,:)
1192    integer, allocatable :: allnBeg(:),allnEnd(:),allnSkip(:)
1193    integer, allocatable :: allmBeg(:),allmEnd(:),allmSkip(:)
1194    integer :: jproc, cvDim_maxmpilocal
1195    integer :: dimIndex_mpilocal, dimIndex_mpiglobal, ila_mpilocal, ila_mpiglobal
1196    integer :: mIndex, nIndex, memberIndex, levIndex, ierr, p, nlaMax
1197
1198    if (verbose) write(*,*) 'Entering lsp_reduceToMPILocal_r4'
1199    call lsp_check(lsp)
1200
1201    call rpn_comm_allreduce(lsp%cvDim_mpilocal, cvDim_maxmpilocal, &
1202         1,"MPI_INTEGER","MPI_MAX","GRID",ierr)
1203
1204    allocate(cvDim_allMpiLocal(mmpi_nprocs))
1205    call rpn_comm_allgather(lsp%cvDim_mpiLocal   ,1,"mpi_integer",       &
1206                            cvDim_allMpiLocal,1,"mpi_integer","GRID",ierr)
1207
1208    ! assign part of mpiglobal vector from current mpi process
1209
1210    if (lsp%global) then
1211
1212      ! Global
1213
1214      allocate(allnBeg(mmpi_nprocs))
1215      call rpn_comm_allgather(lsp%mynBeg,1,"mpi_integer",       &
1216                              allnBeg,1,"mpi_integer","GRID",ierr)
1217      allocate(allnEnd(mmpi_nprocs))
1218      call rpn_comm_allgather(lsp%mynEnd,1,"mpi_integer",       &
1219                              allnEnd,1,"mpi_integer","GRID",ierr)
1220      allocate(allnSkip(mmpi_nprocs))
1221      call rpn_comm_allgather(lsp%mynSkip,1,"mpi_integer",       &
1222                              allnSkip,1,"mpi_integer","GRID",ierr)
1223
1224      allocate(allmBeg(mmpi_nprocs))
1225      call rpn_comm_allgather(lsp%mymBeg,1,"mpi_integer",       &
1226                              allmBeg,1,"mpi_integer","GRID",ierr)
1227      allocate(allmEnd(mmpi_nprocs))
1228      call rpn_comm_allgather(lsp%mymEnd,1,"mpi_integer",       &
1229                              allmEnd,1,"mpi_integer","GRID",ierr)
1230      allocate(allmSkip(mmpi_nprocs))
1231      call rpn_comm_allgather(lsp%mymSkip,1,"mpi_integer",       &
1232                              allmSkip,1,"mpi_integer","GRID",ierr)
1233
1234
1235      if (mmpi_myid == 0) then
1236
1237        allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
1238
1239        !$OMP PARALLEL DO PRIVATE(jproc,dimIndex_mpilocal,memberIndex,levIndex,mIndex,nIndex,ila_mpiglobal,dimIndex_mpiglobal)
1240        do jproc = 0, (mmpi_nprocs-1)
1241          cv_allmaxmpilocal(:,jproc+1) = 0.d0
1242
1243          dimIndex_mpilocal = 0
1244          do memberIndex = 1, lsp%nEns
1245
1246            do levIndex = 1, lsp%nLev
1247              do mIndex = allmBeg(jproc+1), allmEnd(jproc+1), allmSkip(jproc+1)
1248                do nIndex = allnBeg(jproc+1), allnEnd(jproc+1), allnSkip(jproc+1)
1249
1250                  if (mIndex.le.nIndex) then
1251
1252                    ! figure out index into global control vector
1253                    ila_mpiglobal = gst_getNIND(mIndex,lsp%gstID) + nIndex - mIndex
1254                    if (mIndex == 0) then
1255                      ! for mIndex=0 only real part
1256                      dimIndex_mpiglobal = ila_mpiglobal
1257                    else
1258                      ! for mIndex>0 both real and imaginary part
1259                      dimIndex_mpiglobal = 2*ila_mpiglobal-1 - (lsp%ntrunc+1)
1260                    end if
1261                    ! add offset for level
1262                    dimIndex_mpiglobal = dimIndex_mpiglobal + (levIndex-1) * (lsp%ntrunc+1)*(lsp%ntrunc+1)
1263                    ! add offset for member index
1264                    dimIndex_mpiglobal = dimIndex_mpiglobal + (memberIndex-1) * (lsp%ntrunc+1)*(lsp%ntrunc+1)*lsp%nLev
1265                            
1266                    if (mIndex == 0) then
1267                      ! controlVector only contain real part for mIndex=0
1268                      dimIndex_mpilocal = dimIndex_mpilocal + 1
1269                      cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1) = cv_mpiglobal(dimIndex_mpiglobal)
1270                    else
1271                      ! controlVector contains real and imag parts for mIndex>0
1272                      dimIndex_mpilocal = dimIndex_mpilocal + 1
1273                      cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1) = cv_mpiglobal(dimIndex_mpiglobal)
1274                      dimIndex_mpilocal = dimIndex_mpilocal + 1
1275                      cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1) = cv_mpiglobal(dimIndex_mpiglobal+1)
1276                    end if
1277                           
1278                    if (dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)) then
1279                      write(*,*)
1280                      write(*,*) 'ERROR: dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)', dimIndex_mpilocal, cvDim_allMpiLocal(jproc+1)
1281                      write(*,*) '       proc, levIndex, nIndex, mIndex = ',jproc, levIndex, nIndex, mIndex
1282                      call utl_abort('lsp_reduceToMPILocal')
1283                    end if
1284                    if (dimIndex_mpiglobal > lsp%cvDim_mpiglobal) then
1285                      write(*,*)
1286                      write(*,*) 'ERROR: dimIndex_mpiglobal > cvDim_mpiglobal', dimIndex_mpiglobal, lsp%cvDim_mpiglobal
1287                      write(*,*) '       proc, levIndex, nIndex, mIndex = ',jproc, levIndex, nIndex, mIndex
1288                      call utl_abort('lsp_reduceToMPILocal')
1289                    end if
1290 
1291                  end if
1292                end do
1293              end do
1294            end do
1295                
1296          end do
1297
1298        end do ! procs
1299        !$OMP END PARALLEL DO
1300
1301      else
1302        allocate(cv_allmaxmpilocal(1,1))
1303      end if
1304
1305      deallocate(allnBeg)
1306      deallocate(allnEnd)
1307      deallocate(allnSkip)
1308      deallocate(allmBeg)
1309      deallocate(allmEnd)
1310      deallocate(allmSkip)
1311
1312    else
1313       
1314      ! LAM
1315      call rpn_comm_allreduce(lsp%lst%nla,nlaMax,1,"mpi_integer","mpi_max","GRID",ierr)
1316
1317      if (mmpi_myid == 0) then
1318        allocate(allnlaLocal(mmpi_nprocs))
1319        allocate(allilaGlobal(nlaMax,mmpi_nprocs))
1320      else
1321        allocate(allnlaLocal(1))
1322        allocate(allilaGlobal(1,1))
1323      end if
1324      
1325      allocate(ilaGlobal(nlaMax))
1326      ilaGlobal(:)             = -1
1327      ilaGlobal(1:lsp%lst%nla) = lsp%lst%ilaGlobal(:)
1328      
1329      call rpn_comm_gather(lsp%lst%nla, 1, "mpi_integer",       &
1330                           allnlaLocal, 1, "mpi_integer", 0, "GRID", ierr)
1331      call rpn_comm_gather(ilaGlobal   , nlaMax, "mpi_integer",       &
1332                           allilaGlobal, nlaMax, "mpi_integer",0 ,"GRID", ierr)
1333
1334      deallocate(ilaGlobal)
1335
1336      if (mmpi_myid == 0) then
1337
1338        allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
1339
1340        do jproc = 0, (mmpi_nprocs-1)
1341          cv_allmaxmpilocal(:,jproc+1) = 0.d0
1342          do memberIndex = 1, lsp%nEns
1343            do levIndex = 1, lsp%nLev
1344              do ila_mpilocal = 1, allnlaLocal(jproc+1)
1345                do p = 1, lsp%lst%nphase
1346
1347                  dimIndex_mpilocal = ( (levIndex-1) * lsp%nEns * allnlaLocal(jproc+1) * lsp%lst%nphase ) + &
1348                                      ( (memberIndex-1) * allnlaLocal(jproc+1) * lsp%lst%nphase ) + &
1349                                                            ( (ila_mpilocal-1) * lsp%lst%nphase ) + p
1350
1351                  ila_mpiglobal = allilaGlobal(ila_mpilocal,jproc+1)
1352                  if ( ila_mpiglobal <= 0 ) then 
1353                    write(*,*) 'lsp_reduceToMPILocal: invalid ila_mpiglobal index ', ila_mpiglobal
1354                    call utl_abort('lsp_reduceToMPILocal')
1355                  end if
1356                  dimIndex_mpiglobal = ( (levIndex-1) * lsp%nEns * lsp%lst%nlaGlobal * lsp%lst%nphase ) + &
1357                                       ( (memberIndex-1) * lsp%lst%nlaGlobal * lsp%lst%nphase ) + &
1358                                                         ( (ila_mpiglobal-1) * lsp%lst%nphase ) + p
1359  
1360                  if (dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)) then
1361                    write(*,*)
1362                    write(*,*) 'ERROR: dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)', dimIndex_mpilocal, cvDim_allMpiLocal(jproc+1)
1363                    write(*,*) '       proc, memberIndex, levIndex, ila, p = ',jproc,memberIndex,levIndex,ila_mpilocal,p
1364                    call utl_abort('lsp_reduceToMPILocal')
1365                  end if
1366                  if (dimIndex_mpiglobal > lsp%cvDim_mpiglobal) then
1367                    write(*,*)
1368                    write(*,*) 'ERROR: dimIndex_mpiglobal > cvDim_mpiglobal', dimIndex_mpiglobal, lsp%cvDim_mpiglobal
1369                    write(*,*) '       proc, memberIndex, levIndex, ila, p = ',jproc,memberIndex,levIndex,ila_mpilocal,p
1370                    call utl_abort('lsp_reduceToMPILocal')
1371                  end if
1372                  
1373                  cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1) = cv_mpiglobal(dimIndex_mpiglobal)
1374
1375                end do
1376              end do
1377            end do
1378          end do
1379        end do
1380      else
1381        allocate(cv_allmaxmpilocal(1,1))
1382      end if
1383
1384      deallocate(allnlaLocal)
1385      deallocate(allilaGlobal) 
1386      
1387    end if
1388
1389    !- Distribute
1390    allocate(displs(mmpi_nprocs))
1391    do jproc = 0, (mmpi_nprocs-1)
1392      displs(jproc+1) = jproc*cvDim_maxMpiLocal ! displacement wrt cv_allMaxMpiLocal from which
1393                                                ! to take the outgoing data to process jproc
1394    end do
1395
1396    call rpn_comm_scatterv(cv_allMaxMpiLocal, cvDim_allMpiLocal, displs, "mpi_real4", &
1397                           cv_mpiLocal, lsp%cvDim_mpiLocal, "mpi_real4", &
1398                           0, "GRID", ierr)
1399
1400    deallocate(displs) 
1401    deallocate(cv_allMaxMpiLocal)
1402    deallocate(cvDim_allMpiLocal)
1403   
1404  END SUBROUTINE Lsp_reduceToMPILocal_r4
1405
1406  !--------------------------------------------------------------------------
1407  ! lsp_expandToMPIGlobal
1408  !--------------------------------------------------------------------------
1409  SUBROUTINE lsp_expandToMPIGlobal(lsp,cv_mpilocal,cv_mpiglobal)
1410    implicit none
1411
1412    ! Arguments:
1413    type(struct_lsp), pointer, intent(inout) :: lsp
1414    real(8),                   intent(in)    :: cv_mpilocal(lsp%cvDim_mpilocal)
1415    real(8),                   intent(out)   :: cv_mpiglobal(:)
1416
1417    ! Locals:
1418    real(8), allocatable :: cv_maxmpilocal(:)
1419    real(8), pointer     :: cv_allmaxmpilocal(:,:) => null()
1420    integer, allocatable :: cvDim_allMpilocal(:)
1421    integer, allocatable :: ilaGlobal(:), allnlaLocal(:)
1422    integer, allocatable :: allilaGlobal(:,:)
1423    integer, allocatable :: allnBeg(:),allnEnd(:),allnSkip(:)
1424    integer, allocatable :: allmBeg(:),allmEnd(:),allmSkip(:)
1425    integer :: dimIndex_mpilocal, dimIndex_mpiglobal, ila_mpiglobal, ila_mpilocal, cvDim_maxmpilocal
1426    integer :: mIndex, nIndex, jproc, memberIndex, levIndex, ierr, p, nlaMax
1427
1428    if (verbose) write(*,*) 'Entering lsp_expandToMPIGlobal'
1429    call lsp_check(lsp)
1430
1431    !
1432    !- 1.  Gather all local control vectors onto mpi task 0
1433    !
1434    allocate(cvDim_allMpiLocal(mmpi_nprocs))
1435    call rpn_comm_allgather(lsp%cvDim_mpiLocal   ,1,"mpi_integer",       &
1436                            cvDim_allMpiLocal,1,"mpi_integer","GRID",ierr)
1437
1438    call rpn_comm_allreduce(lsp%cvDim_mpilocal,cvDim_maxmpilocal,1,"mpi_integer","mpi_max","GRID",ierr)
1439
1440    allocate(cv_maxmpilocal(cvDim_maxmpilocal))
1441
1442    cv_maxmpilocal(:) = 0.0d0
1443    cv_maxmpilocal(1:lsp%cvDim_mpilocal) = cv_mpilocal(1:lsp%cvDim_mpilocal)
1444
1445    nullify(cv_allmaxmpilocal)
1446    if (mmpi_myid == 0) then
1447       allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
1448    else
1449       allocate(cv_allmaxmpilocal(1,1))
1450    end if
1451    call rpn_comm_gather(cv_maxmpilocal,    cvDim_maxmpilocal, "mpi_double_precision",  &
1452                         cv_allmaxmpilocal, cvDim_maxmpilocal, "mpi_double_precision", 0, "GRID", ierr )
1453
1454    deallocate(cv_maxmpilocal)
1455
1456    !
1457    !- 2.  Reorganize gathered mpilocal control vectors into the mpiglobal control vector
1458    !
1459    if (lsp%global) then
1460
1461       ! Global
1462       if (mmpi_myid == 0) then
1463          allocate(allnBeg(mmpi_nprocs))
1464          allocate(allnEnd(mmpi_nprocs))
1465          allocate(allnSkip(mmpi_nprocs))
1466          allocate(allmBeg(mmpi_nprocs))
1467          allocate(allmEnd(mmpi_nprocs))
1468          allocate(allmSkip(mmpi_nprocs))
1469       else
1470          allocate(allnBeg(1))
1471          allocate(allnEnd(1))
1472          allocate(allnSkip(1))
1473          allocate(allmBeg(1))
1474          allocate(allmEnd(1))
1475          allocate(allmSkip(1))
1476       end if
1477
1478       call rpn_comm_gather(lsp%mynBeg  ,1,"mpi_integer",       &
1479                            allnBeg ,1,"mpi_integer",0,"GRID",ierr)
1480       call rpn_comm_gather(lsp%mynEnd  ,1,"mpi_integer",       &
1481                            allnEnd ,1,"mpi_integer",0,"GRID",ierr)
1482       call rpn_comm_gather(lsp%mynSkip ,1,"mpi_integer",       &
1483                            allnSkip,1,"mpi_integer",0,"GRID",ierr)
1484
1485       call rpn_comm_gather(lsp%mymBeg  ,1,"mpi_integer",       &
1486                            allmBeg ,1,"mpi_integer",0,"GRID",ierr)
1487       call rpn_comm_gather(lsp%mymEnd  ,1,"mpi_integer",       &
1488                            allmEnd ,1,"mpi_integer",0,"GRID",ierr)
1489       call rpn_comm_gather(lsp%mymSkip ,1,"mpi_integer",       &
1490                            allmSkip,1,"mpi_integer",0,"GRID",ierr)
1491
1492       ! reorganize gathered mpilocal control vectors into the mpiglobal control vector
1493       if (mmpi_myid == 0) then
1494         cv_mpiglobal(:) = 0.0d0
1495
1496         !$OMP PARALLEL DO PRIVATE(jproc,dimIndex_mpilocal,memberIndex,levIndex,mIndex,nIndex,ila_mpiglobal,dimIndex_mpiglobal)
1497         do jproc = 0, (mmpi_nprocs-1)
1498           dimIndex_mpilocal = 0
1499           do memberIndex = 1, lsp%nEns
1500
1501             do levIndex = 1, lsp%nLev
1502               do mIndex = allmBeg(jproc+1), allmEnd(jproc+1), allmSkip(jproc+1)
1503                 do nIndex = allnBeg(jproc+1), allnEnd(jproc+1), allnSkip(jproc+1)
1504                   if (mIndex.le.nIndex) then
1505
1506                     ! figure out index into global control vector
1507                     ila_mpiglobal = gst_getNIND(mIndex,lsp%gstID) + nIndex - mIndex
1508                     if (mIndex == 0) then
1509                       ! for mIndex=0 only real part
1510                       dimIndex_mpiglobal = ila_mpiglobal
1511                     else
1512                       ! for mIndex>0 both real and imaginary part
1513                       dimIndex_mpiglobal = 2*ila_mpiglobal-1 - (lsp%ntrunc+1)
1514                     end if
1515                     ! add offset for level
1516                     dimIndex_mpiglobal = dimIndex_mpiglobal + (levIndex-1) * (lsp%ntrunc+1)*(lsp%ntrunc+1)
1517                     ! add offset for member index
1518                     dimIndex_mpiglobal = dimIndex_mpiglobal + (memberIndex-1) * (lsp%ntrunc+1)*(lsp%ntrunc+1)*lsp%nLev
1519
1520                     ! index into local control vector
1521                     if (mIndex == 0) then
1522                       ! only real component for mIndex=0
1523                       dimIndex_mpilocal = dimIndex_mpilocal + 1
1524                       cv_mpiglobal(dimIndex_mpiglobal) = cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1)
1525                     else
1526                       ! both real and imaginary components for mIndex>0
1527                       dimIndex_mpilocal = dimIndex_mpilocal + 1
1528                       cv_mpiglobal(dimIndex_mpiglobal) = cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1)
1529                       dimIndex_mpilocal = dimIndex_mpilocal + 1
1530                       cv_mpiglobal(dimIndex_mpiglobal+1) = cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1)
1531                     end if
1532
1533                     if (dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)) then
1534                        write(*,*)
1535                        write(*,*) 'ERROR: dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)', dimIndex_mpilocal, cvDim_allMpiLocal(jproc+1)
1536                        write(*,*) '       proc, levIndex, nIndex, mIndex = ',jproc, levIndex, nIndex, mIndex
1537                        call utl_abort('lsp_expandToMPIGlobal')
1538                     end if
1539                     if (dimIndex_mpiglobal > lsp%cvDim_mpiglobal) then
1540                        write(*,*)
1541                        write(*,*) 'ERROR: dimIndex_mpiglobal > cvDim_mpiglobal', dimIndex_mpiglobal, lsp%cvDim_mpiglobal
1542                        write(*,*) '       proc, levIndex, nIndex, mIndex = ',jproc, levIndex, nIndex, mIndex
1543                        call utl_abort('lsp_expandToMPIGlobal')
1544                     end if
1545
1546                   end if
1547                 end do
1548               end do
1549             end do
1550           end do
1551         end do ! jproc
1552         !$OMP END PARALLEL DO
1553
1554      end if ! myid == 0 
1555
1556      deallocate(allnBeg)
1557      deallocate(allnEnd)
1558      deallocate(allnSkip)
1559      deallocate(allmBeg)
1560      deallocate(allmEnd)
1561      deallocate(allmSkip)
1562
1563    else
1564
1565      ! LAM
1566       call rpn_comm_allreduce(lsp%lst%nla,nlaMax,1,"mpi_integer","mpi_max","GRID",ierr)
1567
1568       if (mmpi_myid == 0) then
1569          allocate(allnlaLocal(mmpi_nprocs))
1570          allocate(allilaGlobal(nlaMax,mmpi_nprocs))
1571       else
1572          allocate(allnlaLocal(1))
1573          allocate(allilaGlobal(1,1))
1574       end if
1575
1576       allocate(ilaGlobal(nlaMax))
1577       ilaGlobal(:)             = -1
1578       ilaGlobal(1:lsp%lst%nla) = lsp%lst%ilaGlobal(:)
1579
1580       call rpn_comm_gather(lsp%lst%nla, 1, "mpi_integer",       &
1581                            allnlaLocal, 1, "mpi_integer", 0, "GRID", ierr)
1582       call rpn_comm_gather(ilaGlobal   , nlaMax, "mpi_integer",       &
1583                            allilaGlobal, nlaMax, "mpi_integer",0 ,"GRID", ierr)
1584
1585       deallocate(ilaGlobal)
1586
1587       if (mmpi_myid == 0) then
1588          cv_mpiglobal(:) = 0.0d0
1589
1590          do jproc = 0, (mmpi_nprocs-1)
1591             do memberIndex = 1, lsp%nEns
1592                do levIndex = 1, lsp%nLev
1593                   do ila_mpilocal = 1, allnlaLocal(jproc+1)
1594                      do p = 1, lsp%lst%nphase
1595
1596                         dimIndex_mpilocal = ( (levIndex-1) * lsp%nEns * allnlaLocal(jproc+1) * lsp%lst%nphase ) + &
1597                                         ( (memberIndex-1) * allnlaLocal(jproc+1) * lsp%lst%nphase ) + &
1598                                                      ( (ila_mpilocal-1) * lsp%lst%nphase ) + p
1599
1600                         ila_mpiglobal = allilaGlobal(ila_mpilocal,jproc+1)
1601                         if ( ila_mpiglobal <= 0 ) then 
1602                            write(*,*) 'lsp_expandToMPIGlobal: invalid ila_mpiglobal index ', ila_mpiglobal
1603                            call utl_abort('lsp_expandToMPIGlobal')
1604                         end if
1605
1606                         dimIndex_mpiglobal = ( (levIndex-1) * lsp%nEns * lsp%lst%nlaGlobal * lsp%lst%nphase ) + &
1607                                          ( (memberIndex-1) * lsp%lst%nlaGlobal * lsp%lst%nphase ) + &
1608                                                            ( (ila_mpiglobal-1) * lsp%lst%nphase ) + p
1609
1610                         if (dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)) then
1611                            write(*,*)
1612                            write(*,*) 'ERROR: dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)', dimIndex_mpilocal, cvDim_allMpiLocal(jproc+1)
1613                            write(*,*) '       proc, memberIndex, levIndex, ila, p = ',jproc,memberIndex,levIndex,ila_mpilocal,p
1614                            call utl_abort('lsp_expandToMPIGlobal')
1615                         end if
1616                         if (dimIndex_mpiglobal > lsp%cvDim_mpiglobal) then
1617                            write(*,*)
1618                            write(*,*) 'ERROR: dimIndex_mpiglobal > cvDim_mpiglobal', dimIndex_mpiglobal, lsp%cvDim_mpiglobal
1619                            write(*,*) '       proc, memberIndex, levIndex, ila, p = ',jproc,memberIndex,levIndex,ila_mpilocal,p
1620                            call utl_abort('lsp_expandToMPIGlobal')
1621                         end if
1622
1623                         cv_mpiglobal(dimIndex_mpiglobal) = cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1)
1624                         
1625                      end do
1626                   end do
1627                end do
1628             end do
1629          end do
1630
1631       end if
1632
1633       deallocate(allnlaLocal)
1634       deallocate(allilaGlobal)
1635
1636    end if
1637
1638    deallocate(cv_allmaxmpilocal)
1639    deallocate(cvDim_allMpiLocal)
1640
1641  end SUBROUTINE Lsp_expandToMPIGlobal
1642
1643  !--------------------------------------------------------------------------
1644  ! lsp_expandToMPIGlobal_r4
1645  !--------------------------------------------------------------------------
1646  SUBROUTINE lsp_expandToMPIGlobal_r4(lsp,cv_mpilocal,cv_mpiglobal)
1647    implicit none
1648
1649    ! Arguments:
1650    type(struct_lsp), pointer, intent(inout) :: lsp
1651    real(4),                   intent(in)    :: cv_mpilocal(lsp%cvDim_mpilocal)
1652    real(4),                   intent(out)   :: cv_mpiglobal(:)
1653
1654    ! Locals:
1655    real(4), allocatable :: cv_maxmpilocal(:)
1656    real(4), pointer     :: cv_allmaxmpilocal(:,:) => null()
1657    integer, allocatable :: cvDim_allMpilocal(:)
1658    integer, allocatable :: ilaGlobal(:), allnlaLocal(:)
1659    integer, allocatable :: allilaGlobal(:,:)
1660    integer, allocatable :: allnBeg(:),allnEnd(:),allnSkip(:)
1661    integer, allocatable :: allmBeg(:),allmEnd(:),allmSkip(:)
1662    integer :: dimIndex_mpilocal, dimIndex_mpiglobal, ila_mpiglobal, ila_mpilocal, cvDim_maxmpilocal
1663    integer :: mIndex, nIndex, jproc, memberIndex, levIndex, ierr, p, nlaMax
1664
1665    if (verbose) write(*,*) 'Entering lsp_expandToMPIGlobal_r4'
1666    call lsp_check(lsp)
1667
1668    !
1669    !- 1.  Gather all local control vectors onto mpi task 0
1670    !
1671    allocate(cvDim_allMpiLocal(mmpi_nprocs))
1672    call rpn_comm_allgather(lsp%cvDim_mpiLocal   ,1,"mpi_integer",       &
1673                            cvDim_allMpiLocal,1,"mpi_integer","GRID",ierr)
1674
1675    call rpn_comm_allreduce(lsp%cvDim_mpilocal,cvDim_maxmpilocal,1,"mpi_integer","mpi_max","GRID",ierr)
1676
1677    allocate(cv_maxmpilocal(cvDim_maxmpilocal))
1678
1679    cv_maxmpilocal(:) = 0.0d0
1680    cv_maxmpilocal(1:lsp%cvDim_mpilocal) = cv_mpilocal(1:lsp%cvDim_mpilocal)
1681
1682    nullify(cv_allmaxmpilocal)
1683    if (mmpi_myid == 0) then
1684       allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
1685    else
1686       allocate(cv_allmaxmpilocal(1,1))
1687    end if
1688    call rpn_comm_gather(cv_maxmpilocal,    cvDim_maxmpilocal, "mpi_real4",  &
1689                         cv_allmaxmpilocal, cvDim_maxmpilocal, "mpi_real4", 0, "GRID", ierr )
1690
1691    deallocate(cv_maxmpilocal)
1692
1693    !
1694    !- 2.  Reorganize gathered mpilocal control vectors into the mpiglobal control vector
1695    !
1696    if (lsp%global) then
1697
1698       ! Global
1699       if (mmpi_myid == 0) then
1700          allocate(allnBeg(mmpi_nprocs))
1701          allocate(allnEnd(mmpi_nprocs))
1702          allocate(allnSkip(mmpi_nprocs))
1703          allocate(allmBeg(mmpi_nprocs))
1704          allocate(allmEnd(mmpi_nprocs))
1705          allocate(allmSkip(mmpi_nprocs))
1706       else
1707          allocate(allnBeg(1))
1708          allocate(allnEnd(1))
1709          allocate(allnSkip(1))
1710          allocate(allmBeg(1))
1711          allocate(allmEnd(1))
1712          allocate(allmSkip(1))
1713       end if
1714
1715       call rpn_comm_gather(lsp%mynBeg  ,1,"mpi_integer",       &
1716                            allnBeg ,1,"mpi_integer",0,"GRID",ierr)
1717       call rpn_comm_gather(lsp%mynEnd  ,1,"mpi_integer",       &
1718                            allnEnd ,1,"mpi_integer",0,"GRID",ierr)
1719       call rpn_comm_gather(lsp%mynSkip ,1,"mpi_integer",       &
1720                            allnSkip,1,"mpi_integer",0,"GRID",ierr)
1721
1722       call rpn_comm_gather(lsp%mymBeg  ,1,"mpi_integer",       &
1723                            allmBeg ,1,"mpi_integer",0,"GRID",ierr)
1724       call rpn_comm_gather(lsp%mymEnd  ,1,"mpi_integer",       &
1725                            allmEnd ,1,"mpi_integer",0,"GRID",ierr)
1726       call rpn_comm_gather(lsp%mymSkip ,1,"mpi_integer",       &
1727                            allmSkip,1,"mpi_integer",0,"GRID",ierr)
1728
1729       ! reorganize gathered mpilocal control vectors into the mpiglobal control vector
1730       if (mmpi_myid == 0) then
1731         cv_mpiglobal(:) = 0.0d0
1732
1733         !$OMP PARALLEL DO PRIVATE(jproc,dimIndex_mpilocal,memberIndex,levIndex,mIndex,nIndex,ila_mpiglobal,dimIndex_mpiglobal)
1734         do jproc = 0, (mmpi_nprocs-1)
1735           dimIndex_mpilocal = 0
1736           do memberIndex = 1, lsp%nEns
1737
1738             do levIndex = 1, lsp%nLev
1739               do mIndex = allmBeg(jproc+1), allmEnd(jproc+1), allmSkip(jproc+1)
1740                 do nIndex = allnBeg(jproc+1), allnEnd(jproc+1), allnSkip(jproc+1)
1741                   if (mIndex.le.nIndex) then
1742
1743                     ! figure out index into global control vector
1744                     ila_mpiglobal = gst_getNIND(mIndex,lsp%gstID) + nIndex - mIndex
1745                     if (mIndex == 0) then
1746                       ! for mIndex=0 only real part
1747                       dimIndex_mpiglobal = ila_mpiglobal
1748                     else
1749                       ! for mIndex>0 both real and imaginary part
1750                       dimIndex_mpiglobal = 2*ila_mpiglobal-1 - (lsp%ntrunc+1)
1751                     end if
1752                     ! add offset for level
1753                     dimIndex_mpiglobal = dimIndex_mpiglobal + (levIndex-1) * (lsp%ntrunc+1)*(lsp%ntrunc+1)
1754                     ! add offset for member index
1755                     dimIndex_mpiglobal = dimIndex_mpiglobal + (memberIndex-1) * (lsp%ntrunc+1)*(lsp%ntrunc+1)*lsp%nLev
1756
1757                     ! index into local control vector
1758                     if (mIndex == 0) then
1759                       ! only real component for mIndex=0
1760                       dimIndex_mpilocal = dimIndex_mpilocal + 1
1761                       cv_mpiglobal(dimIndex_mpiglobal) = cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1)
1762                     else
1763                       ! both real and imaginary components for mIndex>0
1764                       dimIndex_mpilocal = dimIndex_mpilocal + 1
1765                       cv_mpiglobal(dimIndex_mpiglobal) = cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1)
1766                       dimIndex_mpilocal = dimIndex_mpilocal + 1
1767                       cv_mpiglobal(dimIndex_mpiglobal+1) = cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1)
1768                     end if
1769
1770                     if (dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)) then
1771                        write(*,*)
1772                        write(*,*) 'ERROR: dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)', dimIndex_mpilocal, cvDim_allMpiLocal(jproc+1)
1773                        write(*,*) '       proc, levIndex, nIndex, mIndex = ',jproc, levIndex, nIndex, mIndex
1774                        call utl_abort('lsp_expandToMPIGlobal')
1775                     end if
1776                     if (dimIndex_mpiglobal > lsp%cvDim_mpiglobal) then
1777                        write(*,*)
1778                        write(*,*) 'ERROR: dimIndex_mpiglobal > cvDim_mpiglobal', dimIndex_mpiglobal, lsp%cvDim_mpiglobal
1779                        write(*,*) '       proc, levIndex, nIndex, mIndex = ',jproc, levIndex, nIndex, mIndex
1780                        call utl_abort('lsp_expandToMPIGlobal')
1781                     end if
1782
1783                   end if
1784                 end do
1785               end do
1786             end do
1787           end do
1788         end do ! jproc
1789         !$OMP END PARALLEL DO
1790
1791      end if ! myid == 0 
1792
1793      deallocate(allnBeg)
1794      deallocate(allnEnd)
1795      deallocate(allnSkip)
1796      deallocate(allmBeg)
1797      deallocate(allmEnd)
1798      deallocate(allmSkip)
1799
1800    else
1801
1802      ! LAM
1803       call rpn_comm_allreduce(lsp%lst%nla,nlaMax,1,"mpi_integer","mpi_max","GRID",ierr)
1804
1805       if (mmpi_myid == 0) then
1806          allocate(allnlaLocal(mmpi_nprocs))
1807          allocate(allilaGlobal(nlaMax,mmpi_nprocs))
1808       else
1809          allocate(allnlaLocal(1))
1810          allocate(allilaGlobal(1,1))
1811       end if
1812
1813       allocate(ilaGlobal(nlaMax))
1814       ilaGlobal(:)             = -1
1815       ilaGlobal(1:lsp%lst%nla) = lsp%lst%ilaGlobal(:)
1816
1817       call rpn_comm_gather(lsp%lst%nla, 1, "mpi_integer",       &
1818                            allnlaLocal, 1, "mpi_integer", 0, "GRID", ierr)
1819       call rpn_comm_gather(ilaGlobal   , nlaMax, "mpi_integer",       &
1820                            allilaGlobal, nlaMax, "mpi_integer",0 ,"GRID", ierr)
1821
1822       deallocate(ilaGlobal)
1823
1824       if (mmpi_myid == 0) then
1825          cv_mpiglobal(:) = 0.0d0
1826
1827          do jproc = 0, (mmpi_nprocs-1)
1828             do memberIndex = 1, lsp%nEns
1829                do levIndex = 1, lsp%nLev
1830                   do ila_mpilocal = 1, allnlaLocal(jproc+1)
1831                      do p = 1, lsp%lst%nphase
1832
1833                         dimIndex_mpilocal = ( (levIndex-1) * lsp%nEns * allnlaLocal(jproc+1) * lsp%lst%nphase ) + &
1834                                         ( (memberIndex-1) * allnlaLocal(jproc+1) * lsp%lst%nphase ) + &
1835                                                      ( (ila_mpilocal-1) * lsp%lst%nphase ) + p
1836
1837                         ila_mpiglobal = allilaGlobal(ila_mpilocal,jproc+1)
1838                         if ( ila_mpiglobal <= 0 ) then 
1839                            write(*,*) 'lsp_expandToMPIGlobal: invalid ila_mpiglobal index ', ila_mpiglobal
1840                            call utl_abort('lsp_expandToMPIGlobal')
1841                         end if
1842
1843                         dimIndex_mpiglobal = ( (levIndex-1) * lsp%nEns * lsp%lst%nlaGlobal * lsp%lst%nphase ) + &
1844                                          ( (memberIndex-1) * lsp%lst%nlaGlobal * lsp%lst%nphase ) + &
1845                                                            ( (ila_mpiglobal-1) * lsp%lst%nphase ) + p
1846
1847                         if (dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)) then
1848                            write(*,*)
1849                            write(*,*) 'ERROR: dimIndex_mpilocal > cvDim_allMpiLocal(jproc+1)', dimIndex_mpilocal, cvDim_allMpiLocal(jproc+1)
1850                            write(*,*) '       proc, memberIndex, levIndex, ila, p = ',jproc,memberIndex,levIndex,ila_mpilocal,p
1851                            call utl_abort('lsp_expandToMPIGlobal')
1852                         end if
1853                         if (dimIndex_mpiglobal > lsp%cvDim_mpiglobal) then
1854                            write(*,*)
1855                            write(*,*) 'ERROR: dimIndex_mpiglobal > cvDim_mpiglobal', dimIndex_mpiglobal, lsp%cvDim_mpiglobal
1856                            write(*,*) '       proc, memberIndex, levIndex, ila, p = ',jproc,memberIndex,levIndex,ila_mpilocal,p
1857                            call utl_abort('lsp_expandToMPIGlobal')
1858                         end if
1859
1860                         cv_mpiglobal(dimIndex_mpiglobal) = cv_allmaxmpilocal(dimIndex_mpilocal,jproc+1)
1861                         
1862                      end do
1863                   end do
1864                end do
1865             end do
1866          end do
1867
1868       end if
1869
1870       deallocate(allnlaLocal)
1871       deallocate(allilaGlobal)
1872
1873    end if
1874
1875    deallocate(cv_allmaxmpilocal)
1876    deallocate(cvDim_allMpiLocal)
1877
1878  end SUBROUTINE lsp_expandToMPIGlobal_r4
1879
1880  !--------------------------------------------------------------------------
1881  !   LSP_CHECK
1882  !--------------------------------------------------------------------------
1883  subroutine lsp_check(lsp)
1884    implicit none
1885
1886    ! Arguments:
1887    type(struct_lsp), pointer, intent(in) :: lsp
1888
1889    if ( .not. lsp%initialized) then
1890       call utl_abort('lsp_check: structure not initialized')
1891    end if
1892
1893  end subroutine lsp_check
1894
1895end module localizationSpectral_mod