bMatrixChem_mod sourceΒΆ

   1module bMatrixChem_mod 
   2  ! MODULE bMatrixChem_mod (prefix='bchm' category='2. B and R matrices')
   3  !
   4  !:Purpose:  Contains routines involving the application of
   5  !           background-error covariance matrix(ces). Matrix based on
   6  !           horizontally homogeneous/isotropic correlations. This module
   7  !           includes the transform from control vector (spectral space) to 
   8  !           analysis increments, related utilites, and the transform's adjoint.
   9  !
  10  !           Based on elements of bmatrixHI_mod.ftn90
  11  !
  12  !:Comments:
  13  !
  14  !   1. Covariances uncoupled from weather variable.
  15  !
  16  !   2. One could potentially make public the functions/routines which are
  17  !      identical to those in bmatrixhi_mod.ftn90 (except possibly in name) so
  18  !      that one copy is present in the code.
  19  !
  20
  21  ! Public Subroutines:
  22  !
  23  !    bchm_setupCH:  Must be called first. 
  24  !                   Acquire constituents backgound error standard 
  25  !                   deviations and spectral space correlations which are 
  26  !                   read and prepared by bcsc_setupCH. 
  27  !    bchm_finalize: Deallocate internal module arrays.
  28  !    bchm_BSqrt:    Transformations from control vector to analysis
  29  !                   increments in the minimization process.
  30  !    bchm_BSqrtAd:  Adjoint of bchm_BSqrt.
  31  !    bchm_expand*   MPI manipulations of contol vector(s)
  32  !    bchm_reduce*   MPI manipulations related to contol vector(s)
  33  !
  34  
  35  use midasMpi_mod
  36  use gridStateVector_mod
  37  use gridVariableTransforms_mod
  38  use globalSpectralTransform_mod
  39  use horizontalCoord_mod
  40  use verticalCoord_mod
  41  use varNameList_mod
  42  use utilities_mod
  43  use bCovarSetupChem_mod
  44  
  45  implicit none
  46  save
  47  private
  48
  49  ! public procedures
  50  public :: bchm_setupCH,bchm_finalize,bchm_BSqrt,bchm_BSqrtAd
  51  public :: bchm_expandToMPIglobal,bchm_expandToMPIglobal_r4,bchm_reduceToMPIlocal,bchm_reduceToMPIlocal_r4
  52  
  53  logical             :: initialized = .false.                    
  54  integer             :: nla_mpiglobal,nla_mpilocal           
  55  integer             :: cvDim_mpilocal,cvDim_mpiglobal           
  56  integer             :: gstID, gstID2          
  57
  58  integer             :: mymBeg,mymEnd,mymSkip,mymCount
  59  integer             :: mynBeg,mynEnd,mynSkip,mynCount
  60  integer             :: maxMyNla
  61  integer             :: myLatBeg,myLatEnd
  62  integer             :: myLonBeg,myLonEnd
  63  integer, pointer    :: ilaList_mpiglobal(:)
  64  integer, pointer    :: ilaList_mpilocal(:)
  65                            
  66  integer, external   :: get_max_rss
  67
  68  ! Bacgkround error covariance matrix elements.
  69  ! One could add an additional dimension to corns  
  70  ! for separate block-univariate correlation matrices.
  71  ! This would also permit merging of bmatrixhi_mod and bmatrixchem_mod
  72  ! into one module.
  73
  74  character(len=20)   :: TransformVarKindCH
  75
  76
  77  type(struct_bcsc_bgStats) :: bgStats ! Background covariances
  78                                       ! and related elements
  79
  80  !*************************************************************************
  81    
  82  contains
  83
  84  !--------------------------------------------------------------------------
  85  ! bchm_setupCH
  86  !--------------------------------------------------------------------------
  87  subroutine bchm_setupCH(hco_in,vco_in,cvDim_out)
  88    !
  89    !:Purpose: Acquire constituents backgound error standard deviations (stddev),
  90    !          spectral space correlations (corns) and related elements
  91    !          which are read and prepared by lower level module routine
  92    !          bcsc_setupCH.
  93    !
  94    implicit none
  95
  96    ! Arguments:
  97    type(struct_hco), pointer, intent(in)  :: hco_in
  98    type(struct_vco), pointer, intent(in)  :: vco_in
  99    integer,                   intent(out) :: cvDim_out
 100
 101    ! Locals:
 102    integer :: jn,jm
 103    integer :: latPerPE, latPerPEmax, lonPerPE, lonPerPEmax, nlev_T_even, ierr
 104    logical :: covarNeeded
 105
 106    ! Read and prepare covariances and related elements
 107    
 108    call bcsc_setupCH(hco_in,vco_in,covarNeeded,'Analysis')
 109    if (.not.covarNeeded) then
 110      ! Assumes CH covariances not needed.
 111      cvDim_out=0
 112      return
 113    end if
 114    
 115    ! Get covariances and related elements required for assimilation
 116        
 117    call bcsc_getCovarCH(bgStats,transformVarKind_opt=transformVarKindCH)
 118        
 119    ! Set vertical dimension
 120    ! Need an even number of levels for spectral transform
 121    
 122    if (mod(bgStats%nlev,2) /= 0) then
 123      nLev_T_even = bgStats%nlev+1
 124    else
 125      nLev_T_even = bgStats%nlev
 126    end if
 127     
 128    ! Spectral transform and MPI setup parameters.
 129
 130    nla_mpiglobal = (bgStats%ntrunc+1)*(bgStats%ntrunc+2)/2    
 131    gstID  = gst_setup(bgStats%ni,bgStats%nj,bgStats%ntrunc,bgStats%nkgdim)
 132    gstID2 = gst_setup(bgStats%ni,bgStats%nj,bgStats%ntrunc,nlev_T_even)
 133    if (mmpi_myid == 0) write(*,*) 'bchm_setupCH: returned value of gstID =',gstID
 134    if (mmpi_myid == 0) write(*,*) 'bchm_setupCH: returned value of gstID2=',gstID2
 135
 136    call mmpi_setup_latbands(bgStats%nj, latPerPE, latPerPEmax, myLatBeg, &
 137      myLatEnd)
 138    call mmpi_setup_lonbands(bgStats%ni, lonPerPE, lonPerPEmax, myLonBeg, &
 139      myLonEnd)
 140
 141    call mmpi_setup_m(bgStats%ntrunc,mymBeg,mymEnd,mymSkip,mymCount)
 142    call mmpi_setup_n(bgStats%ntrunc,mynBeg,mynEnd,mynSkip,mynCount)
 143
 144    call gst_ilaList_mpiglobal(ilaList_mpiglobal,nla_mpilocal,maxMyNla,gstID, &
 145      mymBeg,mymEnd,mymSkip,mynBeg,mynEnd,mynSkip)
 146    call gst_ilaList_mpilocal(ilaList_mpilocal,gstID,mymBeg,mymEnd,mymSkip, &
 147      mynBeg,mynEnd,mynSkip)
 148
 149    ! Compute mpilocal control vector size
 150    do jm = mymBeg, mymEnd, mymSkip
 151      do jn = mynBeg, mynEnd, mynSkip
 152        if (jm <= jn) then
 153          if (jm == 0) then
 154            ! only real component for jm=0
 155            cvDim_mpilocal = cvDim_mpilocal + 1*bgStats%nkgdim
 156          else
 157            ! both real and imaginary components for jm>0
 158            cvDim_mpilocal = cvDim_mpilocal + 2*bgStats%nkgdim
 159          end if
 160        end if
 161      end do
 162    end do
 163    cvDim_out = cvDim_mpilocal
 164
 165    ! Also compute mpiglobal control vector dimension
 166    call rpn_comm_allreduce(cvDim_mpilocal, cvDim_mpiglobal, 1, &
 167                            "mpi_integer", "mpi_sum", "GRID", ierr)
 168
 169    initialized = .true.
 170   
 171  end subroutine bchm_setupCH
 172
 173  !--------------------------------------------------------------------------
 174  ! bchm_bSqrt
 175  !--------------------------------------------------------------------------
 176  subroutine bchm_bSqrt(controlvector_in,statevector, stateVectorRef_opt)
 177    !
 178    !:Purpose: To apply B_CHM^1/2 to a control vector.
 179    !
 180    !          Based on bhi_bsqrt
 181    !
 182    implicit none
 183
 184    ! Arguments:
 185    real(8),                    intent(inout) :: controlVector_in(cvDim_mpilocal)
 186    type(struct_gsv),           intent(inout) :: statevector
 187    type(struct_gsv), optional, intent(in)    :: stateVectorRef_opt
 188    
 189    ! Locals:
 190    real(8), allocatable :: gd_out(:,:,:)
 191    real(8)   :: hiControlVector(nla_mpilocal,2,bgStats%nkgdim)
 192    character(len=30) :: transform
 193    integer :: varIndex
 194    
 195    if (.not.initialized) return
 196
 197    if (mmpi_myid == 0) write(*,*) 'bchm_bsqrt: starting'
 198    if (mmpi_myid == 0) write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
 199
 200    allocate(gd_out(myLonBeg:myLonEnd,myLatBeg:myLatEnd,bgStats%nkgdim))
 201
 202    call bchm_cain(controlVector_in,hiControlVector)
 203
 204    call bchm_spa2gd(hiControlVector,gd_out)
 205    
 206    call bchm_copyToStatevector(statevector,gd_out)
 207
 208    if ( trim(transformVarKindCH) /= '' ) then  
 209     
 210      transform = trim(transformVarKindCH)//'CH_tlm'
 211      do varIndex= 1, bgStats%numvar3d+bgStats%numvar2d
 212   
 213        if (vnl_varKindFromVarname(vnl_varNameList(varIndex)) /= 'CH') cycle
 214    
 215        if ( present(stateVectorRef_opt) ) then
 216          call gvt_transform( statevector,  &                          ! INOUT
 217                              trim(transform), &                       ! IN
 218                              stateVectorRef_opt=stateVectorRef_opt, & ! IN
 219			      varName_opt=bgStats%varNameList(varIndex) ) ! IN
 220        else
 221          call gvt_transform( statevector,  &                          ! INOUT
 222                              trim(transform), &                       ! IN
 223                              varName_opt=bgStats%varNameList(varIndex) ) ! IN
 224        end if
 225
 226      end do
 227    end if
 228    
 229    deallocate(gd_out)
 230
 231    if (mmpi_myid == 0) write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
 232    if (mmpi_myid == 0) write(*,*) 'bchm_bsqrt: done'
 233
 234  end subroutine bchm_bSqrt
 235
 236  !--------------------------------------------------------------------------
 237  ! bchm_bSqrtAd
 238  !--------------------------------------------------------------------------
 239  subroutine bchm_bSqrtAd(statevector,controlVector_out, stateVectorRef_opt)
 240    !
 241    !:Purpose: To apply adjoint of B_CHM^1/2 to a statevector.
 242    !
 243    !          Based on bhi_bSqrtAd.
 244    !
 245    implicit none
 246
 247    ! Arguments:
 248    real(8),                    intent(inout) :: controlVector_out(cvDim_mpilocal)
 249    type(struct_gsv),           intent(inout) :: statevector
 250    type(struct_gsv), optional, intent(in)    :: stateVectorRef_opt
 251    
 252    ! Locals:
 253    real(8), allocatable :: gd_in(:,:,:)
 254    real(8)   :: hiControlVector(nla_mpilocal,2,bgStats%nkgdim)
 255    character(len=30) :: transform
 256    integer :: varIndex  
 257
 258    if ( .not.initialized ) then
 259      if (mmpi_myid == 0) write(*,*) 'bMatrixChem not initialized'
 260      return
 261    end if
 262
 263    if (mmpi_myid == 0) write(*,*) 'bchm_bsqrtad: starting'
 264    if (mmpi_myid == 0) write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
 265
 266    allocate(gd_in(myLonBeg:myLonEnd,myLatBeg:myLatEnd,bgStats%nkgdim))
 267
 268    if ( trim(transformVarKindCH) /= '' ) then  
 269          
 270      transform = trim(transformVarKindCH)//'CH_ad'            
 271      do varIndex = 1, bgStats%numvar3d+bgStats%numvar2d
 272   
 273        if (vnl_varKindFromVarname(vnl_varNameList(varIndex)) /= 'CH') cycle
 274    
 275        if ( present(stateVectorRef_opt) ) then
 276          call gvt_transform( statevector,  &                          ! INOUT
 277                              trim(transform), &                       ! IN
 278                              stateVectorRef_opt=stateVectorRef_opt, &  ! IN
 279			      varName_opt=bgStats%varNameList(varIndex) ) ! IN
 280        else
 281          call gvt_transform( statevector,  &                          ! INOUT
 282                              trim(transform), &                       ! IN
 283                              varName_opt=bgStats%varNameList(varIndex) ) ! IN
 284        end if
 285
 286      end do
 287    end if
 288
 289    call bchm_copyFromStatevector(statevector,gd_in)
 290
 291    call bchm_spa2gdad(gd_in,hiControlVector)
 292
 293    call bchm_cainad(hiControlVector,controlVector_out)
 294
 295    deallocate(gd_in)
 296
 297    if (mmpi_myid == 0) write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
 298    if (mmpi_myid == 0) write(*,*) 'bchm_bsqrtad: done'
 299
 300  end subroutine bchm_bSqrtAd
 301
 302  !--------------------------------------------------------------------------
 303  ! bchm_cain
 304  !--------------------------------------------------------------------------
 305  subroutine bchm_cain(controlVector_in,hiControlVector_out)
 306    !
 307    implicit none
 308
 309    ! Arguments:
 310    real(8), intent(inout) :: controlVector_in(cvDim_mpilocal)
 311    real(8), intent(inout) :: hiControlVector_out(nla_mpilocal,2,bgStats%nkgdim)
 312
 313    ! Locals:
 314    integer :: jdim, levelIndex, jm, jn, ila_mpilocal, ila_mpiglobal
 315
 316    jdim = 0
 317    hiControlVector_out(:,:,:) = 0.0d0
 318    do levelIndex = 1, bgStats%nkgdim
 319      do jm = mymBeg, mymEnd, mymSkip
 320        do jn = mynBeg, mynEnd, mynSkip
 321          if (jm <= jn) then
 322            ila_mpiglobal = gst_getNIND(jm,gstID) + jn - jm
 323            ila_mpilocal  = ilaList_mpilocal(ila_mpiglobal)
 324            if (jm == 0) then
 325              ! only real component for jm=0
 326              jdim = jdim + 1
 327              hiControlVector_out(ila_mpilocal,1,levelIndex) = controlVector_in(jdim)
 328            else
 329              ! both real and imaginary components for jm>0
 330              jdim = jdim + 1
 331              hiControlVector_out(ila_mpilocal,1,levelIndex) = controlVector_in(jdim)
 332              jdim = jdim + 1
 333              hiControlVector_out(ila_mpilocal,2,levelIndex) = controlVector_in(jdim)
 334            end if
 335          end if
 336        end do
 337      end do
 338    end do
 339
 340  end subroutine bchm_cain
 341
 342  !--------------------------------------------------------------------------
 343  ! bchm_cainAd
 344  !--------------------------------------------------------------------------
 345  subroutine bchm_cainAd(hiControlVector_in,controlVector_out)
 346    implicit none
 347
 348    ! Arguments:
 349    real(8), intent(inout) :: controlVector_out(cvDim_mpilocal)
 350    real(8), intent(inout) :: hiControlVector_in(nla_mpilocal,2,bgStats%nkgdim)
 351
 352    ! Locals:
 353    integer :: jdim, levelIndex, jm, jn, ila_mpilocal, ila_mpiglobal
 354
 355    jdim = 0
 356    do levelIndex = 1, bgStats%nkgdim
 357      do jm = mymBeg, mymEnd, mymSkip
 358        do jn = mynBeg, mynEnd, mynSkip
 359          if (jm <= jn) then
 360            ila_mpiglobal = gst_getNIND(jm,gstID) + jn - jm
 361            ila_mpilocal  = ilaList_mpilocal(ila_mpiglobal)
 362            if (jm == 0) then
 363              ! only real component for jm=0
 364              jdim = jdim + 1
 365              controlVector_out(jdim) = controlVector_out(jdim) + hiControlVector_in(ila_mpilocal,1,levelIndex)
 366            else
 367              ! both real and imaginary components for jm>0
 368              jdim = jdim + 1
 369              controlVector_out(jdim) = controlVector_out(jdim) + hiControlVector_in(ila_mpilocal,1,levelIndex)*2.0d0
 370              jdim = jdim + 1
 371              controlVector_out(jdim) = controlVector_out(jdim) + hiControlVector_in(ila_mpilocal,2,levelIndex)*2.0d0
 372            end if
 373          end if
 374        end do
 375      end do
 376    end do
 377
 378  end subroutine bchm_cainAd
 379
 380  !--------------------------------------------------------------------------
 381  ! bchm_spa2gd
 382  !--------------------------------------------------------------------------
 383  subroutine bchm_spa2gd(hiControlVector_in,gd_out)
 384    implicit none
 385
 386    ! Arguments:
 387    real(8), intent(inout) :: hiControlVector_in(nla_mpilocal,2,bgStats%nkgdim)
 388    real(8), intent(inout) :: gd_out(myLonBeg:myLonEnd,myLatBeg:myLatEnd,bgStats%nkgdim)
 389
 390    ! Locals:
 391    real(8) :: sp(nla_mpilocal,2,bgStats%nkgdim)
 392    integer :: jn,jm,ila_mpilocal,ila_mpiglobal,icount
 393    real(8) :: sq2
 394    real(8) , allocatable :: zsp(:,:,:), zsp2(:,:,:)
 395    integer :: levelIndex, lonIndex, latIndex
 396    real(8), target  :: gd(myLonBeg:myLonEnd,myLatBeg:myLatEnd,bgStats%nkgdim)
 397
 398    ! maybe not needed:
 399    sp(:,:,:) = 0.0d0
 400    sq2 = sqrt(2.0d0)
 401
 402    allocate(zsp(bgStats%nkgdim,2,mymCount))
 403    allocate(zsp2(bgStats%nkgdim,2,mymCount))
 404
 405    !$OMP PARALLEL DO PRIVATE(jn,jm,levelIndex,ila_mpiglobal,ila_mpilocal, &
 406      zsp2,zsp,icount)
 407    do jn = mynBeg, mynEnd, mynSkip
 408
 409      icount = 0
 410      do jm = mymBeg, mymEnd, mymSkip
 411        if (jm <= jn) then
 412          icount = icount+1
 413          ila_mpiglobal = gst_getNIND(jm,gstID) + jn - jm
 414          ila_mpilocal = ilaList_mpilocal(ila_mpiglobal)
 415          do levelIndex = 1, bgStats%nkgdim
 416            zsp(levelIndex,1,icount) = hiControlVector_in(ila_mpilocal,1, &
 417	      levelIndex)
 418            zsp(levelIndex,2,icount) = hiControlVector_in(ila_mpilocal,2, &
 419	      levelIndex)
 420          end do
 421        end if
 422      end do
 423      if (icount > 0) then
 424
 425        CALL DGEMM('N','N',bgStats%nkgdim,2*icount,bgStats%nkgdim,1.0d0, &
 426	  bgStats%corns(1,1,jn),bgStats%nkgdim,zsp(1,1,1), &
 427	  bgStats%nkgdim,0.0d0,zsp2(1,1,1),bgStats%nkgdim)
 428
 429        icount = 0
 430        do jm = mymBeg, mymEnd, mymSkip
 431          if (jm <= jn) then
 432            icount = icount+1
 433            ila_mpiglobal = gst_getNIND(jm,gstID) + jn - jm
 434            ila_mpilocal = ilaList_mpilocal(ila_mpiglobal)
 435            do levelIndex = 1, bgStats%nkgdim
 436              sp(ila_mpilocal,1,levelIndex) = zsp2(levelIndex,1,icount)
 437              sp(ila_mpilocal,2,levelIndex) = zsp2(levelIndex,2,icount)
 438            end do
 439          end if
 440        end do
 441
 442      end if
 443
 444      ! make adjustments for jm=0
 445      if (mymBeg == 0) then
 446
 447        ila_mpiglobal = gst_getNind(0,gstID) + jn
 448        ila_mpilocal = ilaList_mpilocal(ila_mpiglobal)
 449
 450        do levelIndex = 1, bgStats%nkgdim
 451          sp(ila_mpilocal,1,levelIndex) = sp(ila_mpilocal,1,levelIndex)*sq2
 452          sp(ila_mpilocal,2,levelIndex) = 0.0d0
 453        end do
 454
 455      end if
 456
 457    end do
 458    !$OMP END PARALLEL DO
 459    deallocate(zsp)
 460    deallocate(zsp2)
 461
 462
 463    !$OMP PARALLEL DO PRIVATE(latIndex,levelIndex,lonIndex)
 464    do levelIndex = 1, bgStats%nkgdim
 465      do latIndex = myLatBeg, myLatEnd
 466        do lonIndex = myLonBeg, myLonEnd
 467          gd(lonIndex,latIndex,levelIndex) = 0.0d0
 468        end do
 469      end do
 470    end do
 471    !$OMP END PARALLEL DO
 472
 473    call gst_setID(gstID)
 474    call gst_speree(sp,gd)
 475    call gst_setID(gstID2)
 476
 477    !$OMP PARALLEL DO PRIVATE(latIndex,levelIndex,lonIndex)
 478    do levelIndex = 1, bgStats%nkgdim
 479      do latIndex = myLatBeg, myLatEnd
 480        do lonIndex = myLonBeg, myLonEnd
 481          gd(lonIndex,latIndex,levelIndex) = gd(lonIndex,latIndex,levelIndex) &
 482	    *bgStats%stddev(lonIndex,latIndex,levelIndex)
 483        end do
 484      end do
 485    end do
 486    !$OMP END PARALLEL DO
 487    
 488    !$OMP PARALLEL DO PRIVATE(latIndex,levelIndex,lonIndex)
 489    do levelIndex = 1, bgStats%nkgdim
 490      do latIndex = myLatBeg, myLatEnd
 491        do lonIndex = myLonBeg, myLonEnd
 492          gd_out(lonIndex,latIndex,levelIndex) = gd(lonIndex,latIndex,levelIndex)
 493        end do
 494      end do
 495    end do
 496    !$OMP END PARALLEL DO
 497
 498  end subroutine bchm_spa2gd
 499
 500  !--------------------------------------------------------------------------
 501  ! bchm_spa2gdad
 502  !--------------------------------------------------------------------------
 503  subroutine bchm_spa2gdad(gd_in,hiControlVector_out)
 504    implicit none
 505
 506    ! Arguments:
 507    real(8), intent(inout) :: hiControlVector_out(nla_mpilocal,2,bgStats%nkgdim)
 508    real(8), intent(inout) :: gd_in(myLonBeg:myLonEnd,myLatBeg:myLatEnd,bgStats%nkgdim)
 509
 510    ! Locals:
 511    real(8) :: sp(nla_mpilocal,2,bgStats%nkgdim)
 512    integer :: jn, jm, ila_mpilocal, ila_mpiglobal, icount
 513    real(8) :: sq2
 514    real(8), allocatable :: zsp(:,:,:), zsp2(:,:,:)
 515    integer :: levelIndex, lonIndex, latIndex
 516    real(8), target :: gd(myLonBeg:myLonEnd,myLatBeg:myLatEnd,bgStats%nkgdim)
 517
 518    !$OMP PARALLEL DO PRIVATE(latIndex,levelIndex,lonIndex)
 519    do levelIndex = 1, bgStats%nkgdim
 520      do latIndex = myLatBeg, myLatEnd
 521        do lonIndex = myLonBeg, myLonEnd                                                      
 522          gd(lonIndex,latIndex,levelIndex) = gd_in(lonIndex,latIndex,levelIndex)
 523        end do
 524      end do
 525    end do
 526    !$OMP END PARALLEL DO
 527
 528    !$OMP PARALLEL DO PRIVATE(latIndex,levelIndex,lonIndex)
 529    do levelIndex = 1, bgStats%nkgdim
 530      do latIndex = myLatBeg, myLatEnd
 531        do lonIndex = myLonBeg, myLonEnd
 532          gd(lonIndex,latIndex,levelIndex) = gd(lonIndex,latIndex,levelIndex)* &
 533 	     bgStats%stddev(lonIndex,latIndex,levelIndex)
 534        end do
 535      end do
 536    end do
 537    !$OMP END PARALLEL DO
 538
 539    call gst_setID(gstID)
 540    call gst_speree_ad(sp,gd)
 541
 542    hiControlVector_out(:,:,:) = 0.0d0
 543    sq2 = sqrt(2.0d0)
 544    allocate(zsp(bgStats%nkgdim,2,mymCount))
 545    allocate(zsp2(bgStats%nkgdim,2,mymCount))
 546
 547    !$OMP PARALLEL DO PRIVATE(JN,JM,levelIndex,ILA_MPILOCAL,ILA_MPIGLOBAL,zsp, &
 548      zsp2,icount)
 549    do jn = mynBeg, mynEnd, mynSkip
 550
 551      icount = 0
 552      do jm = mymBeg, mymEnd, mymSkip
 553        if (jm <= jn) then
 554          icount = icount+1
 555          ila_mpiglobal = gst_getNind(jm,gstID) + jn - jm
 556          ila_mpilocal = ilaList_mpilocal(ila_mpiglobal)
 557          do levelIndex = 1, bgStats%nkgdim
 558            zsp2(levelIndex,1,icount) = sp(ila_mpilocal,1,levelIndex)
 559            zsp2(levelIndex,2,icount) = sp(ila_mpilocal,2,levelIndex)
 560          end do
 561        end if
 562      end do
 563
 564      if (icount > 0) then
 565
 566        CALL DGEMM('T','N',bgStats%nkgdim,2*icount,bgStats%nkgdim,1.0d0, &
 567	  bgStats%corns(1,1,jn),bgStats%nkgdim,zsp2(1,1,1), &
 568	  bgStats%nkgdim,0.0d0,zsp(1,1,1),bgStats%nkgdim)
 569
 570        icount = 0
 571        do jm = mymBeg, jn, mymSkip
 572          icount=icount+1
 573          ila_mpiglobal = gst_getNIND(jm,gstID) + jn - jm
 574          ila_mpilocal = ilaList_mpilocal(ila_mpiglobal)
 575          do levelIndex = 1, bgStats%nkgdim
 576            hiControlVector_out(ila_mpilocal,1,levelIndex) = zsp(levelIndex,1,icount)
 577            hiControlVector_out(ila_mpilocal,2,levelIndex) = zsp(levelIndex,2,icount)
 578          end do
 579        end do
 580
 581      end if
 582
 583      ! make adjustments for jm=0
 584      if (mymBeg == 0) then
 585
 586        ila_mpiglobal = gst_getNIND(0,gstID) + jn
 587        ila_mpilocal = ilaList_mpilocal(ila_mpiglobal)
 588
 589        do levelIndex = 1, bgStats%nkgdim
 590          hiControlVector_out(ila_mpilocal,1,levelIndex) = &
 591	    hiControlVector_out(ila_mpilocal,1,levelIndex)*sq2
 592          hiControlVector_out(ila_mpilocal,2,levelIndex) = &
 593	    hiControlVector_out(ila_mpilocal,2,levelIndex)*sq2
 594        end do
 595
 596      end if
 597
 598    end do
 599    !$OMP END PARALLEL DO
 600
 601    deallocate(zsp)
 602    deallocate(zsp2)
 603
 604  end subroutine bchm_spa2gdad
 605
 606  !--------------------------------------------------------------------------
 607  ! bchm_copyToStatevector
 608  !--------------------------------------------------------------------------
 609  subroutine bchm_copyToStatevector(statevector,gd)
 610    implicit none
 611    
 612    ! Arguments:
 613    type(struct_gsv), intent(inout) :: statevector
 614    real(8),          intent(inout) :: gd(myLonBeg:myLonEnd,myLatBeg:myLatEnd,bgStats%nkgdim)
 615    
 616    ! Locals:
 617    integer :: lonIndex, levelIndex, levelIndex2, latIndex, varIndex, ilev1, ilev2
 618    real(8), pointer :: field(:,:,:)
 619
 620    do varIndex = 1,bgStats%numvar3d+bgStats%numvar2d
 621      call gsv_getField(statevector,field,bgStats%varNameList(varIndex))
 622      ilev1 = bgStats%nsposit(varIndex)
 623      ilev2 = bgStats%nsposit(varIndex+1)-1 
 624        
 625      !!!$OMP PARALLEL DO PRIVATE(latIndex,levelIndex,levelIndex2,lonIndex)
 626      do levelIndex = ilev1, ilev2
 627        levelIndex2 = levelIndex-ilev1+1
 628        do latIndex = myLatBeg, myLatEnd
 629          do lonIndex = myLonBeg, myLonEnd
 630            field(lonIndex,latIndex,levelIndex2) = gd(lonIndex,latIndex,levelIndex)
 631          end do
 632        end do
 633      end do
 634      !!!$OMP END PARALLEL DO
 635    end do
 636  end subroutine bchm_copyToStatevector
 637
 638  !--------------------------------------------------------------------------
 639  ! bchm_copyFromStatevector
 640  !--------------------------------------------------------------------------
 641  subroutine bchm_copyFromStatevector(statevector,gd)
 642    implicit none
 643
 644    ! Arguments:
 645    type(struct_gsv), intent(inout) :: statevector
 646    real(8),          intent(inout) :: gd(myLonBeg:myLonEnd,myLatBeg:myLatEnd,bgStats%nkgdim)
 647    
 648    ! Locals:
 649    integer :: lonIndex, levelIndex, levelIndex2, latIndex, varIndex, ilev1, ilev2
 650    real(8), pointer :: field(:,:,:)
 651
 652    do varIndex = 1,bgStats%numvar3d+bgStats%numvar2d
 653      call gsv_getField(statevector,field,bgStats%varNameList(varIndex))
 654
 655      ilev1 = bgStats%nsposit(varIndex)
 656      ilev2 = bgStats%nsposit(varIndex+1)-1 
 657
 658      !!!$OMP PARALLEL DO PRIVATE(latIndex,levelIndex,levelIndex2,lonIndex)
 659      do levelIndex = ilev1, ilev2
 660        levelIndex2 = levelIndex-ilev1+1
 661        do latIndex = myLatBeg, myLatEnd
 662          do lonIndex = myLonBeg, myLonEnd
 663            gd(lonIndex,latIndex,levelIndex) = field(lonIndex,latIndex,levelIndex2)
 664          end do
 665        end do
 666      end do
 667      !!!$OMP END PARALLEL DO
 668     end do
 669
 670  end subroutine bchm_copyFromStatevector
 671
 672  !--------------------------------------------------------------------------
 673  ! bchm_reduceToMPILocal
 674  !--------------------------------------------------------------------------
 675  subroutine bchm_reduceToMPILocal(cv_mpilocal,cv_mpiglobal)
 676    implicit none
 677
 678    ! Arguments:
 679    real(8), intent(out) :: cv_mpilocal(cvDim_mpilocal)
 680    real(8), intent(in)  :: cv_mpiglobal(:)
 681
 682    ! Locals:
 683    real(8), allocatable :: cv_allMaxMpilocal(:,:)
 684    integer, allocatable :: cvDim_allMpilocal(:), displs(:)
 685    integer, allocatable :: allnBeg(:),allnEnd(:),allnSkip(:)
 686    integer, allocatable :: allmBeg(:),allmEnd(:),allmSkip(:)
 687    integer :: jproc,cvDim_maxmpilocal,ierr
 688    integer :: levelIndex,jn,jm,ila_mpiglobal,jdim_mpilocal,jdim_mpiglobal
 689
 690    if (.not.initialized) return
 691
 692    call rpn_comm_allreduce(cvDim_mpilocal, cvDim_maxmpilocal, &
 693                            1,"MPI_INTEGER","MPI_MAX","GRID",ierr)
 694
 695    if (mmpi_myid == 0) then
 696      allocate(cvDim_allMpiLocal(mmpi_nprocs))
 697    else
 698      allocate(cvDim_allMpiLocal(1))
 699    end if
 700
 701    call rpn_comm_gather(cvDim_mpiLocal   ,1,"mpi_integer",       &
 702                         cvDim_allMpiLocal,1,"mpi_integer",0,"GRID",ierr)
 703
 704    if (mmpi_myid == 0) then
 705      allocate(allnBeg(mmpi_nprocs))
 706      allocate(allnEnd(mmpi_nprocs))
 707      allocate(allnSkip(mmpi_nprocs))
 708      allocate(allmBeg(mmpi_nprocs))
 709      allocate(allmEnd(mmpi_nprocs))
 710      allocate(allmSkip(mmpi_nprocs))
 711    else
 712      allocate(allnBeg(1))
 713      allocate(allnEnd(1))
 714      allocate(allnSkip(1))
 715      allocate(allmBeg(1))
 716      allocate(allmEnd(1))
 717      allocate(allmSkip(1))
 718    end if
 719
 720    call rpn_comm_gather(mynBeg  ,1,"mpi_integer",       &
 721                         allnBeg ,1,"mpi_integer",0,"GRID",ierr)
 722    call rpn_comm_gather(mynEnd  ,1,"mpi_integer",       &
 723                         allnEnd ,1,"mpi_integer",0,"GRID",ierr)
 724    call rpn_comm_gather(mynSkip ,1,"mpi_integer",       &
 725                         allnSkip,1,"mpi_integer",0,"GRID",ierr)
 726
 727    call rpn_comm_gather(mymBeg  ,1,"mpi_integer",       &
 728                         allmBeg ,1,"mpi_integer",0,"GRID",ierr)
 729    call rpn_comm_gather(mymEnd  ,1,"mpi_integer",       &
 730                         allmEnd ,1,"mpi_integer",0,"GRID",ierr)
 731    call rpn_comm_gather(mymSkip ,1,"mpi_integer",       &
 732                         allmSkip,1,"mpi_integer",0,"GRID",ierr)
 733
 734    ! Prepare to data to be distributed
 735    if (mmpi_myid == 0) then
 736
 737      allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
 738
 739      !$OMP PARALLEL DO PRIVATE(jproc,jdim_mpilocal,levelIndex,jm,jn,ila_mpiglobal,jdim_mpiglobal)
 740      do jproc = 0, (mmpi_nprocs-1)
 741        cv_allmaxmpilocal(:,jproc+1) = 0.d0
 742        jdim_mpilocal = 0
 743
 744        do levelIndex = 1, bgStats%nkgdim
 745          do jm = allmBeg(jproc+1), allmEnd(jproc+1), allmSkip(jproc+1)
 746            do jn = allnBeg(jproc+1), allnEnd(jproc+1), allnSkip(jproc+1)
 747
 748              if (jm <= jn) then
 749                      
 750                ila_mpiglobal = gst_getNIND(jm,gstID) + jn - jm
 751                      
 752                ! figure out index into global control vector
 753                if (jm == 0) then
 754                  ! for jm=0 only real part
 755                  jdim_mpiglobal = ila_mpiglobal
 756                else
 757                  ! for jm>0 both real and imaginary part
 758                  jdim_mpiglobal = 2*ila_mpiglobal-1 - (bgStats%ntrunc+1)
 759                end if
 760                ! add offset for level
 761                jdim_mpiglobal = jdim_mpiglobal + (levelIndex-1) * &
 762		                 (bgStats%ntrunc+1)*(bgStats%ntrunc+1)
 763                      
 764                ! index into local control vector computer as in cain
 765                if (jm == 0) then
 766                  ! only real component for jm=0
 767                  jdim_mpilocal = jdim_mpilocal + 1
 768                  cv_allmaxmpilocal(jdim_mpilocal,jproc+1) = cv_mpiglobal(jdim_mpiglobal)
 769                else
 770                  ! both real and imaginary components for jm>0
 771                  jdim_mpilocal = jdim_mpilocal + 1
 772                  cv_allmaxmpilocal(jdim_mpilocal,jproc+1) = cv_mpiglobal(jdim_mpiglobal)
 773                  jdim_mpilocal = jdim_mpilocal + 1
 774                  cv_allmaxmpilocal(jdim_mpilocal,jproc+1) = cv_mpiglobal(jdim_mpiglobal+1)
 775                end if
 776                      
 777                if (jdim_mpilocal > cvDim_allMpiLocal(jproc+1)) then
 778                  write(*,*)
 779                  write(*,*) 'ERROR: jdim_mpilocal > cvDim_allMpiLocal(jproc+1)', jdim_mpilocal, cvDim_mpilocal
 780                  write(*,*) '       proc, levelIndex, jn, jm = ',jproc, levelIndex, jn, jm
 781                  call utl_abort('bchm_reduceToMPILocal')
 782                end if
 783                if (jdim_mpiglobal > cvDim_mpiglobal) then
 784                  write(*,*)
 785                  write(*,*) 'ERROR: jdim_mpiglobal > cvDim_mpiglobal', jdim_mpiglobal, cvDim_mpiglobal
 786                  write(*,*) '       proc, levelIndex, jn, jm = ',jproc, levelIndex, jn, jm
 787                  call utl_abort('bchm_reduceToMPILocal')
 788                end if
 789
 790              end if
 791            end do
 792          end do
 793        end do
 794 
 795      end do ! jproc
 796      !$OMP END PARALLEL DO
 797
 798    else
 799      allocate(cv_allmaxmpilocal(1,1))
 800    end if
 801
 802    !- Distribute
 803    allocate(displs(mmpi_nprocs))
 804    do jproc = 0, (mmpi_nprocs-1)
 805      displs(jproc+1) = jproc*cvDim_maxMpiLocal ! displacement wrt cv_allMaxMpiLocal from which
 806                                                ! to take the outgoing data to process jproc
 807    end do
 808
 809    call rpn_comm_scatterv(cv_allMaxMpiLocal, cvDim_allMpiLocal, displs, "mpi_double_precision", &
 810                           cv_mpiLocal, cvDim_mpiLocal, "mpi_double_precision", &
 811                           0, "GRID", ierr)
 812
 813    !- End
 814    deallocate(displs)
 815    deallocate(cv_allMaxMpiLocal)
 816    deallocate(cvDim_allMpiLocal)
 817    deallocate(allnBeg)
 818    deallocate(allnEnd)
 819    deallocate(allnSkip)
 820    deallocate(allmBeg)
 821    deallocate(allmEnd)
 822    deallocate(allmSkip)
 823
 824  end subroutine bchm_reduceToMPILocal
 825
 826  !--------------------------------------------------------------------------
 827  ! bchm_reduceToMPILocal_r4
 828  !--------------------------------------------------------------------------
 829  subroutine bchm_reduceToMPILocal_r4(cv_mpilocal,cv_mpiglobal)
 830    implicit none
 831
 832    ! Arguments:
 833    real(4), intent(out) :: cv_mpilocal(cvDim_mpilocal)
 834    real(4), intent(in)  :: cv_mpiglobal(:)
 835
 836    ! Locals:
 837    real(4), allocatable :: cv_allMaxMpilocal(:,:)
 838    integer, allocatable :: cvDim_allMpilocal(:), displs(:)
 839    integer, allocatable :: allnBeg(:),allnEnd(:),allnSkip(:)
 840    integer, allocatable :: allmBeg(:),allmEnd(:),allmSkip(:)
 841    integer :: jproc,cvDim_maxmpilocal,ierr
 842    integer :: levelIndex,jn,jm,ila_mpiglobal,jdim_mpilocal,jdim_mpiglobal
 843
 844    if (.not.initialized) return
 845
 846    call rpn_comm_allreduce(cvDim_mpilocal, cvDim_maxmpilocal, &
 847                            1,"MPI_INTEGER","MPI_MAX","GRID",ierr)
 848
 849    if (mmpi_myid == 0) then
 850      allocate(cvDim_allMpiLocal(mmpi_nprocs))
 851    else
 852      allocate(cvDim_allMpiLocal(1))
 853    end if
 854
 855    call rpn_comm_gather(cvDim_mpiLocal   ,1,"mpi_integer",       &
 856                         cvDim_allMpiLocal,1,"mpi_integer",0,"GRID",ierr)
 857
 858    if (mmpi_myid == 0) then
 859      allocate(allnBeg(mmpi_nprocs))
 860      allocate(allnEnd(mmpi_nprocs))
 861      allocate(allnSkip(mmpi_nprocs))
 862      allocate(allmBeg(mmpi_nprocs))
 863      allocate(allmEnd(mmpi_nprocs))
 864      allocate(allmSkip(mmpi_nprocs))
 865    else
 866      allocate(allnBeg(1))
 867      allocate(allnEnd(1))
 868      allocate(allnSkip(1))
 869      allocate(allmBeg(1))
 870      allocate(allmEnd(1))
 871      allocate(allmSkip(1))
 872    end if
 873
 874    call rpn_comm_gather(mynBeg  ,1,"mpi_integer",       &
 875                         allnBeg ,1,"mpi_integer",0,"GRID",ierr)
 876    call rpn_comm_gather(mynEnd  ,1,"mpi_integer",       &
 877                         allnEnd ,1,"mpi_integer",0,"GRID",ierr)
 878    call rpn_comm_gather(mynSkip ,1,"mpi_integer",       &
 879                         allnSkip,1,"mpi_integer",0,"GRID",ierr)
 880
 881    call rpn_comm_gather(mymBeg  ,1,"mpi_integer",       &
 882                         allmBeg ,1,"mpi_integer",0,"GRID",ierr)
 883    call rpn_comm_gather(mymEnd  ,1,"mpi_integer",       &
 884                         allmEnd ,1,"mpi_integer",0,"GRID",ierr)
 885    call rpn_comm_gather(mymSkip ,1,"mpi_integer",       &
 886                         allmSkip,1,"mpi_integer",0,"GRID",ierr)
 887
 888    ! Prepare to data to be distributed
 889    if (mmpi_myid == 0) then
 890
 891      allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
 892
 893      !$OMP PARALLEL DO PRIVATE(jproc,jdim_mpilocal,levelIndex,jm,jn,ila_mpiglobal,jdim_mpiglobal)
 894      do jproc = 0, (mmpi_nprocs-1)
 895        cv_allmaxmpilocal(:,jproc+1) = 0.d0
 896        jdim_mpilocal = 0
 897
 898        do levelIndex = 1, bgStats%nkgdim
 899          do jm = allmBeg(jproc+1), allmEnd(jproc+1), allmSkip(jproc+1)
 900            do jn = allnBeg(jproc+1), allnEnd(jproc+1), allnSkip(jproc+1)
 901
 902              if (jm <= jn) then
 903                      
 904                ila_mpiglobal = gst_getNIND(jm,gstID) + jn - jm
 905                      
 906                ! figure out index into global control vector
 907                if  (jm == 0) then
 908                  ! for jm=0 only real part
 909                  jdim_mpiglobal = ila_mpiglobal
 910                else
 911                  ! for jm>0 both real and imaginary part
 912                  jdim_mpiglobal = 2*ila_mpiglobal-1 - (bgStats%ntrunc+1)
 913                end if
 914                ! add offset for level
 915                jdim_mpiglobal = jdim_mpiglobal + (levelIndex-1) * &
 916		                 (bgStats%ntrunc+1)*(bgStats%ntrunc+1)
 917                      
 918                ! index into local control vector computer as in cain
 919                if (jm == 0) then
 920                  ! only real component for jm=0
 921                  jdim_mpilocal = jdim_mpilocal + 1
 922                  cv_allmaxmpilocal(jdim_mpilocal,jproc+1) = cv_mpiglobal(jdim_mpiglobal)
 923                else
 924                  ! both real and imaginary components for jm>0
 925                  jdim_mpilocal = jdim_mpilocal + 1
 926                  cv_allmaxmpilocal(jdim_mpilocal,jproc+1) = cv_mpiglobal(jdim_mpiglobal)
 927                  jdim_mpilocal = jdim_mpilocal + 1
 928                  cv_allmaxmpilocal(jdim_mpilocal,jproc+1) = cv_mpiglobal(jdim_mpiglobal+1)
 929                end if
 930                      
 931                if (jdim_mpilocal > cvDim_allMpiLocal(jproc+1)) then
 932                  write(*,*)
 933                  write(*,*) 'ERROR: jdim_mpilocal > cvDim_allMpiLocal(jproc+1)', jdim_mpilocal, cvDim_mpilocal
 934                  write(*,*) '       proc, levelIndex, jn, jm = ',jproc, levelIndex, jn, jm
 935                  call utl_abort('bchm_reduceToMPILocal')
 936                end if
 937                if (jdim_mpiglobal > cvDim_mpiglobal) then
 938                  write(*,*)
 939                  write(*,*) 'ERROR: jdim_mpiglobal > cvDim_mpiglobal', jdim_mpiglobal, cvDim_mpiglobal
 940                  write(*,*) '       proc, levelIndex, jn, jm = ',jproc, levelIndex, jn, jm
 941                  call utl_abort('bchm_reduceToMPILocal')
 942                end if
 943
 944              end if
 945            end do
 946          end do
 947        end do
 948 
 949      end do ! jproc
 950      !$OMP END PARALLEL DO
 951
 952    else
 953      allocate(cv_allmaxmpilocal(1,1))
 954    end if
 955
 956    !- Distribute
 957    allocate(displs(mmpi_nprocs))
 958    do jproc = 0, (mmpi_nprocs-1)
 959      displs(jproc+1) = jproc*cvDim_maxMpiLocal ! displacement wrt cv_allMaxMpiLocal from which
 960                                                ! to take the outgoing data to process jproc
 961    end do
 962
 963    call rpn_comm_scatterv(cv_allMaxMpiLocal, cvDim_allMpiLocal, displs, "mpi_real4", &
 964                           cv_mpiLocal, cvDim_mpiLocal, "mpi_real4", &
 965                           0, "GRID", ierr)
 966
 967    !- End
 968    deallocate(displs)
 969    deallocate(cv_allMaxMpiLocal)
 970    deallocate(cvDim_allMpiLocal)
 971    deallocate(allnBeg)
 972    deallocate(allnEnd)
 973    deallocate(allnSkip)
 974    deallocate(allmBeg)
 975    deallocate(allmEnd)
 976    deallocate(allmSkip)
 977
 978  end subroutine bchm_reduceToMPILocal_r4
 979
 980  !--------------------------------------------------------------------------
 981  ! bchm_expandToMPIGlobal
 982  !--------------------------------------------------------------------------
 983  subroutine bchm_expandToMPIGlobal(cv_mpilocal,cv_mpiglobal)
 984    implicit none
 985
 986    ! Arguments:
 987    real(8), intent(in)  :: cv_mpilocal(cvDim_mpilocal)
 988    real(8), intent(out) :: cv_mpiglobal(:)
 989
 990    ! Locals:
 991    real(8), allocatable :: cv_maxmpilocal(:)
 992    real(8), pointer :: cv_allmaxmpilocal(:,:) => null()
 993    integer, allocatable :: allnBeg(:),allnEnd(:),allnSkip(:)
 994    integer, allocatable :: allmBeg(:),allmEnd(:),allmSkip(:)
 995    integer :: levelIndex, jn, jm, jproc, ila_mpiglobal, jdim_mpilocal, jdim_mpiglobal, ierr, cvDim_maxmpilocal
 996
 997    if (.not.initialized) return
 998
 999    !
