gridStateVector_mod sourceΒΆ

   1module gridStateVector_mod
   2  ! MODULE gridStateVector_mod (prefix='gsv' category='6. High-level data objects')
   3  !
   4  !:Purpose: The grid-point state vector and related information.
   5  !
   6  use mpi, only : mpi_status_size ! this is the mpi library module
   7  use codePrecision_mod
   8  use midasMpi_mod
   9  use earthConstants_mod
  10  use varNameList_mod
  11  use verticalCoord_mod
  12  use horizontalCoord_mod
  13  use oceanMask_mod
  14  use mathPhysConstants_mod
  15  use timeCoord_mod
  16  use utilities_mod
  17  use message_mod
  18  use physicsFunctions_mod
  19  implicit none
  20  save
  21  private
  22
  23  ! public structure definition
  24  public :: struct_gsv
  25
  26  ! public subroutines and functions
  27  public :: gsv_setup, gsv_allocate, gsv_deallocate, gsv_zero, gsv_3dto4d, gsv_3dto4dAdj
  28  public :: gsv_getOffsetFromVarName, gsv_getLevFromK, gsv_getVarNameFromK, gsv_getMpiIdFromK, gsv_hPad
  29  public :: gsv_modifyVarName, gsv_modifyDate
  30  public :: gsv_transposeTilesToStep, gsv_transposeStepToTiles, gsv_transposeTilesToMpiGlobal
  31  public :: gsv_transposeTilesToVarsLevs, gsv_transposeTilesToVarsLevsAd
  32  public :: gsv_transposeVarsLevsToTiles
  33  public :: gsv_getField, gsv_getFieldUV
  34  public :: gsv_getHeightSfc, gsv_isAssocHeightSfc
  35  public :: gsv_getDateStamp, gsv_getNumLev, gsv_getNumLevFromVarName
  36  public :: gsv_add, gsv_power, gsv_scale, gsv_scaleVertical, gsv_copy, gsv_copy4Dto3D
  37  public :: gsv_copyHeightSfc, gsv_copyMask
  38  public :: gsv_getVco, gsv_getHco, gsv_getHco_physics, gsv_getDataKind, gsv_getNumK
  39  public :: gsv_horizSubSample
  40  public :: gsv_varKindExist, gsv_varExist, gsv_varNamesList
  41  public :: gsv_msgVarNames, gsv_msgFldExtremum
  42  public :: gsv_dotProduct, gsv_schurProduct
  43  public :: gsv_field3d_hbilin, gsv_smoothHorizontal
  44  public :: gsv_communicateTimeParams, gsv_resetTimeParams, gsv_getInfo, gsv_isInitialized
  45  public :: gsv_applyMaskLAM, gsv_containsNonZeroValues
  46  public :: gsv_isAllocated
  47  public :: gsv_transposesteptovarslevs
  48
  49  ! public module variables
  50  public :: gsv_conversionVarKindCHtoMicrograms, gsv_rhumin 
  51  public :: gsv_minValVarKindCH
  52
  53  interface gsv_getField
  54    module procedure gsv_getFieldWrapper_r4
  55    module procedure gsv_getFieldWrapper_r8
  56    module procedure gsv_getField3D_r4
  57    module procedure gsv_getField3D_r8
  58  end interface gsv_getField
  59
  60  interface gsv_getFieldUV
  61    module procedure gsv_getFieldUVWrapper_r4
  62    module procedure gsv_getFieldUVWrapper_r8
  63  end interface gsv_getFieldUV
  64
  65  type struct_gdUV
  66    real(8), pointer :: r8(:,:,:) => null()
  67    real(4), pointer :: r4(:,:,:) => null()
  68  end type struct_gdUV
  69
  70  type struct_gsv
  71    ! This is the derived type of the statevector object
  72
  73    ! These are the main data storage arrays
  74    logical, private          :: allocated=.false.
  75    real(8), pointer, private :: gd_r8(:,:,:,:) => null()
  76    real(8), pointer, private :: gd3d_r8(:,:,:) => null()
  77    real(4), pointer, private :: gd_r4(:,:,:,:) => null()
  78    real(4), pointer, private :: gd3d_r4(:,:,:) => null()
  79    type(struct_ocm)    :: oceanMask
  80    logical             :: heightSfcPresent = .false.
  81    real(8), pointer, private :: heightSfc(:,:) => null()  ! for VarsLevs, heightSfc only on proc 0
  82
  83    ! These are used when distribution is VarLevs to keep corresponding UV
  84    ! components together on each mpi task to facilitate horizontal interpolation
  85    logical             :: UVComponentPresent = .false.  ! wind component present on this mpi task
  86    logical             :: extraUVallocated = .false.    ! extra winds (gdUV) are allocated
  87    integer             :: myUVkBeg, myUVkEnd, myUVkCount
  88    type(struct_gdUV), pointer, private :: gdUV(:) => null()
  89
  90    ! All the remaining extra information
  91    integer             :: dataKind = 8 ! default value
  92    integer             :: ni, nj, nk, numStep, anltime
  93    integer             :: latPerPE, latPerPEmax, myLatBeg, myLatEnd
  94    integer             :: lonPerPE, lonPerPEmax, myLonBeg, myLonEnd
  95    integer             :: mykCount, mykBeg, mykEnd
  96    integer, pointer    :: allLatBeg(:), allLatEnd(:), allLatPerPE(:)
  97    integer, pointer    :: allLonBeg(:), allLonEnd(:), allLonPerPE(:)
  98    integer, pointer    :: allkCount(:), allkBeg(:), allkEnd(:)
  99    integer, pointer    :: allUVkCount(:), allUVkBeg(:), allUVkEnd(:)
 100    integer, pointer    :: dateStampList(:) => null()
 101    integer, pointer    :: dateStamp3d
 102    integer, pointer    :: dateOriginList(:)
 103    integer, pointer    :: npasList(:), ip2List(:)
 104    integer             :: deet
 105    character(len=12)   :: etiket
 106    type(struct_vco), pointer :: vco => null()
 107    type(struct_hco), pointer :: hco => null()
 108    type(struct_hco), pointer :: hco_physics => null()
 109    integer, pointer    :: varOffset(:), varNumLev(:)
 110    logical, pointer    :: onPhysicsGrid(:)
 111    logical             :: mpi_local=.false.
 112    character(len=8)    :: mpi_distribution='None'  ! or 'Tiles' or 'VarsLevs'
 113    integer             :: horizSubSample
 114    logical             :: varExistList(vnl_numVarMax)
 115    character(len=12)   :: hInterpolateDegree='UNSPECIFIED' ! or 'LINEAR' or 'CUBIC' or 'NEAREST'
 116    character(len=12)   :: hExtrapolateDegree='MAXIMUM' ! or 'VALUE' or 'MINIMUM' or 'NEUTRAL'
 117    logical             :: addHeightSfcOffset = .false.
 118  end type struct_gsv
 119
 120  logical :: varExistList(vnl_numVarMax)
 121
 122  ! public variables
 123  real(8) :: gsv_rhumin
 124  real(8) :: gsv_minValVarKindCH(vnl_numVarMax)
 125
 126  ! namelist variables:
 127  character(len=8) :: ANLTIME_BIN                ! can be 'MIDDLE', 'FIRST' or 'LAST'
 128  logical :: addHeightSfcOffset                  ! choose to add non-zero height offset to diagnostic (sfc) levels
 129  logical :: abortOnMpiImbalance                 ! choose to abort program when MPI imbalance is too large
 130  real(8) :: minValVarKindCH(vnl_numVarMax)      ! variable-dependent minimum value applied to chemistry variables
 131  logical :: gsv_conversionVarKindCHtoMicrograms ! activate unit conversion for CH variables
 132     
 133  ! arrays used for transpose VarsLevs <-> Tiles
 134  real(4), allocatable :: gd_send_varsLevs_r4(:,:,:,:), gd_recv_varsLevs_r4(:,:,:,:)
 135  real(8), allocatable :: gd_send_varsLevs_r8(:,:,:,:), gd_recv_varsLevs_r8(:,:,:,:)
 136
 137  ! initialized 
 138  logical :: initialized = .false.
 139
 140  contains
 141
 142  !--------------------------------------------------------------------------
 143  ! gsv_getOffsetFromVarName
 144  !--------------------------------------------------------------------------
 145  function gsv_getOffsetFromVarName(statevector,varName) result(offset)
 146    !
 147    !:Purpose: Returns the offset for the given variable provided it exists
 148    !
 149    implicit none
 150
 151    ! Arguments:
 152    type(struct_gsv), intent(in)  :: statevector
 153    character(len=*), intent(in)  :: varName
 154    ! Result:
 155    integer                       :: offset
 156
 157    ! Locals:
 158    integer :: varIndex
 159
 160    varIndex = vnl_varListIndex(varName)
 161    if (.not. statevector%varExistList(varIndex)) then
 162      call utl_abort('gsv_getOffsetFromVarName: specified varName does not exist in stateVector: ' // trim(varName))
 163    end if
 164    offset=statevector%varOffset(varIndex)
 165
 166  end function gsv_getOffsetFromVarName
 167
 168  !--------------------------------------------------------------------------
 169  ! gsv_getVarNameFromK
 170  !--------------------------------------------------------------------------
 171  function gsv_getVarNameFromK(statevector,kIndex) result(varName)
 172    !
 173    !:Purpose: Returns the variable name from a given kIndex
 174    !
 175    implicit none
 176
 177    ! Arguments
 178    type(struct_gsv), intent(in)  :: statevector
 179    integer,          intent(in)  :: kIndex
 180    ! Result:
 181    character(len=4)              :: varName
 182
 183    ! Locals:
 184    integer :: varIndex
 185
 186    do varIndex = 1, vnl_numvarmax
 187      if (statevector%varExistList(varIndex)) then
 188        if ((kIndex >= (statevector%varOffset(varIndex) + 1)) .and.  &
 189            (kIndex <= (statevector%varOffset(varIndex) + statevector%varNumLev(varIndex)))) then
 190          varName = vnl_varNameList(varIndex)
 191          return
 192        end if
 193      end if
 194    end do
 195
 196    call utl_abort('gsv_getVarNameFromK: kIndex out of range: '//str(kIndex))
 197
 198  end function gsv_getVarNameFromK
 199
 200  !--------------------------------------------------------------------------
 201  ! gsv_getLevFromK
 202  !--------------------------------------------------------------------------
 203  function gsv_getLevFromK(statevector,kIndex) result(levIndex)
 204    !
 205    !:Purpose: Returns level index from a given kIndex
 206    !
 207    implicit none
 208
 209    ! Arguments
 210    type(struct_gsv), intent(in) :: statevector
 211    integer,          intent(in) :: kIndex
 212    ! Result:
 213    integer                      :: levIndex
 214
 215    ! Locals
 216    integer             :: varIndex
 217
 218    do varIndex = 1, vnl_numvarmax
 219      if (statevector%varExistList(varIndex)) then
 220        if ((kIndex >= (statevector%varOffset(varIndex) + 1)) .and.  &
 221            (kIndex <= (statevector%varOffset(varIndex) + statevector%varNumLev(varIndex)))) then
 222          levIndex = kIndex - statevector%varOffset(varIndex)
 223          return
 224        end if
 225      end if
 226    end do
 227
 228    call utl_abort('gsv_getLevFromK: kIndex out of range: '//str(kIndex))
 229
 230  end function gsv_getLevFromK
 231
 232  !--------------------------------------------------------------------------
 233  ! gsv_getMpiIdFromK
 234  !--------------------------------------------------------------------------
 235  function gsv_getMpiIdFromK(statevector,kIndex) result(MpiId)
 236    !
 237    !:Purpose: Returns MPI id from the given kIndex
 238    !
 239    implicit none
 240
 241    ! Arguments:
 242    type(struct_gsv), intent(in) :: statevector
 243    integer,          intent(in) :: kIndex
 244    ! Result:
 245    integer                      :: MpiId
 246    
 247    ! Locals:
 248    integer             :: procIndex
 249
 250    do procIndex = 1, mmpi_nprocs
 251      if ((kIndex >= statevector%allKBeg(procIndex)) .and.  &
 252          (kIndex <= statevector%allKEnd(procIndex))) then
 253          MpiId = procIndex - 1
 254          return
 255      end if
 256    end do
 257
 258    call utl_abort('gsv_getMpiIdFromK: kIndex out of range: '//str(kIndex))
 259
 260  end function gsv_getMpiIdFromK
 261
 262  !--------------------------------------------------------------------------
 263  ! gsv_varExist
 264  !--------------------------------------------------------------------------
 265  recursive function gsv_varExist(statevector_opt,varName) result(varExist)
 266    !
 267    !:Purpose: Boolean fonction returning .true. if the queried variable
 268    !           exists in the statevector if provided or in the global variable
 269    !           list otherwise.
 270    !           For 'Z_*' and 'P_*' variables, the statevector argument is
 271    !           mandatory.
 272    !
 273    implicit none
 274
 275    ! Arguments:
 276    type(struct_gsv), optional, intent(in)  :: statevector_opt
 277    character(len=*),           intent(in)  :: varName
 278    ! Result:
 279    logical                                 :: varExist 
 280
 281    if (varName == 'Z_*') then
 282      varExist =  gsv_varExist(statevector_opt, 'Z_T') .and. &
 283                  gsv_varExist(statevector_opt, 'Z_M')
 284    else if (varName == 'P_*') then
 285      varExist =  gsv_varExist(statevector_opt, 'P_T') .and. &
 286                  gsv_varExist(statevector_opt, 'P_M')
 287    else
 288      if (present(statevector_opt)) then
 289        if (statevector_opt%varExistList(vnl_varListIndex(varName))) then
 290          varExist = .true.
 291        else
 292          varExist = .false.
 293        end if
 294      else
 295        if (varExistList(vnl_varListIndex(varName))) then
 296          varExist = .true.
 297        else
 298          varExist = .false.
 299        end if
 300      end if
 301    end if
 302
 303  end function gsv_varExist
 304
 305  !--------------------------------------------------------------------------
 306  ! gsv_varNamesList
 307  !--------------------------------------------------------------------------
 308  subroutine gsv_varNamesList(varNames,statevector_opt)
 309    !
 310    !:Purpose: Lists all variables present in the statevector 
 311    !
 312    implicit none
 313    
 314    ! Arguments:
 315    character(len=4), pointer,  intent(inout) :: varNames(:)
 316    type(struct_gsv), optional, intent(in)    :: statevector_opt
 317    
 318    ! Locals:
 319    integer :: varLevIndex, varNumberIndex, varIndex, numFound
 320    character(len=4) :: varName
 321
 322    if (associated(varNames)) then
 323      call utl_abort('gsv_varNamesList: varNames must be NULL pointer on input')
 324    end if
 325 
 326    !
 327    !- 1. How many variables do we have?
 328    !
 329    numFound = 0
 330    if (present(statevector_opt)) then
 331      do varIndex = 1, vnl_numvarmax
 332        if (gsv_varExist(statevector_opt,vnl_varNameList(varIndex))) numFound = numFound + 1
 333      end do
 334    else
 335      do varIndex = 1, vnl_numvarmax
 336        if (varExistList(varIndex)) numFound = numFound + 1
 337      end do
 338    end if
 339
 340    !
 341    !- 2. List the variables
 342    !
 343    allocate(varNames(numFound))
 344    varNames(:) = ''
 345
 346    varNumberIndex = 0
 347    if (present(statevector_opt)) then
 348      !- 2.1 List the variables based on the varLevIndex ordering
 349      do varLevIndex = 1, statevector_opt%nk
 350        varName = gsv_getVarNameFromK(statevector_opt,varLevIndex)
 351        if (.not. ANY(varNames(:) == varName)) then
 352          varNumberIndex = varNumberIndex + 1
 353          varNames(varNumberIndex) = varName
 354        end if
 355      end do
 356    else
 357      !- 2.2 List the variables based on the varnamelist_mod ordering
 358      do varIndex = 1, vnl_numvarmax
 359        if (varExistList(varIndex)) then
 360          varName = vnl_varNameList(varIndex)
 361          if (.not. ANY(varNames(:) == varName)) then
 362            varNumberIndex = varNumberIndex + 1
 363            varNames(varNumberIndex) = varName
 364          end if
 365        end if
 366      end do
 367    end if
 368
 369  end subroutine gsv_varNamesList
 370
 371  !--------------------------------------------------------------------------
 372  ! gsv_msgVarNames
 373  !--------------------------------------------------------------------------
 374  function gsv_msgVarNames(statevector) result(string)
 375    !
 376    !:Purpose: Return a string of all variables present in the statevector
 377    !
 378    implicit none
 379
 380    ! Arguments:
 381    type(struct_gsv), intent(in)  :: statevector ! stateVector for which variable names extracted
 382    ! Result:
 383    character(len=:), allocatable :: string      ! allocated string with list of variable names
 384
 385    ! Locals:
 386    character(len=4), pointer     :: varNames(:)
 387
 388    call gsv_varNamesList(varNames, statevector)
 389    string = str(varNames)
 390    nullify(varNames)
 391
 392  end function gsv_msgVarNames
 393
 394  !--------------------------------------------------------------------------
 395  ! gsv_msgFldExtremum
 396  !--------------------------------------------------------------------------
 397  function gsv_msgFldExtremum(statevector, varName) result(string)
 398    !
 399    !:Purpose: Return a string describing span of values (min/max) for the specified field
 400    !
 401    implicit none
 402
 403    ! Arguments
 404    type(struct_gsv), intent(in)  :: statevector ! stateVector for which values provided
 405    character(len=*), intent(in)  :: varName     ! specified variable name for the calculation
 406    ! Result:
 407    character(len=:), allocatable :: string      ! allocated string with min/max values
 408
 409    ! Locals
 410    real(8), pointer :: ptr_r8(:,:,:,:)
 411    real(4), pointer :: ptr_r4(:,:,:,:)
 412
 413    if (statevector%datakind == 4) then
 414      call gsv_getField(statevector, ptr_r4, varName)
 415      string = trim(varName)//' (real('//str(statevector%datakind) &
 416           //')) in ['//str(minval(ptr_r4))//', ' &
 417           //str(maxval(ptr_r4))//']'
 418    else
 419      call gsv_getField(statevector, ptr_r8, varName)
 420      string = trim(varName)//' (real('//str(statevector%datakind) &
 421           //')) in ['//str(minval(ptr_r8))//', ' &
 422           //str(maxval(ptr_r8))//']'
 423    end if
 424
 425  end function gsv_msgFldExtremum
 426
 427  !--------------------------------------------------------------------------
 428  ! gsv_getNumLev
 429  !--------------------------------------------------------------------------
 430  function gsv_getNumLev(statevector,varLevel,varName_opt) result(nlev)
 431    !
 432    !:Purpose: Returns the number of levels for a given type of variable;
 433    !           varLevel can be one of 'TH', 'MM', 'SF', 'SFMM', 'SFTH', 'DP',
 434    !           'SS' or 'OT'.
 435    !
 436    implicit none
 437
 438    ! Arguments:
 439    type(struct_gsv),           intent(in) :: statevector ! Input statevector
 440    character(len=*),           intent(in) :: varLevel    ! Variable type in 'TH', 'MM', 'SF', 'SFMM', 'SFTH', 'DP', 'SS' or 'OT'
 441    character(len=*), optional, intent(in) :: varName_opt ! Variable name when varLevel='OT'
 442    ! Result:
 443    integer                       :: nlev
 444
 445    nlev = vco_getNumLev(statevector%vco,varLevel,varName_opt)
 446
 447  end function gsv_getNumLev
 448
 449  !--------------------------------------------------------------------------
 450  ! gsv_getNumK
 451  !--------------------------------------------------------------------------
 452  function gsv_getNumK(statevector) result(numK)
 453    !
 454    !:Purpose: Returns the number of k indexes on the current MPI process
 455    !
 456    implicit none
 457
 458    ! Arguments:
 459    type(struct_gsv), intent(in)  :: statevector
 460    ! Result:
 461    integer                       :: numK
 462
 463    numK = 1 + statevector%mykEnd - statevector%mykBeg
 464
 465  end function gsv_getNumK
 466
 467  !--------------------------------------------------------------------------
 468  ! gsv_getDataKind
 469  !--------------------------------------------------------------------------
 470  function gsv_getDataKind(statevector) result(dataKind)
 471    !
 472    !:Purpose: Returns the real kind (4 or 8 bytes floating point value) of 
 473    !           the input statevector
 474    !
 475    implicit none
 476
 477    ! Arguments:
 478    type(struct_gsv), intent(in)  :: statevector
 479    ! Result:
 480    integer                       :: dataKind
 481
 482    dataKind = statevector%dataKind
 483
 484  end function gsv_getDataKind
 485
 486  !--------------------------------------------------------------------------
 487  ! gsv_getNumLevFromVarName
 488  !--------------------------------------------------------------------------
 489  function gsv_getNumLevFromVarName(statevector,varName) result(nlev)
 490    !
 491    !:Purpose: Returns the number of levels for a given variable
 492    !
 493    implicit none
 494
 495    ! Arguments:
 496    type(struct_gsv), intent(in)  :: statevector
 497    character(len=*), intent(in)  :: varName
 498    ! Result:
 499    integer                       :: nlev
 500
 501    nlev = statevector%varNumLev(vnl_varListIndex(varName))
 502
 503  end function gsv_getNumLevFromVarName
 504
 505  !--------------------------------------------------------------------------
 506  ! gsv_setup
 507  !--------------------------------------------------------------------------
 508  subroutine gsv_setup
 509    !
 510    !:Purpose: Initialises the gridstatevector module global structure.
 511    !
 512    implicit none
 513
 514    ! Locals:
 515    integer           :: varIndex, fnom, fclos, nulnam, ierr, loopIndex
 516    ! namelist variables:
 517    character(len=4)  :: anlvar(vnl_numVarMax)           ! list of state variable names
 518    logical           :: conversionVarKindCHtoMicrograms ! apply chemistry unit conversions when writing to file
 519    real(8)           :: rhumin                          ! minimum value imposed on humidity
 520
 521    NAMELIST /NAMSTATE/anlvar, rhumin, anlTime_bin, addHeightSfcOffset, &
 522                       conversionVarKindCHtoMicrograms, minValVarKindCH, &
 523                       abortOnMpiImbalance
 524
 525    if (initialized) return
 526
 527    call msg('gsv_setup', 'List of known (valid) variable names', mpiAll_opt=.false.)
 528    call msg('gsv_setup', 'varNameList3D   ='//str(vnl_varNameList3D(:)), mpiAll_opt=.false.)
 529    call msg('gsv_setup', 'varNameList3D   ='//str(vnl_varNameList3D(:)), mpiAll_opt=.false.)
 530    call msg('gsv_setup', 'varNameListOther='//str(vnl_varNameListOther(:)), mpiAll_opt=.false.)
 531
 532    ! Read namelist NAMSTATE to find which fields are needed
 533
 534    anlvar(:) = '    '
 535    rhumin = mpc_minimum_hu_r8
 536    anltime_bin = 'MIDDLE'
 537    addHeightSfcOffset = .false.
 538    conversionVarKindCHtoMicrograms = .false.
 539    minValVarKindCH(:) = mpc_missingValue_r8
 540    abortOnMpiImbalance = .true.
 541
 542    nulnam=0
 543    ierr=fnom(nulnam,'./flnml','FTN+SEQ+R/O',0)
 544    read(nulnam,nml=namstate,iostat=ierr)
 545    if (ierr.ne.0) call utl_abort('gsv_setup: Error reading namelist NAMSTATE')
 546    if (mmpi_myid.eq.0) write(*,nml=namstate)
 547    ierr=fclos(nulnam)
 548
 549    gsv_rhumin = rhumin
 550    gsv_conversionVarKindCHtoMicrograms = conversionVarKindCHtoMicrograms
 551
 552    if (varneed('Z_T') .or. varneed('Z_M')) call utl_abort('gsv_setup: height can not be specified as analysis variable in namelist!')
 553    if (varneed('P_T') .or. varneed('P_M')) call utl_abort('gsv_setup: pressure can not be specified as analysis variable in namelist!')
 554
 555    do varIndex = 1, vnl_numvarmax3D
 556      if (varneed(vnl_varNameList3D(varIndex))) then
 557        varExistList(varIndex) = .true.
 558      else
 559        varExistList(varIndex) = .false.
 560      end if
 561    end do
 562
 563    do varIndex = 1, vnl_numvarmax2D
 564      if (varneed(vnl_varNameList2D(varIndex))) then
 565        varExistList(varIndex+vnl_numVarMax3D) = .true.
 566      else
 567        varExistList(varIndex+vnl_numVarMax3D) = .false.
 568      end if
 569    end do
 570
 571    do varIndex = 1, vnl_numvarmaxOther
 572      if (varneed(vnl_varNameListOther(varIndex))) then
 573        varExistList(varIndex+vnl_numVarMax3D+vnl_numVarMax2D) = .true.
 574      else
 575        varExistList(varIndex+vnl_numVarMax3D+vnl_numVarMax2D) = .false.
 576      end if
 577    end do
 578
 579    ! Setup to assign min values to apply
 580    
 581    ! Check for input values only for variables of CH kind
 582    do varIndex = 1, vnl_numvarmax
 583      if (trim(AnlVar(varIndex)) == '') exit
 584      if (vnl_varKindFromVarname(AnlVar(varIndex)) == 'CH') then
 585        if (minValVarKindCH(varIndex) < 0.99d0 * mpc_missingValue_r8) then
 586          if (trim(AnlVar(varIndex)) == 'AF' .or. trim(AnlVar(varIndex)) == 'AC') then
 587            ! Set for particulate matter in micrograms/cm^3
 588            minValVarKindCH(varIndex) = mpc_minimum_pm_r8
 589          else
 590            ! Set for concentrations in micrograms/kg
 591            minValVarKindCH(varIndex) = mpc_minimum_ch_r8
 592          end if
 593        end if
 594      end if
 595    end do
 596
 597    ! Assign min values to apply
 598    gsv_minValVarKindCH(:) = mpc_missingValue_r8
 599    do varIndex = 1, vnl_numvarmax
 600      if (varExistList(varIndex)) then
 601        do loopIndex = 1, vnl_numvarmax
 602          if (trim(AnlVar(loopIndex)) == '') exit
 603          if (trim(vnl_varNameList(varIndex)) == trim(AnlVar(loopIndex))) &
 604             gsv_minValVarKindCH(varIndex) = minValVarKindCH(loopIndex)
 605        end do
 606      end if
 607    end do
 608
 609    call msg('gsv_setup','global varExistList ='//str(varExistList), mpiAll_opt=.false.)
 610
 611    ! Check value for ANLTIME_BIN
 612    if (ANLTIME_BIN .ne. 'MIDDLE' .and. ANLTIME_BIN .ne. 'FIRST' .and.  ANLTIME_BIN .ne. 'LAST') then
 613      call utl_abort('gsv_setup: Problem setting ANLTIME_BIN. Verify NAMSTATE namelist')
 614    end if
 615
 616    initialized = .true.
 617
 618    return
 619
 620    contains
 621
 622      logical function varneed(varName)
 623        implicit none
 624
 625        ! Arguments:
 626        character(len=*), intent(in) :: varName
 627
 628        ! Locals:
 629        integer :: varIndex
 630 
 631        varneed=.false.
 632        do varIndex=1,VNL_NUMVARMAX
 633          if (trim(varName) == trim(anlvar(varIndex))) then
 634            varneed=.true.
 635         end if
 636        end do
 637
 638      end function varneed
 639
 640  end subroutine gsv_setup
 641
 642  !--------------------------------------------------------------------------
 643  ! gsv_isInitialized
 644  !--------------------------------------------------------------------------
 645  function gsv_isInitialized() result(gsvInitialized)
 646    !
 647    !:Purpose: To verify gsv_setup has already run.
 648    !
 649    implicit none
 650
 651    ! Result:
 652    logical :: gsvInitialized
 653
 654    gsvInitialized = .false.
 655
 656    if (initialized) gsvInitialized = .true.
 657
 658  end function gsv_isInitialized
 659
 660  !--------------------------------------------------------------------------
 661  ! gsv_isAllocated
 662  !--------------------------------------------------------------------------
 663  function gsv_isAllocated(stateVector) result(isAllocated)
 664    !
 665    !:Purpose: To verify if a stateVector is allocated.
 666    !
 667    implicit none
 668
 669    ! Arguments:
 670    type(struct_gsv), intent(in)  :: stateVector
 671    ! Result:
 672    logical                       :: isAllocated
 673
 674    isAllocated = stateVector%allocated
 675
 676  end function gsv_isAllocated
 677
 678  !--------------------------------------------------------------------------
 679  ! gsv_allocate
 680  !--------------------------------------------------------------------------
 681  subroutine gsv_allocate(statevector, numStep, hco_ptr, vco_ptr, dateStamp_opt, dateStampList_opt,  &
 682                          mpi_local_opt, mpi_distribution_opt, horizSubSample_opt,                   &
 683                          varNames_opt, dataKind_opt, allocHeightSfc_opt, hInterpolateDegree_opt,    &
 684                          hExtrapolateDegree_opt, allocHeight_opt, allocPressure_opt, beSilent_opt)
 685    !
 686    !:Purpose: Allocates the struct_gsv memory, sets horizontal and vertical 
 687    !           coordinates, sets some options and MPI distribution 
 688    !           configurations
 689    !
 690    implicit none
 691
 692    ! Arguments:
 693    type(struct_gsv),           intent(inout) :: statevector            ! statevector to be allocated
 694    integer,                    intent(in)    :: numStep                ! number of time steps
 695    type(struct_hco), pointer,  intent(in)    :: hco_ptr                ! horizontal structure
 696    type(struct_vco), pointer,  intent(in)    :: vco_ptr                ! vertical structure
 697    integer,          optional, intent(in)    :: dateStamp_opt          ! reference datestamp
 698    integer,          optional, intent(in)    :: dateStampList_opt(:)   ! explicit datestamp list
 699    logical,          optional, intent(in)    :: mpi_local_opt          ! if .false. no MPI distribution will be used 
 700    character(len=*), optional, intent(in)    :: mpi_distribution_opt   ! MPI distribution strategy in {'Tiles', 'VarsLevs', 'None'} default: 'Tiles'
 701    integer,          optional, intent(in)    :: horizSubSample_opt     ! horizontal subsampling factor (to get a coarser grid)
 702    character(len=*), optional, intent(in)    :: varNames_opt(:)        ! allow specification of variables
 703    integer,          optional, intent(in)    :: dataKind_opt           ! real kind (4 or 8 bytes; defaults to 8)
 704    logical,          optional, intent(in)    :: allocHeightSfc_opt     ! toggle allocation of surface height field
 705    logical,          optional, intent(in)    :: allocHeight_opt        ! force the allocation of 'Z_T' and 'Z_M'
 706    logical,          optional, intent(in)    :: allocPressure_opt      ! force the allocation of 'P_T' and 'P_M'
 707    character(len=*), optional, intent(in)    :: hInterpolateDegree_opt ! set the horizontal interpolation degree
 708    character(len=*), optional, intent(in)    :: hExtrapolateDegree_opt ! set the horizontal extrapolation degree
 709    logical,          optional, intent(in)    :: beSilent_opt           ! limit outputs to listing
 710
 711    ! Locals:
 712    integer :: ierr,iloc,varIndex,varIndex2,stepIndex,lon1,lat1,k1,kIndex,kIndex2,levUV
 713    character(len=4) :: UVname
 714    logical :: beSilent, allocPressure, allocHeight
 715    integer :: verbLevel
 716
 717    call utl_tmg_start(168, 'low-level--gsv_allocate')
 718
 719    if (.not. initialized) then
 720      call msg('gsv_allocate','gsv_setup must be called first to be able to use this module. Call it now')
 721      call gsv_setup
 722    end if
 723
 724    if (present(beSilent_opt)) then
 725      beSilent = beSilent_opt
 726    else
 727      beSilent = .true.
 728    end if
 729
 730    ! set the horizontal and vertical coordinates
 731    statevector%hco => hco_ptr
 732    statevector%vco => vco_ptr
 733
 734    if (.not.statevector%vco%initialized) then
 735       call utl_abort('statevector_allocate: VerticalCoord has not been initialized!')
 736    end if
 737
 738    if (statevector%allocated) then
 739      call msg('gsv_allocate', 'gridStateVector already allocated! Deallocating first.', mpiAll_opt=.false.)
 740      call gsv_deallocate(statevector)
 741    end if
 742
 743    if (present(dataKind_opt)) statevector%dataKind = dataKind_opt
 744
 745    if (present(varNames_opt)) then
 746      if (present(allocHeight_opt)) call utl_abort('gsv_allocate: to allocate Z_T/Z_M, set them in varNames_opt')
 747      if (present(allocPressure_opt)) call utl_abort('gsv_allocate: to allocate P_T/P_M, set them in varNames_opt')
 748    end if
 749
 750    if (present(varNames_opt)) then      
 751      statevector%varExistList(:) = .false.
 752      do varIndex2 = 1, size(varNames_opt)
 753        varIndex = vnl_varListIndex(varNames_opt(varIndex2))
 754        statevector%varExistList(varIndex) = .true.
 755      end do
 756    else
 757      ! use the global variable list and set allocHeight/allocPressure if needed
 758      statevector%varExistList(:) = varExistList(:)
 759
 760      if (present(allocHeight_opt)) then
 761        allocHeight = allocHeight_opt
 762      else
 763        if (statevector%varExistList(vnl_varListIndex('TT ')) .and. &
 764            statevector%varExistList(vnl_varListIndex('HU ')) .and. &
 765            statevector%varExistList(vnl_varListIndex('P0 '))) then
 766          allocHeight = .true.
 767        else
 768          allocHeight = .false.
 769        end if
 770      end if
 771
 772      if (present(allocPressure_opt)) then
 773        allocPressure = allocPressure_opt
 774      else
 775        if (statevector%varExistList(vnl_varListIndex('P0 '))) then
 776          allocPressure = .true.
 777        else
 778          allocPressure = .false.
 779        end if
 780      end if
 781
 782      ! add Z_T/Z_M and P_T/P_M to the varExistList
 783      if (allocHeight) then
 784        if (gsv_getNumLev(statevector,'TH') > 0) statevector%varExistList(vnl_varListIndex('Z_T ')) = .true.
 785        if (gsv_getNumLev(statevector,'MM') > 0) statevector%varExistList(vnl_varListIndex('Z_M ')) = .true.
 786      end if
 787      if (allocPressure) then
 788        if (gsv_getNumLev(statevector,'TH') > 0) statevector%varExistList(vnl_varListIndex('P_T ')) = .true.
 789        if (gsv_getNumLev(statevector,'MM') > 0) statevector%varExistList(vnl_varListIndex('P_M ')) = .true.
 790      end if
 791
 792      ! add P0LS to the varExistList if vcode=5100
 793      if (statevector%vco%vcode == 5100) then
 794        statevector%varExistList(vnl_varListIndex('P0LS')) = .true.
 795      end if
 796    end if
 797
 798    if (statevector%vco%vcode == 5100 .and. &
 799        statevector%varExistList(vnl_varListIndex('P0')) .and. &
 800        .not.statevector%varExistList(vnl_varListIndex('P0LS'))) then
 801      call msg('gsv_allocate', 'varNames_opt = '//str(varNames_opt))
 802      call utl_abort('gsv_allocate: P0LS should be included in varNames_opt when vcode=5100')
 803    end if
 804
 805    if (present(horizSubSample_opt)) then
 806      ! user has chosen a coarser grid than specified in hco
 807      statevector%horizSubSample = horizSubSample_opt
 808    else
 809      ! default is no sub-sampling
 810      statevector%horizSubSample = 1
 811    end if
 812
 813    if (present(hInterpolateDegree_opt)) then
 814      ! set the horizontal interpolation degree (intentionally no default value)
 815      statevector%hInterpolateDegree = trim(hInterpolateDegree_opt)
 816    end if
 817
 818    if (present(hExtrapolateDegree_opt)) then
 819      ! set the horizontal extrapolation degree (intentionally no default value)
 820      statevector%hExtrapolateDegree = trim(hExtrapolateDegree_opt)
 821    end if
 822
 823    ! compute the number of global grid points for a given subSample level
 824    statevector%ni = ceiling(real(statevector%hco%ni,8) / real(statevector%horizSubSample,8))
 825    statevector%nj = ceiling(real(statevector%hco%nj,8) / real(statevector%horizSubSample,8))
 826
 827    if (statevector%ni * statevector%horizSubSample /= statevector%hco%ni) then
 828      call msg('gsv_allocate',' number of longitudes is not evenly divisible at this subSample level'&
 829               //' ni='//str(statevector%ni)// ', horizSubSample = '&
 830               //str(statevector%horizSubSample))
 831      call utl_abort('gsv_allocate')
 832    end if
 833
 834    if (statevector%nj * statevector%horizSubSample /= statevector%hco%nj) then
 835      call msg('gsv_allocate','number of latitudes is not evenly divisible at this subSample level'&
 836               //' nj='//str(statevector%nj)//', horizSubSample = '&
 837               //str(statevector%horizSubSample))
 838      call utl_abort('gsv_allocate')
 839    end if
 840
 841    statevector%numStep=numStep
 842
 843    if (present(mpi_local_opt)) then
 844      statevector%mpi_local = mpi_local_opt
 845    else
 846      statevector%mpi_local = .false.
 847    end if
 848
 849    if (present(mpi_distribution_opt)) then
 850      if (trim(mpi_distribution_opt) .ne. 'Tiles'    .and. &
 851          trim(mpi_distribution_opt) .ne. 'VarsLevs' .and. &
 852          trim(mpi_distribution_opt) .ne. 'None') then
 853        call utl_abort('gsv_allocate: Unknown value of mpi_distribution: ' // trim(mpi_distribution_opt))
 854      end if
 855      statevector%mpi_distribution = mpi_distribution_opt
 856    else
 857      if (statevector%mpi_local) then
 858        statevector%mpi_distribution = 'Tiles'
 859      else
 860        statevector%mpi_distribution = 'None'
 861      end if
 862    end if
 863
 864    ! determine lat/lon index ranges
 865    if (stateVector%mpi_distribution == 'Tiles') then
 866      call mmpi_setup_latbands(statevector%nj,  &
 867                               statevector%latPerPE, statevector%latPerPEmax, &
 868                               statevector%myLatBeg, statevector%myLatEnd)
 869      call mmpi_setup_lonbands(statevector%ni,  &
 870                               statevector%lonPerPE, statevector%lonPerPEmax, &
 871                               statevector%myLonBeg, statevector%myLonEnd)
 872    else
 873      statevector%latPerPE    = statevector%nj
 874      statevector%latPerPEmax = statevector%nj
 875      statevector%myLatBeg    = 1
 876      statevector%myLatEnd    = statevector%nj
 877      statevector%lonPerPE    = statevector%ni
 878      statevector%lonPerPEmax = statevector%ni
 879      statevector%myLonBeg    = 1
 880      statevector%myLonEnd    = statevector%ni
 881    end if
 882
 883    allocate(statevector%varOffset(vnl_numvarmax))
 884    statevector%varOffset(:)=0
 885    allocate(statevector%varNumLev(vnl_numvarmax))
 886    statevector%varNumLev(:)=0
 887    allocate(statevector%onPhysicsGrid(vnl_numvarmax))
 888    statevector%onPhysicsGrid(:) = .false.
 889
 890    iloc=0
 891    if (present(varNames_opt)) then
 892
 893      do varIndex2 = 1, size(varNames_opt)
 894        varIndex = vnl_varListIndex(varNames_opt(varIndex2))
 895        statevector%varOffset(varIndex)=iloc
 896        statevector%varNumLev(varIndex)=  &
 897             gsv_getNumLev(statevector, &
 898                           vnl_varLevelFromVarname(vnl_varNameList(varIndex)),  &
 899                           vnl_varNameList(varIndex))
 900        iloc = iloc + statevector%varNumLev(varIndex)
 901      end do
 902
 903    else
 904
 905      do varIndex = 1, vnl_numvarmax3d
 906        if (statevector%varExistList(varIndex)) then
 907          statevector%varOffset(varIndex)=iloc
 908          statevector%varNumLev(varIndex)=  &
 909               gsv_getNumLev(statevector,  &
 910                             vnl_varLevelFromVarname(vnl_varNameList(varIndex)))
 911          iloc = iloc + statevector%varNumLev(varIndex)
 912        end if
 913      end do
 914      do varIndex2 = 1, vnl_numvarmax2d
 915        varIndex=varIndex2+vnl_numvarmax3d
 916        if (statevector%varExistList(varIndex)) then
 917          statevector%varOffset(varIndex)=iloc
 918          statevector%varNumLev(varIndex)=1
 919          iloc = iloc + 1
 920        end if
 921      end do
 922      do varIndex2 = 1, vnl_numvarmaxOther
 923        varIndex=varIndex2+vnl_numvarmax3d+vnl_numvarmax2d
 924        if (statevector%varExistList(varIndex)) then
 925          statevector%varOffset(varIndex)=iloc
 926          statevector%varNumLev(varIndex)=  &
 927               gsv_getNumLev(statevector,  &
 928                              vnl_varLevelFromVarname(vnl_varNameList(varIndex)), &
 929                              vnl_varNameList(varIndex))
 930          iloc = iloc + statevector%varNumLev(varIndex)
 931        end if
 932      end do
 933
 934    end if
 935
 936    if (iloc == 0) then
 937      call utl_abort('gsv_allocate:  Nothing to allocate')
 938    end if
 939
 940    statevector%nk=iloc
 941
 942    if (beSilent) then
 943      verbLevel = msg_NEVER
 944    else
 945      verbLevel = 2
 946    end if
 947    call msg('gsv_allocate', 'statevector%nk = '//str(statevector%nk)&
 948             //new_line('')//'varOffset='//str(statevector%varOffset)&
 949             //new_line('')//'varNumLev='//str(statevector%varNumLev),&
 950             verb_opt=verbLevel, mpiAll_opt=.false.)
 951
 952    ! determine range of values for the 'k' index (vars+levels)
 953    if (statevector%mpi_distribution == 'VarsLevs') then
 954      call mmpi_setup_varslevels(statevector%nk, statevector%mykBeg, &
 955                                 statevector%mykEnd, statevector%mykCount)
 956    else
 957      statevector%mykCount = statevector%nk
 958      statevector%mykBeg = 1
 959      statevector%mykEnd = statevector%nk
 960    end if
 961
 962    ! determine if a wind component exists on this mpi task
 963    statevector%UVComponentPresent = .false.
 964    statevector%myUVkCount = 0
 965    statevector%myUVkBeg = 0
 966    statevector%myUVkEnd = -1
 967    do kIndex = statevector%mykBeg, statevector%mykEnd
 968      if (gsv_getVarNameFromK(statevector,kIndex) == 'UU' .or.  &
 969           gsv_getVarNameFromK(statevector,kIndex) == 'VV') then
 970        statevector%UVComponentPresent = .true.
 971        if (statevector%myUVkBeg == 0) statevector%myUVkBeg = kIndex
 972        statevector%myUVkEnd = kIndex
 973        statevector%myUVkCount = statevector%myUVkCount + 1
 974      end if
 975    end do
 976
 977    ! determine if a separate complementary wind component needed, which
 978    ! is the case when mpi distribution could mean that both components 
 979    ! are not available on same mpi task, or the statevector was allocated
 980    ! with only one of the components
 981    statevector%extraUVallocated = .false.
 982    if (statevector%mpi_distribution == 'VarsLevs' .or.   &
 983         (gsv_varExist(statevector,'UU') .and. .not. gsv_varExist(statevector,'VV')) .or. &
 984         (gsv_varExist(statevector,'VV') .and. .not. gsv_varExist(statevector,'UU'))) then
 985      statevector%extraUVallocated = statevector%UVComponentPresent
 986    end if
 987
 988    allocate(statevector%allLonBeg(mmpi_npex))
 989    allocate(statevector%allLonEnd(mmpi_npex))
 990    allocate(statevector%allLonPerPE(mmpi_npex))
 991    allocate(statevector%allLatBeg(mmpi_npey))
 992    allocate(statevector%allLatEnd(mmpi_npey))
 993    allocate(statevector%allLatPerPE(mmpi_npey))
 994    allocate(statevector%allkCount(mmpi_nprocs))
 995    allocate(statevector%allkBeg(mmpi_nprocs))
 996    allocate(statevector%allkEnd(mmpi_nprocs))
 997    allocate(statevector%allUVkCount(mmpi_nprocs))
 998    allocate(statevector%allUVkBeg(mmpi_nprocs))
 999    allocate(statevector%allUVkEnd(mmpi_nprocs))
1000
1001    if (statevector%mpi_local) then
1002      CALL rpn_comm_allgather(statevector%myLonBeg,1,'mpi_integer',       &
1003                              statevector%allLonBeg,1,'mpi_integer','EW',ierr)
1004      CALL rpn_comm_allgather(statevector%myLonEnd,1,'mpi_integer',       &
1005                              statevector%allLonEnd,1,'mpi_integer','EW',ierr)
1006      CALL rpn_comm_allgather(statevector%lonPerPE,1,'mpi_integer',       &
1007                              statevector%allLonPerPE,1,'mpi_integer','EW',ierr)
1008  
1009      CALL rpn_comm_allgather(statevector%myLatBeg,1,'mpi_integer',       &
1010                              statevector%allLatBeg,1,'mpi_integer','NS',ierr)
1011      CALL rpn_comm_allgather(statevector%myLatEnd,1,'mpi_integer',       &
1012                              statevector%allLatEnd,1,'mpi_integer','NS',ierr)
1013      CALL rpn_comm_allgather(statevector%LatPerPE,1,'mpi_integer',       &
1014                              statevector%allLatPerPE,1,'mpi_integer','NS',ierr)
1015
1016      call gsv_checkMpiDistribution(stateVector)
1017
1018      CALL rpn_comm_allgather(statevector%mykCount,1,'mpi_integer',       &
1019                              statevector%allkCount,1,'mpi_integer','grid',ierr)
1020      CALL rpn_comm_allgather(statevector%mykBeg,1,'mpi_integer',       &
1021                              statevector%allkBeg,1,'mpi_integer','grid',ierr)
1022      CALL rpn_comm_allgather(statevector%mykEnd,1,'mpi_integer',       &
1023                              statevector%allkEnd,1,'mpi_integer','grid',ierr)
1024
1025      CALL rpn_comm_allgather(statevector%myUVkCount,1,'mpi_integer',       &
1026                              statevector%allUVkCount,1,'mpi_integer','grid',ierr)
1027      CALL rpn_comm_allgather(statevector%myUVkBeg,1,'mpi_integer',       &
1028                              statevector%allUVkBeg,1,'mpi_integer','grid',ierr)
1029      CALL rpn_comm_allgather(statevector%myUVkEnd,1,'mpi_integer',       &
1030                              statevector%allUVkEnd,1,'mpi_integer','grid',ierr)
1031    else
1032
1033      statevector%allLonBeg(:) = statevector%myLonBeg
1034      statevector%allLonEnd(:) = statevector%myLonEnd
1035      statevector%allLonPerPE(:) = statevector%lonPerPE
1036      statevector%allLatBeg(:) = statevector%myLatBeg
1037      statevector%allLatEnd(:) = statevector%myLatEnd
1038      statevector%allLatPerPE(:) = statevector%LatPerPE
1039
1040      statevector%allkCount(:) = statevector%mykCount
1041      statevector%allkBeg(:) = statevector%mykBeg
1042      statevector%allkEnd(:) = statevector%mykEnd
1043
1044      statevector%allUVkCount(:) = statevector%myUVkCount
1045      statevector%allUVkBeg(:) = statevector%myUVkBeg
1046      statevector%allUVkEnd(:) = statevector%myUVkEnd
1047
1048    end if
1049
1050    select case (ANLTIME_BIN)
1051    case ('FIRST')
1052       statevector%anltime=1
1053    case ('MIDDLE')
1054       statevector%anltime=nint((real(numStep,8)+1.0d0)/2.0d0)
1055    case ('LAST')
1056       statevector%anltime=numStep
1057    case default
1058      call utl_abort('gsv_allocate: unsupported value for ANLTIME_BIN = '//trim(ANLTIME_BIN))
1059    end select
1060
1061    if (present(dateStamp_opt) .and. present(dateStampList_opt)) then
1062      call utl_abort('gsv_allocate: Either dateStamp or dateStampList should be presented but not both')
1063    else if (present(dateStampList_opt)) then
1064      allocate(statevector%dateStampList(numStep))
1065      do stepIndex = 1, numStep
1066        statevector%dateStampList(stepIndex)= dateStampList_opt(stepIndex)
1067      end do
1068      statevector%dateStamp3d => statevector%dateStampList(statevector%anltime)
1069    else if (present(dateStamp_opt)) then
1070      allocate(statevector%dateStampList(numStep))
1071      if (numStep == 1) then
1072        statevector%dateStampList(1) = dateStamp_opt
1073      else
1074        call tim_getstamplist(statevector%dateStampList,numStep,dateStamp_opt)
1075      end if
1076      statevector%dateStamp3d => statevector%dateStampList(statevector%anltime)
1077    else
1078      nullify(statevector%dateStamplist)
1079    end if
1080    allocate(statevector%dateOriginList(numStep))
1081    allocate(statevector%npasList(numStep))
1082    allocate(statevector%ip2List(numStep))
1083
1084    call gsv_resetTimeParams(statevector)
1085
1086    if (statevector%dataKind == 8) then
1087      allocate(statevector%gd_r8(statevector%myLonBeg:statevector%myLonEnd,  &
1088                                 statevector%myLatBeg:statevector%myLatEnd,  &
1089                                 statevector%mykBeg:statevector%mykEnd,numStep),stat=ierr)
1090      if (statevector%UVComponentPresent) then
1091        allocate(statevector%gdUV(statevector%myUVkBeg:statevector%myUVkEnd))
1092        if (statevector%extraUVallocated) then
1093          do kIndex = statevector%myUVkBeg, statevector%myUVkEnd
1094            allocate(statevector%gdUV(kIndex)%r8(statevector%myLonBeg:statevector%myLonEnd,  &
1095                                                 statevector%myLatBeg:statevector%myLatEnd,  &
1096                                                 numStep))
1097            statevector%gdUV(kIndex)%r8(:,:,:) = 0.0d0
1098          end do
1099        else
1100          ! in this case, both components available on each mpi task, so just point to it
1101          do kIndex = statevector%myUVkBeg, statevector%myUVkEnd
1102            levUV = gsv_getLevFromK(statevector, kIndex)
1103            UVname = complementaryUVname(gsv_getVarNameFromK(statevector,kIndex))
1104            kIndex2 = levUV + gsv_getOffsetFromVarName(statevector,UVname)
1105            lon1 = statevector%myLonBeg
1106            lat1 = statevector%myLatBeg
1107            statevector%gdUV(kIndex)%r8(lon1:,lat1:,1:) => statevector%gd_r8(:,:,kIndex2,:)
1108          end do
1109        end if
1110      end if
1111    else if (statevector%dataKind == 4) then
1112      allocate(statevector%gd_r4(statevector%myLonBeg:statevector%myLonEnd,  &
1113                                 statevector%myLatBeg:statevector%myLatEnd,  &
1114                                 statevector%mykBeg:statevector%mykEnd,numStep),stat=ierr)
1115      if (statevector%UVComponentPresent) then
1116        allocate(statevector%gdUV(statevector%myUVkBeg:statevector%myUVkEnd))
1117        if (statevector%extraUVallocated) then
1118          do kIndex = statevector%myUVkBeg, statevector%myUVkEnd
1119            allocate(statevector%gdUV(kIndex)%r4(statevector%myLonBeg:statevector%myLonEnd,  &
1120                                                    statevector%myLatBeg:statevector%myLatEnd,  &
1121                                                    numStep))
1122            statevector%gdUV(kIndex)%r4(:,:,:) = 0.0
1123          end do
1124        else
1125          ! in this case, both components available on each mpi task, so just point to it
1126          do kIndex = statevector%myUVkBeg, statevector%myUVkEnd
1127            levUV = gsv_getLevFromK(statevector, kIndex)
1128            UVname = complementaryUVname(gsv_getVarNameFromK(statevector,kIndex))
1129            kIndex2 = levUV + gsv_getOffsetFromVarName(statevector,UVname)
1130            lon1 = statevector%myLonBeg
1131            lat1 = statevector%myLatBeg
1132            statevector%gdUV(kIndex)%r4(lon1:,lat1:,1:) => statevector%gd_r4(:,:,kIndex2,:)
1133          end do
1134        end if
1135      end if
1136    else
1137      call utl_abort('gsv_allocate: unknown value of datakind')
1138    end if
1139    if (ierr.ne.0) then
1140      call utl_abort('gsv_allocate: Problem allocating memory! id=1 '//str(ierr))
1141    end if
1142
1143    if (present(allocHeightSfc_opt)) then
1144      if (allocHeightSfc_opt) then
1145        ! if VarsLevs, then only proc 0 allocates surface height, otherwise all procs do
1146        statevector%heightSfcPresent = .true.
1147        if ((statevector%mpi_distribution == 'VarsLevs' .and. mmpi_myid == 0) .or. &
1148             statevector%mpi_distribution /= 'VarsLevs') then
1149          allocate(statevector%HeightSfc(statevector%myLonBeg:statevector%myLonEnd,  &
1150                                     statevector%myLatBeg:statevector%myLatEnd))
1151          statevector%HeightSfc(:,:) = 0.0d0
1152        end if
1153      end if
1154    end if
1155
1156    lon1=statevector%myLonBeg
1157    lat1=statevector%myLatBeg
1158    k1=statevector%mykBeg
1159    if (statevector%dataKind == 8) then
1160      statevector%gd3d_r8(lon1:,lat1:,k1:) => statevector%gd_r8(:,:,:,statevector%anltime)
1161    else if (statevector%dataKind == 4) then
1162      statevector%gd3d_r4(lon1:,lat1:,k1:) => statevector%gd_r4(:,:,:,statevector%anltime)
1163    end if
1164
1165    statevector%addHeightSfcOffset = addHeightSfcOffset
1166
1167    statevector%allocated=.true.
1168
1169    call utl_tmg_stop(168)
1170
1171  end subroutine gsv_allocate
1172
1173  !--------------------------------------------------------------------------
1174  ! gsv_communicateTimeParams
1175  !--------------------------------------------------------------------------
1176  subroutine gsv_communicateTimeParams(statevector)
1177    !
1178    !:Purpose: Ensures all mpi tasks have certain time and other parameters
1179    !
1180    implicit none
1181
1182    ! Arguments:
1183    type(struct_gsv), intent(inout) :: statevector
1184
1185    ! Locals:
1186    integer :: deet, ierr
1187    integer :: ip2List(statevector%numStep), npasList(statevector%numStep)
1188    integer :: dateOriginList(statevector%numStep)
1189    logical :: onPhysicsGrid(vnl_numVarMax)
1190
1191    call rpn_comm_allreduce(statevector%deet, deet, 1,  &
1192                            'MPI_INTEGER', 'MPI_MAX', 'GRID', ierr)
1193    statevector%deet = deet
1194    call rpn_comm_allreduce(statevector%ip2List, ip2List, statevector%numStep,  &
1195                            'MPI_INTEGER', 'MPI_MAX', 'GRID', ierr)
1196    statevector%ip2List(:) = ip2List(:)
1197    call rpn_comm_allreduce(statevector%npasList, npasList, statevector%numStep,  &
1198                            'MPI_INTEGER', 'MPI_MAX', 'GRID', ierr)
1199    statevector%npasList(:) = npasList(:)
1200    call rpn_comm_allreduce(statevector%dateOriginList, dateOriginList, statevector%numStep,  &
1201                            'MPI_INTEGER', 'MPI_MAX', 'GRID', ierr)
1202    statevector%dateOriginList(:) = dateOriginList(:)
1203
1204    call rpn_comm_allreduce(statevector%onPhysicsGrid(:), onPhysicsGrid(:), size(onPhysicsGrid),  &
1205                            'MPI_LOGICAL', 'MPI_LOR', 'GRID', ierr)
1206    statevector%onPhysicsGrid(:) = onPhysicsGrid(:)
1207
1208    call msg('gsv_communicateTimeParams', 'deet = '//str(deet) &
1209         //new_line('')//'ip2List = '//str(ip2List(:)) &
1210         //new_line('')//'npasList = '//str(npasList(:)) &
1211         //new_line('')//'dateOriginList = '//str(dateOriginList(:)))
1212
1213  end subroutine gsv_communicateTimeParams
1214
1215  !--------------------------------------------------------------------------
1216  ! gsv_resetTimeParams
1217  !--------------------------------------------------------------------------
1218  subroutine gsv_resetTimeParams(statevector)
1219    !
1220    !:Purpose: Resets certain time parameters to "missing" values
1221    !
1222    implicit none
1223
1224    ! Arguments:
1225    type(struct_gsv), intent(inout) :: statevector
1226
1227    statevector%dateOriginList(:) =  mpc_missingValue_int
1228    statevector%npasList(:)       =  mpc_missingValue_int
1229    statevector%ip2List(:)        =  mpc_missingValue_int
1230    statevector%deet              =  mpc_missingValue_int
1231    statevector%etiket            =  "UNDEFINED"
1232
1233  end subroutine gsv_resetTimeParams
1234
1235  !--------------------------------------------------------------------------
1236  ! gsv_checkMpiDistribution
1237  !--------------------------------------------------------------------------
1238  subroutine gsv_checkMpiDistribution(stateVector)
1239    !
1240    !:Purpose: Checks the distribution of latitude and longitude gridpoints
1241    !           over the mpi tasks. If the variation in the number of grid
1242    !           points in either direction is too large, other mpi topologies
1243    !           will be suggested in the listing and the program could
1244    !           potentially abort. The printing to the listing is limited
1245    !           to only the first 5 calls.
1246    !
1247    implicit none
1248
1249    ! Arguments:
1250    type(struct_gsv), intent(in) :: statevector
1251
1252    ! Locals:
1253    integer       :: npex, npey
1254    integer       :: lonPerPEmin, lonPerPEmax, latPerPEmin, latPerPEmax
1255    integer, save :: numCalls = 0
1256
1257    ! check if distribution of gridpoints over mpi tasks is very uneven
1258    if (maxval(statevector%allLonPerPE) > 2*minval(statevector%allLonPerPE) .or. &
1259        maxval(statevector%allLatPerPE) > 2*minval(statevector%allLatPerPE)) then
1260      numCalls = numCalls + 1
1261      if (mmpi_myid == 0 .and. (numCalls <= 5)) then
1262        call msg('gsv_checkMpiDistribution', & 
1263            new_line('')//'=============================================================' &
1264          //new_line('')//'WARNING: bad choice of mpi topology!' &
1265          //new_line('')//'   mpi x, y dimensions = '//str(mmpi_npex)//', '//str(mmpi_npey) &
1266          //new_line('')//'   min(lonPerPE) = '//str(minval(statevector%allLonPerPE)) &
1267          //new_line('')//'   max(lonPerPE) = '//str(maxval(statevector%allLonPerPE)) &
1268          //new_line('')//'   min(latPerPE) = '//str(minval(statevector%allLatPerPE)) &
1269          //new_line('')//'   max(latPerPE) = '//str(maxval(statevector%allLatPerPE)) &
1270          //new_line(''), mpiAll_opt=.false.)
1271
1272        ! make suggestions for mpi x diminension
1273        if (maxval(statevector%allLonPerPE) > 2*minval(statevector%allLonPerPE)) then
1274          call msg('gsv_checkMpiDistribution', & 
1275                   'Please choose a value of mpi x dimension that gives a smaller ' &
1276                   //'difference between min and max of lonPerPE. Here are some options:', &
1277                   mpiAll_opt=.false.)
1278          do npex = 1, 2*mmpi_npex
1279            lonPerPEmin = floor(real(stateVector%ni)/real(npex))
1280            lonPerPEmax = stateVector%ni - (npex - 1) * lonPerPEmin
1281            if (lonPerPEmax < 2*lonPerPEmin) then
1282              call msg('gsv_checkMpiDistribution','mpi x dimension = '//str(npex) &
1283                   //', difference between min and max lonPerPE = ' &
1284                   //str(lonPerPEmax - lonPerPEmin), mpiAll_opt=.false.)
1285            end if
1286          end do
1287        end if
1288
1289        ! make suggestions for mpi y dimension
1290        if (maxval(statevector%allLatPerPE) > 2*minval(statevector%allLatPerPE)) then
1291          call msg('gsv_checkMpiDistribution',&
1292               'Please choose a value of mpi y dimension that gives a smaller ' &
1293               //'difference between min and max of latPerPE. Here are some options:',&
1294               mpiAll_opt=.false.)
1295          do npey = 1, 2*mmpi_npey
1296            latPerPEmin = floor(real(stateVector%nj)/real(npey))
1297            latPerPEmax = stateVector%nj - (npey - 1) * latPerPEmin
1298            if (latPerPEmax < 2*latPerPEmin) then
1299              call msg('gsv_checkMpiDistribution','mpi y dimension = '//str(npey)  &
1300                //', difference between min and max latPerPE = ' &
1301                //str(latPerPEmax - latPerPEmin), mpiAll_opt=.false.)
1302            end if
1303          end do
1304        end if
1305
1306        call msg('gsv_checkMpiDistribution', new_line('')//'=============================================================', mpiAll_opt=.false.)
1307      else
1308        ! After 5 calls, just give a short message
1309        call msg('gsv_checkMpiDistribution','WARNING: bad choice of mpi topology!', mpiAll_opt=.false.)
1310      end if
1311
1312      if (abortOnMpiImbalance) call utl_abort('gsv_checkMpiDistribution: Please choose a better mpi topology')
1313    end if
1314
1315  end subroutine gsv_checkMpiDistribution
1316    
1317  !--------------------------------------------------------------------------
1318  ! gsv_complementaryUVname
1319  !--------------------------------------------------------------------------
1320  function complementaryUVname(UV_in) result(UV_out)
1321    !
1322    !:Purpose: Returns the other wind component name
1323    !           UU -> VV, VV -> UU
1324    !
1325    implicit none
1326
1327    ! Arguments:
1328    character(len=*), intent(in)  :: UV_in
1329    ! Result:
1330    character(len=4)              :: UV_out
1331
1332    ! return UU on VV input, and vice versa
1333    if (trim(UV_in) == 'UU') then
1334      UV_out = 'VV  '
1335    else if (trim(UV_in) == 'VV') then
1336      UV_out = 'UU  '
1337    else
1338      call utl_abort('complementaryUVname: invalid input, UV_in = '//trim(UV_in))
1339    end if
1340  end function complementaryUVname
1341
1342  !--------------------------------------------------------------------------
1343  ! gsv_modifyDate
1344  !--------------------------------------------------------------------------
1345  subroutine gsv_modifyDate(statevector, dateStamp, modifyDateOrigin_opt)
1346    !
1347    !:Purpose: Modifies a statevector reference date
1348    !
1349    implicit none
1350  
1351    ! Arguments:
1352    type(struct_gsv),  intent(inout) :: statevector
1353    integer,           intent(in)    :: dateStamp
1354    logical, optional, intent(in)    :: modifyDateOrigin_opt
1355
1356    if (statevector%numStep == 1) then
1357      statevector%dateStampList(1) = dateStamp
1358      if(present(modifyDateOrigin_opt)) statevector%dateOriginList(1) = dateStamp 
1359    else
1360      call tim_getstamplist(statevector%dateStampList, statevector%numStep, dateStamp)
1361      if(present(modifyDateOrigin_opt)) call tim_getstamplist(statevector%dateOriginList, statevector%numStep, dateStamp)
1362    end if
1363    statevector%dateStamp3d => statevector%dateStampList(statevector%anltime)
1364
1365  end subroutine gsv_modifyDate
1366
1367  !--------------------------------------------------------------------------
1368  ! gsv_modifyVarName
1369  !--------------------------------------------------------------------------
1370  subroutine gsv_modifyVarName(statevector, oldVarName, newVarName) 
1371    !
1372    !:Purpose: Replaces a variable with a variable of the same vertical level type
1373    !
1374    implicit none
1375
1376    ! Arguments:
1377    type(struct_gsv), intent(inout) :: statevector
1378    character(len=*), intent(in)    :: oldVarName
1379    character(len=*), intent(in)    :: newVarName
1380    
1381    ! Locals:
1382    integer :: varIndex_oldVarName, varIndex_newVarName
1383
1384    ! Test the compatibility of the modifications
1385    if (.not. gsv_varExist(statevector,oldVarName)) then
1386      call utl_abort('gsv_modifyVarName: the varName to replace does not exist '//trim(oldVarName))
1387    end if
1388    if (gsv_varExist(statevector,newVarName)) then
1389      call utl_abort('gsv_modifyVarName: the varName to add already exist '//trim(oldVarName))
1390    end if
1391    if (vnl_varLevelFromVarname(newVarName) /= vnl_varLevelFromVarname(oldVarName)) then
1392      call utl_abort('gsv_modifyVarName: the level type are different')
1393    end if
1394
1395    ! Find  varIndex_oldVarName & varIndex_newVarName
1396    varIndex_oldVarName = vnl_varListIndex(oldVarName)
1397    varIndex_newVarName = vnl_varListIndex(newVarName)
1398
1399    ! Change the ExistList
1400    statevector%varExistList(varIndex_oldVarName) = .false.
1401    statevector%varExistList(varIndex_newVarName) = .true.
1402
1403    ! Change the offset
1404    statevector%varOffset(varIndex_newVarName) = statevector%varOffset(varIndex_oldVarName)
1405    statevector%varOffset(varIndex_oldVarName) = 0
1406
1407    ! Change the number of levels
1408    statevector%varNumLev(varIndex_newVarName) = statevector%varNumLev(varIndex_oldVarName)
1409    statevector%varNumLev(varIndex_oldVarName) = 0
1410
1411  end subroutine gsv_modifyVarName
1412
1413  !--------------------------------------------------------------------------
1414  ! gsv_zero
1415  !--------------------------------------------------------------------------
1416  subroutine gsv_zero(statevector)
1417    !
1418    !:Purpose: Zeros all struct_gsv arrays
1419    !
1420    implicit none
1421
1422    ! Arguments:
1423    type(struct_gsv), intent(inout) :: statevector
1424
1425    ! Locals:
1426    integer :: stepIndex,lonIndex,kIndex,latIndex,lat1,lat2,lon1,lon2,k1,k2,k1UV,k2UV
1427
1428    if (.not.statevector%allocated) then
1429      call utl_abort('gsv_zero: gridStateVector not yet allocated')
1430    end if
1431
1432    lon1=statevector%myLonBeg
1433    lon2=statevector%myLonEnd
1434    lat1=statevector%myLatBeg
1435    lat2=statevector%myLatEnd
1436    k1=statevector%mykBeg
1437    k2=statevector%mykEnd
1438    k1UV = statevector%myUVkBeg
1439    k2UV = statevector%myUVkEnd
1440
1441    if (associated(statevector%HeightSfc)) statevector%HeightSfc(:,:) = 0.0d0
1442
1443    if (statevector%dataKind == 8) then
1444
1445      do kIndex = k1, k2
1446        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,lonIndex)    
1447        do stepIndex = 1, statevector%numStep
1448          do latIndex = lat1, lat2
1449            do lonIndex = lon1, lon2
1450              statevector%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = 0.0d0
1451            end do
1452          end do
1453        end do
1454       !$OMP END PARALLEL DO
1455      end do
1456
1457      if (statevector%extraUVallocated) then
1458        do kIndex = k1UV, k2UV
1459          !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,lonIndex)    
1460          do stepIndex = 1, statevector%numStep
1461            do latIndex = lat1, lat2
1462              do lonIndex = lon1, lon2
1463                statevector%gdUV(kIndex)%r8(lonIndex,latIndex,stepIndex) = 0.0d0
1464              end do
1465            end do
1466          end do
1467          !$OMP END PARALLEL DO
1468        end do
1469      end if
1470
1471    else if (statevector%dataKind == 4) then
1472
1473      do kIndex = k1, k2
1474        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,lonIndex)
1475        do stepIndex = 1, statevector%numStep
1476          do latIndex = lat1, lat2
1477            do lonIndex = lon1, lon2
1478              statevector%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = 0.0
1479            end do
1480          end do
1481        end do
1482        !$OMP END PARALLEL DO
1483      end do
1484
1485      if (statevector%extraUVallocated) then
1486        do kIndex = k1UV, k2UV
1487          !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,lonIndex)    
1488          do stepIndex = 1, statevector%numStep
1489            do latIndex = lat1, lat2
1490              do lonIndex = lon1, lon2
1491                statevector%gdUV(kIndex)%r4(lonIndex,latIndex,stepIndex) = 0.0
1492              end do
1493            end do
1494          end do
1495          !$OMP END PARALLEL DO
1496        end do
1497      end if
1498
1499    else
1500      call utl_abort('gsv_zero: unknown value of datakind')
1501    end if
1502    
1503  end subroutine gsv_zero
1504
1505  !--------------------------------------------------------------------------
1506  ! gsv_add
1507  !--------------------------------------------------------------------------
1508  subroutine gsv_add(statevector_in,statevector_inout,scaleFactor_opt)
1509    !
1510    !:Purpose: Adds two statevectors
1511    !           statevector_inout = statevector_inout + scaleFactor_opt * statevector_in
1512    !
1513    implicit none
1514
1515    ! Arguments:
1516    type(struct_gsv),  intent(in)     :: statevector_in     ! first operand 
1517    type(struct_gsv),  intent(inout)  :: statevector_inout  ! second operand, will receive the result
1518    real(8), optional, intent(in)     :: scaleFactor_opt    ! optional scaling of the second operand prior to the addition
1519
1520    ! Locals:
1521    integer           :: stepIndex,lonIndex,kIndex,latIndex,lon1,lon2,lat1,lat2,k1,k2
1522
1523    if (.not.statevector_in%allocated) then
1524      call utl_abort('gsv_add: stateVector_in not yet allocated')
1525    end if
1526    if (.not.statevector_inout%allocated) then
1527      call utl_abort('gsv_add: stateVector_inout not yet allocated')
1528    end if
1529    if ( statevector_in%mykBeg /= statevector_inout%mykBeg .or. &
1530         statevector_in%mykEnd /= statevector_inout%mykEnd ) then
1531      call utl_abort('gsv_add: mykBeg/mykEnd of stateVector_in/inout do not match')
1532    end if
1533
1534    lon1=statevector_in%myLonBeg
1535    lon2=statevector_in%myLonEnd
1536    lat1=statevector_in%myLatBeg
1537    lat2=statevector_in%myLatEnd
1538    k1=statevector_in%mykBeg
1539    k2=statevector_in%mykEnd
1540
1541    if (statevector_inout%dataKind == 8 .and. statevector_in%dataKind == 8) then
1542
1543      if (present(scaleFactor_opt)) then
1544        do stepIndex = 1, statevector_inout%numStep
1545          !$OMP PARALLEL DO PRIVATE (latIndex,kIndex,lonIndex)    
1546          do kIndex = k1, k2
1547            do latIndex = lat1, lat2
1548              do lonIndex = lon1, lon2
1549                statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = &
1550                     statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) +  &
1551                     scaleFactor_opt * statevector_in%gd_r8(lonIndex,latIndex,kIndex,stepIndex)
1552              end do
1553            end do
1554          end do
1555          !$OMP END PARALLEL DO
1556        end do
1557      else
1558        do stepIndex = 1, statevector_inout%numStep
1559          !$OMP PARALLEL DO PRIVATE (latIndex,kIndex,lonIndex)    
1560          do kIndex = k1, k2
1561            do latIndex = lat1, lat2
1562              do lonIndex = lon1, lon2
1563                statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = &
1564                     statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) +  &
1565                     statevector_in%gd_r8(lonIndex,latIndex,kIndex,stepIndex)
1566              end do
1567            end do
1568          end do
1569          !$OMP END PARALLEL DO
1570        end do
1571      end if
1572
1573    else if (statevector_inout%dataKind == 4 .and. statevector_in%dataKind == 4) then
1574
1575      if (present(scaleFactor_opt)) then
1576        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
1577        do kIndex = k1, k2
1578          do stepIndex = 1, statevector_inout%numStep
1579            do latIndex = lat1, lat2
1580              do lonIndex = lon1, lon2
1581                statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) +  &
1582                                          real(scaleFactor_opt,4) * statevector_in%gd_r4(lonIndex,latIndex,kIndex,stepIndex)
1583              end do
1584            end do
1585          end do
1586        end do
1587        !$OMP END PARALLEL DO
1588      else
1589        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
1590        do kIndex = k1, k2
1591          do stepIndex = 1, statevector_inout%numStep
1592            do latIndex = lat1, lat2
1593              do lonIndex = lon1, lon2
1594                statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) +  &
1595                                                                statevector_in%gd_r4(lonIndex,latIndex,kIndex,stepIndex)
1596              end do
1597            end do
1598          end do
1599        end do
1600        !$OMP END PARALLEL DO
1601      end if
1602
1603    else
1604      call utl_abort('gsv_add: Data type must be the same for both statevectors')
1605    end if
1606
1607  end subroutine gsv_add
1608
1609  !--------------------------------------------------------------------------
1610  ! gsv_schurProduct
1611  !--------------------------------------------------------------------------
1612  subroutine gsv_schurProduct(statevector_in,statevector_inout)
1613    !
1614    !:Purpose: Applies the Schur product of two statevector
1615    !           statevector_inout(i,j,k,l) = statevector_inout(i,j,k,l) * statevector_in(i,j,k,l) 
1616    !
1617    implicit none
1618
1619    ! Arguments:
1620    type(struct_gsv),  intent(in)     :: statevector_in     ! first operand 
1621    type(struct_gsv),  intent(inout)  :: statevector_inout  ! second operand, will receive the result
1622
1623    ! Locals:
1624    integer :: stepIndex,lonIndex,kIndex,latIndex,lon1,lon2,lat1,lat2,k1,k2
1625
1626    if (.not.statevector_in%allocated) then
1627      call utl_abort('gsv_schurProduct: gridStateVector_in not yet allocated')
1628    end if
1629    if (.not.statevector_inout%allocated) then
1630      call utl_abort('gsv_schurProduct: gridStateVector_inout not yet allocated')
1631    end if
1632
1633    lon1=statevector_in%myLonBeg
1634    lon2=statevector_in%myLonEnd
1635    lat1=statevector_in%myLatBeg
1636    lat2=statevector_in%myLatEnd
1637    k1=statevector_in%mykBeg
1638    k2=statevector_in%mykEnd
1639
1640    if (statevector_inout%dataKind == 8 .and. statevector_in%dataKind == 8) then
1641
1642      do stepIndex = 1, statevector_inout%numStep
1643        !$OMP PARALLEL DO PRIVATE (latIndex,kIndex,lonIndex)    
1644        do kIndex = k1, k2
1645          do latIndex = lat1, lat2
1646            do lonIndex = lon1, lon2
1647              statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = &
1648                   statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) *  &
1649                   statevector_in%gd_r8(lonIndex,latIndex,kIndex,stepIndex)
1650            end do
1651          end do
1652        end do
1653        !$OMP END PARALLEL DO
1654      end do
1655      
1656
1657    else if (statevector_inout%dataKind == 4 .and. statevector_in%dataKind == 4) then
1658
1659      do stepIndex = 1, statevector_inout%numStep
1660        !$OMP PARALLEL DO PRIVATE (latIndex,kIndex,lonIndex)    
1661        do kIndex = k1, k2
1662          do latIndex = lat1, lat2
1663            do lonIndex = lon1, lon2
1664              statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = &
1665                   statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) *  &
1666                   statevector_in%gd_r4(lonIndex,latIndex,kIndex,stepIndex)
1667            end do
1668          end do
1669        end do
1670        !$OMP END PARALLEL DO
1671      end do
1672
1673    else
1674      call utl_abort('gsv_schurProduct: Data type must be the same for both statevectors')
1675    end if
1676
1677  end subroutine gsv_schurProduct
1678
1679  !--------------------------------------------------------------------------
1680  ! gsv_copy
1681  !--------------------------------------------------------------------------
1682  subroutine gsv_copy(statevector_in, statevector_out, stepIndexOut_opt, &
1683                      allowTimeMismatch_opt, allowVarMismatch_opt, &
1684                      allowVcoMismatch_opt, beSilent_opt)
1685    !
1686    !:Purpose: Copies a statevector
1687    !
1688    implicit none
1689
1690    ! Arguments:
1691    type(struct_gsv),  intent(in)    :: statevector_in
1692    type(struct_gsv),  intent(inout) :: statevector_out
1693    integer, optional, intent(in)    :: stepIndexOut_opt
1694    logical, optional, intent(in)    :: allowTimeMismatch_opt
1695    logical, optional, intent(in)    :: allowVarMismatch_opt
1696    logical, optional, intent(in)    :: allowVcoMismatch_opt
1697    logical, optional, intent(in)    :: beSilent_opt
1698
1699    ! Locals:
1700    logical            :: timeMismatch, allowVarMismatch, varMismatch, allowVcoMismatch
1701    logical            :: beSilent
1702    integer :: stepIndex, lonIndex, kIndex, latIndex, levIndex, varIndex, numCommonVar 
1703    integer :: lon1, lon2, lat1, lat2, k1, k2, step1, step2, stepIn, nlev_in
1704    real(4), pointer :: field_out_r4(:,:,:,:), field_in_r4(:,:,:,:)
1705    real(8), pointer :: field_out_r8(:,:,:,:), field_in_r8(:,:,:,:)
1706    character(len=4), allocatable :: varNameListCommon(:)
1707    character(len=4)              :: varName
1708    character(len=10)             :: gsvCopyType 
1709    character(len=4), pointer     :: varNamesList_in(:), varNamesList_out(:)
1710
1711    if ( present(beSilent_opt) ) then
1712      beSilent = beSilent_opt
1713    else
1714      beSilent = .false.
1715    end if
1716    
1717    if ( present(allowVarMismatch_opt) ) then
1718      allowVarMismatch = allowVarMismatch_opt
1719    else
1720      allowVarMismatch = .false.
1721    end if
1722    varMismatch = .false.
1723
1724    ! asserting grid structure consistency
1725    if ( .not. hco_equal(gsv_getHco(statevector_in),gsv_getHco(statevector_out)) ) then
1726      call utl_abort('gsv_copy: horizontal structure inconsistency')
1727    end if
1728
1729    if (present(allowVcoMismatch_opt)) then
1730      allowVcoMismatch = allowVcoMismatch_opt
1731    else
1732      allowVcoMismatch = .false.
1733    end if
1734    if ( .not. vco_equal(gsv_getVco(statevector_in),gsv_getVco(statevector_out)) ) then
1735      if ( .not. allowVcoMismatch) then
1736        call utl_abort('gsv_copy: vertical structure inconsistency')
1737      end if
1738    end if
1739
1740    timeMismatch = .false.
1741    if (present(allowTimeMismatch_opt)) then
1742      if (allowTimeMismatch_opt) then
1743        if (statevector_in%numStep < statevector_out%numStep) then
1744          call utl_abort('gsv_copy: numStep_in less than numStep_out, which is not allowed')
1745        end if
1746        if (statevector_in%numStep /= statevector_out%numStep) then
1747          timeMismatch = .true.
1748        end if
1749      else
1750        if (statevector_in%numStep /= statevector_out%numStep) then
1751          call utl_abort('gsv_copy: numStep_in not equal to numStep_out')
1752        end if
1753      end if
1754    end if
1755
1756    if (present(stepIndexOut_opt) .and. present(allowTimeMismatch_opt)) then
1757      call utl_abort('gsv_copy: Cannot specify both stepIndexOut_opt ' //  &
1758                     'and allowTimeMismatch_opt in the same call')
1759    end if
1760
1761    if (.not.statevector_in%allocated) then
1762      call utl_abort('gsv_copy: gridStateVector_in not yet allocated')
1763    end if
1764    if (.not.statevector_out%allocated) then
1765      call utl_abort('gsv_copy: gridStateVector_out not yet allocated')
1766    end if
1767
1768    if (statevector_in%mpi_distribution == 'VarsLevs') allowVarMismatch = .false.
1769
1770    nullify(varNamesList_in)
1771    nullify(varNamesList_out)
1772    call gsv_varNamesList(varNamesList_in,statevector_in)
1773    call gsv_varNamesList(varNamesList_out,statevector_out)
1774
1775    if (size(varNamesList_in(:)) /= size(varNamesList_out(:))) then
1776      varMismatch = .true.
1777    else 
1778      if (all(varNamesList_in(:) == varNamesList_out(:))) then
1779        varMismatch = .false.
1780      else
1781        varMismatch = .true.
1782      end if 
1783    end if
1784    deallocate(varNamesList_out)
1785    deallocate(varNamesList_in)
1786
1787    ! if varMismatch and allowVarMismatch -> copy by varName, else copy by kIndex
1788    if (varMismatch .and. allowVarMismatch) then 
1789      gsvCopyType = 'VarName'
1790    else if (.not. varMismatch) then
1791      gsvCopyType = 'kIndex'
1792    else 
1793      call utl_abort('gsv_copy: varMismatch and allowVarMismatch do not agree! Aborting.')
1794    end if
1795
1796    call msg('gsv_copy', 'gsvCopyType='//gsvCopyType &
1797         //', timeMismatch='//str(timeMismatch) &
1798         //', varMismatch='//str(varMismatch) &
1799         //', allowVarMismatch='//str(allowVarMismatch), verb_opt=2)
1800
1801    ! build list of common variables and see if there is a mismatch
1802    allocate(varNameListCommon(vnl_numvarmax))
1803    varNameListCommon(:) = '    '
1804    if (varMismatch) then
1805      numCommonVar = 0
1806      do varIndex = 1, vnl_numvarmax
1807        varName = vnl_varNameList(varIndex)
1808        if (gsv_varExist(statevector_in,varName) .and. gsv_varExist(statevector_out,varName)) then
1809          numCommonVar = numCommonVar + 1
1810          varNameListCommon(numCommonVar) = varName 
1811        end if 
1812      end do
1813    end if
1814
1815    lon1 = statevector_in%myLonBeg
1816    lon2 = statevector_in%myLonEnd
1817    lat1 = statevector_in%myLatBeg
1818    lat2 = statevector_in%myLatEnd
1819    k1 = statevector_in%mykBeg
1820    k2 = statevector_in%mykEnd
1821    ! If stepIndexOut_opt present then copy from step 1 to stepIndexOut_opt
1822    if (present(stepIndexOut_opt)) then
1823      step1 = stepIndexOut_opt
1824      step2 = stepIndexOut_opt
1825    else
1826      step1 = 1
1827      step2 = statevector_out%numStep
1828    end if
1829
1830    ! copy over some time related parameters
1831    statevector_out%deet   = statevector_in%deet
1832    statevector_out%etiket = statevector_in%etiket
1833    do stepIndex = step1, step2
1834      if (present(stepIndexOut_opt)) then
1835        stepIn = 1
1836      else if(timeMismatch) then
1837        stepIn_Loop0: do stepIn = 1, statevector_in%numStep
1838          if (statevector_in%dateStampList(stepIn) ==  &
1839              statevector_out%dateStampList(stepIndex)) exit stepIn_loop0
1840        end do stepIn_Loop0
1841      else
1842        stepIn = stepIndex
1843      end if
1844      statevector_out%dateOriginList(stepIndex) = statevector_in%dateOriginList(stepIn)
1845      statevector_out%npasList(stepIndex)       = statevector_in%npasList(stepIn)
1846      statevector_out%ip2List(stepIndex)        = statevector_in%ip2List(stepIn)
1847    end do
1848
1849    if (associated(statevector_in%HeightSfc) .and. associated(statevector_out%HeightSfc)) then
1850      statevector_out%HeightSfc(:,:) = statevector_in%HeightSfc(:,:)
1851    end if
1852
1853    if (statevector_out%dataKind == 8 .and. statevector_in%dataKind == 8) then
1854
1855      if (trim(gsvCopyType) == 'kIndex') then
1856        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex,stepIn)
1857        do kIndex = k1, k2
1858          do stepIndex = step1, step2
1859            if (present(stepIndexOut_opt)) then
1860              stepIn = 1
1861            else if(timeMismatch) then
1862              stepIn_Loop: do stepIn = 1, statevector_in%numStep
1863                if (statevector_in%dateStampList(stepIn) ==  &
1864                    statevector_out%dateStampList(stepIndex)) exit stepIn_loop
1865              end do stepIn_Loop
1866            else
1867              stepIn = stepIndex
1868            end if
1869            do latIndex = lat1, lat2
1870              do lonIndex = lon1, lon2
1871                statevector_out%gd_r8(lonIndex,latIndex,kIndex,stepIndex) =  &
1872                  statevector_in%gd_r8(lonIndex,latIndex,kIndex,stepIn)
1873              end do
1874            end do
1875          end do
1876        end do
1877        !$OMP END PARALLEL DO
1878
1879      else
1880        do varIndex = 1, numCommonVar
1881          varName = varNameListCommon(varIndex)
1882
1883          nlev_in = gsv_getNumLevFromVarName(statevector_in,varName)
1884
1885          call gsv_getField(statevector_in ,field_in_r8, varName)
1886          call gsv_getField(statevector_out,field_out_r8, varName)
1887
1888          !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,levIndex,lonIndex,stepIn)
1889          do stepIndex = step1, step2
1890            if (present(stepIndexOut_opt)) then
1891              stepIn = 1
1892            else if(timeMismatch) then
1893              stepIn_Loop2: do stepIn = 1, statevector_in%numStep
1894                if (statevector_in%dateStampList(stepIn) ==  &
1895                    statevector_out%dateStampList(stepIndex)) exit stepIn_loop2
1896              end do stepIn_Loop2
1897            else
1898              stepIn = stepIndex
1899            end if
1900            do levIndex = 1, nlev_in
1901              do latIndex = lat1, lat2
1902                do lonIndex = lon1, lon2
1903                  field_out_r8(lonIndex,latIndex,levIndex,stepIndex) =  &
1904                    field_in_r8(lonIndex,latIndex,levIndex,stepIn)
1905                end do
1906              end do
1907            end do
1908          end do
1909          !$OMP END PARALLEL DO
1910
1911        end do
1912      end if
1913
1914    else if (statevector_out%dataKind == 4 .and. statevector_in%dataKind == 4) then
1915
1916      if (trim(gsvCopyType) == 'kIndex') then
1917        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex,stepIn)
1918        do kIndex = k1, k2
1919          do stepIndex = step1, step2
1920            if (present(stepIndexOut_opt)) then
1921              stepIn = 1
1922            else if(timeMismatch) then
1923              stepIn_Loop3: do stepIn = 1, statevector_in%numStep
1924                if (statevector_in%dateStampList(stepIn) ==  &
1925                    statevector_out%dateStampList(stepIndex)) exit stepIn_loop3
1926              end do stepIn_Loop3
1927            else
1928              stepIn = stepIndex
1929            end if
1930            do latIndex = lat1, lat2
1931              do lonIndex = lon1, lon2
1932                statevector_out%gd_r4(lonIndex,latIndex,kIndex,stepIndex) =  &
1933                  statevector_in%gd_r4(lonIndex,latIndex,kIndex,stepIn)
1934              end do
1935            end do
1936          end do
1937        end do
1938        !$OMP END PARALLEL DO
1939
1940      else
1941        do varIndex = 1, numCommonVar
1942          varName = varNameListCommon(varIndex)
1943
1944          nlev_in = gsv_getNumLevFromVarName(statevector_in,varName)
1945
1946          call gsv_getField(statevector_in ,field_in_r4, varName)
1947          call gsv_getField(statevector_out,field_out_r4, varName)
1948
1949          !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,levIndex,lonIndex,stepIn)
1950          do stepIndex = step1, step2
1951            if (present(stepIndexOut_opt)) then
1952              stepIn = 1
1953            else if(timeMismatch) then
1954              stepIn_Loop4: do stepIn = 1, statevector_in%numStep
1955                if (statevector_in%dateStampList(stepIn) ==  &
1956                    statevector_out%dateStampList(stepIndex)) exit stepIn_loop4
1957              end do stepIn_Loop4
1958            else
1959              stepIn = stepIndex
1960            end if
1961            do levIndex = 1, nlev_in
1962              do latIndex = lat1, lat2
1963                do lonIndex = lon1, lon2
1964                  field_out_r4(lonIndex,latIndex,levIndex,stepIndex) =  &
1965                    field_in_r4(lonIndex,latIndex,levIndex,stepIn)
1966                end do
1967              end do
1968            end do
1969          end do
1970          !$OMP END PARALLEL DO
1971        end do
1972      end if
1973
1974    else if (statevector_out%dataKind == 4 .and. statevector_in%dataKind == 8) then
1975
1976      if (trim(gsvCopyType) == 'kIndex') then
1977        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex,stepIn)
1978        do kIndex = k1, k2
1979          do stepIndex = step1, step2
1980            if (present(stepIndexOut_opt)) then
1981              stepIn = 1
1982            else if(timeMismatch) then
1983              stepIn_Loop5: do stepIn = 1, statevector_in%numStep
1984                if (statevector_in%dateStampList(stepIn) ==  &
1985                    statevector_out%dateStampList(stepIndex)) exit stepIn_loop5
1986              end do stepIn_Loop5
1987            else
1988              stepIn = stepIndex
1989            end if
1990            do latIndex = lat1, lat2
1991              do lonIndex = lon1, lon2
1992                statevector_out%gd_r4(lonIndex,latIndex,kIndex,stepIndex) =  &
1993                  real(statevector_in%gd_r8(lonIndex,latIndex,kIndex,stepIn),4)
1994              end do
1995            end do
1996          end do
1997        end do
1998        !$OMP END PARALLEL DO
1999
2000      else
2001        do varIndex = 1, numCommonVar
2002          varName = varNameListCommon(varIndex)
2003
2004          nlev_in = gsv_getNumLevFromVarName(statevector_in,varName)
2005
2006          call gsv_getField(statevector_in ,field_in_r8, varName)
2007          call gsv_getField(statevector_out,field_out_r4, varName)
2008
2009          !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,levIndex,lonIndex,stepIn)
2010          do stepIndex = step1, step2
2011            if (present(stepIndexOut_opt)) then
2012              stepIn = 1
2013            else if(timeMismatch) then
2014              stepIn_Loop6: do stepIn = 1, statevector_in%numStep
2015                if (statevector_in%dateStampList(stepIn) ==  &
2016                    statevector_out%dateStampList(stepIndex)) exit stepIn_loop6
2017              end do stepIn_Loop6
2018            else
2019              stepIn = stepIndex
2020            end if
2021            do levIndex = 1, nlev_in
2022              do latIndex = lat1, lat2
2023                do lonIndex = lon1, lon2
2024                  field_out_r4(lonIndex,latIndex,levIndex,stepIndex) =  &
2025                    real(field_in_r8(lonIndex,latIndex,levIndex,stepIn),4)
2026                end do
2027              end do
2028            end do
2029          end do
2030          !$OMP END PARALLEL DO
2031
2032        end do
2033      end if
2034
2035    else if (statevector_out%dataKind == 8 .and. statevector_in%dataKind == 4) then
2036
2037      if (trim(gsvCopyType) == 'kIndex') then
2038        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex,stepIn)
2039        do kIndex = k1, k2
2040          do stepIndex = step1, step2
2041            if (present(stepIndexOut_opt)) then
2042              stepIn = 1
2043            else if(timeMismatch) then
2044              stepIn_Loop7: do stepIn = 1, statevector_in%numStep
2045                if (statevector_in%dateStampList(stepIn) ==  &
2046                    statevector_out%dateStampList(stepIndex)) exit stepIn_loop7
2047              end do stepIn_Loop7
2048            else
2049              stepIn = stepIndex
2050            end if
2051            do latIndex = lat1, lat2
2052              do lonIndex = lon1, lon2
2053                statevector_out%gd_r8(lonIndex,latIndex,kIndex,stepIndex) =  &
2054                  real(statevector_in%gd_r4(lonIndex,latIndex,kIndex,stepIn),8)
2055              end do
2056            end do
2057          end do
2058        end do
2059        !$OMP END PARALLEL DO
2060
2061      else
2062        do varIndex = 1, numCommonVar
2063          varName = varNameListCommon(varIndex)
2064
2065          nlev_in = gsv_getNumLevFromVarName(statevector_in,varName)
2066
2067          call gsv_getField(statevector_in ,field_in_r4, varName)
2068          call gsv_getField(statevector_out,field_out_r8, varName)
2069
2070          !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,levIndex,lonIndex,stepIn)
2071          do stepIndex = step1, step2
2072            if (present(stepIndexOut_opt)) then
2073              stepIn = 1
2074            else if(timeMismatch) then
2075              stepIn_Loop8: do stepIn = 1, statevector_in%numStep
2076                if (statevector_in%dateStampList(stepIn) ==  &
2077                    statevector_out%dateStampList(stepIndex)) exit stepIn_loop8
2078              end do stepIn_Loop8
2079            else
2080              stepIn = stepIndex
2081            end if
2082            do levIndex = 1, nlev_in
2083              do latIndex = lat1, lat2
2084                do lonIndex = lon1, lon2
2085                  field_out_r8(lonIndex,latIndex,levIndex,stepIndex) =  &
2086                    real(field_in_r4(lonIndex,latIndex,levIndex,stepIn),8)
2087                end do
2088              end do
2089            end do
2090          end do
2091          !$OMP END PARALLEL DO
2092
2093        end do
2094      end if
2095
2096    else
2097      call utl_abort('gsv_copy: Unknown data types')
2098    end if
2099
2100    deallocate(varNameListCommon)
2101
2102    ! Copy mask if it exists
2103    call gsv_copyMask(statevector_in, statevector_out, beSilent_opt=beSilent)
2104
2105  end subroutine gsv_copy
2106
2107  !--------------------------------------------------------------------------
2108  ! gsv_copyMask
2109  !--------------------------------------------------------------------------
2110  subroutine gsv_copyMask(statevector_in,statevector_out,beSilent_opt)
2111    !
2112    !:Purpose: Copy ocean mask, if it exists.
2113    !
2114    implicit none
2115
2116    ! Arguments:
2117    type(struct_gsv),  intent(in)    :: statevector_in
2118    type(struct_gsv),  intent(inout) :: statevector_out
2119    logical, optional, intent(in)    :: beSilent_opt
2120
2121    ! Locals:
2122    logical :: beSilent
2123
2124    if ( present(beSilent_opt) ) then
2125      beSilent = beSilent_opt
2126    else
2127      beSilent = .false.
2128    end if
2129
2130    ! Copy mask if it exists
2131    call ocm_copyMask(statevector_in%oceanMask, statevector_out%oceanMask,&
2132                      beSilent_opt=beSilent)
2133
2134  end subroutine gsv_copyMask
2135
2136  !--------------------------------------------------------------------------
2137  ! gsv_copy4Dto3D
2138  !--------------------------------------------------------------------------
2139  subroutine gsv_copy4Dto3D(statevector_in,statevector_out)
2140    !
2141    !:Purpose: Copies contents of a 4D statevector into a 3D statevector
2142    !           object by extracting the middle time step.
2143    !
2144    implicit none
2145
2146    ! Arguments:
2147    type(struct_gsv), intent(in)     :: statevector_in
2148    type(struct_gsv), intent(inout)  :: statevector_out
2149
2150    ! Locals:
2151    integer :: middleStepIndex, lonIndex, latIndex, kIndex
2152    integer :: lon1, lon2, lat1, lat2, k1, k2, numStepIn, numStepOut
2153
2154    if (.not.statevector_in%allocated) then
2155      call utl_abort('gsv_copy4Dto3D: gridStateVector_in not yet allocated')
2156    end if
2157    if (.not.statevector_out%allocated) then
2158      call utl_abort('gsv_copy4Dto3D: gridStateVector_out not yet allocated')
2159    end if
2160
2161    lon1 = statevector_in%myLonBeg
2162    lon2 = statevector_in%myLonEnd
2163    lat1 = statevector_in%myLatBeg
2164    lat2 = statevector_in%myLatEnd
2165    k1 = statevector_in%mykBeg
2166    k2 = statevector_in%mykEnd
2167    numStepIn  =  statevector_in%numStep
2168    numStepOut =  statevector_out%numStep
2169
2170    if (numStepOut /= 1) call utl_abort('gsv_copy4Dto3D: output statevector must have only 1 timestep')
2171    if (numStepIn == 1) then
2172      call msg('gsv_copy4Dto3D', 'WARNING: input statevector only has 1 timestep, will simply copy.')
2173    end if
2174    middleStepIndex = (numStepIn + 1) / 2
2175
2176    if (associated(statevector_in%HeightSfc) .and. associated(statevector_out%HeightSfc)) then
2177      statevector_out%HeightSfc(:,:) = statevector_in%HeightSfc(:,:)
2178    end if
2179
2180    if (statevector_out%dataKind == 8 .and. statevector_in%dataKind == 8) then
2181
2182      !$OMP PARALLEL DO PRIVATE (kIndex,latIndex,lonIndex)
2183      do kIndex = k1, k2
2184        do latIndex = lat1, lat2
2185          do lonIndex = lon1, lon2
2186            statevector_out%gd_r8(lonIndex,latIndex,kIndex,1) =  &
2187              statevector_in%gd_r8(lonIndex,latIndex,kIndex,middleStepIndex)
2188          end do
2189        end do
2190      end do
2191      !$OMP END PARALLEL DO
2192
2193    else if (statevector_out%dataKind == 4 .and. statevector_in%dataKind == 4) then
2194
2195      !$OMP PARALLEL DO PRIVATE (kIndex,latIndex,lonIndex)
2196      do kIndex = k1, k2
2197        do latIndex = lat1, lat2
2198          do lonIndex = lon1, lon2
2199            statevector_out%gd_r4(lonIndex,latIndex,kIndex,1) =  &
2200              statevector_in%gd_r4(lonIndex,latIndex,kIndex,middleStepIndex)
2201          end do
2202        end do
2203      end do
2204      !$OMP END PARALLEL DO
2205
2206    else
2207      call utl_abort('gsv_copy4Dto3D: Data type must be the same for both statevectors')
2208    end if
2209
2210  end subroutine gsv_copy4Dto3D
2211
2212  !--------------------------------------------------------------------------
2213  ! gsv_copyHeightSfc
2214  !--------------------------------------------------------------------------
2215  subroutine gsv_copyHeightSfc(statevector_in,statevector_out)
2216    !
2217    !:Purpose: Copies HeightSfc data from one statevector to another
2218    !
2219    implicit none
2220
2221    ! Arguments:
2222    type(struct_gsv), intent(in)    :: statevector_in
2223    type(struct_gsv), intent(inout) :: statevector_out
2224
2225    if ( .not. hco_equal(gsv_getHco(statevector_in), gsv_getHco(statevector_out))) then
2226      call utl_abort('gsv_copyHeightSfc: inconsistent horizontal structure')
2227    end if
2228    if (.not.statevector_in%allocated) then
2229      call utl_abort('gsv_copyHeightSfc: gridStateVector_in not yet allocated')
2230    end if
2231    if (.not.statevector_out%allocated) then
2232      call utl_abort('gsv_copyHeightSfc: gridStateVector_out not yet allocated')
2233    end if
2234
2235    if (.not. associated(statevector_in%HeightSfc)) then
2236      call utl_abort('gsv_copyHeightSfc: HeightSfc in gridStateVector_in not allocated')
2237    end if
2238    if (.not. associated(statevector_out%HeightSfc)) then
2239      call utl_abort('gsv_copyHeightSfc: HeightSfc in gridStateVector_out not allocated')
2240    end if
2241
2242    statevector_out%HeightSfc(:,:) = statevector_in%HeightSfc(:,:)
2243
2244  end subroutine gsv_copyHeightSfc
2245
2246  !--------------------------------------------------------------------------
2247  ! gsv_hPad
2248  !--------------------------------------------------------------------------
2249  subroutine gsv_hPad(statevector_in,statevector_out)
2250    !
2251    !:Purpose: Copies a statevector to a horizontally larger one and pad with a
2252    !           predefined value of 0 (or 1000 for 'P0'). 
2253    !
2254    implicit none
2255
2256    ! Arguments:
2257    type(struct_gsv), intent(in)     :: statevector_in
2258    type(struct_gsv), intent(inout)  :: statevector_out
2259
2260    ! Locals:
2261    integer :: stepIndex,lonIndex,kIndex,latIndex
2262    integer :: lonBeg_in, lonEnd_in, latBeg_in, latEnd_in, kBeg, kEnd
2263    real(8) :: paddingValue_r8
2264    real(4) :: paddingValue_r4
2265
2266    if (.not.statevector_in%allocated) then
2267      call utl_abort('gsv_hPad: gridStateVector_in not yet allocated')
2268    end if
2269    if (.not.statevector_out%allocated) then
2270      call utl_abort('gsv_hPad: gridStateVector_out not yet allocated')
2271    end if
2272    if (statevector_in%mpi_local .or. statevector_out%mpi_local) then
2273       call utl_abort('gsv_hPad: both gridStateVectors must be NO MPI')
2274    end if
2275
2276    lonBeg_in=statevector_in%myLonBeg
2277    lonEnd_in=statevector_in%myLonEnd
2278    latBeg_in=statevector_in%myLatBeg
2279    latEnd_in=statevector_in%myLatEnd
2280    kBeg=statevector_in%mykBeg
2281    kEnd=statevector_in%mykEnd
2282
2283    if (lonBeg_in > statevector_out%myLonBeg .or. &
2284        lonEnd_in > statevector_out%myLonEnd .or. &
2285        latBeg_in > statevector_out%myLatBeg .or. &
2286        latEnd_in > statevector_out%myLatEnd) then
2287      call utl_abort('gsv_hPad: StateVector_out is SMALLER than StateVector_in')
2288    end if
2289    if (kBeg /= statevector_out%mykBeg .or. kEnd /= statevector_out%mykEnd) then
2290      call utl_abort('gsv_hPad: Vertical levels are not compatible')
2291    end if
2292
2293    ! copy over some time related parameters
2294    statevector_out%deet   = statevector_in%deet
2295    statevector_out%etiket = statevector_in%etiket
2296    do stepIndex = 1, statevector_out%numStep
2297      statevector_out%dateOriginList(stepIndex) = statevector_in%dateOriginList(stepIndex)
2298      statevector_out%npasList(stepIndex)       = statevector_in%npasList(stepIndex)
2299      statevector_out%ip2List(stepIndex)        = statevector_in%ip2List(stepIndex)
2300    end do
2301
2302    if (statevector_out%dataKind == 8 .and. statevector_in%dataKind == 8) then
2303
2304      if (associated(statevector_in%HeightSfc) .and. associated(statevector_out%HeightSfc)) then
2305        statevector_out%HeightSfc(:,:) = 0.d0
2306        statevector_out%HeightSfc(lonBeg_in:lonEnd_in,latBeg_in:latEnd_in) = &
2307             statevector_in%HeightSfc(:,:)
2308      end if
2309
2310      !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex,paddingValue_r8)    
2311      do kIndex = kBeg, kEnd
2312        if (trim(gsv_getVarNameFromK(statevector_out,kIndex)) == 'P0') then
2313          paddingValue_r8 = 1000.d0 ! 1000 hPa
2314        else
2315          paddingValue_r8 = 0.d0
2316        end if
2317        do stepIndex = 1, statevector_out%numStep
2318          statevector_out%gd_r8(:,:,kIndex,stepIndex) = paddingValue_r8
2319          do latIndex = latBeg_in, latEnd_in
2320            do lonIndex = lonBeg_in, lonEnd_in
2321              statevector_out%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = &
2322                   statevector_in%gd_r8(lonIndex,latIndex,kIndex,stepIndex)
2323            end do
2324          end do
2325        end do
2326      end do
2327      !$OMP END PARALLEL DO
2328
2329    else if (statevector_out%dataKind == 4 .and. statevector_in%dataKind == 4) then
2330
2331      if (associated(statevector_in%HeightSfc) .and. associated(statevector_out%HeightSfc)) then
2332        statevector_out%HeightSfc(:,:) = 0.0
2333        statevector_out%HeightSfc(lonBeg_in:lonEnd_in,latBeg_in:latEnd_in) = statevector_in%HeightSfc(:,:)
2334      end if
2335
2336      !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex,paddingValue_r4)    
2337      do kIndex = kBeg, kEnd
2338        if (trim(gsv_getVarNameFromK(statevector_out,kIndex)) == 'P0') then
2339          paddingValue_r4 = 1000.0 ! 1000 hPa
2340        else
2341          paddingValue_r4 = 0.0
2342        end if
2343        do stepIndex = 1, statevector_out%numStep
2344          statevector_out%gd_r4(:,:,kIndex,stepIndex) = paddingValue_r4
2345          do latIndex = latBeg_in, latEnd_in
2346            do lonIndex = lonBeg_in, lonEnd_in
2347              statevector_out%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = statevector_in%gd_r4(lonIndex,latIndex,kIndex,stepIndex)
2348            end do
2349          end do
2350        end do
2351      end do
2352      !$OMP END PARALLEL DO
2353
2354    else
2355      call utl_abort('gsv_hPad: Data type must be the same for both statevectors')
2356    end if
2357
2358  end subroutine gsv_hPad
2359
2360  !--------------------------------------------------------------------------
2361  ! gsv_power
2362  !--------------------------------------------------------------------------
2363  subroutine gsv_power(statevector_inout,power,scaleFactor_opt)
2364    !
2365    !:Purpose: Applies the power function
2366    !           statevector_inout(i,j,k,l) = scaleFactor_opt * (statevector_inout(i,j,k,l)**power)
2367    !
2368    implicit none
2369
2370    ! Arguments:
2371    type(struct_gsv),  intent(inout)  :: statevector_inout
2372    real(8),           intent(in)     :: power
2373    real(8), optional, intent(in)     :: scaleFactor_opt    ! optional scaling applied on the power of the operand
2374
2375    ! Locals:
2376    integer :: stepIndex,lonIndex,kIndex,latIndex,lon1,lon2,lat1,lat2,k1,k2
2377
2378    if (.not.statevector_inout%allocated) then
2379      call utl_abort('gsv_power: gridStateVector_inout not yet allocated')
2380    end if
2381
2382    lon1=statevector_inout%myLonBeg
2383    lon2=statevector_inout%myLonEnd
2384    lat1=statevector_inout%myLatBeg
2385    lat2=statevector_inout%myLatEnd
2386    k1=statevector_inout%mykBeg
2387    k2=statevector_inout%mykEnd
2388
2389    if (statevector_inout%dataKind == 8) then
2390
2391      if (present(scaleFactor_opt)) then
2392        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
2393        do kIndex = k1, k2
2394          do stepIndex = 1, statevector_inout%numStep
2395            do latIndex = lat1, lat2
2396              do lonIndex = lon1, lon2
2397                statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = &
2398                     scaleFactor_opt * (statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex))**power
2399              end do
2400            end do
2401          end do
2402        end do
2403        !$OMP END PARALLEL DO
2404      else
2405        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
2406        do kIndex = k1, k2
2407          do stepIndex = 1, statevector_inout%numStep
2408            do latIndex = lat1, lat2
2409              do lonIndex = lon1, lon2
2410                statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = &
2411                     (statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex))**power
2412              end do
2413            end do
2414          end do
2415        end do
2416        !$OMP END PARALLEL DO
2417      end if
2418
2419    else if (statevector_inout%dataKind == 4) then
2420
2421      if (present(scaleFactor_opt)) then
2422        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
2423        do kIndex = k1, k2
2424          do stepIndex = 1, statevector_inout%numStep
2425            do latIndex = lat1, lat2
2426              do lonIndex = lon1, lon2
2427                statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = &
2428                     real(scaleFactor_opt,4) * (statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex))**power
2429              end do
2430            end do
2431          end do
2432        end do
2433        !$OMP END PARALLEL DO
2434      else
2435        !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
2436        do kIndex = k1, k2
2437          do stepIndex = 1, statevector_inout%numStep
2438            do latIndex = lat1, lat2
2439              do lonIndex = lon1, lon2
2440                statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = &
2441                     (statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex))**power
2442              end do
2443            end do
2444          end do
2445        end do
2446        !$OMP END PARALLEL DO
2447      end if
2448
2449    end if
2450
2451  end subroutine gsv_power
2452
2453  !--------------------------------------------------------------------------
2454  ! gsv_scale
2455  !--------------------------------------------------------------------------
2456  subroutine gsv_scale(statevector_inout,scaleFactor)
2457    !
2458    !:Purpose: Applies scaling factor to a statevector
2459    !           statevector_inout = scaleFactor * statevector_inout
2460    !
2461    implicit none
2462
2463    ! Arguments:
2464    type(struct_gsv), intent(inout) :: statevector_inout
2465    real(8),          intent(in)    :: scaleFactor
2466
2467    ! Locals:
2468    integer :: stepIndex,lonIndex,kIndex,latIndex,lon1,lon2,lat1,lat2,k1,k2
2469
2470    if (.not.statevector_inout%allocated) then
2471      call utl_abort('gsv_scale: gridStateVector_inout not yet allocated')
2472    end if
2473
2474    lon1=statevector_inout%myLonBeg
2475    lon2=statevector_inout%myLonEnd
2476    lat1=statevector_inout%myLatBeg
2477    lat2=statevector_inout%myLatEnd
2478    k1=statevector_inout%mykBeg
2479    k2=statevector_inout%mykEnd
2480
2481    if (statevector_inout%dataKind == 8) then
2482
2483      !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
2484      do kIndex = k1, k2
2485        do stepIndex = 1, statevector_inout%numStep
2486          do latIndex = lat1, lat2
2487            do lonIndex = lon1, lon2
2488              statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = &
2489                   scaleFactor * statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex)
2490            end do
2491          end do
2492        end do
2493      end do
2494      !$OMP END PARALLEL DO
2495
2496    else
2497
2498      !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
2499      do kIndex = k1, k2
2500        do stepIndex = 1, statevector_inout%numStep
2501          do latIndex = lat1, lat2
2502            do lonIndex = lon1, lon2
2503              statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = &
2504                   real(scaleFactor,4) * statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex)
2505            end do
2506          end do
2507        end do
2508      end do
2509      !$OMP END PARALLEL DO
2510
2511    end if
2512
2513  end subroutine gsv_scale
2514
2515  !--------------------------------------------------------------------------
2516  ! gsv_scaleVertical
2517  !--------------------------------------------------------------------------
2518  subroutine gsv_scaleVertical(statevector_inout,scaleFactor)
2519    !
2520    !:Purpose: Applies a specific scaling to each level
2521    !           statevector_inout(:,:,k,:) = scaleFactor(k) * statevector_inout(:,:,k,:)
2522    !
2523    implicit none
2524
2525    ! Arguments:
2526    type(struct_gsv), intent(inout) :: statevector_inout
2527    real(8),          intent(in)    :: scaleFactor(:)
2528
2529    ! Locals:
2530    integer          :: stepIndex,lonIndex,kIndex,latIndex,lon1,lon2,lat1,lat2,k1,k2,levIndex
2531    character(len=4) :: varLevel
2532
2533    if (.not.statevector_inout%allocated) then
2534      call utl_abort('gsv_scaleVertical: gridStateVector_inout not yet allocated')
2535    end if
2536
2537    lon1=statevector_inout%myLonBeg
2538    lon2=statevector_inout%myLonEnd
2539    lat1=statevector_inout%myLatBeg
2540    lat2=statevector_inout%myLatEnd
2541    k1=statevector_inout%mykBeg
2542    k2=statevector_inout%mykEnd
2543
2544    if (statevector_inout%dataKind == 8) then
2545
2546      !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex,levIndex,varLevel)
2547      do kIndex = k1, k2
2548        varLevel = vnl_varLevelFromVarname(gsv_getVarNameFromK(statevector_inout, kIndex))
2549        if (varLevel == 'SF' .or. varLevel == 'SFMM' .or. varLevel == 'SFTH') then
2550          ! use lowest momentum level for surface variables
2551          levIndex = gsv_getNumLev(statevector_inout, 'MM')
2552        else
2553          levIndex = gsv_getLevFromK(statevector_inout, kIndex)
2554        end if
2555        do stepIndex = 1, statevector_inout%numStep
2556          do latIndex = lat1, lat2
2557            do lonIndex = lon1, lon2
2558              statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = &
2559                   scaleFactor(levIndex) * statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex)
2560            end do
2561          end do
2562        end do
2563      end do
2564      !$OMP END PARALLEL DO
2565
2566    else
2567
2568      !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex,levIndex,varLevel)
2569      do kIndex = k1, k2
2570        varLevel = vnl_varLevelFromVarname(gsv_getVarNameFromK(statevector_inout, kIndex))
2571        if (varLevel == 'SF' .or. varLevel == 'SFMM' .or. varLevel == 'SFTH') then
2572          ! use lowest momentum level for surface variables
2573          levIndex = gsv_getNumLev(statevector_inout, 'MM')
2574        else
2575          levIndex = gsv_getLevFromK(statevector_inout, kIndex)
2576        end if
2577        do stepIndex = 1, statevector_inout%numStep
2578          do latIndex = lat1, lat2
2579            do lonIndex = lon1, lon2
2580              statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = &
2581                   real(scaleFactor(levIndex),4) * statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex)
2582            end do
2583          end do
2584        end do
2585      end do
2586      !$OMP END PARALLEL DO
2587
2588    end if
2589
2590  end subroutine gsv_scaleVertical
2591
2592  !--------------------------------------------------------------------------
2593  ! gsv_3dto4d
2594  !--------------------------------------------------------------------------
2595  subroutine gsv_3dto4d(statevector_inout)
2596    !
2597    !:Purpose: Copies the 3D data array to all time steps of the 4D array of the
2598    !           same statevector
2599    !
2600    implicit none
2601
2602    ! Arguments:
2603    type(struct_gsv), intent(inout) :: statevector_inout
2604
2605    ! Locals:
2606    integer :: stepIndex,lonIndex,kIndex,latIndex,lon1,lon2,lat1,lat2,k1,k2
2607
2608    if (.not.statevector_inout%allocated) then
2609      call utl_abort('gsv_3dto4d: statevector not yet allocated')
2610    end if
2611
2612    lon1=statevector_inout%myLonBeg
2613    lon2=statevector_inout%myLonEnd
2614    lat1=statevector_inout%myLatBeg
2615    lat2=statevector_inout%myLatEnd
2616    k1=statevector_inout%mykBeg
2617    k2=statevector_inout%mykEnd
2618
2619    if (statevector_inout%numStep.eq.1) return
2620
2621    if (statevector_inout%dataKind == 8) then
2622
2623      !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
2624      do kIndex = k1, k2
2625        do stepIndex = 1, statevector_inout%numStep
2626          if (stepIndex.ne.statevector_inout%anltime) then
2627            do latIndex = lat1, lat2
2628              do lonIndex = lon1, lon2
2629                statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex) = &
2630                     statevector_inout%gd3d_r8(lonIndex,latIndex,kIndex)
2631              end do
2632            end do
2633          end if
2634        end do
2635      end do
2636      !$OMP END PARALLEL DO
2637
2638    else
2639
2640      !$OMP PARALLEL DO PRIVATE (stepIndex,latIndex,kIndex,lonIndex)    
2641      do kIndex = k1, k2
2642        do stepIndex = 1, statevector_inout%numStep
2643          if (stepIndex.ne.statevector_inout%anltime) then
2644            do latIndex = lat1, lat2
2645              do lonIndex = lon1, lon2
2646                statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex) = &
2647                     statevector_inout%gd3d_r4(lonIndex,latIndex,kIndex)
2648              end do
2649            end do
2650          end if
2651        end do
2652      end do
2653      !$OMP END PARALLEL DO
2654
2655    end if
2656
2657  end subroutine gsv_3dto4d
2658
2659  !--------------------------------------------------------------------------
2660  ! gsv_3dto4dAdj
2661  !--------------------------------------------------------------------------
2662  subroutine gsv_3dto4dAdj(statevector_inout)
2663    !
2664    !:Purpose: Adjoint code of the 3dto4d copy to all time steps 
2665    !
2666    implicit none
2667
2668    ! Arguments:
2669    type(struct_gsv), intent(inout) :: statevector_inout
2670
2671    ! Locals:
2672    integer :: stepIndex,lonIndex,kIndex,latIndex,lon1,lon2,lat1,lat2,k1,k2
2673    real(4), allocatable :: gd2d_tmp_r4(:,:)
2674    real(8), allocatable :: gd2d_tmp(:,:)
2675
2676    if (.not.statevector_inout%allocated) then
2677      call utl_abort('gsv_3dto4dAdj: statevector not yet allocated')
2678    end if
2679
2680    lon1=statevector_inout%myLonBeg
2681    lon2=statevector_inout%myLonEnd
2682    lat1=statevector_inout%myLatBeg
2683    lat2=statevector_inout%myLatEnd
2684    k1=statevector_inout%mykBeg
2685    k2=statevector_inout%mykEnd
2686
2687    if (statevector_inout%numStep.eq.1) return
2688
2689    if (statevector_inout%dataKind == 8) then
2690
2691      allocate(gd2d_tmp(lon1:lon2,lat1:lat2))
2692      !$OMP PARALLEL DO PRIVATE (latIndex,kIndex,lonIndex,stepIndex,gd2d_tmp)
2693      do kIndex = k1, k2
2694        gd2d_tmp(:,:) = 0.0d0
2695        do stepIndex = 1, statevector_inout%numStep
2696          do latIndex = lat1, lat2
2697            do lonIndex = lon1, lon2
2698              gd2d_tmp(lonIndex,latIndex) = gd2d_tmp(lonIndex,latIndex) +   &
2699                   statevector_inout%gd_r8(lonIndex,latIndex,kIndex,stepIndex)
2700            end do
2701          end do
2702        end do
2703        do latIndex = lat1, lat2
2704          do lonIndex = lon1, lon2
2705            statevector_inout%gd3d_r8(lonIndex,latIndex,kIndex) = &
2706                 gd2d_tmp(lonIndex,latIndex)
2707          end do
2708        end do
2709      end do
2710      !$OMP END PARALLEL DO
2711      deallocate(gd2d_tmp)
2712
2713    else
2714
2715      allocate(gd2d_tmp_r4(lon1:lon2,lat1:lat2))
2716      !$OMP PARALLEL DO PRIVATE (latIndex,kIndex,lonIndex,stepIndex,gd2d_tmp_r4)
2717      do kIndex = k1, k2
2718        gd2d_tmp_r4(:,:) = 0.0
2719        do stepIndex = 1, statevector_inout%numStep
2720          do latIndex = lat1, lat2
2721            do lonIndex = lon1, lon2
2722              gd2d_tmp_r4(lonIndex,latIndex) = gd2d_tmp_r4(lonIndex,latIndex) +   &
2723                   statevector_inout%gd_r4(lonIndex,latIndex,kIndex,stepIndex)
2724            end do
2725          end do
2726        end do
2727        do latIndex = lat1, lat2
2728          do lonIndex = lon1, lon2
2729            statevector_inout%gd3d_r4(lonIndex,latIndex,kIndex) = &
2730                 gd2d_tmp_r4(lonIndex,latIndex)
2731          end do
2732        end do
2733      end do
2734      !$OMP END PARALLEL DO
2735      deallocate(gd2d_tmp_r4)
2736
2737    end if
2738
2739  end subroutine gsv_3dto4dAdj
2740
2741  !--------------------------------------------------------------------------
2742  ! gsv_deallocate
2743  !--------------------------------------------------------------------------
2744  subroutine gsv_deallocate(statevector)
2745    !
2746    !:Purpose: Deallocates the struct_gsv memory structure
2747    !
2748    implicit none
2749
2750    ! Arguments:
2751    type(struct_gsv), intent(inout) :: statevector
2752
2753    ! Locals:
2754    integer :: ierr, kIndex
2755
2756    if (.not.statevector%allocated) then
2757      call utl_abort('gsv_deallocate: gridStateVector not yet allocated')
2758    end if
2759
2760    statevector%allocated=.false.
2761
2762    deallocate(statevector%allLonBeg)
2763    deallocate(statevector%allLonEnd)
2764    deallocate(statevector%allLonPerPE)
2765    deallocate(statevector%allLatBeg)
2766    deallocate(statevector%allLatEnd)
2767    deallocate(statevector%allLatPerPE)
2768    deallocate(statevector%allkBeg)
2769    deallocate(statevector%allkEnd)
2770    deallocate(statevector%allkCount)
2771    deallocate(statevector%allUVkBeg)
2772    deallocate(statevector%allUVkEnd)
2773    deallocate(statevector%allUVkCount)
2774
2775    if (statevector%dataKind == 8) then
2776      deallocate(statevector%gd_r8,stat=ierr)
2777      nullify(statevector%gd_r8)
2778      if (statevector%UVComponentPresent) then 
2779        do kIndex = statevector%myUVkBeg, statevector%myUVkEnd
2780          if (statevector%extraUVallocated) deallocate(statevector%gdUV(kIndex)%r8)
2781          nullify(statevector%gdUV(kIndex)%r8)
2782        end do
2783        deallocate(statevector%gdUV)
2784        nullify(statevector%gdUV)
2785      end if
2786    else if (statevector%dataKind == 4) then
2787      deallocate(statevector%gd_r4,stat=ierr)
2788      nullify(statevector%gd_r4)
2789      if (statevector%UVComponentPresent) then 
2790        do kIndex = statevector%myUVkBeg, statevector%myUVkEnd
2791          if (statevector%extraUVallocated) deallocate(statevector%gdUV(kIndex)%r4)
2792          nullify(statevector%gdUV(kIndex)%r4)
2793        end do
2794        deallocate(statevector%gdUV)
2795        nullify(statevector%gdUV)
2796      end if
2797    end if
2798    if (ierr.ne.0) then
2799      write(*,*) 'gsv_deallocate: Problem detected. IERR =',ierr
2800    end if
2801
2802    statevector%heightSfcPresent = .false.
2803    if (associated(statevector%HeightSfc)) then
2804      deallocate(statevector%HeightSfc)
2805      nullify(statevector%HeightSfc)
2806    end if
2807
2808    if (associated(statevector%dateStampList)) then
2809      deallocate(statevector%dateStampList)
2810      nullify(statevector%dateStampList)
2811    end if
2812    deallocate(statevector%dateOriginList)
2813    deallocate(statevector%npasList)
2814    deallocate(statevector%ip2List)
2815    deallocate(statevector%varOffset)
2816    deallocate(statevector%varNumLev)
2817
2818    call ocm_deallocate(statevector%oceanMask)
2819
2820  end subroutine gsv_deallocate
2821
2822  !--------------------------------------------------------------------------
2823  ! gsv_getField main routine and wrappers for r4 and r8
2824  !--------------------------------------------------------------------------
2825  subroutine gsv_getFieldWrapper_r4(statevector, field_r4, varName_opt)
2826    !
2827    !:Purpose: Returns a pointer to the 4D data array. 
2828    !           Wrapper for the kind 4 real.
2829    !
2830    implicit none
2831
2832    ! Arguments:
2833    type(struct_gsv),           intent(in)    :: statevector
2834    real(4), pointer,           intent(inout) :: field_r4(:,:,:,:)
2835    character(len=*), optional, intent(in)    :: varName_opt
2836
2837    if (statevector%dataKind /= 4) call utl_abort('gsv_getFieldWrapper_r4: wrong dataKind')
2838    call gsv_getField_r48(statevector, field_r4_opt = field_r4, varName_opt = varName_opt)
2839
2840  end subroutine gsv_getFieldWrapper_r4
2841
2842  subroutine gsv_getFieldWrapper_r8(statevector, field_r8, varName_opt)
2843    !
2844    !:Purpose: Returns a pointer to the 4D data array. 
2845    !           Wrapper for the kind 8 real.
2846    !
2847    implicit none
2848
2849    ! Arguments:
2850    type(struct_gsv),           intent(in)    :: statevector
2851    real(8), pointer,           intent(inout) :: field_r8(:,:,:,:)
2852    character(len=*), optional, intent(in)    :: varName_opt
2853
2854    if (statevector%dataKind /= 8) call utl_abort('gsv_getFieldWrapper_r8: wrong dataKind')
2855    call gsv_getField_r48(statevector, field_r8_opt = field_r8, varName_opt = varName_opt)
2856
2857  end subroutine gsv_getFieldWrapper_r8
2858
2859  subroutine gsv_getField_r48(statevector,field_r4_opt,field_r8_opt,varName_opt)
2860    !
2861    !:Purpose: Returns a pointer to the 4D data array. 
2862    !           Wrapper pairing the proper real data kind
2863    !
2864    implicit none
2865
2866    ! Arguments:
2867    type(struct_gsv),           intent(in)    :: statevector
2868    character(len=*), optional, intent(in)    :: varName_opt
2869    real(4), pointer, optional, intent(inout) :: field_r4_opt(:,:,:,:)
2870    real(8), pointer, optional, intent(inout) :: field_r8_opt(:,:,:,:)
2871
2872    ! Locals:
2873    integer                                :: ilev1, ilev2, lon1, lat1, k1
2874
2875    lon1 = statevector%myLonBeg
2876    lat1 = statevector%myLatBeg
2877    k1 = statevector%mykBeg
2878
2879    if (present(varName_opt)) then
2880      if (statevector%mpi_distribution == 'VarsLevs') then
2881        call utl_abort('gsv_getField_r48: cannot specify a varName for VarsLevs mpi distribution')
2882      end if
2883      if (gsv_varExist(statevector,varName_opt)) then
2884        ilev1 = 1 + statevector%varOffset(vnl_varListIndex(varName_opt))
2885        ilev2 = ilev1 - 1 + statevector%varNumLev(vnl_varListIndex(varName_opt))
2886        if (gsv_getDataKind(statevector) == 4) then
2887          field_r4_opt(lon1:,lat1:,1:,1:) => statevector%gd_r4(:,:,ilev1:ilev2,:)
2888        else
2889          field_r8_opt(lon1:,lat1:,1:,1:) => statevector%gd_r8(:,:,ilev1:ilev2,:)
2890        end if
2891      else
2892        call utl_abort('gsv_getField_r48: Unknown variable name! ' // varName_opt)
2893      end if
2894    else
2895      if (gsv_getDataKind(statevector) == 4) then
2896        field_r4_opt(lon1:,lat1:,k1:,1:) => statevector%gd_r4(:,:,:,:)
2897      else
2898        field_r8_opt(lon1:,lat1:,k1:,1:) => statevector%gd_r8(:,:,:,:)
2899      end if
2900    end if
2901
2902  end subroutine gsv_getField_r48
2903
2904  !--------------------------------------------------------------------------
2905  ! gsv_getField3D_r8
2906  !--------------------------------------------------------------------------
2907  subroutine gsv_getField3D_r8(statevector,field3D,varName_opt,stepIndex_opt)
2908    !
2909    !:Purpose: Returns a pointer to the 3D data array. 
2910    !           Wrapper for the kind 8 real.
2911    !
2912    implicit none
2913
2914    ! Arguments:
2915    type(struct_gsv),           intent(in)    :: statevector
2916    real(8), pointer,           intent(inout) :: field3D(:,:,:)
2917    character(len=*), optional, intent(in)    :: varName_opt
2918    integer,          optional, intent(in)    :: stepIndex_opt
2919
2920    ! Locals:
2921    integer                                :: ilev1,ilev2,lon1,lat1,k1
2922
2923    lon1=statevector%myLonBeg
2924    lat1=statevector%myLatBeg
2925    k1=statevector%mykBeg
2926
2927    if (.not. associated(statevector%gd3d_r8)) then
2928      call utl_abort('gsv_getField3D_r8: data with type r8 not allocated')
2929    end if
2930
2931    if (present(varName_opt)) then
2932      if (statevector%mpi_distribution == 'VarsLevs') then
2933        call utl_abort('gsv_getField3D_r8: cannot specify a varName for VarsLevs mpi distribution')
2934      end if
2935      if (gsv_varExist(statevector,varName_opt)) then
2936        ilev1 = 1 + statevector%varOffset(vnl_varListIndex(varName_opt))
2937        ilev2 = ilev1 - 1 + statevector%varNumLev(vnl_varListIndex(varName_opt))
2938        if (present(stepIndex_opt)) then
2939          field3D(lon1:,lat1:,1:) => statevector%gd_r8(:,:,ilev1:ilev2,stepIndex_opt)
2940        else
2941          field3D(lon1:,lat1:,1:) => statevector%gd3d_r8(:,:,ilev1:ilev2)
2942        end if
2943      else
2944        call utl_abort('gsv_getField3D_r8: Unknown variable name! ' // varName_opt)
2945      end if
2946    else
2947      if (present(stepIndex_opt)) then
2948        field3D(lon1:,lat1:,k1:) => statevector%gd_r8(:,:,:,stepIndex_opt)
2949      else
2950        field3D(lon1:,lat1:,k1:) => statevector%gd3d_r8(:,:,:)
2951      end if
2952    end if
2953
2954  end subroutine gsv_getField3D_r8
2955
2956  !--------------------------------------------------------------------------
2957  ! gsv_getField3D_r4
2958  !--------------------------------------------------------------------------
2959  subroutine gsv_getField3D_r4(statevector,field3d,varName_opt,stepIndex_opt)
2960    !
2961    !:Purpose: Returns a pointer to the 3D data array. 
2962    !           Wrapper for the kind 4 real.
2963    !
2964    implicit none
2965
2966    ! Arguments:
2967    type(struct_gsv),           intent(in)    :: statevector
2968    real(4), pointer,           intent(inout) :: field3d(:,:,:)
2969    character(len=*), optional, intent(in)    :: varName_opt
2970    integer,          optional, intent(in)    :: stepIndex_opt
2971
2972    ! Locals:
2973    integer                                :: ilev1,ilev2,lon1,lat1,k1
2974
2975    lon1=statevector%myLonBeg
2976    lat1=statevector%myLatBeg
2977    k1=statevector%mykBeg
2978
2979    if (.not. associated(statevector%gd3d_r4)) then
2980      call utl_abort('gsv_getField3D_r4: data with type r4 not allocated')
2981    end if
2982
2983    if (present(varName_opt)) then
2984      if (statevector%mpi_distribution == 'VarsLevs') then
2985        call utl_abort('gsv_getField3D_r4: cannot specify a varName for VarsLevs mpi distribution')
2986      end if
2987      if (gsv_varExist(statevector,varName_opt)) then
2988        ilev1 = 1 + statevector%varOffset(vnl_varListIndex(varName_opt))
2989        ilev2 = ilev1 - 1 + statevector%varNumLev(vnl_varListIndex(varName_opt))
2990        if (present(stepIndex_opt)) then
2991          field3D(lon1:,lat1:,1:) => statevector%gd_r4(:,:,ilev1:ilev2,stepIndex_opt)
2992        else
2993          field3D(lon1:,lat1:,1:) => statevector%gd3d_r4(:,:,ilev1:ilev2)
2994        end if
2995      else
2996        call utl_abort('gsv_getField3D_r4: Unknown variable name! ' // varName_opt)
2997      end if
2998    else
2999      if (present(stepIndex_opt)) then
3000        field3D(lon1:,lat1:,k1:) => statevector%gd_r4(:,:,:,stepIndex_opt)
3001      else
3002        field3D(lon1:,lat1:,k1:) => statevector%gd3d_r4(:,:,:)
3003      end if
3004    end if
3005
3006  end subroutine gsv_getField3D_r4
3007
3008  !--------------------------------------------------------------------------
3009  ! gsv_getFieldUV main routine and wrappers for r4 and r8
3010  !--------------------------------------------------------------------------
3011  subroutine gsv_getFieldUVWrapper_r4(statevector,field_r4,kIndex)
3012    !
3013    !:Purpose: Returns a pointer to the UV data array. 
3014    !           Wrapper for the kind 4 real.
3015    !
3016    implicit none
3017
3018    ! Arguments:
3019    type(struct_gsv), intent(in)    :: statevector
3020    real(4), pointer, intent(inout) :: field_r4(:,:,:)
3021    integer,          intent(in)    :: kIndex
3022
3023    call gsv_getFieldUV_r48(statevector,field_r4_opt=field_r4,kIndex=kIndex)
3024    
3025  end subroutine gsv_getFieldUVWrapper_r4
3026  
3027  subroutine gsv_getFieldUVWrapper_r8(statevector,field_r8,kIndex)
3028    !
3029    !:Purpose: Returns a pointer to the UV data array. 
3030    !           Wrapper for the kind 8 real.
3031    !
3032    implicit none
3033
3034    ! Arguments:
3035    type(struct_gsv), intent(in)    :: statevector
3036    real(8), pointer, intent(inout) :: field_r8(:,:,:)
3037    integer,          intent(in)    :: kIndex
3038
3039    call gsv_getFieldUV_r48(statevector,field_r8_opt=field_r8,kIndex=kIndex)
3040    
3041  end subroutine gsv_getFieldUVWrapper_r8
3042  
3043  subroutine gsv_getFieldUV_r48(statevector,field_r4_opt,field_r8_opt,kIndex)
3044    !
3045    !:Purpose: Returns a pointer to the UV data array. 
3046    !           Wrapper pairing the proper real data kind
3047    !
3048    implicit none
3049
3050    ! Arguments:
3051    type(struct_gsv),           intent(in)    :: statevector
3052    integer,                    intent(in)    :: kIndex
3053    real(4), optional, pointer, intent(inout) :: field_r4_opt(:,:,:)
3054    real(8), optional, pointer, intent(inout) :: field_r8_opt(:,:,:)
3055
3056    ! Locals:
3057    integer                                :: lon1,lat1
3058
3059    lon1 = statevector%myLonBeg
3060    lat1 = statevector%myLatBeg
3061
3062    if (gsv_getDataKind(statevector) == 4) then
3063      if (.not. associated(statevector%gdUV(kIndex)%r4)) call utl_abort('gsv_getFieldUV_r48: data with type r4 not allocated')
3064      field_r4_opt(lon1:,lat1:,1:) => statevector%gdUV(kIndex)%r4(:,:,:)
3065    else
3066      if (.not. associated(statevector%gdUV(kIndex)%r8)) call utl_abort('gsv_getFieldUV_r48: data with type r8 not allocated')
3067      field_r8_opt(lon1:,lat1:,1:) => statevector%gdUV(kIndex)%r8(:,:,:)
3068    end if
3069
3070  end subroutine gsv_getFieldUV_r48
3071
3072  !--------------------------------------------------------------------------
3073  ! gsv_isAssocHeightSfc
3074  !--------------------------------------------------------------------------
3075  function gsv_isAssocHeightSfc(statevector) result(isAssociated)
3076    !
3077    !:Purpose: Returns .true. if HeightSfc is associated
3078    !
3079    implicit none
3080
3081    ! Arguments:
3082    type(struct_gsv), intent(in)        :: statevector
3083    ! Result:
3084    logical                             :: isAssociated
3085
3086    isAssociated = .false.
3087    if (associated(statevector%HeightSfc)) then
3088      isAssociated = .true.
3089    end if
3090
3091  end function gsv_isAssocHeightSfc
3092
3093  !--------------------------------------------------------------------------
3094  ! gsv_getHeightSfc
3095  !--------------------------------------------------------------------------
3096  function gsv_getHeightSfc(statevector) result(field)
3097    !
3098    !:Purpose: Returns an access pointer to HeightSfc
3099    !
3100    implicit none
3101
3102    ! Arguments:
3103    type(struct_gsv), intent(in)           :: statevector
3104    ! Result:
3105    real(8),pointer                        :: field(:,:)
3106
3107    ! Locals:
3108    integer                                :: lon1,lat1
3109
3110    lon1 = statevector%myLonBeg
3111    lat1 = statevector%myLatBeg
3112
3113    if (.not. statevector%heightSfcPresent) call utl_abort('gsv_getHeightSfc: data not allocated')
3114
3115    if (associated(statevector%HeightSfc)) then
3116      field(lon1:,lat1:) => statevector%HeightSfc(:,:)
3117    else
3118      nullify(field)
3119    end if
3120
3121  end function gsv_getHeightSfc
3122
3123  !--------------------------------------------------------------------------
3124  ! gsv_getDateStamp
3125  !--------------------------------------------------------------------------
3126  function gsv_getDateStamp(statevector,stepIndex_opt) result(dateStamp)
3127    !
3128    !:Purpose: Returns the reference datestamp (or the datestamp of a specified
3129    !           time step.
3130    !
3131    implicit none
3132
3133    ! Arguments:
3134    type(struct_gsv),  intent(in)  :: statevector
3135    integer, optional, intent(in)  :: stepIndex_opt 
3136    ! Result:
3137    integer                        :: dateStamp
3138
3139    if (associated(statevector%dateStampList)) then
3140      if (present(stepIndex_opt)) then
3141        if (stepIndex_opt.gt.0.and.stepIndex_opt.le.statevector%numStep) then
3142          dateStamp=statevector%dateStampList(stepIndex_opt)
3143        else
3144          call utl_abort('gsv_getDateStamp: requested step is out of range! Step=' &
3145               //str(stepIndex_opt)//',numStep='//str(statevector%numStep))
3146        end if    
3147      else
3148        dateStamp=statevector%dateStamp3D
3149      end if
3150    else
3151      call utl_abort('gsv_getDateStamp: dateStampList was not created during allocation!')
3152    end if
3153
3154  end function gsv_getDateStamp
3155
3156  !--------------------------------------------------------------------------
3157  ! gsv_getVco
3158  !--------------------------------------------------------------------------
3159  function gsv_getVco(statevector) result(vco_ptr)
3160    !
3161    !:Purpose: Returns an access pointer to the statevector vco 
3162    !           (vertical coordinate structure)
3163    !
3164    implicit none
3165
3166    ! Arguments:
3167    type(struct_gsv), intent(in) :: statevector
3168    ! Result:
3169    type(struct_vco), pointer    :: vco_ptr
3170
3171    vco_ptr => statevector%vco
3172
3173  end function gsv_getVco
3174
3175  !--------------------------------------------------------------------------
3176  ! gsv_getHco
3177  !--------------------------------------------------------------------------
3178  function gsv_getHco(statevector) result(hco_ptr)
3179    !
3180    !:Purpose: Returns an access pointer to the statevector hco 
3181    !           (horizontal coordinate structure)
3182    !
3183    implicit none
3184
3185    ! Arguments:
3186    type(struct_gsv), intent(in) :: statevector
3187    ! Result:
3188    type(struct_hco), pointer    :: hco_ptr
3189
3190    hco_ptr => statevector%hco
3191
3192  end function gsv_getHco
3193
3194  !--------------------------------------------------------------------------
3195  ! gsv_getHco_physics
3196  !--------------------------------------------------------------------------
3197  function gsv_getHco_physics(statevector) result(hco_ptr)
3198    !
3199    !:Purpose: Returns an access pointer to the statevector physics hco 
3200    !           (horizontal coordinate structure)
3201    !
3202    implicit none
3203
3204    ! Arguments:
3205    type(struct_gsv), intent(in) :: statevector
3206    ! Result:
3207    type(struct_hco), pointer    :: hco_ptr
3208
3209    hco_ptr => statevector%hco_physics
3210
3211  end function gsv_getHco_physics
3212
3213  !--------------------------------------------------------------------------
3214  ! gsv_transposeVarsLevsToTiles
3215  !--------------------------------------------------------------------------
3216  subroutine gsv_transposeVarsLevsToTiles(statevector_in, statevector_out)
3217    !
3218    !:Purpose: Transposes the data from mpi_distribution=VarsLevs to Tiles
3219    !
3220    implicit none
3221
3222    ! Arguments:
3223    type(struct_gsv), intent(in)    :: statevector_in
3224    type(struct_gsv), intent(inout) :: statevector_out
3225
3226    ! Locals:
3227    integer :: youridx, youridy, yourid, nsize, maxkcount, ierr
3228    integer :: sendrecvKind, inKind, outKind, stepIndex
3229    integer :: displs(mmpi_nprocs), nsizes(mmpi_nprocs)
3230    real(4), pointer     :: field_in_r4_ptr(:,:,:,:), field_out_r4_ptr(:,:,:,:)
3231    real(8), pointer     :: field_in_r8_ptr(:,:,:,:), field_out_r8_ptr(:,:,:,:)
3232    real(8), allocatable :: gd_send_height(:,:,:), gd_recv_height(:,:)
3233    real(8), pointer     :: field_height_in_ptr(:,:), field_height_out_ptr(:,:)
3234
3235    if (statevector_in%mpi_distribution /= 'VarsLevs') then
3236      call utl_abort('gsv_transposeVarsLevsToTiles: input statevector must have VarsLevs mpi distribution') 
3237    end if
3238
3239    if (statevector_out%mpi_distribution /= 'Tiles') then
3240      call utl_abort('gsv_transposeVarsLevsToTiles: out statevector must have Tiles mpi distribution')
3241    end if
3242
3243    call utl_tmg_start(164,'low-level--gsv_varsLevsToTiles')
3244
3245    inKind = statevector_in%dataKind
3246    outKind = statevector_out%dataKind
3247    if (inKind == 4 .or. outKind == 4) then
3248      sendrecvKind = 4
3249    else
3250      sendrecvKind = 8
3251    end if
3252
3253    maxkCount = maxval(statevector_in%allkCount(:))
3254    if (sendrecvKind == 4) then
3255      call utl_reAllocate(gd_send_varsLevs_r4, statevector_out%lonPerPEmax, statevector_out%latPerPEmax, &
3256                           maxkCount, mmpi_nprocs)
3257      call utl_reAllocate(gd_recv_varsLevs_r4, statevector_out%lonPerPEmax, statevector_out%latPerPEmax, &
3258                          maxkCount, mmpi_nprocs)
3259    else
3260      call utl_reAllocate(gd_send_varsLevs_r8, statevector_out%lonPerPEmax, statevector_out%latPerPEmax, &
3261                          maxkCount, mmpi_nprocs)
3262      call utl_reAllocate(gd_recv_varsLevs_r8, statevector_out%lonPerPEmax, statevector_out%latPerPEmax, &
3263                          maxkCount, mmpi_nprocs)
3264    end if
3265
3266    do stepIndex = 1, statevector_out%numStep
3267
3268      if (sendrecvKind == 4 .and. inKind == 4) then
3269        call gsv_getField(statevector_in,field_in_r4_ptr)
3270        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3271        do youridy = 0, (mmpi_npey-1)
3272          do youridx = 0, (mmpi_npex-1)
3273            yourid = youridx + youridy*mmpi_npex
3274            gd_send_varsLevs_r4(1:statevector_out%allLonPerPE(youridx+1),  &
3275                                1:statevector_out%allLatPerPE(youridy+1),  &
3276                                1:statevector_in%mykCount, yourid+1) =  &
3277              field_in_r4_ptr(statevector_out%allLonBeg(youridx+1):statevector_out%allLonEnd(youridx+1),  &
3278                              statevector_out%allLatBeg(youridy+1):statevector_out%allLatEnd(youridy+1),  &
3279                              statevector_in%mykBeg:statevector_in%mykEnd, stepIndex)
3280          end do
3281        end do
3282        !$OMP END PARALLEL DO
3283      else if (sendrecvKind == 4 .and. inKind == 8) then
3284        call gsv_getField(statevector_in,field_in_r8_ptr)
3285        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3286        do youridy = 0, (mmpi_npey-1)
3287          do youridx = 0, (mmpi_npex-1)
3288            yourid = youridx + youridy*mmpi_npex
3289            gd_send_varsLevs_r4(1:statevector_out%allLonPerPE(youridx+1),  &
3290                                1:statevector_out%allLatPerPE(youridy+1),  &
3291                                1:statevector_in%mykCount, yourid+1) =  &
3292              real(field_in_r8_ptr(statevector_out%allLonBeg(youridx+1):statevector_out%allLonEnd(youridx+1),  &
3293                                   statevector_out%allLatBeg(youridy+1):statevector_out%allLatEnd(youridy+1),  &
3294                                   statevector_in%mykBeg:statevector_in%mykEnd, stepIndex),4)
3295          end do
3296        end do
3297        !$OMP END PARALLEL DO
3298      else if (sendrecvKind == 8 .and. inKind == 4) then
3299        call gsv_getField(statevector_in,field_in_r4_ptr)
3300        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3301        do youridy = 0, (mmpi_npey-1)
3302          do youridx = 0, (mmpi_npex-1)
3303            yourid = youridx + youridy*mmpi_npex
3304            gd_send_varsLevs_r8(1:statevector_out%allLonPerPE(youridx+1),  &
3305                                1:statevector_out%allLatPerPE(youridy+1),  &
3306                                1:statevector_in%mykCount, yourid+1) =  &
3307              real(field_in_r4_ptr(statevector_out%allLonBeg(youridx+1):statevector_out%allLonEnd(youridx+1),  &
3308                                   statevector_out%allLatBeg(youridy+1):statevector_out%allLatEnd(youridy+1),  &
3309                                   statevector_in%mykBeg:statevector_in%mykEnd, stepIndex),8)
3310          end do
3311        end do
3312        !$OMP END PARALLEL DO
3313      else if (sendrecvKind == 8 .and. inKind == 8) then
3314        call gsv_getField(statevector_in,field_in_r8_ptr)
3315        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3316        do youridy = 0, (mmpi_npey-1)
3317          do youridx = 0, (mmpi_npex-1)
3318            yourid = youridx + youridy*mmpi_npex
3319            gd_send_varsLevs_r8(1:statevector_out%allLonPerPE(youridx+1),  &
3320                                1:statevector_out%allLatPerPE(youridy+1),  &
3321                                1:statevector_in%mykCount, yourid+1) =  &
3322              field_in_r8_ptr(statevector_out%allLonBeg(youridx+1):statevector_out%allLonEnd(youridx+1),  &
3323                              statevector_out%allLatBeg(youridy+1):statevector_out%allLatEnd(youridy+1),  &
3324                              statevector_in%mykBeg:statevector_in%mykEnd, stepIndex)
3325          end do
3326        end do
3327        !$OMP END PARALLEL DO
3328      end if
3329
3330      nsize = statevector_out%lonPerPEmax * statevector_out%latPerPEmax * maxkCount
3331      if (mmpi_nprocs > 1) then
3332        if (sendrecvKind == 4) then
3333          call rpn_comm_alltoall(gd_send_varsLevs_r4, nsize, 'mpi_real4',  &
3334                                 gd_recv_varsLevs_r4, nsize, 'mpi_real4', 'grid', ierr)
3335        else
3336          call rpn_comm_alltoall(gd_send_varsLevs_r8, nsize, 'mpi_real8',  &
3337                                 gd_recv_varsLevs_r8, nsize, 'mpi_real8', 'grid', ierr)
3338        end if
3339      else
3340        if (sendrecvKind == 4) then
3341          gd_recv_varsLevs_r4(:,:,:,1) = gd_send_varsLevs_r4(:,:,:,1)
3342        else
3343          gd_recv_varsLevs_r8(:,:,:,1) = gd_send_varsLevs_r8(:,:,:,1)
3344        end if
3345      end if
3346
3347      if (sendrecvKind == 4 .and. outKind == 4) then
3348        call gsv_getField(statevector_out,field_out_r4_ptr)
3349        !$OMP PARALLEL DO PRIVATE(yourid)
3350        do yourid = 0, (mmpi_nprocs-1)
3351          field_out_r4_ptr(statevector_out%myLonBeg:statevector_out%myLonEnd, &
3352                           statevector_out%myLatBeg:statevector_out%myLatEnd, &
3353                           statevector_in%allkBeg(yourid+1):statevector_in%allkEnd(yourid+1), stepIndex) =   &
3354            gd_recv_varsLevs_r4(1:statevector_out%lonPerPE,  &
3355                                1:statevector_out%latPerPE,  &
3356                                1:statevector_in%allkCount(yourid+1), yourid+1)
3357        end do
3358        !$OMP END PARALLEL DO
3359      else if (sendrecvKind == 4 .and. outKind == 8) then
3360        call gsv_getField(statevector_out,field_out_r8_ptr)
3361        !$OMP PARALLEL DO PRIVATE(yourid)
3362        do yourid = 0, (mmpi_nprocs-1)
3363          field_out_r8_ptr(statevector_out%myLonBeg:statevector_out%myLonEnd, &
3364                           statevector_out%myLatBeg:statevector_out%myLatEnd, &
3365                           statevector_in%allkBeg(yourid+1):statevector_in%allkEnd(yourid+1), stepIndex) =   &
3366            real(gd_recv_varsLevs_r4(1:statevector_out%lonPerPE,  &
3367                                     1:statevector_out%latPerPE,  &
3368                                     1:statevector_in%allkCount(yourid+1), yourid+1),8)
3369        end do
3370        !$OMP END PARALLEL DO
3371      else if (sendrecvKind == 8 .and. outKind == 4) then
3372        call gsv_getField(statevector_out,field_out_r4_ptr)
3373        !$OMP PARALLEL DO PRIVATE(yourid)
3374        do yourid = 0, (mmpi_nprocs-1)
3375          field_out_r4_ptr(statevector_out%myLonBeg:statevector_out%myLonEnd, &
3376                           statevector_out%myLatBeg:statevector_out%myLatEnd, &
3377                           statevector_in%allkBeg(yourid+1):statevector_in%allkEnd(yourid+1), stepIndex) =   &
3378            real(gd_recv_varsLevs_r8(1:statevector_out%lonPerPE,  &
3379                                     1:statevector_out%latPerPE,  &
3380                                     1:statevector_in%allkCount(yourid+1), yourid+1),4)
3381        end do
3382        !$OMP END PARALLEL DO
3383      else if (sendrecvKind == 8 .and. outKind == 8) then
3384        call gsv_getField(statevector_out,field_out_r8_ptr)
3385        !$OMP PARALLEL DO PRIVATE(yourid)
3386        do yourid = 0, (mmpi_nprocs-1)
3387          field_out_r8_ptr(statevector_out%myLonBeg:statevector_out%myLonEnd, &
3388                           statevector_out%myLatBeg:statevector_out%myLatEnd, &
3389                           statevector_in%allkBeg(yourid+1):statevector_in%allkEnd(yourid+1), stepIndex) =   &
3390            gd_recv_varsLevs_r8(1:statevector_out%lonPerPE,  &
3391                                1:statevector_out%latPerPE,  &
3392                                1:statevector_in%allkCount(yourid+1), yourid+1)
3393        end do
3394        !$OMP END PARALLEL DO
3395      end if
3396
3397    end do ! stepIndex
3398
3399    if (statevector_in%heightSfcPresent .and. statevector_out%heightSfcPresent) then
3400      allocate(gd_send_height(statevector_out%lonPerPEmax,statevector_out%latPerPEmax,mmpi_nprocs))
3401      allocate(gd_recv_height(statevector_out%lonPerPEmax,statevector_out%latPerPEmax))
3402      gd_send_height(:,:,:) = 0.0d0
3403      gd_recv_height(:,:) = 0.0d0
3404      field_height_in_ptr => gsv_getHeightSfc(statevector_in)
3405      field_height_out_ptr => gsv_getHeightSfc(statevector_out)
3406
3407      if (mmpi_myid == 0) then
3408        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3409        do youridy = 0, (mmpi_npey-1)
3410          do youridx = 0, (mmpi_npex-1)
3411            yourid = youridx + youridy*mmpi_npex
3412            gd_send_height(1:statevector_out%allLonPerPE(youridx+1),  &
3413                           1:statevector_out%allLatPerPE(youridy+1), yourid+1) =  &
3414              field_height_in_ptr(statevector_out%allLonBeg(youridx+1):statevector_out%allLonEnd(youridx+1),  &
3415                                  statevector_out%allLatBeg(youridy+1):statevector_out%allLatEnd(youridy+1))
3416          end do
3417        end do
3418        !$OMP END PARALLEL DO
3419      end if
3420
3421      nsize = statevector_out%lonPerPEmax * statevector_out%latPerPEmax
3422      do yourid = 0, (mmpi_nprocs-1)
3423        displs(yourid+1) = yourid*nsize
3424        nsizes(yourid+1) = nsize
3425      end do
3426      call rpn_comm_scatterv(gd_send_height, nsizes, displs, 'mpi_double_precision', &
3427                             gd_recv_height, nsize, 'mpi_double_precision', &
3428                             0, 'grid', ierr)
3429
3430      field_height_out_ptr(statevector_out%myLonBeg:statevector_out%myLonEnd, &
3431                           statevector_out%myLatBeg:statevector_out%myLatEnd) =   &
3432        gd_recv_height(1:statevector_out%lonPerPE,  &
3433                       1:statevector_out%latPerPE)
3434
3435      deallocate(gd_recv_height)
3436      deallocate(gd_send_height)
3437    end if
3438
3439    ! Copy over the mask, if it exists
3440    call gsv_copyMask(statevector_in, statevector_out)
3441
3442    ! Copy metadata
3443    if (associated(statevector_in%dateStampList)) statevector_out%dateStampList(:) = statevector_in%dateStampList(:)
3444    if (associated(statevector_in%dateOriginList)) statevector_out%dateOriginList(:) = statevector_in%dateOriginList(:)
3445    if (associated(statevector_in%npasList))      statevector_out%npasList(:) = statevector_in%npasList(:)
3446    if (associated(statevector_in%ip2List))       statevector_out%ip2List(:) = statevector_in%ip2List(:)
3447    statevector_out%deet = statevector_in%deet
3448    statevector_out%etiket = statevector_in%etiket
3449
3450    call utl_tmg_stop(164)
3451
3452  end subroutine gsv_transposeVarsLevsToTiles
3453
3454  !--------------------------------------------------------------------------
3455  ! gsv_transposeTilesToVarsLevs
3456  !--------------------------------------------------------------------------
3457  subroutine gsv_transposeTilesToVarsLevs(statevector_in, statevector_out, &
3458                                          beSilent_opt)
3459    !
3460    !:Purpose: Transposes the data from mpi_distribution=Tiles to VarsLevs
3461    !
3462    implicit none
3463
3464    ! Arguments:
3465    type(struct_gsv),  intent(in)    :: statevector_in
3466    type(struct_gsv),  intent(inout) :: statevector_out
3467    logical, optional, intent(in)    :: beSilent_opt
3468
3469    ! Locals:
3470    integer :: youridx, youridy, yourid, nsize, maxkcount, ierr, mpiTagUU, mpiTagVV
3471    integer :: sendrecvKind, inKind, outKind, kIndexUU, kIndexVV, MpiIdUU, MpiIdVV
3472    integer :: levUV, stepIndex, numSend, numRecv
3473    integer :: requestIdSend(stateVector_out%nk), requestIdRecv(stateVector_out%nk)
3474    integer :: mpiStatuses(mpi_status_size,stateVector_out%nk)
3475    real(4), pointer     :: field_in_r4_ptr(:,:,:,:), field_out_r4_ptr(:,:,:,:)
3476    real(8), pointer     :: field_in_r8_ptr(:,:,:,:), field_out_r8_ptr(:,:,:,:)
3477    real(8), pointer     :: field_height_in_ptr(:,:), field_height_out_ptr(:,:)
3478    real(8), allocatable :: gd_send_height(:,:), gd_recv_height(:,:,:)
3479    logical :: beSilent
3480
3481    if ( present(beSilent_opt) ) then
3482      beSilent = beSilent_opt
3483    else
3484      beSilent = .false.
3485    end if
3486    
3487    if ( .not. beSilent ) then
3488      call msg('gsv_transposeTilesToVarsLevs','START', verb_opt=2)
3489      call msg_memUsage('gsv_transposeTilesToVarsLevs')
3490    end if
3491
3492    if (statevector_in%mpi_distribution == 'None' .and. &
3493        statevector_out%mpi_distribution == 'None') then
3494      write(*,*) 'gsv_transposeTilesToVarsLevs: Just copy non-MPI statevector'
3495      call gsv_copy(statevector_in,statevector_out)
3496      return
3497    end if
3498
3499    if (statevector_in%mpi_distribution /= 'Tiles') then
3500      call utl_abort('gsv_transposeTilesToVarsLevs: input statevector must have Tiles mpi distribution') 
3501    end if
3502
3503    if (statevector_out%mpi_distribution /= 'VarsLevs') then
3504      call utl_abort('gsv_transposeTilesToVarsLevs: out statevector must have VarsLevs mpi distribution')
3505    end if
3506
3507    inKind = statevector_in%dataKind
3508    outKind = statevector_out%dataKind
3509    if (inKind == 4 .or. outKind == 4) then
3510      sendrecvKind = 4
3511    else
3512      sendrecvKind = 8
3513    end if
3514
3515    if (sendrecvKind == 4) then
3516      call utl_tmg_start(165,'low-level--gsv_tilesToVarsLevs_r4')
3517    else
3518      call utl_tmg_start(166,'low-level--gsv_tilesToVarsLevs_r8')
3519    end if
3520
3521    maxkCount = maxval(statevector_out%allkCount(:))
3522    if (sendrecvKind == 4) then
3523      call utl_reAllocate(gd_send_varsLevs_r4, statevector_in%lonPerPEmax, statevector_in%latPerPEmax, &
3524                          maxkCount, mmpi_nprocs)
3525      call utl_reAllocate(gd_recv_varsLevs_r4, statevector_in%lonPerPEmax, statevector_in%latPerPEmax, &
3526                          maxkCount, mmpi_nprocs)
3527    else
3528      call utl_reAllocate(gd_send_varsLevs_r8, statevector_in%lonPerPEmax, statevector_in%latPerPEmax, &
3529                          maxkCount, mmpi_nprocs)
3530      call utl_reAllocate(gd_recv_varsLevs_r8, statevector_in%lonPerPEmax, statevector_in%latPerPEmax, &
3531                          maxkCount, mmpi_nprocs)
3532    end if
3533
3534    do stepIndex = 1, statevector_in%numStep
3535
3536      if (sendrecvKind == 8 .and. inKind == 8) then
3537        call gsv_getField(statevector_in,field_in_r8_ptr)
3538        !$OMP PARALLEL DO PRIVATE(yourid)
3539        do yourid = 0, (mmpi_nprocs-1)
3540          gd_send_varsLevs_r8(1:statevector_in%lonPerPE, &
3541                              1:statevector_in%latPerPE, &
3542                              1:statevector_out%allkCount(yourid+1), yourid+1) =  &
3543              field_in_r8_ptr(statevector_in%myLonBeg:statevector_in%myLonEnd, &
3544                              statevector_in%myLatBeg:statevector_in%myLatEnd, &
3545                              statevector_out%allkBeg(yourid+1):statevector_out%allkEnd(yourid+1), stepIndex)
3546        end do
3547        !$OMP END PARALLEL DO
3548      else if (sendrecvKind == 4 .and. inKind == 4) then
3549        call gsv_getField(statevector_in,field_in_r4_ptr)
3550        !$OMP PARALLEL DO PRIVATE(yourid)
3551        do yourid = 0, (mmpi_nprocs-1)
3552          gd_send_varsLevs_r4(1:statevector_in%lonPerPE, &
3553                              1:statevector_in%latPerPE, &
3554                              1:statevector_out%allkCount(yourid+1), yourid+1) =  &
3555              field_in_r4_ptr(statevector_in%myLonBeg:statevector_in%myLonEnd, &
3556                              statevector_in%myLatBeg:statevector_in%myLatEnd, &
3557                              statevector_out%allkBeg(yourid+1):statevector_out%allkEnd(yourid+1), stepIndex)
3558        end do
3559        !$OMP END PARALLEL DO
3560      else if (sendrecvKind == 4 .and. inKind == 8) then
3561        call gsv_getField(statevector_in,field_in_r8_ptr)
3562        !$OMP PARALLEL DO PRIVATE(yourid)
3563        do yourid = 0, (mmpi_nprocs-1)
3564          gd_send_varsLevs_r4(1:statevector_in%lonPerPE, &
3565                              1:statevector_in%latPerPE, &
3566                              1:statevector_out%allkCount(yourid+1), yourid+1) =  &
3567              real(field_in_r8_ptr(statevector_in%myLonBeg:statevector_in%myLonEnd, &
3568                                   statevector_in%myLatBeg:statevector_in%myLatEnd, &
3569                                   statevector_out%allkBeg(yourid+1):statevector_out%allkEnd(yourid+1), stepIndex),4)
3570        end do
3571        !$OMP END PARALLEL DO
3572      else
3573        call utl_abort('gsv_transposeTilesToVarsLevs: Incompatible mix of real 4 and 8 before alltoall mpi comm')
3574      end if
3575
3576      nsize = statevector_in%lonPerPEmax * statevector_in%latPerPEmax * maxkCount
3577      if (mmpi_nprocs > 1) then
3578        if (sendrecvKind == 4) then
3579          call rpn_comm_alltoall(gd_send_varsLevs_r4, nsize, 'mpi_real4',  &
3580                                 gd_recv_varsLevs_r4, nsize, 'mpi_real4', 'grid', ierr)
3581        else
3582          call rpn_comm_alltoall(gd_send_varsLevs_r8, nsize, 'mpi_real8',  &
3583                                 gd_recv_varsLevs_r8, nsize, 'mpi_real8', 'grid', ierr)
3584        end if
3585      else
3586        if (sendrecvKind == 4) then
3587          gd_recv_varsLevs_r4(:,:,:,1) = gd_send_varsLevs_r4(:,:,:,1)
3588        else
3589          gd_recv_varsLevs_r8(:,:,:,1) = gd_send_varsLevs_r8(:,:,:,1)
3590        end if
3591      end if
3592
3593      if (sendrecvKind == 8 .and. outKind == 8) then
3594        call gsv_getField(statevector_out,field_out_r8_ptr)
3595        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3596        do youridy = 0, (mmpi_npey-1)
3597          do youridx = 0, (mmpi_npex-1)
3598            yourid = youridx + youridy*mmpi_npex
3599            field_out_r8_ptr(statevector_in%allLonBeg(youridx+1):statevector_in%allLonEnd(youridx+1),  &
3600                             statevector_in%allLatBeg(youridy+1):statevector_in%allLatEnd(youridy+1),  &
3601                             statevector_out%mykBeg:statevector_out%mykEnd, stepIndex) = &
3602                gd_recv_varsLevs_r8(1:statevector_in%allLonPerPE(youridx+1),  &
3603                                    1:statevector_in%allLatPerPE(youridy+1),  &
3604                                    1:statevector_out%mykCount, yourid+1)
3605          end do
3606        end do
3607        !$OMP END PARALLEL DO
3608      else if (sendrecvKind == 4 .and. outKind == 4) then
3609        call gsv_getField(statevector_out,field_out_r4_ptr)
3610        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3611        do youridy = 0, (mmpi_npey-1)
3612          do youridx = 0, (mmpi_npex-1)
3613            yourid = youridx + youridy*mmpi_npex
3614            field_out_r4_ptr(statevector_in%allLonBeg(youridx+1):statevector_in%allLonEnd(youridx+1),  &
3615                             statevector_in%allLatBeg(youridy+1):statevector_in%allLatEnd(youridy+1),  &
3616                             statevector_out%mykBeg:statevector_out%mykEnd, stepIndex) = &
3617                gd_recv_varsLevs_r4(1:statevector_in%allLonPerPE(youridx+1),  &
3618                                    1:statevector_in%allLatPerPE(youridy+1),  &
3619                                    1:statevector_out%mykCount, yourid+1)
3620          end do
3621        end do
3622        !$OMP END PARALLEL DO
3623      else if (sendrecvKind == 4 .and. outKind == 8) then
3624        call gsv_getField(statevector_out,field_out_r8_ptr)
3625        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3626        do youridy = 0, (mmpi_npey-1)
3627          do youridx = 0, (mmpi_npex-1)
3628            yourid = youridx + youridy*mmpi_npex
3629            field_out_r8_ptr(statevector_in%allLonBeg(youridx+1):statevector_in%allLonEnd(youridx+1),  &
3630                             statevector_in%allLatBeg(youridy+1):statevector_in%allLatEnd(youridy+1),  &
3631                             statevector_out%mykBeg:statevector_out%mykEnd, stepIndex) = &
3632                real(gd_recv_varsLevs_r4(1:statevector_in%allLonPerPE(youridx+1),  &
3633                                         1:statevector_in%allLatPerPE(youridy+1),  &
3634                                         1:statevector_out%mykCount, yourid+1), 8)
3635          end do
3636        end do
3637        !$OMP END PARALLEL DO
3638      else
3639        call utl_abort('gsv_transposeTilesToVarsLevs: Incompatible mix of real 4 and 8 after alltoall mpi comm')
3640      end if
3641
3642      ! send copy of wind component to task that has other component
3643      if (gsv_varExist(stateVector_out, 'UU') .and.  &
3644          gsv_varExist(stateVector_out, 'VV')) then
3645
3646        numSend = 0
3647        numRecv = 0
3648        LOOP_KINDEX: do kIndexUU = 1, stateVector_out%nk
3649          if (gsv_getVarNameFromK(stateVector_out, kIndexUU) /= 'UU') cycle LOOP_KINDEX
3650
3651          ! get k index for corresponding VV component
3652          levUV = gsv_getLevFromK(stateVector_out, kIndexUU)
3653          kIndexVV = levUV + gsv_getOffsetFromVarName(statevector_out,'VV')
3654
3655          ! get Mpi task id for both U and V
3656          MpiIdUU = gsv_getMpiIdFromK(statevector_out,kIndexUU)
3657          MpiIdVV = gsv_getMpiIdFromK(statevector_out,kIndexVV)
3658
3659          if (MpiIdUU == MpiIdVV .and. mmpi_myid == MpiIdUU) then
3660            if (outKind == 8) then
3661              statevector_out%gdUV(kIndexUU)%r8(:, :, stepIndex) =  statevector_out%gd_r8(:, :, kIndexVV, stepIndex)
3662              statevector_out%gdUV(kIndexVV)%r8(:, :, stepIndex) =  statevector_out%gd_r8(:, :, kIndexUU, stepIndex)
3663            else
3664              statevector_out%gdUV(kIndexUU)%r4(:, :, stepIndex) =  statevector_out%gd_r4(:, :, kIndexVV, stepIndex)
3665              statevector_out%gdUV(kIndexVV)%r4(:, :, stepIndex) =  statevector_out%gd_r4(:, :, kIndexUU, stepIndex)
3666            end if
3667            cycle LOOP_KINDEX
3668          end if
3669
3670          ! need to do mpi communication to exchange wind components
3671          nsize = statevector_out%ni * statevector_out%nj
3672          mpiTagUU = kIndexUU
3673          mpiTagVV = kIndexVV
3674          if (mmpi_myid == MpiIdUU) then ! I have UU
3675
3676            numRecv = numRecv + 1
3677            if (outKind == 8) then
3678              call mpi_irecv(statevector_out%gdUV(kIndexUU)%r8(:, :, stepIndex),  &
3679                             nsize, mmpi_datyp_real8, MpiIdVV, mpiTagVV,  &
3680                             mmpi_comm_grid, requestIdRecv(numRecv), ierr)
3681            else
3682              call mpi_irecv(statevector_out%gdUV(kIndexUU)%r4(:, :, stepIndex),  &
3683                             nsize, mmpi_datyp_real4, MpiIdVV, mpiTagVV,  &
3684                             mmpi_comm_grid, requestIdRecv(numRecv), ierr)
3685            end if
3686
3687            numSend = numSend + 1
3688            if (outKind == 8) then
3689              call mpi_isend(statevector_out%gd_r8(:, :, kIndexUU, stepIndex),  &
3690                             nsize, mmpi_datyp_real8, MpiIdVV, mpiTagUU,  &
3691                             mmpi_comm_grid, requestIdSend(numSend), ierr)
3692            else
3693              call mpi_isend(statevector_out%gd_r4(:, :, kIndexUU, stepIndex),  &
3694                             nsize, mmpi_datyp_real4, MpiIdVV, mpiTagUU,  &
3695                             mmpi_comm_grid, requestIdSend(numSend), ierr)
3696            end if
3697
3698          else if (mmpi_myid == MpiIdVV) then  ! I have VV
3699
3700            numRecv = numRecv + 1
3701            if (outKind == 8) then
3702              call mpi_irecv(statevector_out%gdUV(kIndexVV)%r8(:, :, stepIndex),  &
3703                             nsize, mmpi_datyp_real8, MpiIdUU, mpiTagUU,  &
3704                             mmpi_comm_grid, requestIdRecv(numRecv), ierr)
3705            else
3706              call mpi_irecv(statevector_out%gdUV(kIndexVV)%r4(:, :, stepIndex),  &
3707                             nsize, mmpi_datyp_real4, MpiIdUU, mpiTagUU,  &
3708                             mmpi_comm_grid, requestIdRecv(numRecv), ierr)
3709            end if
3710
3711            numSend = numSend + 1
3712            if (outKind == 8) then
3713              call mpi_isend(statevector_out%gd_r8(:, :, kIndexVV, stepIndex),  &
3714                             nsize, mmpi_datyp_real8, MpiIdUU, mpiTagVV,  &
3715                             mmpi_comm_grid, requestIdSend(numSend), ierr)
3716            else
3717              call mpi_isend(statevector_out%gd_r4(:, :, kIndexVV, stepIndex),  &
3718                             nsize, mmpi_datyp_real4, MpiIdUU, mpiTagVV,  &
3719                             mmpi_comm_grid, requestIdSend(numSend), ierr)
3720            end if
3721
3722          end if
3723
3724        end do LOOP_KINDEX
3725
3726        if (numRecv > 0) then
3727          call mpi_waitAll(numRecv, requestIdRecv(1:numRecv), mpiStatuses(:,1:numRecv), ierr)
3728        end if
3729
3730        if (numSend > 0) then
3731          call mpi_waitAll(numSend, requestIdSend(1:numSend), mpiStatuses(:,1:numSend), ierr)
3732        end if
3733
3734      end if ! UU and VV exist
3735
3736    end do ! stepIndex
3737
3738    ! gather up surface height onto task 0
3739    if (statevector_in%heightSfcPresent .and. statevector_out%heightSfcPresent) then
3740      allocate(gd_send_height(statevector_in%lonPerPEmax,statevector_in%latPerPEmax))
3741      allocate(gd_recv_height(statevector_in%lonPerPEmax,statevector_in%latPerPEmax,mmpi_nprocs))
3742      field_height_in_ptr => gsv_getHeightSfc(statevector_in)
3743      field_height_out_ptr => gsv_getHeightSfc(statevector_out)
3744
3745      gd_send_height(:,:) = 0.0D0
3746      gd_send_height(1:statevector_in%lonPerPE,1:statevector_in%latPerPE) = field_height_in_ptr(:,:)
3747
3748      nsize = statevector_in%lonPerPEmax * statevector_in%latPerPEmax
3749      call rpn_comm_gather(gd_send_height, nsize, 'mpi_double_precision',  &
3750                           gd_recv_height, nsize, 'mpi_double_precision', 0, 'grid', ierr)
3751
3752      if (mmpi_myid == 0) then
3753        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3754        do youridy = 0, (mmpi_npey-1)
3755          do youridx = 0, (mmpi_npex-1)
3756            yourid = youridx + youridy*mmpi_npex
3757            field_height_out_ptr(statevector_in%allLonBeg(youridx+1):statevector_in%allLonEnd(youridx+1),  &
3758                             statevector_in%allLatBeg(youridy+1):statevector_in%allLatEnd(youridy+1)) = &
3759                gd_recv_height(1:statevector_in%allLonPerPE(youridx+1),  &
3760                           1:statevector_in%allLatPerPE(youridy+1), yourid+1)
3761          end do
3762        end do
3763        !$OMP END PARALLEL DO
3764      end if
3765
3766      deallocate(gd_send_height)
3767      deallocate(gd_recv_height)
3768    end if ! heightSfcPresent
3769
3770    ! Copy over the mask, if it exists
3771    call gsv_copyMask(statevector_in, statevector_out, beSilent_opt=beSilent)
3772    
3773    if ( .not. beSilent ) then
3774      call msg('gsv_transposeTilesToVarsLevs','END', verb_opt=2)
3775      call msg_memUsage('gsv_transposeTilesToVarsLevs')
3776    end if
3777
3778    if (sendrecvKind == 4) then
3779      call utl_tmg_stop(165)
3780    else
3781      call utl_tmg_stop(166)
3782    end if
3783
3784  end subroutine gsv_transposeTilesToVarsLevs
3785
3786  !--------------------------------------------------------------------------
3787  ! gsv_transposeTilesToVarsLevsAd
3788  !--------------------------------------------------------------------------
3789  subroutine gsv_transposeTilesToVarsLevsAd(statevector_in, statevector_out)
3790    !
3791    !:Purpose: Adjoint of Transpose the data from mpi_distribution=Tiles to
3792    !           VarsLevs
3793    !
3794    implicit none
3795
3796    ! Arguments:
3797    type(struct_gsv), intent(in)    :: statevector_in
3798    type(struct_gsv), intent(inout) :: statevector_out
3799
3800    ! Locals:
3801    integer :: youridx, youridy, yourid, nsize, maxkcount, ierr, mpiIdUU, mpiIdVV
3802    integer :: sendrecvKind, inKind, outKind, kIndexUU, kIndexVV, kIndex
3803    integer :: levUV, mpiTagUU, mpiTagVV, stepIndex, numSend, numRecv
3804    integer :: requestIdSend(stateVector_in%nk), requestIdRecv(stateVector_in%nk)
3805    integer :: mpiStatuses(mpi_status_size,stateVector_in%nk)
3806    integer :: displs(mmpi_nprocs), nsizes(mmpi_nprocs)
3807    real(4), pointer     :: field_in_r4_ptr(:,:,:,:), field_out_r4_ptr(:,:,:,:)
3808    real(8), pointer     :: field_in_r8_ptr(:,:,:,:), field_out_r8_ptr(:,:,:,:)
3809    real(8), allocatable :: gd_send_height(:,:,:), gd_recv_height(:,:)
3810    real(8), pointer     :: field_height_in_ptr(:,:), field_height_out_ptr(:,:)
3811    real(4), allocatable :: gdUV_r4(:,:,:), gd_r4(:,:,:)
3812    real(8), allocatable :: gdUV_r8(:,:,:), gd_r8(:,:,:)
3813
3814    call utl_tmg_start(167,'low-level--gsv_tilesToVarsLevsAD')
3815
3816    if (statevector_in%mpi_distribution /= 'VarsLevs') then
3817      call utl_abort('gsv_transposeTilesToVarsLevsAd: input statevector must have VarsLevs mpi distribution') 
3818    end if
3819
3820    if (statevector_out%mpi_distribution /= 'Tiles') then
3821      call utl_abort('gsv_transposeTilesToVarsLevsAd: out statevector must have Tiles mpi distribution')
3822    end if
3823
3824    inKind = statevector_in%dataKind
3825    outKind = statevector_out%dataKind
3826    if (inKind == 4 .and. outKind == 4) then
3827      sendrecvKind = 4
3828    else if (inKind == 8 .and. outKind == 8) then
3829      sendrecvKind = 8
3830    else
3831      call utl_abort('gsv_transposeTilesToVarsLevsAd: input and output must have same dataKind')
3832    end if
3833
3834    do stepIndex = 1, statevector_out%numStep
3835
3836      ! adjoint of: send copy of wind component to task that has the other component
3837      if (gsv_varExist(stateVector_in, 'UU') .and.  &
3838          gsv_varExist(stateVector_in, 'VV')) then
3839
3840        if (statevector_in%UVComponentPresent) then
3841          if (sendrecvKind == 4) then
3842            allocate(gdUV_r4(statevector_in%ni,statevector_in%nj, &
3843                             statevector_in%myUVkBeg:statevector_in%myUVkEnd))
3844            allocate(gd_r4(statevector_in%ni,statevector_in%nj, &
3845                           statevector_in%myUVkBeg:statevector_in%myUVkEnd))
3846          else
3847            allocate(gdUV_r8(statevector_in%ni,statevector_in%nj, &
3848                             statevector_in%myUVkBeg:statevector_in%myUVkEnd))
3849            allocate(gd_r8(statevector_in%ni,statevector_in%nj, &
3850                           statevector_in%myUVkBeg:statevector_in%myUVkEnd))
3851          end if
3852        end if
3853
3854        numSend = 0
3855        numRecv = 0
3856        LOOP_KINDEX: do kIndexUU = 1, stateVector_in%nk
3857          if (gsv_getVarNameFromK(stateVector_in, kIndexUU) /= 'UU') cycle LOOP_KINDEX
3858
3859          ! get k index for corresponding VV component
3860          levUV = gsv_getLevFromK(stateVector_in, kIndexUU)
3861          kIndexVV = levUV + gsv_getOffsetFromVarName(statevector_in,'VV')
3862
3863          ! get Mpi task id for both U and V
3864          MpiIdUU = gsv_getMpiIdFromK(statevector_in,kIndexUU)
3865          MpiIdVV = gsv_getMpiIdFromK(statevector_in,kIndexVV)
3866
3867          if (MpiIdUU == MpiIdVV .and.  mmpi_myid == MpiIdUU) then
3868            if (sendrecvKind == 4) then
3869              gd_r4(:, :, kIndexUU) = statevector_in%gdUV(kIndexVV)%r4(:, :, stepIndex)
3870              gd_r4(:, :, kIndexVV) = statevector_in%gdUV(kIndexUU)%r4(:, :, stepIndex)
3871            else
3872              gd_r8(:, :, kIndexUU) = statevector_in%gdUV(kIndexVV)%r8(:, :, stepIndex)
3873              gd_r8(:, :, kIndexVV) = statevector_in%gdUV(kIndexUU)%r8(:, :, stepIndex)
3874            end if
3875            cycle LOOP_KINDEX
3876          end if
3877
3878          ! need to do mpi communication to exchange wind components
3879          nsize = statevector_in%ni * statevector_in%nj
3880          mpiTagUU = kIndexUU
3881          mpiTagVV = kIndexVV
3882
3883          if (mmpi_myid == MpiIdUU) then ! I have UU
3884
3885            numRecv = numRecv + 1
3886            if (sendrecvKind == 4) then
3887              call mpi_irecv(gd_r4(:, :, kIndexUU),  &
3888                             nsize, mmpi_datyp_real4, MpiIdVV, mpiTagVV,  &
3889                             mmpi_comm_grid, requestIdRecv(numRecv), ierr)
3890            else
3891              call mpi_irecv(gd_r8(:, :, kIndexUU),  &
3892                             nsize, mmpi_datyp_real8, MpiIdVV, mpiTagVV,  &
3893                             mmpi_comm_grid, requestIdRecv(numRecv), ierr)
3894            end if
3895
3896            numSend = numSend + 1
3897            if (sendrecvKind == 4) then
3898              gdUV_r4(:, :, kIndexUU) = statevector_in%gdUV(kIndexUU)%r4(:, :, stepIndex)
3899              call mpi_isend(gdUV_r4(:, :, kIndexUU),  &
3900                             nsize, mmpi_datyp_real4, MpiIdVV, mpiTagUU,  &
3901                             mmpi_comm_grid, requestIdSend(numSend), ierr)
3902            else
3903              gdUV_r8(:, :, kIndexUU) = statevector_in%gdUV(kIndexUU)%r8(:, :, stepIndex)
3904              call mpi_isend(gdUV_r8(:, :, kIndexUU),  &
3905                             nsize, mmpi_datyp_real8, MpiIdVV, mpiTagUU,  &
3906                             mmpi_comm_grid, requestIdSend(numSend), ierr)
3907            end if
3908
3909          else if (mmpi_myid == MpiIDVV) then ! I have VV
3910
3911            numRecv = numRecv + 1
3912            if (sendrecvKind == 4) then
3913              call mpi_irecv(gd_r4(:, :, kIndexVV),  &
3914                             nsize, mmpi_datyp_real4, MpiIdUU, mpiTagUU,  &
3915                             mmpi_comm_grid, requestIdRecv(numRecv), ierr)
3916            else
3917              call mpi_irecv(gd_r8(:, :, kIndexVV),  &
3918                             nsize, mmpi_datyp_real8, MpiIdUU, mpiTagUU,  &
3919                             mmpi_comm_grid, requestIdRecv(numRecv), ierr)
3920            end if
3921
3922            numSend = numSend + 1
3923            if (sendrecvKind == 4) then
3924              gdUV_r4(:, :, kIndexVV) = statevector_in%gdUV(kIndexVV)%r4(:, :, stepIndex)
3925              call mpi_isend(gdUV_r4(:, :, kIndexVV),  &
3926                             nsize, mmpi_datyp_real4, MpiIdUU, mpiTagVV,  &
3927                             mmpi_comm_grid, requestIdSend(numSend), ierr)
3928            else
3929              gdUV_r8(:, :, kIndexVV) = statevector_in%gdUV(kIndexVV)%r8(:, :, stepIndex)
3930              call mpi_isend(gdUV_r8(:, :, kIndexVV),  &
3931                             nsize, mmpi_datyp_real8, MpiIdUU, mpiTagVV,  &
3932                             mmpi_comm_grid, requestIdSend(numSend), ierr)
3933            end if
3934
3935          end if
3936
3937        end do LOOP_KINDEX
3938
3939        if (numRecv > 0) then
3940          call mpi_waitAll(numRecv, requestIdRecv(1:numRecv), mpiStatuses(:,1:numRecv), ierr)
3941        end if
3942
3943        if (numSend > 0) then
3944          call mpi_waitAll(numSend, requestIdSend(1:numSend), mpiStatuses(:,1:numSend), ierr)
3945        end if
3946
3947        if (statevector_in%UVComponentPresent) then
3948          do kIndex = statevector_in%myUVkBeg, statevector_in%myUVkEnd
3949            if (sendrecvKind == 4) then
3950              statevector_in%gd_r4(:, :, kIndex, stepIndex) =   &
3951                   statevector_in%gd_r4(:, :, kIndex, stepIndex) +  &
3952                   gd_r4(:, :, kIndex)
3953            else
3954              statevector_in%gd_r8(:, :, kIndex, stepIndex) =   &
3955                   statevector_in%gd_r8(:, :, kIndex, stepIndex) +  &
3956                   gd_r8(:, :, kIndex)
3957            end if
3958          end do
3959          if (sendrecvKind == 4) then
3960            deallocate(gdUV_r4)
3961            deallocate(gd_r4)
3962          else
3963            deallocate(gdUV_r8)
3964            deallocate(gd_r8)
3965          end if
3966        end if
3967
3968      end if ! UU and VV exist
3969
3970      ! do allToAll to redistribute main variables from VarsLevs to Tiles
3971      maxkCount = maxval(statevector_in%allkCount(:))
3972      if (sendrecvKind == 4) then
3973        call utl_reAllocate(gd_send_varsLevs_r4, statevector_out%lonPerPEmax, statevector_out%latPerPEmax, &
3974                            maxkCount, mmpi_nprocs)
3975        call utl_reAllocate(gd_recv_varsLevs_r4, statevector_out%lonPerPEmax, statevector_out%latPerPEmax, &
3976                            maxkCount, mmpi_nprocs)
3977      else
3978        call utl_reAllocate(gd_send_varsLevs_r8, statevector_out%lonPerPEmax, statevector_out%latPerPEmax, &
3979                            maxkCount, mmpi_nprocs)
3980        call utl_reAllocate(gd_recv_varsLevs_r8, statevector_out%lonPerPEmax, statevector_out%latPerPEmax, &
3981                            maxkCount, mmpi_nprocs)
3982      end if
3983
3984      if (sendrecvKind == 4 .and. inKind == 4) then
3985        call gsv_getField(statevector_in,field_in_r4_ptr)
3986        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
3987        do youridy = 0, (mmpi_npey-1)
3988          do youridx = 0, (mmpi_npex-1)
3989            yourid = youridx + youridy*mmpi_npex
3990            gd_send_varsLevs_r4(1:statevector_out%allLonPerPE(youridx+1),  &
3991                                1:statevector_out%allLatPerPE(youridy+1),  &
3992                                1:statevector_in%mykCount, yourid+1) =  &
3993              field_in_r4_ptr(statevector_out%allLonBeg(youridx+1):statevector_out%allLonEnd(youridx+1),  &
3994                              statevector_out%allLatBeg(youridy+1):statevector_out%allLatEnd(youridy+1),  &
3995                              statevector_in%mykBeg:statevector_in%mykEnd, stepIndex)
3996          end do
3997        end do
3998        !$OMP END PARALLEL DO
3999      else if (sendrecvKind == 8 .and. inKind == 8) then
4000        call gsv_getField(statevector_in,field_in_r8_ptr)
4001        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
4002        do youridy = 0, (mmpi_npey-1)
4003          do youridx = 0, (mmpi_npex-1)
4004            yourid = youridx + youridy*mmpi_npex
4005            gd_send_varsLevs_r8(1:statevector_out%allLonPerPE(youridx+1),  &
4006                                1:statevector_out%allLatPerPE(youridy+1),  &
4007                                1:statevector_in%mykCount, yourid+1) =  &
4008              field_in_r8_ptr(statevector_out%allLonBeg(youridx+1):statevector_out%allLonEnd(youridx+1),  &
4009                              statevector_out%allLatBeg(youridy+1):statevector_out%allLatEnd(youridy+1),  &
4010                              statevector_in%mykBeg:statevector_in%mykEnd, stepIndex)
4011          end do
4012        end do
4013        !$OMP END PARALLEL DO
4014      else if (sendrecvKind == 4 .and. inKind == 8) then
4015        call gsv_getField(statevector_in,field_in_r8_ptr)
4016        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
4017        do youridy = 0, (mmpi_npey-1)
4018          do youridx = 0, (mmpi_npex-1)
4019            yourid = youridx + youridy*mmpi_npex
4020            gd_send_varsLevs_r4(1:statevector_out%allLonPerPE(youridx+1),  &
4021                                1:statevector_out%allLatPerPE(youridy+1),  &
4022                                1:statevector_in%mykCount, yourid+1) =  &
4023              real(field_in_r8_ptr(statevector_out%allLonBeg(youridx+1):statevector_out%allLonEnd(youridx+1),  &
4024                                   statevector_out%allLatBeg(youridy+1):statevector_out%allLatEnd(youridy+1),  &
4025                                   statevector_in%mykBeg:statevector_in%mykEnd, stepIndex),4)
4026          end do
4027        end do
4028        !$OMP END PARALLEL DO
4029      else
4030        call utl_abort('gsv_transposeTilesToVarsLevsAd: Incompatible mix of real 4 and 8 before alltoall mpi comm')
4031      end if
4032
4033      nsize = statevector_out%lonPerPEmax * statevector_out%latPerPEmax * maxkCount
4034      if (mmpi_nprocs > 1) then
4035        if (sendrecvKind == 4) then
4036          call rpn_comm_alltoall(gd_send_varsLevs_r4, nsize, 'mpi_real4',  &
4037                                 gd_recv_varsLevs_r4, nsize, 'mpi_real4', 'grid', ierr)
4038        else
4039          call rpn_comm_alltoall(gd_send_varsLevs_r8, nsize, 'mpi_real8',  &
4040                                 gd_recv_varsLevs_r8, nsize, 'mpi_real8', 'grid', ierr)
4041        end if
4042      else
4043        if (sendrecvKind == 4) then
4044           gd_recv_varsLevs_r4(:,:,:,1) = gd_send_varsLevs_r4(:,:,:,1)
4045        else
4046           gd_recv_varsLevs_r8(:,:,:,1) = gd_send_varsLevs_r8(:,:,:,1)
4047        end if
4048      end if
4049
4050      if (sendrecvKind == 4 .and. outKind == 4) then
4051        call gsv_getField(statevector_out,field_out_r4_ptr)
4052        !$OMP PARALLEL DO PRIVATE(yourid)
4053        do yourid = 0, (mmpi_nprocs-1)
4054          field_out_r4_ptr(statevector_out%myLonBeg:statevector_out%myLonEnd, &
4055                           statevector_out%myLatBeg:statevector_out%myLatEnd, &
4056                           statevector_in%allkBeg(yourid+1):statevector_in%allkEnd(yourid+1), stepIndex) =   &
4057            gd_recv_varsLevs_r4(1:statevector_out%lonPerPE,  &
4058                                1:statevector_out%latPerPE,  &
4059                                1:statevector_in%allkCount(yourid+1), yourid+1)
4060        end do
4061        !$OMP END PARALLEL DO
4062      else if (sendrecvKind == 8 .and. outKind == 8) then
4063        call gsv_getField(statevector_out,field_out_r8_ptr)
4064        !$OMP PARALLEL DO PRIVATE(yourid)
4065        do yourid = 0, (mmpi_nprocs-1)
4066          field_out_r8_ptr(statevector_out%myLonBeg:statevector_out%myLonEnd, &
4067                           statevector_out%myLatBeg:statevector_out%myLatEnd, &
4068                           statevector_in%allkBeg(yourid+1):statevector_in%allkEnd(yourid+1), stepIndex) =   &
4069            gd_recv_varsLevs_r8(1:statevector_out%lonPerPE,  &
4070                                1:statevector_out%latPerPE,  &
4071                                1:statevector_in%allkCount(yourid+1), yourid+1)
4072        end do
4073        !$OMP END PARALLEL DO
4074      else if (sendrecvKind == 4 .and. outKind == 8) then
4075        call gsv_getField(statevector_out,field_out_r8_ptr)
4076        !$OMP PARALLEL DO PRIVATE(yourid)
4077        do yourid = 0, (mmpi_nprocs-1)
4078          field_out_r8_ptr(statevector_out%myLonBeg:statevector_out%myLonEnd, &
4079                           statevector_out%myLatBeg:statevector_out%myLatEnd, &
4080                           statevector_in%allkBeg(yourid+1):statevector_in%allkEnd(yourid+1), stepIndex) =   &
4081            real(gd_recv_varsLevs_r4(1:statevector_out%lonPerPE,  &
4082                                     1:statevector_out%latPerPE,  &
4083                                     1:statevector_in%allkCount(yourid+1), yourid+1),8)
4084        end do
4085        !$OMP END PARALLEL DO
4086      else
4087        call utl_abort('gsv_transposeTilesToVarsLevsAd: Incompatible mix of real 4 and 8 after alltoall mpi comm')
4088      end if
4089
4090    end do ! stepIndex
4091
4092    ! scatter surface height from task 0 to all others, 1 tile per task
4093    if (statevector_in%heightSfcPresent .and. statevector_out%heightSfcPresent) then
4094      allocate(gd_send_height(statevector_out%lonPerPEmax,statevector_out%latPerPEmax,mmpi_nprocs))
4095      allocate(gd_recv_height(statevector_out%lonPerPEmax,statevector_out%latPerPEmax))
4096      gd_send_height(:,:,:) = 0.0d0
4097      gd_recv_height(:,:) = 0.0d0
4098      field_height_in_ptr => gsv_getHeightSfc(statevector_in)
4099      field_height_out_ptr => gsv_getHeightSfc(statevector_out)
4100
4101      if (mmpi_myid == 0) then
4102        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
4103        do youridy = 0, (mmpi_npey-1)
4104          do youridx = 0, (mmpi_npex-1)
4105            yourid = youridx + youridy*mmpi_npex
4106            gd_send_height(1:statevector_out%allLonPerPE(youridx+1),  &
4107                       1:statevector_out%allLatPerPE(youridy+1), yourid+1) =  &
4108              field_height_in_ptr(statevector_out%allLonBeg(youridx+1):statevector_out%allLonEnd(youridx+1),  &
4109                              statevector_out%allLatBeg(youridy+1):statevector_out%allLatEnd(youridy+1))
4110          end do
4111        end do
4112        !$OMP END PARALLEL DO
4113      end if
4114
4115      nsize = statevector_out%lonPerPEmax * statevector_out%latPerPEmax
4116      do yourid = 0, (mmpi_nprocs-1)
4117        displs(yourid+1) = yourid*nsize
4118        nsizes(yourid+1) = nsize
4119      end do
4120      call rpn_comm_scatterv(gd_send_height, nsizes, displs, 'mpi_double_precision', &
4121                             gd_recv_height, nsize, 'mpi_double_precision', &
4122                             0, 'grid', ierr)
4123
4124      field_height_out_ptr(statevector_out%myLonBeg:statevector_out%myLonEnd, &
4125                       statevector_out%myLatBeg:statevector_out%myLatEnd) =   &
4126        gd_recv_height(1:statevector_out%lonPerPE,  &
4127                   1:statevector_out%latPerPE)
4128
4129      deallocate(gd_recv_height)
4130      deallocate(gd_send_height)
4131    end if
4132
4133    ! Copy over the mask, if it exists
4134    call gsv_copyMask(statevector_in, statevector_out)
4135
4136    call utl_tmg_stop(167)
4137
4138  end subroutine gsv_transposeTilesToVarsLevsAd
4139
4140  !--------------------------------------------------------------------------
4141  ! gsv_horizSubSample
4142  !--------------------------------------------------------------------------
4143  subroutine gsv_horizSubSample(statevector_in,statevector_out,horizSubSample)
4144    !
4145    !:Purpose: Subsamples the horizontal statevector grid by an integral factor
4146    !           and transform accordingly the fields 
4147    !
4148    implicit none
4149
4150    ! Arguments:
4151    type(struct_gsv), intent(in)   :: statevector_in
4152    type(struct_gsv), intent(out)  :: statevector_out
4153    integer,          intent(in)   :: horizSubSample
4154
4155    ! Locals:
4156    integer             :: relativeFactor, middleStep
4157    real(8)             :: ratio_r8
4158    integer             :: stepIndex, lonIndex, latIndex, ilon_in, ilon_out, ilat_in, ilat_out
4159    integer             :: ilon_in1, ilon_in2, lonIndex_in, ilat_in1, ilat_in2, latIndex_in
4160    character(len=4), pointer :: varNames(:)
4161
4162    if (statevector_out%allocated) then
4163      call gsv_deallocate(statevector_out)
4164    end if
4165
4166    nullify(varNames)
4167    call gsv_varNamesList(varNames, statevector_in)
4168
4169    ! allocate the output statevector
4170    if (associated(statevector_in%dateStampList)) then
4171      middleStep = nint((real(statevector_in%numStep,8)+1.0d0)/2.0d0)
4172      call gsv_allocate(statevector_out,  &
4173                        statevector_in%numStep, statevector_in%hco, statevector_in%vco, &
4174                        datestamp_opt = statevector_in%dateStampList(middleStep),  &
4175                        mpi_local_opt = statevector_in%mpi_local,  &
4176                        horizSubSample_opt = horizSubSample, varNames_opt=varNames)
4177
4178    else
4179      call gsv_allocate(statevector_out,  &
4180                        statevector_in%numStep, statevector_in%hco, statevector_in%vco, &
4181                        mpi_local_opt = statevector_in%mpi_local, &
4182                        horizSubSample_opt = horizSubSample, varNames_opt=varNames)
4183    end if
4184
4185    deallocate(varNames)
4186
4187    if (statevector_out%horizSubSample == statevector_in%horizSubSample) then
4188      call msg('gsv_horizSubSample', &
4189           'gsv_horizSubSample: already at the selected subsample level: ' &
4190           //str(statevector_out%horizSubSample), mpiAll_opt=.false.)
4191      call gsv_copy(statevector_in,statevector_out)
4192      return
4193    end if
4194
4195    if (statevector_out%horizSubSample > statevector_in%horizSubSample) then
4196
4197      ! simple averaging onto a coarser grid
4198      call msg('gsv_horizSubSample', 'increasing subsample level from ' &
4199           //str(statevector_in%horizSubSample)//' to ' &
4200           //str(statevector_out%horizSubSample), mpiAll_opt=.false.)
4201
4202      ratio_r8 = real(statevector_out%horizSubSample,8)/real(statevector_in%horizSubSample,8)
4203      if (abs(ratio_r8 - real(nint(ratio_r8),8)) > 1.0d-5) then
4204        call msg('gsv_horizSubSample', &
4205                             'original subsample level='//str(statevector_in%horizSubSample) &
4206             //new_line('')//'new      subsample level='//str(statevector_out%horizSubSample))
4207        call utl_abort('gsv_horizSubSample: relative change of subsample level not an integer')
4208      end if
4209      relativeFactor = nint(ratio_r8)
4210
4211      call gsv_zero(statevector_out)
4212      !$OMP PARALLEL DO PRIVATE(stepIndex, latIndex, ilat_out, ilat_in1, ilat_in2, &
4213      !$OMP latIndex_in, lonIndex, ilon_out, ilon_in1, ilon_in2, lonIndex_in)
4214      do stepIndex = 1, statevector_out%numStep
4215
4216        do latIndex = 1, statevector_out%latPerPE
4217          ilat_out = latIndex + statevector_out%myLatBeg - 1
4218          ilat_in1 = relativeFactor*(latIndex-1) + statevector_in%myLatBeg
4219          ilat_in2 = ilat_in1 + relativeFactor - 1
4220          do latIndex_in = ilat_in1, ilat_in2
4221
4222            do lonIndex = 1, statevector_out%lonPerPE
4223              ilon_out = lonIndex + statevector_out%myLonBeg - 1
4224              ilon_in1 = relativeFactor*(lonIndex-1) + statevector_in%myLonBeg
4225              ilon_in2 = ilon_in1 + relativeFactor - 1
4226              do lonIndex_in = ilon_in1, ilon_in2
4227
4228                statevector_out%gd_r8(ilon_out, ilat_out, :, stepIndex) =  &
4229                  statevector_out%gd_r8(ilon_out, ilat_out, :, stepIndex) +  &
4230                  statevector_in%gd_r8(lonIndex_in, ilat_in, :, stepIndex)
4231
4232              end do ! lonIndex_in
4233            end do ! lonIndex
4234
4235          end do ! latIndex_in
4236        end do ! latIndex
4237
4238      end do ! stepIndex
4239      !$OMP END PARALLEL DO 
4240
4241    else
4242
4243      ! interpolate to a finer grid
4244      call msg('gsv_horizSubSample', 'decreasing subsample level from ' &
4245           //str(statevector_in%horizSubSample)//' to '  &
4246           //str(statevector_out%horizSubSample), mpiAll_opt=.false.)
4247
4248      ratio_r8 = real(statevector_in%horizSubSample)/real(statevector_out%horizSubSample)
4249      if (abs(ratio_r8 - real(nint(ratio_r8),8)) > 1.0d-5) then
4250        call msg('gsv_horizSubSample', &
4251                             'original subsample level='//str(statevector_in%horizSubSample) &
4252             //new_line('')//'new      subsample level='//str(statevector_out%horizSubSample))
4253        call utl_abort('gsv_horizSubSample: relative change of subsample level not an integer')
4254      end if
4255      relativeFactor = nint(ratio_r8)
4256
4257      ! copy each input grid point to multiple output grid points
4258      !$OMP PARALLEL DO PRIVATE(stepIndex, latIndex, ilat_out, ilat_in, lonIndex, ilon_out, ilon_in)
4259      do stepIndex = 1, statevector_out%numStep
4260
4261        do latIndex = 1, statevector_out%latPerPE
4262          ilat_out = latIndex + statevector_out%myLatBeg - 1
4263          ilat_in  = floor(real(latIndex-1,8)/real(relativeFactor,8)) + statevector_in%myLatBeg
4264          do lonIndex = 1, statevector_out%lonPerPE
4265            ilon_out = lonIndex + statevector_in%myLonBeg - 1
4266            ilon_in  = floor(real(lonIndex-1,8)/real(relativeFactor,8)) + statevector_in%myLonBeg
4267            statevector_out%gd_r8(ilon_out, ilat_out, :, stepIndex) =  &
4268              statevector_in%gd_r8(ilon_in, ilat_in, :, stepIndex)
4269          end do ! lonIndex
4270        end do ! latIndex
4271
4272      end do ! stepIndex
4273      !$OMP END PARALLEL DO 
4274
4275    end if
4276
4277  end subroutine gsv_horizSubSample
4278
4279  !--------------------------------------------------------------------------
4280  ! gsv_transposeStepToVarsLevs
4281  !--------------------------------------------------------------------------
4282  subroutine gsv_transposeStepToVarsLevs(stateVector_1step_r4, &
4283                                         stateVector_VarsLevs, stepIndexBeg)
4284    !
4285    !:Purpose: Transposes the data from a timestep MPI distribution (1 timestep
4286    !           per MPI task) to the `mpi_distribution='VarsLevs'` distribution.
4287    !
4288    !:Comment: Step-wise distribution is mostly only used for file I/O.
4289    !           When in such implicit time distribution, it is necessery that
4290    !           `mpi_local=.false.` and `numStep=1`.
4291    !
4292    implicit none
4293
4294    ! Arguments:
4295    type(struct_gsv), intent(in)    :: stateVector_1step_r4
4296    type(struct_gsv), intent(inout) :: stateVector_VarsLevs
4297    integer,          intent(in)    :: stepIndexBeg
4298
4299    ! Locals:
4300    integer :: ierr, maxkCount, numStepInput, numkToSend, stepIndexInput
4301    integer :: nsize
4302    integer :: sendsizes(mmpi_nprocs), recvsizes(mmpi_nprocs), senddispls(mmpi_nprocs), recvdispls(mmpi_nprocs)
4303    integer :: kIndex, kIndex2, levUV, procIndex, stepIndex
4304    logical :: thisProcIsAsender(mmpi_nprocs)
4305    real(4), allocatable :: gd_send_r4(:,:,:), gd_recv_r4(:,:,:)
4306    real(4), pointer     :: field_in_r4(:,:,:,:), field_out_r4(:,:,:,:)
4307
4308    call utl_tmg_start(162,'low-level--gsv_stepToVarsLevs')
4309
4310    if (statevector_VarsLevs%mpi_distribution /= 'VarsLevs') then
4311      call utl_abort('gsv_transposeStepToVarsLevs: output statevector must have VarsLevs mpi distribution') 
4312    end if
4313
4314    ! do mpi transpose to get 4D stateVector into VarsLevs form
4315    call rpn_comm_barrier('GRID',ierr)
4316    call msg('gsv_transposeStepToVarsLevs', 'START', verb_opt=2)
4317    call msg_memUsage('gsv_transposeStepToVarsLevs')
4318
4319    ! first do surface height, just need to copy over on task 0, assuming task 0 read a file
4320    if (mmpi_myid == 0 .and. stateVector_VarsLevs%heightSfcPresent) then
4321      if (.not.stateVector_1step_r4%allocated) then
4322        call utl_abort('gsv_transposeStepToVarsLevs: Problem with HeightSfc')
4323      end if
4324      stateVector_VarsLevs%HeightSfc(:,:) = stateVector_1step_r4%HeightSfc(:,:)
4325    end if
4326
4327    ! determine which tasks have something to send and let everyone know
4328    do procIndex = 1, mmpi_nprocs
4329      thisProcIsAsender(procIndex) = .false.
4330      if (mmpi_myid == (procIndex-1) .and. stateVector_1step_r4%allocated) then
4331        thisProcIsAsender(procIndex) = .true.
4332      end if
4333      call rpn_comm_bcast(thisProcIsAsender(procIndex), 1,  &
4334                          'MPI_LOGICAL', procIndex-1, 'GRID', ierr)
4335    end do
4336
4337    numStepInput = 0
4338    do procIndex = 1, mmpi_nprocs
4339      if (thisProcIsAsender(procIndex)) numStepInput = numStepInput + 1
4340    end do
4341    call msg('gsv_transposeStepToVarsLevs', 'numStepInput = '//str(numStepInput))
4342
4343    maxkCount = maxval(stateVector_VarsLevs%allkCount(:))
4344    numkToSend = min(mmpi_nprocs,stateVector_VarsLevs%nk)
4345    allocate(gd_recv_r4(stateVector_VarsLevs%ni,stateVector_VarsLevs%nj,numStepInput))
4346    gd_recv_r4(:,:,:) = 0.0
4347    if (stateVector_1step_r4%allocated) then
4348      allocate(gd_send_r4(stateVector_VarsLevs%ni,stateVector_VarsLevs%nj,numkToSend))
4349    else
4350      allocate(gd_send_r4(1,1,1))
4351    end if
4352    gd_send_r4(:,:,:) = 0.0
4353
4354    call gsv_getField(stateVector_VarsLevs,field_out_r4)
4355
4356    ! prepare for alltoallv
4357    nsize = stateVector_VarsLevs%ni * stateVector_VarsLevs%nj
4358
4359    ! only send the data from tasks with data, same amount to all
4360    sendsizes(:) = 0
4361    if (stateVector_1step_r4%allocated) then
4362      do procIndex = 1, numkToSend
4363        sendsizes(procIndex) = nsize
4364      end do
4365    end if
4366    senddispls(1) = 0
4367    do procIndex = 2, mmpi_nprocs
4368      senddispls(procIndex) = senddispls(procIndex-1) + sendsizes(procIndex-1)
4369    end do
4370
4371    ! all tasks recv only from those with data
4372    recvsizes(:) = 0
4373    if ((1+mmpi_myid) <= numkToSend) then
4374      do procIndex = 1, mmpi_nprocs
4375        if (thisProcIsAsender(procIndex)) then
4376          recvsizes(procIndex) = nsize
4377        end if
4378      end do
4379    end if
4380    recvdispls(1) = 0
4381    do procIndex = 2, mmpi_nprocs
4382      recvdispls(procIndex) = recvdispls(procIndex-1) + recvsizes(procIndex-1)
4383    end do
4384
4385    ! loop to send (at most) 1 level to (at most) all other mpi tasks
4386    do kIndex = 1, maxkCount
4387
4388      ! prepare the complete 1 timestep for sending on all tasks that read something
4389      if (stateVector_1step_r4%allocated) then
4390
4391        call gsv_getField(stateVector_1step_r4,field_in_r4)
4392        !$OMP PARALLEL DO PRIVATE(procIndex,kIndex2)
4393        do procIndex = 1, mmpi_nprocs
4394          ! compute kIndex value being sent
4395          kIndex2 = kIndex + stateVector_VarsLevs%allkBeg(procIndex) - 1
4396          if (kIndex2 <= stateVector_VarsLevs%allkEnd(procIndex)) then
4397            if(procIndex > numkToSend) then
4398              call utl_abort('gsv_transposeStepToVarsLevs: ERROR with numkToSend? '&
4399                   //'procIndex='//str(procIndex) &
4400                   //', numkToSend = '//str(numkToSend))
4401            end if
4402            gd_send_r4(:,:,procIndex) = field_in_r4(:,:,kIndex2,1)
4403          end if
4404        end do
4405        !$OMP END PARALLEL DO
4406
4407      end if
4408
4409      call mpi_alltoallv(gd_send_r4, sendsizes, senddispls, mmpi_datyp_real4,  &
4410                         gd_recv_r4, recvsizes, recvdispls, mmpi_datyp_real4, mmpi_comm_grid, ierr)
4411
4412      stepIndex = stepIndexBeg - 1
4413      stepIndexInput = 0
4414      do procIndex = 1, mmpi_nprocs
4415        ! skip if this task has nothing to send
4416        if (.not. thisProcIsAsender(procIndex)) cycle
4417
4418        stepIndex = stepIndex + 1
4419        if (stepIndex > stateVector_VarsLevs%numStep) then
4420          call utl_abort('gsv_transposeStepToVarsLevs: stepIndex > numStep')
4421        end if
4422
4423        ! all tasks copy the received step data into correct slot
4424        kIndex2 = kIndex + stateVector_VarsLevs%mykBeg - 1
4425        if (kIndex2 <= stateVector_VarsLevs%mykEnd) then
4426          stepIndexInput = stepIndexInput + 1
4427          field_out_r4(:,:,kIndex2,stepIndex) = gd_recv_r4(:,:,stepIndexInput)
4428        end if
4429      end do
4430
4431    end do ! kIndex
4432
4433    ! also do extra transpose for complementary wind components when wind component present
4434    ! this should really be changed to only send the data needed for UV, not total k
4435    if (gsv_varExist(stateVector_VarsLevs, 'UU') .or. gsv_varExist(stateVector_VarsLevs, 'VV')) then
4436      do kIndex = 1, maxkCount
4437
4438        ! prepare the complete 1 timestep for sending on all tasks that read something
4439        if (stateVector_1step_r4%allocated) then
4440
4441          !$OMP PARALLEL DO PRIVATE(procIndex,kIndex2,levUV)
4442          ! loop over all tasks we are sending to
4443          do procIndex = 1, mmpi_nprocs
4444            kIndex2 = kIndex + stateVector_VarsLevs%allkBeg(procIndex) - 1
4445            if (kIndex2 <= stateVector_VarsLevs%allkEnd(procIndex) .and. &
4446                kIndex2 >= stateVector_1step_r4%myUVkBeg .and.           &
4447                kIndex2 <= stateVector_1step_r4%myUVkEnd) then
4448              gd_send_r4(:,:,procIndex) = stateVector_1step_r4%gdUV(kIndex2)%r4(:,:,1)
4449            end if
4450          end do
4451          !$OMP END PARALLEL DO
4452
4453        end if
4454
4455        call mpi_alltoallv(gd_send_r4, sendsizes, senddispls, mmpi_datyp_real4,  &
4456                           gd_recv_r4, recvsizes, recvdispls, mmpi_datyp_real4, mmpi_comm_grid, ierr)
4457
4458        stepIndex = stepIndexBeg - 1
4459        stepIndexInput = 0
4460        ! loop over all tasks from which we receive something
4461        do procIndex = 1, mmpi_nprocs
4462          ! skip if this task did not send anything
4463          if (.not. thisProcIsAsender(procIndex)) cycle
4464
4465          stepIndex = stepIndex + 1
4466          if (stepIndex > stateVector_VarsLevs%numStep) then
4467            call utl_abort('gsv_transposeStepToVarsLevs: stepIndex > numStep')
4468          end if
4469
4470          ! all tasks copy the received step data into correct slot
4471          kIndex2 = kIndex + stateVector_VarsLevs%mykBeg - 1
4472          if (kIndex2 <= stateVector_VarsLevs%mykEnd) then
4473            stepIndexInput = stepIndexInput + 1
4474            if (kIndex2 >= stateVector_VarsLevs%myUVkBeg .and.  &
4475                kIndex2 <= stateVector_VarsLevs%myUVkEnd) then
4476              statevector_varsLevs%gdUV(kIndex2)%r4(:,:,stepIndex) = gd_recv_r4(:,:,stepIndexInput)
4477            end if
4478          end if
4479        end do
4480
4481      end do ! kIndex
4482
4483    end if ! do complementary wind component transpose
4484
4485    deallocate(gd_send_r4)
4486    deallocate(gd_recv_r4)
4487
4488    ! Copy the mask if it is present
4489    if (stateVector_1step_r4%allocated) then
4490      call gsv_copyMask(stateVector_1step_r4, stateVector_varsLevs)
4491    end if
4492    call ocm_communicateMask(stateVector_varsLevs%oceanMask)
4493
4494    call msg_memUsage('gsv_transposeStepToVarsLevs')
4495    call msg('gsv_transposeStepToVarsLevs','END', verb_opt=2)
4496
4497    call utl_tmg_stop(162)
4498
4499  end subroutine gsv_transposeStepToVarsLevs
4500
4501  !--------------------------------------------------------------------------
4502  ! gsv_transposeStepToTiles
4503  !--------------------------------------------------------------------------
4504  subroutine gsv_transposeStepToTiles(stateVector_1step, stateVector_tiles, stepIndexBeg)
4505    !
4506    !:Purpose: Transposes the data from a timestep MPI distribution (1 timestep
4507    !           per MPI task) to the `mpi_distribution='Tiles'` distribution 
4508    !           (4D lat-lon tiles).
4509    !
4510    !:Comment: Step-wise distribution is mostly only used for file I/O.
4511    !           When in such implicit time distribution, it is necessery that
4512    !           `mpi_local=.false.` and `numStep=1`.
4513    !
4514    implicit none
4515
4516    ! Arguments:
4517    type(struct_gsv), intent(in)    :: stateVector_1step
4518    type(struct_gsv), intent(inout) :: stateVector_tiles
4519    integer,          intent(in)    :: stepIndexBeg
4520
4521    ! Locals:
4522    integer :: ierr, yourid, youridx, youridy, nsize, numStepInput, stepCount
4523    integer :: displs(mmpi_nprocs), nsizes(mmpi_nprocs)
4524    integer :: senddispls(mmpi_nprocs), sendsizes(mmpi_nprocs)
4525    integer :: recvdispls(mmpi_nprocs), recvsizes(mmpi_nprocs)
4526    integer :: kIndex, procIndex, stepIndex, indexBeg, indexEnd
4527    integer :: inKind, outKind, sendrecvKind, inKindLocal
4528    logical :: thisProcIsAsender(mmpi_nprocs), allZero, allZero_mpiglobal
4529    real(8), allocatable :: gd_send_height(:,:,:), gd_recv_height(:,:)
4530    real(4), allocatable :: gd_send_1d_r4(:), gd_recv_3d_r4(:,:,:)
4531    real(8), allocatable :: gd_send_1d_r8(:), gd_recv_3d_r8(:,:,:)
4532    real(4), pointer     :: field_in_r4_ptr(:,:,:,:), field_out_r4_ptr(:,:,:,:)
4533    real(8), pointer     :: field_in_r8_ptr(:,:,:,:), field_out_r8_ptr(:,:,:,:)
4534
4535    call rpn_comm_barrier('GRID',ierr)
4536
4537    call utl_tmg_start(163,'low-level--gsv_stepToTiles')
4538
4539    if (statevector_tiles%mpi_distribution /= 'Tiles') then
4540      call utl_abort('gsv_transposeStepToTiles: output statevector must have Tiles mpi distribution')
4541    end if
4542
4543    call msg('gsv_transposeStepToTiles','START', verb_opt=2)
4544
4545    ! determine which tasks have something to send and let everyone know
4546    do procIndex = 1, mmpi_nprocs
4547      thisProcIsAsender(procIndex) = .false.
4548      if (mmpi_myid == (procIndex-1) .and. stateVector_1step%allocated) then
4549        thisProcIsAsender(procIndex) = .true.
4550      end if
4551      call rpn_comm_bcast(thisProcIsAsender(procIndex), 1,  &
4552                          'MPI_LOGICAL', procIndex-1, 'GRID', ierr)
4553    end do
4554
4555    ! only send the data from tasks with data, same amount to all
4556    sendsizes(:) = 0
4557    if (stateVector_1step%allocated) then
4558      do youridy = 0, (mmpi_npey-1)
4559        do youridx = 0, (mmpi_npex-1)
4560          yourid = youridx + youridy*mmpi_npex
4561          nsize = stateVector_tiles%allLonPerPE(youridx+1) * stateVector_tiles%allLatPerPE(youridy+1)
4562          sendsizes(yourid+1) = nsize
4563        end do
4564      end do
4565    end if
4566    senddispls(:) = 0
4567    do yourid = 1, (mmpi_nprocs-1)
4568      senddispls(yourid+1) = senddispls(yourid) + sendsizes(yourid)
4569    end do
4570
4571    ! all tasks recv, but only from those with data
4572    recvsizes(:) = 0
4573    nsize = stateVector_tiles%lonPerPE * stateVector_tiles%latPerPE
4574    do yourid = 0, (mmpi_nprocs-1) ! recv from this task
4575      if (thisProcIsAsender(yourid+1)) then
4576        recvsizes(yourid+1) = nsize
4577      end if
4578    end do
4579    recvdispls(:) = 0
4580    do yourid = 1, (mmpi_nprocs-1)
4581      recvdispls(yourid+1) = recvdispls(yourid) + recvsizes(yourid)
4582    end do
4583
4584    numStepInput = 0
4585    do yourid = 0, (mmpi_nprocs-1)
4586      if (thisProcIsAsender(yourid+1)) numStepInput = numStepInput + 1
4587    end do
4588
4589    if (stateVector_1step%allocated) then
4590      inKindLocal = stateVector_1step%dataKind
4591    else
4592      inKindLocal = -1
4593    end if
4594    call rpn_comm_allreduce(inKindLocal, inKind, 1,  &
4595                            'MPI_INTEGER', 'MPI_MAX', 'GRID', ierr)
4596    outKind = stateVector_tiles%dataKind
4597    if (inKind == 4 .or. outKind == 4) then
4598      sendrecvKind = 4
4599    else
4600      sendrecvKind = 8
4601    end if
4602
4603    ! allocate arrays used for mpi communication of 1 level/variable at a time
4604    if (sendrecvKind == 4) then
4605      allocate(gd_recv_3d_r4(stateVector_tiles%lonPerPE,stateVector_tiles%latPerPE,numStepInput))
4606      gd_recv_3d_r4(:,:,:) = 0.0
4607      if (stateVector_1step%allocated) then
4608        allocate(gd_send_1d_r4(stateVector_tiles%ni*stateVector_tiles%nj))
4609      else
4610        allocate(gd_send_1d_r4(1))
4611      end if
4612      gd_send_1d_r4(:) = 0.0
4613    else
4614      allocate(gd_recv_3d_r8(stateVector_tiles%lonPerPE,stateVector_tiles%latPerPE,numStepInput))
4615      gd_recv_3d_r8(:,:,:) = 0.0d0
4616      if (stateVector_1step%allocated) then
4617        allocate(gd_send_1d_r8(stateVector_tiles%ni*stateVector_tiles%nj))
4618      else
4619        allocate(gd_send_1d_r8(1))
4620      end if
4621      gd_send_1d_r8(:) = 0.0d0
4622    end if
4623
4624    if (stateVector_tiles%dataKind == 4) then
4625      call gsv_getField(stateVector_tiles,field_out_r4_ptr)
4626    else if (stateVector_tiles%dataKind == 8) then
4627      call gsv_getField(stateVector_tiles,field_out_r8_ptr)
4628    else
4629      call utl_abort('gsv_transposeStepToTiles: stateVector_tiles%dataKind not real 4 or 8')
4630    end if
4631
4632    kIndex_Loop: do kIndex = 1, stateVector_tiles%nk
4633
4634      ! determine if there is data to send for this kIndex
4635      if (stateVector_1step%allocated) then
4636        if (stateVector_1step%dataKind == 4) then
4637          call gsv_getField(stateVector_1step,field_in_r4_ptr)
4638          allZero = (maxval(abs(field_in_r4_ptr(:, :, kIndex, 1))) == 0.0)
4639        else if (stateVector_1step%dataKind == 8) then
4640          call gsv_getField(stateVector_1step,field_in_r8_ptr)
4641          allZero = (maxval(abs(field_in_r8_ptr(:, :, kIndex, 1))) == 0.0D0)
4642        end if
4643      else
4644        allZero = .true.
4645      end if
4646      call rpn_comm_allReduce(allZero,allZero_mpiglobal,1,'mpi_logical','mpi_land','GRID',ierr)
4647      if (allZero_mpiglobal) then
4648        ! Field equal to zero, skipping this kIndex to save time
4649        cycle kIndex_Loop
4650      end if
4651
4652      ! prepare the complete 1 timestep for sending on all tasks that have read something
4653      if (stateVector_1step%allocated) then
4654        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid,nsize,indexBeg,indexEnd)
4655        do youridy = 0, (mmpi_npey-1)
4656          do youridx = 0, (mmpi_npex-1)
4657            yourid = youridx + youridy*mmpi_npex
4658            nsize = stateVector_tiles%allLonPerPE(youridx+1) *  &
4659                    stateVector_tiles%allLatPerPE(youridy+1)
4660            indexBeg = senddispls(yourid+1) + 1
4661            indexEnd = senddispls(yourid+1) + nsize
4662            if (sendrecvKind == 4 .and. stateVector_1step%dataKind == 4) then
4663              gd_send_1d_r4(indexBeg:indexEnd) =  &
4664                        reshape(field_in_r4_ptr(stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
4665                                                 stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1), &
4666                                                 kIndex, 1), (/ nsize /))
4667            else if (sendrecvKind == 4 .and. stateVector_1step%dataKind == 8) then
4668              gd_send_1d_r4(indexBeg:indexEnd) =  &
4669                   real(reshape(field_in_r8_ptr(stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
4670                                                  stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1), &
4671                                                  kIndex, 1), (/ nsize /)), 4)
4672            else if (sendrecvKind == 8 .and. stateVector_1step%dataKind == 8) then
4673              gd_send_1d_r8(indexBeg:indexEnd) =  &
4674                        reshape(field_in_r8_ptr(stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
4675                                                 stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1), &
4676                                                 kIndex, 1), (/ nsize /))
4677            else
4678              call utl_abort('gsv_stepToTiles: unexpected combination of real kinds')
4679            end if
4680          end do
4681        end do
4682        !$OMP END PARALLEL DO
4683      end if
4684
4685      if (sendrecvKind == 4) then
4686        call mpi_alltoallv(gd_send_1d_r4, sendsizes, senddispls, mmpi_datyp_real4, &
4687                           gd_recv_3d_r4, recvsizes, recvdispls, mmpi_datyp_real4, &
4688                           mmpi_comm_grid, ierr)
4689      else if (sendrecvKind == 8) then
4690        call mpi_alltoallv(gd_send_1d_r8, sendsizes, senddispls, mmpi_datyp_real8, &
4691                           gd_recv_3d_r8, recvsizes, recvdispls, mmpi_datyp_real8, &
4692                           mmpi_comm_grid, ierr)
4693      end if
4694
4695      stepIndex = stepIndexBeg - 1
4696      stepCount = 0
4697      do procIndex = 1, mmpi_nprocs
4698
4699        ! skip if this task had nothing to send
4700        if (.not. thisProcIsAsender(procIndex)) cycle
4701
4702        stepCount = stepCount + 1
4703        stepIndex = stepIndex + 1
4704        if (stepIndex > stateVector_tiles%numStep) then
4705          call utl_abort('gsv_transposeStepToTiles: stepIndex > numStep')
4706        end if
4707        if (sendrecvKind == 4 .and. stateVector_tiles%dataKind == 4) then
4708          field_out_r4_ptr(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,  &
4709                           stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd,  &
4710                           kIndex, stepIndex) =   &
4711              gd_recv_3d_r4(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE,stepCount)
4712        else if (sendrecvKind == 4 .and. stateVector_tiles%dataKind == 8) then
4713          field_out_r8_ptr(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,  &
4714                           stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd,  &
4715                           kIndex, stepIndex) =   &
4716              real(gd_recv_3d_r4(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE,stepCount),8)
4717        else if (sendrecvKind == 8 .and. stateVector_tiles%dataKind == 8) then
4718          field_out_r8_ptr(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,  &
4719                           stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd,  &
4720                           kIndex, stepIndex) =   &
4721              gd_recv_3d_r8(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE,stepCount)
4722        end if
4723
4724      end do ! procIndex
4725    end do kIndex_Loop ! kIndex
4726
4727    if (sendrecvKind == 4) then
4728      deallocate(gd_recv_3d_r4)
4729      deallocate(gd_send_1d_r4)
4730    else if (sendrecvKind == 8) then
4731      deallocate(gd_recv_3d_r8)
4732      deallocate(gd_send_1d_r8)
4733    end if
4734
4735    ! now send HeightSfc from task 0 to all others
4736    if (stateVector_tiles%heightSfcPresent) then
4737
4738      allocate(gd_recv_height(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax))
4739      gd_recv_height(:,:) = 0.0d0
4740
4741      ! prepare data to send from task 0
4742      if (mmpi_myid == 0) then
4743        if (.not. stateVector_1step%allocated) then
4744          call utl_abort('gsv_transposeStepToVarsLevs: Problem with HeightSfc')
4745        end if
4746
4747        allocate(gd_send_height(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax,mmpi_nprocs))
4748        gd_send_height(:,:,:) = 0.0d0
4749
4750        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
4751        do youridy = 0, (mmpi_npey-1)
4752          do youridx = 0, (mmpi_npex-1)
4753            yourid = youridx + youridy*mmpi_npex
4754            gd_send_height(1:stateVector_tiles%allLonPerPE(youridx+1),  &
4755                    1:stateVector_tiles%allLatPerPE(youridy+1), yourid+1) =  &
4756                real(stateVector_1step%HeightSfc(&
4757                     stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
4758                     stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1)), 8)
4759          end do
4760        end do
4761        !$OMP END PARALLEL DO
4762
4763      else
4764        allocate(gd_send_height(1,1,1))
4765      end if
4766
4767      ! distribute from task 0 to all tasks
4768      nsize = stateVector_tiles%lonPerPEmax * stateVector_tiles%latPerPEmax
4769      do procIndex = 1, mmpi_nprocs
4770        displs(procIndex) = (procIndex-1)*nsize
4771        nsizes(procIndex) = nsize
4772      end do
4773      call rpn_comm_scatterv(gd_send_height, nsizes, displs, 'mpi_real8', &
4774                             gd_recv_height, nsize, 'mpi_real8', &
4775                             0, 'grid', ierr)
4776
4777      stateVector_tiles%HeightSfc(&
4778                stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,    &
4779                stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd) =  &
4780          gd_recv_height(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE)
4781
4782      deallocate(gd_recv_height)
4783      deallocate(gd_send_height)
4784
4785    end if ! heightSfcPresent
4786
4787    ! Copy the mask if it is present
4788    if (stateVector_1step%allocated) then
4789      call gsv_copyMask(stateVector_1step, stateVector_tiles)
4790    end if
4791    call ocm_communicateMask(stateVector_tiles%oceanMask)
4792
4793    call msg('gsv_transposeStepToTiles','END', verb_opt=2)
4794
4795    call utl_tmg_stop(163)
4796
4797  end subroutine gsv_transposeStepToTiles
4798
4799  !--------------------------------------------------------------------------
4800  ! gsv_transposeTilesToStep
4801  !--------------------------------------------------------------------------
4802  subroutine gsv_transposeTilesToStep(stateVector_1step, stateVector_tiles, stepIndexBeg)
4803    !
4804    !:Purpose: Transposes the data from a `mpi_distribution='Tiles'` distribution 
4805    !           (4D lat-lon tiles) to a timestep MPI distribution (1 timestep per
4806    !           MPI task)
4807    !
4808    !:Comment: Step-wise distribution is mostly only used for file I/O.
4809    !           When in such implicit time distribution, it is necessery that
4810    !           `mpi_local=.false.` and `numStep=1`.
4811    !
4812    implicit none
4813
4814    ! Arguments:
4815    type(struct_gsv), intent(inout)  :: stateVector_1step
4816    type(struct_gsv), intent(in)     :: stateVector_tiles
4817    integer,          intent(in)     :: stepIndexBeg
4818
4819    ! Locals:
4820    integer :: ierr, yourid, youridx, youridy, nsize
4821    integer :: kIndex, procIndex, stepIndex, numStepOutput, stepCount
4822    integer :: inKind, outKind, sendrecvKind, outKindLocal
4823    logical :: thisProcIsAreceiver(mmpi_nprocs)
4824    integer :: sendsizes(mmpi_nprocs), recvsizes(mmpi_nprocs), senddispls(mmpi_nprocs), recvdispls(mmpi_nprocs)
4825    real(4), allocatable :: gd_send_r4(:,:,:), gd_recv_r4(:,:,:)
4826    real(8), allocatable :: gd_send_r8(:,:,:), gd_recv_r8(:,:,:)
4827    real(8), allocatable :: gd_send_height(:,:), gd_recv_height(:,:,:)
4828    real(4), pointer     :: field_in_r4_ptr(:,:,:,:), field_out_r4_ptr(:,:,:,:)
4829    real(8), pointer     :: field_in_r8_ptr(:,:,:,:), field_out_r8_ptr(:,:,:,:)
4830
4831    if (statevector_tiles%mpi_distribution /= 'Tiles') then
4832      call utl_abort('gsv_transposeTilesToStep: input statevector must have Tiles mpi distribution')
4833    end if
4834
4835    call rpn_comm_barrier('GRID',ierr)
4836    call msg('gsv_transposeTilesToStep', 'START', verb_opt=2)
4837
4838    ! determine which tasks have something to receive and let everyone know
4839    do procIndex = 1, mmpi_nprocs
4840      thisProcIsAreceiver(procIndex) = .false.
4841      if (mmpi_myid == (procIndex-1) .and. stateVector_1step%allocated) then
4842        thisProcIsAreceiver(procIndex) = .true.
4843      end if
4844      call rpn_comm_bcast(thisProcIsAreceiver(procIndex), 1,  &
4845                          'MPI_LOGICAL', procIndex-1, 'GRID', ierr)
4846    end do
4847
4848    numStepOutput = 0
4849    do procIndex = 1, mmpi_nprocs
4850      if (thisProcIsAreceiver(procIndex)) numStepOutput = numStepOutput + 1
4851    end do
4852    call msg('gsv_transposeTilesToStep','numStepOutput = '//str(numStepOutput))
4853
4854    ! size of each message
4855    nsize = stateVector_tiles%lonPerPEmax * stateVector_tiles%latPerPEmax
4856
4857    ! only recv the data on tasks that need data
4858    recvsizes(:) = 0
4859    if (stateVector_1step%allocated) then
4860      do procIndex = 1, mmpi_nprocs
4861        recvsizes(procIndex) = nsize
4862      end do
4863    end if
4864    recvdispls(1) = 0
4865    do procIndex = 2, mmpi_nprocs
4866      recvdispls(procIndex) = recvdispls(procIndex-1) + recvsizes(procIndex-1)
4867    end do
4868
4869    ! all tasks send only to those that need data
4870    sendsizes(:) = 0
4871    do procIndex = 1, mmpi_nprocs
4872      if (thisProcIsAreceiver(procIndex)) then
4873        sendsizes(procIndex) = nsize
4874      end if
4875    end do
4876    senddispls(1) = 0
4877    do procIndex = 2, mmpi_nprocs
4878      senddispls(procIndex) = senddispls(procIndex-1) + sendsizes(procIndex-1)
4879    end do
4880
4881    if (stateVector_1step%allocated) then
4882      outKindLocal = stateVector_1step%dataKind
4883    else
4884      outKindLocal = -1
4885    end if
4886    call rpn_comm_allreduce(outKindLocal, outKind, 1,  &
4887                            'MPI_INTEGER', 'MPI_MAX', 'GRID', ierr)
4888    inKind = stateVector_tiles%dataKind
4889    if (inKind == 4 .or. outKind == 4) then
4890      sendrecvKind = 4
4891    else
4892      sendrecvKind = 8
4893    end if
4894
4895    ! allocate arrays used for mpi communication of 1 level/variable at a time
4896    if (sendrecvKind == 4) then
4897      allocate(gd_send_r4(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax,numStepOutput))
4898      gd_send_r4(:,:,:) = 0.0
4899      if (stateVector_1step%allocated) then
4900        allocate(gd_recv_r4(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax,mmpi_nprocs))
4901      else
4902        allocate(gd_recv_r4(1,1,1))
4903      end if
4904      gd_recv_r4(:,:,:) = 0.0
4905    else
4906      allocate(gd_send_r8(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax,numStepOutput))
4907      gd_send_r8(:,:,:) = 0.0d0
4908      if (stateVector_1step%allocated) then
4909        allocate(gd_recv_r8(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax,mmpi_nprocs))
4910      else
4911        allocate(gd_recv_r8(1,1,1))
4912      end if
4913      gd_recv_r8(:,:,:) = 0.0d0
4914    end if
4915
4916    if (stateVector_tiles%dataKind == 4) then
4917      call gsv_getField(stateVector_tiles,field_in_r4_ptr)
4918    else
4919      call gsv_getField(stateVector_tiles,field_in_r8_ptr)
4920    end if
4921
4922    do kIndex = 1, stateVector_tiles%nk
4923
4924      ! prepare data for distribution from all tasks to only those that need it
4925      stepIndex = stepIndexBeg - 1
4926      stepCount = 0
4927
4928      do procIndex = 1, mmpi_nprocs
4929
4930        ! skip if this task has nothing to receive
4931        if (.not. thisProcIsAreceiver(procIndex)) cycle
4932
4933        stepCount = stepCount + 1
4934        stepIndex = stepIndex + 1
4935        if (stepIndex > stateVector_tiles%numStep) then
4936          call utl_abort('gsv_transposeTilesToStep: stepIndex > numStep')
4937        end if
4938
4939        if (sendrecvKind == 4 .and. stateVector_tiles%dataKind == 4) then
4940          gd_send_r4(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE,stepCount) = &
4941               field_in_r4_ptr(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,  &
4942                               stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd,  &
4943                               kIndex, stepIndex)
4944        else if (sendrecvKind == 4 .and. stateVector_tiles%dataKind == 8) then
4945          gd_send_r4(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE,stepCount) = &
4946               real(field_in_r8_ptr(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,  &
4947                                    stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd,  &
4948                                    kIndex, stepIndex),4)
4949        else if (sendrecvKind == 8 .and. stateVector_tiles%dataKind == 8) then
4950          gd_send_r8(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE,stepCount) = &
4951               field_in_r8_ptr(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,  &
4952                               stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd,  &
4953                               kIndex, stepIndex)
4954        else
4955          call utl_abort('gsv_transposeTilesToStep: unexpected combination of real kinds')
4956        end if
4957
4958      end do ! procIndex
4959
4960      if (sendrecvKind == 4) then
4961        call mpi_alltoallv(gd_send_r4, sendsizes, senddispls, mmpi_datyp_real4, &
4962                           gd_recv_r4, recvsizes, recvdispls, mmpi_datyp_real4, &
4963                           mmpi_comm_grid, ierr)
4964      else if (sendrecvKind == 8) then
4965        call mpi_alltoallv(gd_send_r8, sendsizes, senddispls, mmpi_datyp_real8, &
4966                           gd_recv_r8, recvsizes, recvdispls, mmpi_datyp_real8, &
4967                           mmpi_comm_grid, ierr)
4968      end if
4969
4970      ! copy over the complete 1 timestep received
4971      if (stateVector_1step%allocated) then
4972
4973        if (sendrecvKind == 4 .and. stateVector_1step%dataKind == 4) then
4974
4975          call gsv_getField(stateVector_1step,field_out_r4_ptr)
4976          !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
4977          do youridy = 0, (mmpi_npey-1)
4978            do youridx = 0, (mmpi_npex-1)
4979              yourid = youridx + youridy*mmpi_npex
4980              field_out_r4_ptr(stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
4981                               stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1), &
4982                               kIndex, 1) = &
4983                    gd_recv_r4(1:stateVector_tiles%allLonPerPE(youridx+1),  &
4984                               1:stateVector_tiles%allLatPerPE(youridy+1), yourid+1)
4985            end do
4986          end do
4987          !$OMP END PARALLEL DO
4988
4989        else if (sendrecvKind == 4 .and. stateVector_1step%dataKind == 8) then
4990
4991          call gsv_getField(stateVector_1step,field_out_r8_ptr)
4992          !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
4993          do youridy = 0, (mmpi_npey-1)
4994            do youridx = 0, (mmpi_npex-1)
4995              yourid = youridx + youridy*mmpi_npex
4996              field_out_r8_ptr(stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
4997                               stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1), &
4998                               kIndex, 1) = &
4999               real(gd_recv_r4(1:stateVector_tiles%allLonPerPE(youridx+1),  &
5000                               1:stateVector_tiles%allLatPerPE(youridy+1), yourid+1), 8)
5001            end do
5002          end do
5003          !$OMP END PARALLEL DO
5004
5005        else if (sendrecvKind == 8 .and. stateVector_1step%dataKind == 8) then
5006
5007          call gsv_getField(stateVector_1step,field_out_r8_ptr)
5008          !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
5009          do youridy = 0, (mmpi_npey-1)
5010            do youridx = 0, (mmpi_npex-1)
5011              yourid = youridx + youridy*mmpi_npex
5012              field_out_r8_ptr(stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
5013                               stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1), &
5014                               kIndex, 1) = &
5015                    gd_recv_r8(1:stateVector_tiles%allLonPerPE(youridx+1),  &
5016                               1:stateVector_tiles%allLatPerPE(youridy+1), yourid+1)
5017            end do
5018          end do
5019          !$OMP END PARALLEL DO
5020
5021        else
5022          call utl_abort('gsv_transposeTilesToStep: unexpected combination of real kinds')
5023        end if
5024
5025      end if
5026
5027    end do ! kIndex
5028
5029    if (sendrecvKind == 4) then
5030      deallocate(gd_recv_r4)
5031      deallocate(gd_send_r4)
5032    else if (sendrecvKind == 8) then
5033      deallocate(gd_recv_r8)
5034      deallocate(gd_send_r8)
5035    end if
5036
5037    ! now gather the same HeightSfc onto each task that is a receiver
5038    if (stateVector_tiles%heightSfcPresent) then
5039
5040      allocate(gd_send_height(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax))
5041      gd_send_height(:,:) = 0.0d0
5042      if (stateVector_1step%allocated) then
5043        allocate(gd_recv_height(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax,mmpi_nprocs))
5044      else
5045        allocate(gd_recv_height(1,1,1))
5046      end if
5047      gd_recv_height(:,:,:) = 0.0d0
5048
5049      ! prepare tile to send on each task
5050      gd_send_height(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE) = &
5051          stateVector_tiles%HeightSfc(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,    &
5052                                  stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd)
5053
5054      ! gather from all tasks onto each task with a receiving statevector
5055      do procIndex = 1, mmpi_nprocs
5056
5057        ! skip if this task has nothing to receive
5058        if (.not. thisProcIsAreceiver(procIndex)) cycle
5059
5060        nsize = stateVector_tiles%lonPerPEmax * stateVector_tiles%latPerPEmax
5061        call rpn_comm_gather(gd_send_height, nsize, 'mpi_real8', &
5062                             gd_recv_height, nsize, 'mpi_real8', &
5063                             procIndex-1, 'grid', ierr)
5064
5065        ! copy over the complete 1 timestep received
5066        if (mmpi_myid == procIndex-1) then
5067          !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
5068          do youridy = 0, (mmpi_npey-1)
5069            do youridx = 0, (mmpi_npex-1)
5070              yourid = youridx + youridy*mmpi_npex
5071              stateVector_1step%HeightSfc(&
5072                   stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
5073                   stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1)) = &
5074                   gd_recv_height(1:stateVector_tiles%allLonPerPE(youridx+1),  &
5075                              1:stateVector_tiles%allLatPerPE(youridy+1), yourid+1)
5076            end do
5077          end do
5078          !$OMP END PARALLEL DO
5079        end if
5080
5081      end do ! procIndex
5082
5083      deallocate(gd_recv_height)
5084      deallocate(gd_send_height)
5085
5086    end if ! heightSfcPresent
5087
5088    ! Copy mask if it exists on mpi task with step data allocated
5089    if (stateVector_1step%allocated) then
5090      call gsv_copyMask(stateVector_tiles, stateVector_1step)
5091    end if
5092
5093    call msg('gsv_transposeTilesToStep', 'END', verb_opt=2)
5094
5095  end subroutine gsv_transposeTilesToStep
5096
5097  !--------------------------------------------------------------------------
5098  ! gsv_transposeTilesToMpiGlobal
5099  !--------------------------------------------------------------------------
5100  subroutine gsv_transposeTilesToMpiGlobal(stateVector_mpiGlobal, stateVector_tiles)
5101    !
5102    !:Purpose: Does MPI transpose (allGather) from `mpi_distribution='Tiles'` 
5103    !          (4D lat-lon tiles) to global 4D stateVector on each MPI task 
5104    !          where it is allocated.
5105    !
5106    implicit none
5107
5108    ! Arguments:
5109    type(struct_gsv), intent(inout)  :: stateVector_mpiGlobal
5110    type(struct_gsv), intent(in)     :: stateVector_tiles
5111
5112    ! Locals:
5113    integer :: ierr, yourid, youridx, youridy, nsize
5114    integer :: kIndex, stepIndex, numStep
5115    real(4), allocatable :: gd_send_r4(:,:), gd_recv_r4(:,:,:)
5116    real(8), allocatable :: gd_send_r8(:,:), gd_recv_r8(:,:,:)
5117    real(4), pointer     :: field_out_r4(:,:,:,:), field_in_r4(:,:,:,:)
5118    real(8), pointer     :: field_out_r8(:,:,:,:), field_in_r8(:,:,:,:)
5119
5120    if (stateVector_tiles%mpi_distribution /= 'Tiles') then
5121      call utl_abort('gsv_transposeTilesToMpiGlobal: input statevector must have Tiles mpi distribution')
5122    end if
5123
5124    if (stateVector_mpiGlobal%allocated) then
5125      if (stateVector_mpiGlobal%numStep /= stateVector_tiles%numStep) then
5126        call utl_abort('gsv_transposeTilesToMpiGlobal: input and output ' // &
5127                       'stateVectors must have same numStep')
5128      end if
5129    end if
5130
5131    numStep = stateVector_tiles%numStep
5132
5133    call rpn_comm_barrier('GRID',ierr)
5134    call msg('gsv_transposeTilesToMpiGlobal', 'START', verb_opt=2)
5135
5136    ! size of each message
5137    nsize = stateVector_tiles%lonPerPEmax * stateVector_tiles%latPerPEmax
5138
5139    ! allocate arrays used for mpi communication of 1 level/variable at a time
5140    allocate(gd_send_r4(stateVector_tiles%lonPerPEmax,  &
5141                        stateVector_tiles%latPerPEmax))
5142    gd_send_r4(:,:) = 0.0
5143    allocate(gd_recv_r4(stateVector_tiles%lonPerPEmax,  &
5144                        stateVector_tiles%latPerPEmax, mmpi_nprocs))
5145    gd_recv_r4(:,:,:) = 0.0
5146
5147    if (stateVector_tiles%dataKind == 4) then
5148      call gsv_getField(stateVector_tiles,field_in_r4)
5149    else
5150      call gsv_getField(stateVector_tiles,field_in_r8)
5151    end if
5152    if (stateVector_mpiGlobal%allocated) then
5153      if (stateVector_mpiGlobal%dataKind == 4) then
5154        call gsv_getField(stateVector_mpiGlobal,field_out_r4)
5155      else
5156        call gsv_getField(stateVector_mpiGlobal,field_out_r8)
5157      end if
5158    end if
5159
5160    ! do allGather for 1 2D field/stepIndex at a time
5161    do stepIndex = 1, numStep
5162      do kIndex = 1, stateVector_tiles%nk
5163        if (stateVector_tiles%dataKind == 4) then
5164          gd_send_r4(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE) =  &
5165               field_in_r4(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,  &
5166                           stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd,  &
5167                           kIndex, stepIndex)
5168        else
5169          gd_send_r4(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE) =      &
5170               real(field_in_r8(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd, &
5171                                stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd, &
5172                                kIndex, stepIndex), 4)
5173        end if
5174
5175        call rpn_comm_allgather(gd_send_r4, nsize, 'mpi_real4',  &
5176                                gd_recv_r4, nsize, 'mpi_real4', 'grid', ierr)
5177
5178        ! copy over the complete 2D field for 1 stepIndex received
5179        if (stateVector_mpiGlobal%allocated) then
5180
5181          !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
5182          do youridy = 0, (mmpi_npey-1)
5183            do youridx = 0, (mmpi_npex-1)
5184              yourid = youridx + youridy*mmpi_npex
5185              if (stateVector_mpiGlobal%dataKind == 4) then
5186                field_out_r4(stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
5187                             stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1), &
5188                             kIndex, stepIndex) = &
5189                     gd_recv_r4(1:stateVector_tiles%allLonPerPE(youridx+1),  &
5190                                1:stateVector_tiles%allLatPerPE(youridy+1), yourid+1)
5191              else
5192                field_out_r8(stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
5193                             stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1), &
5194                             kIndex, stepIndex) = &
5195                     real(gd_recv_r4(1:stateVector_tiles%allLonPerPE(youridx+1),  &
5196                                      1:stateVector_tiles%allLatPerPE(youridy+1), yourid+1), 4)
5197              end if
5198            end do
5199          end do
5200          !$OMP END PARALLEL DO
5201
5202        end if
5203
5204      end do ! kIndex
5205    end do ! stepIndex
5206
5207    deallocate(gd_recv_r4)
5208    deallocate(gd_send_r4)
5209
5210    ! now gather the same HeightSfc onto each task that is a receiver
5211    if (stateVector_tiles%heightSfcPresent) then
5212
5213      allocate(gd_send_r8(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax))
5214      gd_send_r8(:,:) = 0.0d0
5215      allocate(gd_recv_r8(stateVector_tiles%lonPerPEmax,stateVector_tiles%latPerPEmax,mmpi_nprocs))
5216      gd_recv_r8(:,:,:) = 0.0d0
5217
5218      ! prepare tile to send on each task
5219      gd_send_r8(1:stateVector_tiles%lonPerPE,1:stateVector_tiles%latPerPE) = &
5220          stateVector_tiles%HeightSfc(stateVector_tiles%myLonBeg:stateVector_tiles%myLonEnd,    &
5221                                      stateVector_tiles%myLatBeg:stateVector_tiles%myLatEnd)
5222
5223      call rpn_comm_allGather(gd_send_r8, nsize, 'mpi_real8',  &
5224                              gd_recv_r8, nsize, 'mpi_real8', 'grid', ierr)
5225
5226      if (stateVector_mpiGlobal%allocated) then
5227
5228        !$OMP PARALLEL DO PRIVATE(youridy,youridx,yourid)
5229        do youridy = 0, (mmpi_npey-1)
5230          do youridx = 0, (mmpi_npex-1)
5231            yourid = youridx + youridy*mmpi_npex
5232            stateVector_mpiGlobal%HeightSfc(&
5233                   stateVector_tiles%allLonBeg(youridx+1):stateVector_tiles%allLonEnd(youridx+1), &
5234                   stateVector_tiles%allLatBeg(youridy+1):stateVector_tiles%allLatEnd(youridy+1)) = &
5235                   gd_recv_r8(1:stateVector_tiles%allLonPerPE(youridx+1),  &
5236                              1:stateVector_tiles%allLatPerPE(youridy+1), yourid+1)
5237          end do
5238        end do
5239        !$OMP END PARALLEL DO
5240
5241      end if
5242
5243      deallocate(gd_recv_r8)
5244      deallocate(gd_send_r8)
5245
5246    end if ! heightSfcPresent
5247
5248    ! Copy over the mask, if it exists
5249    if (stateVector_mpiGlobal%allocated) then
5250      call gsv_copyMask(stateVector_tiles, stateVector_mpiGlobal)
5251    end if
5252
5253    call msg('gsv_transposeTilesToMpiGlobal', 'END', verb_opt=2)
5254
5255  end subroutine gsv_transposeTilesToMpiGlobal
5256
5257  !--------------------------------------------------------------------------
5258  ! gsv_varKindExist
5259  !--------------------------------------------------------------------------
5260  function gsv_varKindExist(varKind) result(KindFound)
5261    !
5262    !:Purpose: To check whether any of the variables to be assimilated
5263    !           (i.e. specified in the namelist NAMSTATE) are part of the
5264    !           specified variable kind
5265    !
5266    implicit none
5267
5268    ! Arguments:
5269    character(len=*), intent(in) :: varKind    ! Variable kind (e.g. MT or CH)
5270    ! Result:
5271    logical                      :: KindFound  ! Logical indicating whether var kind found 
5272
5273    ! Locals:
5274    integer :: varIndex
5275
5276    KindFound = .false.
5277
5278    VARLIST: do varIndex = 1, vnl_numvarmax
5279       if (gsv_varExist(varName=vnl_varNameList(varIndex))) then
5280          if (vnl_varKindFromVarname(vnl_varNameList(varIndex)).eq.varKind) then
5281             KindFound = .true.
5282             exit VARLIST 
5283          end if
5284       end if
5285    end do VARLIST
5286
5287  end function gsv_varKindExist
5288
5289  !--------------------------------------------------------------------------
5290  ! gsv_dotProduct
5291  !--------------------------------------------------------------------------
5292  subroutine gsv_dotProduct(stateVector_a,stateVector_b,dotsum)
5293    !
5294    !:Purpose: Computes the dot product of two statevectors
5295    !
5296    implicit none
5297
5298    ! Arguments:
5299    type(struct_gsv), intent(in)  :: stateVector_a
5300    type(struct_gsv), intent(in)  :: stateVector_b
5301    real(8),          intent(out) :: dotsum
5302
5303    ! Locals:
5304    integer          :: jstep,jlon,jlev,jlat,lon1,lon2,lat1,lat2
5305    integer          :: k1,k2
5306
5307    if (.not.stateVector_a%allocated) then
5308      call utl_abort('gsv_dotProduct: gridStateVector_in not yet allocated')
5309    end if
5310    if (.not.statevector_b%allocated) then
5311      call utl_abort('gsv_dotProduct: gridStateVector_inout not yet allocated')
5312    end if
5313
5314    lon1 = stateVector_a%myLonBeg
5315    lon2 = stateVector_a%myLonEnd
5316    lat1 = stateVector_a%myLatBeg
5317    lat2 = stateVector_a%myLatEnd
5318    k1 = stateVector_a%mykBeg
5319    k2 = stateVector_a%mykEnd
5320
5321    dotsum = 0.0D0
5322    do jstep = 1, stateVector_a%numStep
5323      do jlev = k1,k2
5324        do jlat = lat1, lat2
5325          do jlon = lon1, lon2
5326            dotsum = dotsum + stateVector_a%gd_r8(jlon,jlat,jlev,jstep) * &
5327                              stateVector_b%gd_r8(jlon,jlat,jlev,jstep)
5328          end do 
5329        end do
5330      end do
5331    end do
5332
5333    call mmpi_allreduce_sumreal8scalar(dotsum,'grid')
5334
5335  end subroutine gsv_dotProduct
5336
5337  !--------------------------------------------------------------------------
5338  ! gsv_field3d_hbilin
5339  !--------------------------------------------------------------------------
5340  subroutine gsv_field3d_hbilin(field,nlong,nlat,nlev,xlong,xlat,vlev, &
5341                                fieldout,nlongout,nlatout,nlevout,xlongout, &
5342                                xlatout,vlevout)
5343    !
5344    !:Purpose: Horizontal bilinear interpolation from a 3D regular gridded field
5345    !           to another 3D regular gridded field.
5346    !
5347    !           This version can be used with fields that are not part of the
5348    !           background state, such as climatologies.
5349    !
5350    !           This version does not depend on gridstatevector data
5351    !           types/structures.
5352    !
5353    implicit none
5354
5355    ! Arguments:
5356    real(8), intent(in)  :: field(nlong,nlat,nlev) ! 3D field
5357    integer, intent(in)  :: nlong ! number of latitudes
5358    integer, intent(in)  :: nlat  ! number of longitudes
5359    integer, intent(in)  :: nlev  ! number of vertical levels
5360    real(8), intent(in)  :: xlong(nlong) ! longitudes (radians)
5361    real(8), intent(in)  :: xlat(nlat)   ! latitudes (radians)
5362    real(8), intent(in)  :: vlev(nlev)   ! vertical levels of input field (in pressure)
5363    real(8), intent(out) :: fieldout(nlongout,nlatout,nlevout) ! 3D field
5364    integer, intent(in)  :: nlongout ! number or latitudes
5365    integer, intent(in)  :: nlatout  ! number of target longitudes
5366    integer, intent(in)  :: nlevout  ! Number of target vertical levels
5367    real(8), intent(in)  :: xlongout(nlongout) ! target longitudes (radians) 
5368    real(8), intent(in)  :: xlatout(nlatout)   ! target of target latitudes (radians)
5369    real(8), intent(in)  :: vlevout(nlevout)   ! Target vertical levels (in pressure)
5370
5371    ! Locals:
5372    real(8) :: lnvlev(nlev),lnvlevout(nlevout),plong2
5373    integer :: ilev,ilon,ilat,ilatp1,i,j,ilongout,ilatout
5374    logical :: same_vlev
5375    real(8) :: DLDX, DLDY, DLDP, DLW1, DLW2, DLW3, DLW4
5376
5377    ! Check if vertical interpolation needed
5378    if (nlev /= nlevout) then
5379      same_vlev = .false.
5380    else
5381      if (any(abs(vlev-vlevout) > 0.01*vlev)) then
5382        same_vlev = .false.
5383      else
5384        same_vlev = .true.
5385      end if
5386    end if 
5387    
5388    ! Find near lat/long grid points
5389    
5390    do ilongout = 1, nlongout
5391
5392      if (nlongout > 1) then    
5393        plong2 = xlongout(ilongout)
5394        if (plong2 < 0.0) plong2 = 2.D0*mpc_pi_r8 + plong2
5395        do ilon = 2,nlong
5396          if  (xlong(ilon-1) < xlong(ilon)) then
5397            if (plong2 >= xlong(ilon-1) .and. plong2 <= xlong(ilon)) exit
5398          else 
5399            ! Assumes this is a transition between 360 to 0 (if it exists). Skip over.
5400          end if
5401        end do
5402        ilon = ilon-1
5403      else
5404        ilon = 1
5405      end if
5406      
5407      do ilatout = 1, nlatout
5408         
5409        do ilat = 2, nlat
5410          if (xlatout(ilatout) <= xlat(ilat)) exit
5411        end do
5412        ilat = min(ilat-1,nlat)
5413        ilatp1 = min(ilat+1,nlat)
5414    
5415        ! Set lat/long interpolation weights
5416    
5417        if (nlongout > 1) then
5418          DLDX = (xlongout(ilongout) - xlong(ilon))/(xlong(ilon+1)-xlong(ilon))
5419          if (ilat < nlat) then
5420            DLDY = (xlatout(ilatout) - xlat(ilat))/(xlat(ilat+1)-xlat(ilat))
5421          else
5422            DLDY = (xlatout(ilatout) - xlat(ilat))/(xlat(ilat)-xlat(ilat-1))
5423          end if
5424
5425          DLW1 = (1.d0-DLDX) * (1.d0-DLDY)
5426          DLW2 =       DLDX  * (1.d0-DLDY)
5427          DLW3 = (1.d0-DLDX) *       DLDY
5428          DLW4 =       DLDX  *       DLDY
5429        else
5430          if (ilat < nlat) then
5431            DLDY = (xlatout(ilatout) - xlat(ilat))/(xlat(ilat+1)-xlat(ilat))
5432          else
5433            DLDY = (xlatout(ilatout) - xlat(ilat))/(xlat(ilat)-xlat(ilat-1))
5434          end if
5435
5436          DLW1 = (1.d0-DLDY)
5437          DLW3 = DLDY        
5438        end if
5439        
5440        ! Set vertical interpolation weights (assumes pressure vertical coordinate)
5441    
5442        if (.not.same_vlev) then
5443          lnvlevout(:) = log(vlevout(:))    
5444          lnvlev(:) = log(vlev(:))    
5445          
5446          ilev = 1
5447          do i = 1, nlevout
5448            do j = ilev, nlev          
5449               if (lnvlevout(i) < lnvlev(j)) exit    ! assumes both lnvlevout and lnvlev increase with increasing index value
5450            end do
5451            ilev = j-1
5452            if (ilev < 1) then
5453               ilev = 1
5454            else if (ilev >= nlev) then
5455               ilev = nlev-1
5456            end if
5457       
5458            DLDP = (lnvlev(ilev+1)-lnvlevout(i))/(lnvlev(ilev+1)-lnvlev(ilev))
5459            
5460            fieldout(ilongout,ilatout,i) = DLDP* (DLW1 * field(ilon,ilat,ilev) &
5461                           + DLW2 * field(ilon+1,ilat,ilev) &
5462                           + DLW3 * field(ilon,ilatp1,ilev) &
5463                           + DLW4 * field(ilon+1,ilatp1,ilev)) &
5464             + (1.d0-DLDP)* (DLW1 * field(ilon,ilat,ilev+1) &
5465                           + DLW2 * field(ilon+1,ilat,ilev+1) &
5466                           + DLW3 * field(ilon,ilatp1,ilev+1) &
5467                           + DLW4 * field(ilon+1,ilatp1,ilev+1))                               
5468          end do
5469        else if (nlongout > 1) then
5470          do ilev = 1, nlevout           
5471            fieldout(ilongout,ilatout,ilev) = DLW1 * field(ilon,ilat,ilev) &
5472                           + DLW2 * field(ilon+1,ilat,ilev) &
5473                           + DLW3 * field(ilon,ilatp1,ilev) &
5474                           + DLW4 * field(ilon+1,ilatp1,ilev)
5475          end do
5476        else 
5477          do ilev = 1, nlevout           
5478            fieldout(ilongout,ilatout,ilev) = DLW1 * field(ilon,ilat,ilev) &
5479                           + DLW3 * field(ilon,ilatp1,ilev) 
5480          end do
5481        end if 
5482      end do
5483    end do
5484        
5485  end subroutine gsv_field3d_hbilin   
5486
5487  !--------------------------------------------------------------------------
5488  ! gsv_smoothHorizontal
5489  !--------------------------------------------------------------------------
5490  subroutine gsv_smoothHorizontal(stateVector_inout, horizontalScale, maskNegatives_opt, &
5491       varName_opt, binInteger_opt, binReal_opt, binRealThreshold_opt)
5492    !
5493    !:Purpose: To apply a horizontal smoothing to all of the fields according
5494    !           to the specified horizontal length scale
5495    !
5496    implicit none
5497
5498    ! Arguments:
5499    type(struct_gsv), target,   intent(inout) :: stateVector_inout
5500    real(8),                    intent(in)    :: horizontalScale
5501    logical,          optional, intent(in)    :: maskNegatives_opt
5502    character(len=*), optional, intent(in)    :: varName_opt
5503    real(4), pointer, optional, intent(in)    :: binInteger_opt(:,:,:)
5504    real(8), pointer, optional, intent(in)    :: binReal_opt(:,:)
5505    real(8),          optional, intent(in)    :: binRealThreshold_opt
5506
5507    ! Locals:
5508    type(struct_gsv), pointer :: stateVector
5509    type(struct_gsv), target  :: stateVector_varsLevs
5510    integer :: latIndex, lonIndex, stepIndex, kIndex
5511    integer :: latIndex2, lonIndex2, maxDeltaIndex, count
5512    integer :: latBeg, latEnd, lonBeg, lonEnd
5513    integer :: myBinInteger
5514    real(8), allocatable :: smoothedField(:,:)
5515    real(8) :: lat1_r8, lon1_r8, lat2_r8, lon2_r8, distance
5516    real(8) :: binRealThreshold, myBinReal
5517    real(4), pointer :: binInteger(:,:,:)
5518    real(8), pointer :: binReal(:,:)
5519    logical :: maskNegatives, binIntegerTest, binRealTest
5520    
5521    call utl_tmg_start(169, 'low-level--gsv_smoothHorizontal')
5522
5523    if (horizontalScale <= 0.0d0) then
5524      call msg('gsv_smoothHorizontal', 'specified scale <= 0, returning')
5525      return
5526    end if
5527
5528    if (present(binInteger_opt)) then
5529      binIntegerTest= .true.
5530      binInteger => binInteger_opt
5531    else
5532      binIntegerTest = .false.
5533    end if
5534
5535    if (present(binReal_opt)) then
5536      binRealTest= .true.
5537      binReal => binReal_opt
5538      if (present(binRealThreshold_opt)) then
5539        binRealThreshold = binRealThreshold_opt
5540      else
5541        call utl_abort('gsv_smoothHorizontal: a binRealThreshold_opt value must be provided with binReal_opt')
5542      end if
5543    else
5544      binRealTest = .false.
5545    end if
5546
5547    if (stateVector_inout%mpi_distribution /= 'VarsLevs' .and. stateVector_inout%mpi_local) then
5548    
5549      call gsv_allocate(statevector_varsLevs, statevector_inout%numStep, statevector_inout%hco, &
5550                        statevector_inout%vco, dataKind_opt=statevector_inout%dataKind,         &
5551                        mpi_local_opt=.true., mpi_distribution_opt='VarsLevs', &
5552                        allocHeight_opt=.false., allocPressure_opt=.false.)
5553      call gsv_transposeTilesToVarsLevs(statevector_inout, statevector_varsLevs)
5554      stateVector => stateVector_varsLevs
5555      
5556    else
5557    
5558      stateVector => stateVector_inout
5559      
5560    end if
5561
5562    if (present(maskNegatives_opt)) then
5563      maskNegatives = maskNegatives_opt
5564    else
5565      maskNegatives = .false.
5566    end if
5567
5568    allocate(smoothedField(stateVector%ni,stateVector%nj))
5569
5570    ! figure out the maximum possible number of grid points to search
5571    if (stateVector%hco%dlat > 0.0d0) then
5572      maxDeltaIndex = ceiling(1.5d0 * horizontalScale / (ec_ra * max(stateVector%hco%dlat,stateVector%hco%dlon)))
5573      write(*,*) 'gsv_smoothHorizontal: maxDistance, maxDeltaIndex = ', horizontalScale / 1000.0d0, 'km', &
5574           maxDeltaIndex, max(stateVector%hco%dlat,stateVector%hco%dlon)
5575    else if(stateVector%hco%dlat == 0.0d0) then
5576      maxDeltaIndex = ceiling(1.5d0 * horizontalScale / statevector%hco%minGridSpacing)
5577      write(*,*) 'gsv_smoothHorizontal: maxDistance: ', horizontalScale / 1000.0d0, ' km,', &
5578           ' maxDeltaIndex: ', maxDeltaIndex, ', minGridSpacing: ', statevector%hco%minGridSpacing
5579    else
5580      call utl_abort('gsv_smoothHorizontal: cannot compute a value for maxDeltaIndex')
5581    end if
5582
5583    ! apply a simple footprint operator type of averaging within specified radius
5584    do stepIndex = 1, stateVector%numStep
5585      do kIndex = stateVector%mykBeg, stateVector%mykEnd
5586      
5587        if (present(varName_opt)) then
5588          if (gsv_getVarNameFromK(stateVector,kIndex) /= trim(varName_opt)) cycle
5589        end if
5590	
5591        smoothedField(:,:) = 0.0d0
5592        do latIndex = 1, stateVector%nj
5593          do lonIndex = 1, stateVector%ni
5594	  
5595            lat1_r8 = stateVector%hco%lat2d_4(lonIndex,latIndex)
5596            lon1_r8 = stateVector%hco%lon2d_4(lonIndex,latIndex)
5597            count = 0
5598            latBeg = max(1, latIndex - maxDeltaIndex)
5599            latEnd = min(stateVector%nj, latIndex + maxDeltaIndex)
5600            lonBeg = max(1, lonIndex - maxDeltaIndex)
5601            lonEnd = min(stateVector%ni, lonIndex + maxDeltaIndex)
5602            if (binIntegerTest) then
5603              myBinInteger = int(binInteger(lonIndex,latIndex,1)) 
5604            end if
5605            
5606	    if (binRealTest) then
5607              myBinReal = binReal(lonIndex,latIndex)
5608            end if
5609	    
5610            do latIndex2 = latBeg, latEnd
5611              do lonIndex2 = lonBeg, lonEnd
5612
5613                ! skip negative value if it should be masked
5614                if (maskNegatives .and. stateVector%dataKind == 8) then
5615                  if (stateVector%gd_r8(lonIndex2,latIndex2,kIndex,stepIndex) < 0.0d0) cycle
5616                else if (maskNegatives .and. statevector%dataKind == 4) then
5617                  if (stateVector%gd_r4(lonIndex2,latIndex2,kIndex,stepIndex) < 0.0) cycle
5618                end if
5619
5620                ! skip value if it is beyond the specified distance
5621                lat2_r8 = stateVector%hco%lat2d_4(lonIndex2,latIndex2)
5622                lon2_r8 = stateVector%hco%lon2d_4(lonIndex2,latIndex2)
5623                distance = phf_calcDistanceFast(lat2_r8, lon2_r8, lat1_r8, lon1_r8)
5624                if (distance > horizontalScale) cycle
5625
5626                ! skip value if it lie in a different bin
5627                if (binIntegerTest) then
5628                  if (int(binInteger(lonIndex2,latIndex2,1)) /=  myBinInteger .or. & 
5629                      int(binInteger(lonIndex2,latIndex2,1)) == -1) cycle
5630                end if
5631		
5632                if (binRealTest) then
5633                  if (abs(binReal(lonIndex2,latIndex2) - myBinReal) > binRealThreshold) cycle
5634                end if
5635
5636                count = count + 1
5637                if (stateVector%dataKind == 8) then
5638                  smoothedField(lonIndex,latIndex) = smoothedField(lonIndex,latIndex) + &
5639                                                     stateVector%gd_r8(lonIndex2,latIndex2,kIndex,stepIndex)
5640                else
5641                  smoothedField(lonIndex,latIndex) = smoothedField(lonIndex,latIndex) + &
5642                                                     real(stateVector%gd_r4(lonIndex2,latIndex2,kIndex,stepIndex),8)
5643                end if
5644            
5645	      end do
5646            end do
5647            
5648	    if (count > 0) then
5649              smoothedField(lonIndex,latIndex) = smoothedField(lonIndex,latIndex) / real(count,8)
5650            else
5651              if (maskNegatives) smoothedField(lonIndex,latIndex) = mpc_missingValue_r8
5652            end if
5653	    
5654          end do
5655        end do
5656	
5657        if (stateVector%dataKind == 8) then
5658          stateVector%gd_r8(:,:,kIndex,stepIndex) = smoothedField(:,:)
5659        else
5660          stateVector%gd_r4(:,:,kIndex,stepIndex) = real(smoothedField(:,:),4)
5661        end if
5662      end do
5663    end do
5664
5665    deallocate(smoothedField)
5666
5667    if (stateVector_inout%mpi_distribution /= 'VarsLevs' .and. stateVector_inout%mpi_local) then
5668      call gsv_transposeVarsLevsToTiles(statevector_varsLevs, statevector_inout)
5669      call gsv_deallocate(statevector_varsLevs)
5670    end if
5671    
5672    call utl_tmg_stop(169)
5673
5674  end subroutine gsv_smoothHorizontal
5675
5676  !--------------------------------------------------------------------------
5677  ! gsv_getInfo
5678  !--------------------------------------------------------------------------
5679  ! DBGmad : TODO use a `gsv_str()` string representation?
5680  subroutine gsv_getInfo(stateVector, message)
5681    !
5682    !:Purpose: Writes out grid state vector parameters
5683    !
5684    implicit none
5685
5686    ! Arguments:
5687    type(struct_gsv), intent(in) :: stateVector
5688    character(len=*), intent(in) :: message
5689
5690    write(*,*)
5691    write(*,*) message
5692    write(*,*) '------------------- START -------------------'
5693    write(*,*) 'heightSfcPresent = ',stateVector%heightSfcPresent
5694    write(*,*) 'UVComponentPresent = ',stateVector%UVComponentPresent
5695    write(*,*) 'extraUVallocated = ',stateVector%extraUVallocated
5696    write(*,*) 'myUVkBeg = ',stateVector%myUVkBeg
5697    write(*,*) 'myUVkEnd = ',stateVector%myUVkEnd
5698    write(*,*) 'myUVkCount = ',stateVector%myUVkCount
5699    write(*,*) 'dataKind = ',stateVector%dataKind
5700    write(*,*) 'ni = ',stateVector%ni
5701    write(*,*) 'nj = ',stateVector%nj
5702    write(*,*) 'nk = ',stateVector%nk
5703    write(*,*) 'numStep = ',stateVector%numStep
5704    write(*,*) 'anltime = ',stateVector%anltime
5705    write(*,*) 'latPerPE = ',stateVector%latPerPE
5706    write(*,*) 'latPerPEmax = ',stateVector%latPerPEmax
5707    write(*,*) 'myLatBeg = ',stateVector%myLatBeg
5708    write(*,*) 'myLatEnd = ',stateVector%myLatEnd
5709    write(*,*) 'lonPerPE = ',stateVector%lonPerPE
5710    write(*,*) 'lonPerPEmax = ',stateVector%lonPerPEmax
5711    write(*,*) 'myLonBeg = ',stateVector%myLonBeg
5712    write(*,*) 'myLonEnd = ',stateVector%myLonEnd
5713    write(*,*) 'mykCount = ',stateVector%mykCount
5714    write(*,*) 'mykBeg = ',stateVector%mykBeg
5715    write(*,*) 'mykEnd = ',stateVector%mykEnd
5716    if (associated(stateVector%allLatBeg)) write(*,*) 'allLatBeg = ',stateVector%allLatBeg
5717    if (associated(stateVector%allLatEnd)) write(*,*) 'allLatEnd = ',stateVector%allLatEnd
5718    if (associated(stateVector%allLatPerPE)) write(*,*) 'allLatPerPE = ',stateVector%allLatPerPE
5719    if (associated(stateVector%allLonBeg)) write(*,*) 'allLonBeg = ',stateVector%allLonBeg
5720    if (associated(stateVector%allLonEnd)) write(*,*) 'allLonEnd = ',stateVector%allLonEnd
5721    if (associated(stateVector%allLonPerPE)) write(*,*) 'allLonPerPE = ',stateVector%allLonPerPE
5722    if (associated(stateVector%allkCount)) write(*,*) 'allkCount = ',stateVector%allkCount
5723    if (associated(stateVector%allkBeg)) write(*,*) 'allkBeg = ',stateVector%allkBeg
5724    if (associated(stateVector%allkEnd)) write(*,*) 'allkEnd = ',stateVector%mykEnd
5725    if (associated(stateVector%allUVkCount)) write(*,*) 'allUVkCount = ',stateVector%allUVkCount
5726    if (associated(stateVector%allUVkBeg)) write(*,*) 'allUVkBeg = ',stateVector%allUVkBeg
5727    if (associated(stateVector%allUVkEnd)) write(*,*) 'allUVkEnd = ',stateVector%myUVkEnd
5728    if (associated(stateVector%dateStampList)) write(*,*) 'dateStampList = ',stateVector%dateStampList
5729    if (associated(stateVector%dateStamp3d)) write(*,*) 'dateStamp3d = ',stateVector%dateStamp3d
5730    if (associated(stateVector%dateOriginList)) write(*,*) 'dateOriginList = ',stateVector%dateOriginList
5731    if (associated(stateVector%npasList)) write(*,*) 'npasList = ',stateVector%npasList
5732    if (associated(stateVector%ip2List)) write(*,*) 'ip2List = ',stateVector%ip2List
5733    write(*,*) 'deet = ',stateVector%deet
5734    write(*,*) 'etiket = ',stateVector%etiket
5735    write(*,*) 'allocated = ',stateVector%allocated
5736    if (associated(stateVector%varOffset)) write(*,*) 'varOffset = ',stateVector%varOffset
5737    if (associated(stateVector%varNumLev)) write(*,*) 'varNumLev = ',stateVector%varNumLev
5738    write(*,*) 'mpi_local = ',stateVector%mpi_local
5739    write(*,*) 'mpi_distribution = ',stateVector%mpi_distribution
5740    write(*,*) 'horizSubSample = ',stateVector%horizSubSample
5741    write(*,*) 'varExistList = ',stateVector%varExistList
5742    write(*,*) 'hInterpolateDegree = ',stateVector%hInterpolateDegree
5743    write(*,*) 'hExtrapolateDegree = ',stateVector%hExtrapolateDegree
5744    write(*,*) 'addHeightSfcOffset = ',stateVector%addHeightSfcOffset
5745    write(*,*) '-------------------- END --------------------'
5746
5747  end subroutine gsv_getInfo
5748
5749  !--------------------------------------------------------------------------
5750  ! gsv_applyMaskLAM
5751  !--------------------------------------------------------------------------
5752  subroutine gsv_applyMaskLAM(statevector_inout, maskLAM)
5753    !
5754    !:Purpose: To apply a mask to a state vector for LAM grid
5755    !
5756    implicit none
5757
5758    ! Arguments:
5759    type(struct_gsv), intent(inout)  :: statevector_inout
5760    type(struct_gsv), intent(in)     :: maskLAM
5761
5762    ! Locals:
5763    real(4), pointer :: increment_r4(:,:,:,:)
5764    real(8), pointer :: increment_r8(:,:,:,:)
5765    real(pre_incrReal), pointer :: analIncMask(:,:,:)
5766    integer :: latIndex, kIndex, lonIndex, stepIndex
5767
5768    call msg('gsv_applyMaskLAM','START', verb_opt=2)
5769    
5770    call gsv_getField(maskLAM,analIncMask)
5771
5772    if (statevector_inout%dataKind == 4) then
5773      ! apply mask using increment_r4
5774      call gsv_getField(statevector_inout,increment_r4)
5775      do stepIndex = 1, statevector_inout%numStep
5776        !$OMP PARALLEL DO PRIVATE (latIndex,kIndex,lonIndex)
5777        do kIndex = 1, statevector_inout%nk
5778          do latIndex =  statevector_inout%myLatBeg,  statevector_inout%myLatEnd
5779            do lonIndex =  statevector_inout%myLonBeg,  statevector_inout%myLonEnd
5780              increment_r4(lonIndex,latIndex,kIndex,stepIndex) =      &
5781                   increment_r4(lonIndex,latIndex,kIndex,stepIndex) * &
5782                   analIncMask(lonIndex,latIndex,1)
5783            end do
5784          end do
5785        end do
5786        !$OMP END PARALLEL DO
5787      end do
5788    else
5789      ! apply mask using increment_r8
5790      call gsv_getField(statevector_inout,increment_r8)
5791      do stepIndex = 1, statevector_inout%numStep
5792        !$OMP PARALLEL DO PRIVATE (latIndex,kIndex,lonIndex)
5793        do kIndex = 1, statevector_inout%nk
5794          do latIndex =  statevector_inout%myLatBeg,  statevector_inout%myLatEnd
5795            do lonIndex =  statevector_inout%myLonBeg,  statevector_inout%myLonEnd
5796              increment_r8(lonIndex,latIndex,kIndex,stepIndex) =      &
5797                   increment_r8(lonIndex,latIndex,kIndex,stepIndex) * &
5798                   analIncMask(lonIndex,latIndex,1)
5799            end do
5800          end do
5801        end do
5802        !$OMP END PARALLEL DO
5803      end do
5804    end if
5805
5806    call msg('gsv_applyMaskLAM','END', verb_opt=2)
5807
5808  end subroutine gsv_applyMaskLAM
5809
5810  !--------------------------------------------------------------------------
5811  ! gsv_containsNonZeroValues
5812  !--------------------------------------------------------------------------
5813  function gsv_containsNonZeroValues(stateVector) result(stateVectorHasNonZeroValue)
5814    !
5815    !:Purpose: To check if stateVector has any non-zero value.
5816    !
5817    implicit none
5818
5819    ! Arguments:
5820    type(struct_gsv), intent(in) :: stateVector
5821    ! Result:
5822    logical                      :: stateVectorHasNonZeroValue
5823
5824    ! Locals:
5825    real(4), pointer             :: field_r4_ptr(:,:,:,:)
5826    real(8), pointer             :: field_r8_ptr(:,:,:,:)
5827    logical                      :: allZero, allZero_mpiglobal
5828    integer                      :: ierr
5829
5830    if (.not. stateVector%allocated) then
5831      stateVectorHasNonZeroValue = .false.
5832      return
5833    end if
5834
5835    if (stateVector%dataKind == 4) then
5836      call gsv_getField(stateVector,field_r4_ptr)
5837      allZero = (maxval(abs(field_r4_ptr(:,:,:,:))) == 0.0)
5838    else if (stateVector%dataKind == 8) then
5839      call gsv_getField(stateVector,field_r8_ptr)
5840      allZero = (maxval(abs(field_r8_ptr(:,:,:,:))) == 0.0D0)
5841    end if
5842
5843    call rpn_comm_allReduce(allZero,allZero_mpiglobal,1,'mpi_logical','mpi_land','GRID',ierr)
5844    stateVectorHasNonZeroValue = .not. allZero_mpiglobal
5845
5846  end function gsv_containsNonZeroValues
5847
5848end module gridStateVector_mod