1000    !- 1.  Gather all local control vectors onto mpi task 0
1001    !
1002    call rpn_comm_allreduce(cvDim_mpilocal,cvDim_maxmpilocal,1,"mpi_integer","mpi_max","GRID",ierr)
1003
1004    allocate(cv_maxmpilocal(cvDim_maxmpilocal))
1005
1006    nullify(cv_allmaxmpilocal)
1007    if (mmpi_myid == 0) then
1008      allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
1009    else
1010      allocate(cv_allmaxmpilocal(1,1))
1011    end if
1012
1013    cv_maxmpilocal(:) = 0.0d0
1014    cv_maxmpilocal(1:cvDim_mpilocal) = cv_mpilocal(1:cvDim_mpilocal)
1015
1016    call rpn_comm_gather(cv_maxmpilocal,    cvDim_maxmpilocal, "mpi_double_precision",  &
1017                         cv_allmaxmpilocal, cvDim_maxmpilocal, "mpi_double_precision", 0, "GRID", ierr )
1018
1019    deallocate(cv_maxmpilocal)
1020
1021    !
1022    !- 2.  Reorganize gathered mpilocal control vectors into the mpiglobal control vector
1023    !
1024    if (mmpi_myid == 0) then
1025      allocate(allnBeg(mmpi_nprocs))
1026      allocate(allnEnd(mmpi_nprocs))
1027      allocate(allnSkip(mmpi_nprocs))
1028      allocate(allmBeg(mmpi_nprocs))
1029      allocate(allmEnd(mmpi_nprocs))
1030      allocate(allmSkip(mmpi_nprocs))
1031    else
1032      allocate(allnBeg(1))
1033      allocate(allnEnd(1))
1034      allocate(allnSkip(1))
1035      allocate(allmBeg(1))
1036      allocate(allmEnd(1))
1037      allocate(allmSkip(1))
1038    end if
1039
1040    call rpn_comm_gather(mynBeg  ,1,"mpi_integer",       &
1041                         allnBeg ,1,"mpi_integer",0,"GRID",ierr)
1042    call rpn_comm_gather(mynEnd  ,1,"mpi_integer",       &
1043                         allnEnd ,1,"mpi_integer",0,"GRID",ierr)
1044    call rpn_comm_gather(mynSkip ,1,"mpi_integer",       &
1045                         allnSkip,1,"mpi_integer",0,"GRID",ierr)
1046
1047    call rpn_comm_gather(mymBeg  ,1,"mpi_integer",       &
1048                         allmBeg ,1,"mpi_integer",0,"GRID",ierr)
1049    call rpn_comm_gather(mymEnd  ,1,"mpi_integer",       &
1050                         allmEnd ,1,"mpi_integer",0,"GRID",ierr)
1051    call rpn_comm_gather(mymSkip ,1,"mpi_integer",       &
1052                         allmSkip,1,"mpi_integer",0,"GRID",ierr)
1053
1054    if (mmpi_myid == 0) then
1055      cv_mpiglobal(:) = 0.0d0
1056
1057      !$OMP PARALLEL DO PRIVATE(jproc,jdim_mpilocal,levelIndex,jm,jn,ila_mpiglobal,jdim_mpiglobal)
1058      do jproc = 0, (mmpi_nprocs-1)
1059        jdim_mpilocal = 0
1060
1061        do levelIndex = 1, bgStats%nkgdim
1062          do jm = allmBeg(jproc+1), allmEnd(jproc+1), allmSkip(jproc+1)
1063            do jn = allnBeg(jproc+1), allnEnd(jproc+1), allnSkip(jproc+1)
1064              if (jm <= jn) then
1065
1066                ila_mpiglobal = gst_getNIND(jm,gstID) + jn - jm
1067
1068                ! figure out index into global control vector
1069                if (jm == 0) then
1070                  ! for jm=0 only real part
1071                  jdim_mpiglobal = ila_mpiglobal
1072                else
1073                  ! for jm>0 both real and imaginary part
1074                  jdim_mpiglobal = 2*ila_mpiglobal-1 - (bgStats%ntrunc+1)
1075                end if
1076                ! add offset for level
1077                jdim_mpiglobal = jdim_mpiglobal + (levelIndex-1) * &
1078		                 (bgStats%ntrunc+1)*(bgStats%ntrunc+1)
1079
1080                ! index into local control vector
1081                if (jm == 0) then
1082                  ! only real component for jm=0
1083                  jdim_mpilocal = jdim_mpilocal + 1
1084                  cv_mpiglobal(jdim_mpiglobal) = cv_allmaxmpilocal(jdim_mpilocal,jproc+1)
1085                else
1086                  ! both real and imaginary components for jm>0
1087                  jdim_mpilocal = jdim_mpilocal + 1
1088                  cv_mpiglobal(jdim_mpiglobal) = cv_allmaxmpilocal(jdim_mpilocal,jproc+1)
1089                  jdim_mpilocal = jdim_mpilocal + 1
1090                  cv_mpiglobal(jdim_mpiglobal+1) = cv_allmaxmpilocal(jdim_mpilocal,jproc+1)
1091                end if
1092
1093                if (jdim_mpiglobal > cvDim_mpiglobal) then
1094                  write(*,*) 'ERROR: jdim,cvDim,mpiglobal=', &
1095		             jdim_mpiglobal,cvDim_mpiglobal,levelIndex,jn,jm
1096		end if
1097
1098              end if
1099            end do
1100          end do
1101        end do
1102      end do ! jproc
1103      !$OMP END PARALLEL DO
1104
1105    end if ! myid == 0 
1106
1107    deallocate(allnBeg)
1108    deallocate(allnEnd)
1109    deallocate(allnSkip)
1110    deallocate(allmBeg)
1111    deallocate(allmEnd)
1112    deallocate(allmSkip)
1113    deallocate(cv_allmaxmpilocal)
1114
1115  end subroutine bchm_expandToMPIGlobal
1116
1117  !--------------------------------------------------------------------------
1118  ! bchm_expandToMPIGlobal_r4
1119  !--------------------------------------------------------------------------
1120  subroutine bchm_expandToMPIGlobal_r4(cv_mpilocal,cv_mpiglobal)
1121    implicit none
1122
1123    ! Arguments:
1124    real(4), intent(in)  :: cv_mpilocal(cvDim_mpilocal)
1125    real(4), intent(out) :: cv_mpiglobal(:)
1126
1127    ! Locals:
1128    real(4), allocatable :: cv_maxmpilocal(:)
1129    real(4), pointer :: cv_allmaxmpilocal(:,:) => null()
1130    integer, allocatable :: allnBeg(:),allnEnd(:),allnSkip(:)
1131    integer, allocatable :: allmBeg(:),allmEnd(:),allmSkip(:)
1132    integer :: levelIndex, jn, jm, jproc, ila_mpiglobal, jdim_mpilocal, jdim_mpiglobal, ierr, cvDim_maxmpilocal
1133
1134    if (.not.initialized) return
1135
1136    !
1137    !- 1.  Gather all local control vectors onto mpi task 0
1138    !
1139    call rpn_comm_allreduce(cvDim_mpilocal,cvDim_maxmpilocal,1,"mpi_integer","mpi_max","GRID",ierr)
1140
1141    allocate(cv_maxmpilocal(cvDim_maxmpilocal))
1142
1143    nullify(cv_allmaxmpilocal)
1144    if (mmpi_myid == 0) then
1145      allocate(cv_allmaxmpilocal(cvDim_maxmpilocal,mmpi_nprocs))
1146    else
1147      allocate(cv_allmaxmpilocal(1,1))
1148    end if
1149
1150    cv_maxmpilocal(:) = 0.0d0
1151    cv_maxmpilocal(1:cvDim_mpilocal) = cv_mpilocal(1:cvDim_mpilocal)
1152
1153    call rpn_comm_gather(cv_maxmpilocal,    cvDim_maxmpilocal, "mpi_real4",  &
1154                         cv_allmaxmpilocal, cvDim_maxmpilocal, "mpi_real4", 0, "GRID", ierr )
1155
1156    deallocate(cv_maxmpilocal)
1157
1158    !
1159    !- 2.  Reorganize gathered mpilocal control vectors into the mpiglobal control vector
1160    !
1161    if (mmpi_myid == 0) then
1162      allocate(allnBeg(mmpi_nprocs))
1163      allocate(allnEnd(mmpi_nprocs))
1164      allocate(allnSkip(mmpi_nprocs))
1165      allocate(allmBeg(mmpi_nprocs))
1166      allocate(allmEnd(mmpi_nprocs))
1167      allocate(allmSkip(mmpi_nprocs))
1168    else
1169      allocate(allnBeg(1))
1170      allocate(allnEnd(1))
1171      allocate(allnSkip(1))
1172      allocate(allmBeg(1))
1173      allocate(allmEnd(1))
1174      allocate(allmSkip(1))
1175    end if
1176
1177    call rpn_comm_gather(mynBeg  ,1,"mpi_integer",       &
1178                         allnBeg ,1,"mpi_integer",0,"GRID",ierr)
1179    call rpn_comm_gather(mynEnd  ,1,"mpi_integer",       &
1180                         allnEnd ,1,"mpi_integer",0,"GRID",ierr)
1181    call rpn_comm_gather(mynSkip ,1,"mpi_integer",       &
1182                         allnSkip,1,"mpi_integer",0,"GRID",ierr)
1183
1184    call rpn_comm_gather(mymBeg  ,1,"mpi_integer",       &
1185                         allmBeg ,1,"mpi_integer",0,"GRID",ierr)
1186    call rpn_comm_gather(mymEnd  ,1,"mpi_integer",       &
1187                         allmEnd ,1,"mpi_integer",0,"GRID",ierr)
1188    call rpn_comm_gather(mymSkip ,1,"mpi_integer",       &
1189                         allmSkip,1,"mpi_integer",0,"GRID",ierr)
1190
1191    if (mmpi_myid == 0) then
1192      cv_mpiglobal(:) = 0.0d0
1193
1194      !$OMP PARALLEL DO PRIVATE(jproc,jdim_mpilocal,levelIndex,jm,jn,ila_mpiglobal,jdim_mpiglobal)
1195      do jproc = 0, (mmpi_nprocs-1)
1196        jdim_mpilocal = 0
1197
1198        do levelIndex = 1, bgStats%nkgdim
1199          do jm = allmBeg(jproc+1), allmEnd(jproc+1), allmSkip(jproc+1)
1200            do jn = allnBeg(jproc+1), allnEnd(jproc+1), allnSkip(jproc+1)
1201              if (jm <= jn) then
1202
1203                ila_mpiglobal = gst_getNIND(jm,gstID) + jn - jm
1204
1205                ! figure out index into global control vector
1206                if (jm == 0) then
1207                  ! for jm=0 only real part
1208                  jdim_mpiglobal = ila_mpiglobal
1209                else
1210                  ! for jm>0 both real and imaginary part
1211                  jdim_mpiglobal = 2*ila_mpiglobal-1 - (bgStats%ntrunc+1)
1212                end if
1213                ! add offset for level
1214                jdim_mpiglobal = jdim_mpiglobal + (levelIndex-1) * &
1215		                 (bgStats%ntrunc+1)*(bgStats%ntrunc+1)
1216
1217                ! index into local control vector
1218                if (jm == 0) then
1219                  ! only real component for jm=0
1220                  jdim_mpilocal = jdim_mpilocal + 1
1221                  cv_mpiglobal(jdim_mpiglobal) = cv_allmaxmpilocal(jdim_mpilocal,jproc+1)
1222                else
1223                  ! both real and imaginary components for jm>0
1224                  jdim_mpilocal = jdim_mpilocal + 1
1225                  cv_mpiglobal(jdim_mpiglobal) = cv_allmaxmpilocal(jdim_mpilocal,jproc+1)
1226                  jdim_mpilocal = jdim_mpilocal + 1
1227                  cv_mpiglobal(jdim_mpiglobal+1) = cv_allmaxmpilocal(jdim_mpilocal,jproc+1)
1228                end if
1229
1230                if (jdim_mpiglobal > cvDim_mpiglobal) then
1231                  write(*,*) 'ERROR: jdim,cvDim,mpiglobal=', &
1232		             jdim_mpiglobal,cvDim_mpiglobal,levelIndex,jn,jm
1233                end if 
1234              end if
1235            end do
1236          end do
1237        end do
1238      end do ! jproc
1239      !$OMP END PARALLEL DO
1240
1241    end if ! myid == 0 
1242
1243    deallocate(allnBeg)
1244    deallocate(allnEnd)
1245    deallocate(allnSkip)
1246    deallocate(allmBeg)
1247    deallocate(allmEnd)
1248    deallocate(allmSkip)
1249    deallocate(cv_allmaxmpilocal)
1250
1251  end subroutine bchm_expandtompiglobal_r4
1252
1253  !--------------------------------------------------------------------------
1254  ! bchm_finalize
1255  !--------------------------------------------------------------------------
1256  subroutine bchm_finalize()
1257    implicit none
1258
1259    if (initialized) call bcsc_finalize()
1260
1261  end subroutine bchm_finalize
1262
1263end module bMatrixChem_mod