obsSubSpaceData_mod sourceΒΆ

   1module obsSubSpaceData_mod
   2  ! MODULE obsSubSpaceData_mod (prefix='oss' category='6. High-level data objects')
   3  !
   4  !:Purpose: Repository of obs space structures, arrays, and routines specific
   5  !          to obs data pertinent to subspaces of the overall ObsSpaceData.
   6  !          Most tools are independent of ObsSpaceData and can be used by
   7  !          themselves for the users' application(s).
   8  !
   9
  10
  11  ! Public routines:
  12  !
  13  !       - "oss_obsdata_get_*" to get arrays or elements from input 
  14  !         'struc_oss_obsdata' structure.
  15  !         Requires companion availabity of "oss_obsdata_alloc" and "oss_obsdata_dealloc"           
  16  ! 
  17  !       - "oss_obsdata_add_data1d" to add data value(s) to obsdata%data1d 
  18  !         with associated identifier code.
  19  !
  20  !       - "oss_obsdata_MPIallgather" gathers previously saved obsdata 
  21  !         from all processors.
  22  !
  23  !       - "oss_obsdata_code_len" to pass on oss_code_len value
  24  !
  25  !       - "oss_comboIdList" to provide list of fixed or accumulated stnid, (stnid,varno) 
  26  !          or (stnid,varno,multi/uni-level) combinations to be used in searches.
  27  !
  28  !       - "oss_get_comboIdList" uses the subroutine oss_comboIdlist to compile a unique list of stnid,  
  29  !          (stnid,varno) or (stnid,varno,multi/uni-level) combinations to be used in searches.
  30  !
  31
  32  use codePrecision_mod
  33  use utilities_mod    
  34  use MathPhysConstants_mod
  35  use midasMpi_mod
  36  use bufr_mod
  37  use obsSpaceData_mod  ! for use in oss_get_comboIdList 
  38   
  39  implicit none
  40  private
  41
  42! public procedures
  43! -----------------
  44
  45  public :: oss_obsdata_get_data1d,oss_obsdata_add_data1d,oss_obsdata_alloc,oss_obsdata_dealloc
  46  public :: oss_obsdata_get_element,oss_obsdata_get_array1d,oss_obsdata_get_array2d
  47  public :: oss_obsdata_get_header_code,oss_obsdata_MPIallgather
  48  public :: oss_obsdata_code_len, oss_comboIdList, oss_get_comboIdList
  49  
  50! public types
  51! ------------
  52
  53  public :: struct_oss_obsdata
  54  
  55! module constants
  56! -----------------
  57
  58  integer, parameter :: oss_code_len=40            ! Max string size for code in struct_oss_obsdata.
  59                                                   ! Minimum required size:
  60                                                   ! 22 (lat/long and time coord) + 9 (stnid) = 31
  61  integer, parameter :: oss_code_sublen=22         ! Length of lat/long and time coord
  62  integer, parameter :: oss_code_latlen=5          ! Length of lat segment 
  63
  64! interface for generating obsdata BURP header codes from (lat,long,date,hhmm,stnid)
  65  interface oss_obsdata_get_header_code
  66     module procedure obsdata_get_header_code_i
  67     module procedure obsdata_get_header_code_r
  68  end interface oss_obsdata_get_header_code
  69
  70! module structures
  71! -----------------
  72
  73  type :: struct_oss_obsdata
  74
  75     !  Structure storing information associated to observations such
  76     !  as BURP file reports (irep) either retrieved from BURP files 
  77     !  themselves or from other sources.  
  78     !     
  79     !  Variable               Description
  80     !  --------               -----------
  81     !  ndim                   number of dimensions of the data arrays
  82     !                         (i.e. ndim=1 for std and ndim=2 for averaging kernels)
  83     !  nrep                   number of reports or observations
  84     !  dim1                   first array dimension for each  report/observation
  85     !  dim2                   second array dimension for each  report/observation (only relevant for ndim=2)
  86     !  data1d                 obs data of dimension (dim1,nrep)
  87     !  data2d                 obs data of dimension (dim1,dim2,nrep)
  88     !  irep                   current report position
  89     !  code                   unique character string for identifying the report/observation
  90     !
  91     !  Follow-up to be made: modify dim1 and dim2 as pointer arrays dependent on irep
  92     !
  93     
  94     real(8), pointer :: data1d(:,:),data2d(:,:,:)
  95     character(len=oss_code_len), pointer :: code(:)
  96     integer :: ndim,nrep,dim1,dim2,irep
  97  
  98  end type struct_oss_obsdata
  99
 100contains
 101
 102
 103  subroutine oss_obsdata_alloc(obsdata,nrep,dim1,dim2_opt)
 104    !
 105    !:Purpose: Allocates memory for structure struct_oss_obsdata to hold obs data file information.
 106    !           If dim2 is specified, then the data array associated with each observation/report
 107    !           will be 2D array. will be a 1D array if dim2 is not specified.
 108    ! 
 109    !:Arguments:
 110    !      :obsdata: data structure to allocate
 111    !      :nrep: max number of associated observations/reports for data 
 112    !      :dim1: first dimension length of the array associated to each observation/report
 113    !      :dim2: second dimension length of the array associated to each observation/report (optional)
 114    !
 115    implicit none
 116
 117    ! Arguments:
 118    type(struct_oss_obsdata), intent(inout) :: obsdata
 119    integer                 , intent(in)    :: nrep
 120    integer                 , intent(in)    :: dim1 
 121    integer       , optional, intent(in)    :: dim2_opt
 122    
 123    obsdata%nrep = nrep
 124    obsdata%dim1 = dim1
 125
 126    if (present(dim2_opt)) then
 127       obsdata%dim2 = dim2_opt
 128       obsdata%ndim = 2
 129       allocate(obsdata%data2d(dim1,dim2_opt,nrep))
 130       obsdata%data2d(:,:,:) = 0.0D0
 131       obsdata%data1d => null()
 132    else
 133       obsdata%dim2 = 0
 134       obsdata%ndim = 1
 135       allocate(obsdata%data1d(dim1,nrep))
 136       obsdata%data1d(:,:) = 0.0D0
 137       obsdata%data2d => null()
 138    end if
 139
 140    ! code is a character string assigned to each observation/report to uniquely identify it
 141    allocate(obsdata%code(nrep))
 142    
 143    ! obsdata%irep is a counter used to keep tract of position in the
 144    ! data arrays when extracting values via oss_obsdata_get_array*.
 145    ! The value is initialized to one, pointing to the very first element.
 146    obsdata%irep = 1
 147
 148  end subroutine oss_obsdata_alloc
 149
 150
 151  subroutine oss_obsdata_dealloc(obsdata)
 152    !
 153    !:Purpose: Deallocates memory for structure struct_oss_obsdata
 154    !
 155    implicit none
 156
 157    ! Arguments:
 158    type(struct_oss_obsdata), intent(inout) :: obsdata
 159    
 160    if (associated(obsdata%data1d)) deallocate(obsdata%data1d)
 161    if (associated(obsdata%data2d)) deallocate(obsdata%data2d)
 162    if (associated(obsdata%code))   deallocate(obsdata%code)
 163
 164  end subroutine oss_obsdata_dealloc
 165
 166
 167  function oss_obsdata_get_element( obsdata, code, idim1, stat_opt ) result(element)
 168    ! 
 169    !:Purpose: Returns element of array from obsdata 1D data array. The returned element is the
 170    !           one with the specified identifying code.
 171    !
 172    !:Arguments: 
 173    !           :obsdata: struct_oss_obsdata instance
 174    !           :code:    unique identifying code
 175    !           :idim1:   position of element in the dim1 axis
 176    !           :element: retrieved element from obsdata%data1d
 177    !           :stat:    status code
 178    !
 179    implicit none
 180
 181    ! Arguments:
 182    type(struct_oss_obsdata), intent(inout) :: obsdata
 183    character(len=*)        , intent(in)    :: code
 184    integer                 , intent(in)    :: idim1
 185    integer       , optional, intent(out)   :: stat_opt
 186    ! Result:
 187    real(8) :: element
 188    
 189    ! find obsdata%irep for current observation
 190    call obsdata_set_index(obsdata,code,stat_opt=stat_opt)
 191    if (present(stat_opt)) then
 192       if (stat_opt.ne.0) then
 193          element = 0.
 194          return
 195       end if
 196    end if
 197
 198    ! get element from data array at current position
 199    element = obsdata%data1d(idim1,obsdata%irep)
 200    
 201    ! increment position in data array
 202    if (obsdata%irep.eq.obsdata%nrep) then
 203       obsdata%irep = 1
 204    else
 205       obsdata%irep=obsdata%irep+1
 206    end if
 207
 208  end function oss_obsdata_get_element
 209
 210
 211  function oss_obsdata_get_array1d(obsdata,code,stat_opt) result(array)
 212    ! 
 213    !:Purpose: Returns 1D data array from obsdata. The returned array is the one with the specified
 214    !           identifying code.
 215    !:Arguments:
 216    !           :obsdata: struct_oss_obsdata instance
 217    !           :code:    unique identifying code
 218    !           :array:   retrieved array from obsdata%data1d of dimension obsdata%dim1
 219    !           :stat:    search success (0 - found; 1 = no data; 2 = not found)
 220    !
 221    implicit none
 222
 223    ! Arguments:
 224    type(struct_oss_obsdata), intent(inout) :: obsdata
 225    character(len=*)        , intent(in)    :: code
 226    integer       , optional, intent(out)   :: stat_opt
 227    ! Result:
 228    real(8) :: array(obsdata%dim1)
 229    
 230    ! find obsdata%irep for current observation
 231    call obsdata_set_index(obsdata,code,stat_opt=stat_opt)
 232    if ( present( stat_opt ) ) then
 233      if ( stat_opt /= 0 ) then
 234        array(:) = 0.
 235        return
 236      end if
 237    end if
 238    
 239    ! get element from data array at current position
 240    array = obsdata%data1d(:,obsdata%irep)
 241    
 242    ! increment position in data array
 243    if (obsdata%irep.eq.obsdata%nrep) then
 244       obsdata%irep = 1
 245    else
 246       obsdata%irep=obsdata%irep+1
 247    end if
 248
 249  end function oss_obsdata_get_array1d
 250
 251
 252  function oss_obsdata_get_data1d( obsdata, lon, lat, date, time, stnid, stat_opt ) result(array)
 253    !
 254    !:Purpose: Extract 1D data array from structure according to input (lat,long,date,time,stnid)
 255    !
 256    !:Arguments:
 257    !           :obsdata: Structure from which data elements are to be found  
 258    !           :lon:     longitude real (radians)
 259    !           :lat:     latitude real (radians)
 260    !           :date:    date in YYYYMMDD
 261    !           :time:    time in HHMM
 262    !           :stnid:   station ID
 263    !           :array:   Identified 1D array
 264    !           :stat:    search success (0 - found; 1 = no data; 2 = not found)
 265    !    
 266    implicit none
 267
 268    ! Arguments:
 269    type(struct_oss_obsdata), intent(inout) :: obsdata
 270    real(8)                 , intent(in)    :: lon
 271    real(8)                 , intent(in)    :: lat
 272    integer                 , intent(in)    :: date
 273    integer                 , intent(in)    :: time
 274    character(len=*)        , intent(in)    :: stnid
 275    integer       , optional, intent(out)   :: stat_opt
 276    ! Result:
 277    real(8) :: array(obsdata%dim1)
 278
 279    ! Locals:
 280    character(len=oss_code_len) :: code
 281    
 282    ! Set desired identifier code
 283    code=oss_obsdata_get_header_code(lon,lat,date,time,stnid) 
 284  
 285    ! Get array corresponding to code.
 286    array=oss_obsdata_get_array1d(obsdata,code,stat_opt=stat_opt)
 287    
 288  end function oss_obsdata_get_data1d
 289
 290
 291  function oss_obsdata_get_array2d( obsdata, code, stat_opt ) result(array)
 292    ! 
 293    !:Purpose: Returns 2D data array from obsdata. The returned array is the one with the specified
 294    !           identifying code.
 295    !
 296    !:Arguments: 
 297    !           :obsdata: struct_oss_obsdata instance
 298    !           :code:    unique identifying code
 299    !           :array:   retrieved array from obsdata%data2d of dimension (obsdata%dim1,obsdata%dim2)
 300    !           :stat:    search success (0 - found; 1 = no data; 2 = not found)
 301    !
 302    implicit none
 303
 304    ! Arguments:
 305    type(struct_oss_obsdata), intent(inout) :: obsdata
 306    character(len=*)        , intent(in)    :: code
 307    integer       , optional, intent(out)   :: stat_opt
 308    ! Result:
 309    real(8) :: array(obsdata%dim1,obsdata%dim2)
 310    
 311    ! find obsdata%irep for current observation
 312    call obsdata_set_index(obsdata,code,stat_opt=stat_opt)
 313    if (present(stat_opt)) then
 314       if (stat_opt.ne.0) then
 315          array(:,:) = 0.
 316          return
 317       end if
 318    end if
 319    
 320    ! get elements from data array at current position
 321    array = obsdata%data2d(:,:,obsdata%irep)
 322    
 323    ! increment position in data array
 324    if (obsdata%irep.eq.obsdata%nrep) then
 325       obsdata%irep = 1
 326    else
 327       obsdata%irep=obsdata%irep+1
 328    end if
 329
 330  end function oss_obsdata_get_array2d
 331
 332
 333  subroutine obsdata_set_index( obsdata, code, stat_opt )
 334    ! 
 335    !:Purpose: Sets the position variable (irep) in struct_oss_obsdata to reference the record
 336    !           that matches the input identifying code.
 337    !
 338    !:Arguments: 
 339    !           :obsdata:      struct_oss_obsdata instance
 340    !           :index:        obs index
 341    !           :code:         code for comparison to those in obsdata
 342    !           :obsdata%irep: current index position for the observation/report
 343    !           :stat_opt:     status of call (optional)
 344    !                    :0: no errors
 345    !                    :1: no reports available
 346    !                    :2: report not found
 347    !
 348    !:Comments: If the optional argument stat_opt is provided and an error occurs, the error code will
 349    !            be returned and the abort will not be called to allow for error handling.
 350    !
 351    implicit none
 352
 353    ! Arguments:
 354    type(struct_oss_obsdata), intent(inout) :: obsdata
 355    character(len=*)        , intent(in)    :: code
 356    integer       , optional, intent(out)   :: stat_opt
 357
 358    ! Locals:
 359    integer :: i
 360    integer :: ref_lat
 361    
 362    if ( obsdata%nrep <= 0 ) then
 363      if ( present( stat_opt ) ) then
 364        stat_opt = 1
 365        return
 366      else
 367        call utl_abort("obsdata_set_index: No reports available. Check for consistency " // &
 368                       "between input BURP file and input NAMBURP_FILTER_*  namelist " // &
 369                       "(and input auxiliary file if part of CH family).")
 370      end if
 371    end if
 372
 373    i=0
 374    
 375    ! Search for matching identifier code
 376    do while (trim(obsdata%code(obsdata%irep)) /= trim(code))
 377       obsdata%irep=obsdata%irep+1
 378       if (obsdata%irep > obsdata%nrep) obsdata%irep=1
 379       if (i > obsdata%nrep) then
 380          if (len_trim(code) >= oss_code_sublen) then
 381       
 382             ! Assumes codes of the form "LAT--LON--YYYYMMDDHHMM*" when len(code)>=oss_code_sublen.
 383	  
 384             ! Upper loop search did not find a match. For valid data near the poles over
 385             ! the global analysis grid, the lat could have been moved to the nearest analysis grid
 386             ! latitude. The following is to account for this, assuming this is the only exception
 387             ! for points near the poles. 
 388             !
 389             ! This is done as a second search step so as not to slow down the normally sufficient 
 390             ! search performed above.
 391
 392             i=0
 393             read(code(1:oss_code_latlen),*) ref_lat
 394             
 395             ! Search for matching identifier code
 396             do while (.not.obsdata_extra_code_test(trim(obsdata%code(obsdata%irep)),code,ref_lat))
 397                obsdata%irep=obsdata%irep+1
 398                if (obsdata%irep > obsdata%nrep) obsdata%irep=1
 399                if (i > obsdata%nrep) exit
 400                i=i+1       
 401             end do
 402             if (i > obsdata%nrep) then
 403                if (present(stat_opt)) then
 404                   stat_opt = 2
 405                   return
 406                else
 407                   call utl_abort("obsdata_set_index: Obs index not found for nrep = " // trim(utl_str(obsdata%nrep)) // " and code = '" // code // "'")
 408                end if
 409             end if
 410          else
 411             if (present(stat_opt)) then
 412                stat_opt = 2
 413                return
 414             else
 415                call utl_abort("obsdata_set_index: Obs index not found for nrep = " // trim(utl_str(obsdata%nrep)) // " and code = '" // code // "'")
 416             end if
 417          end if
 418          exit
 419       end if 
 420       i=i+1       
 421    end do
 422         
 423    if (present(stat_opt)) stat_opt = 0
 424
 425  end subroutine obsdata_set_index
 426             
 427
 428  function obsdata_extra_code_test( test_code, ref_code, ref_lat ) result(found)
 429    ! 
 430    !:Purpose: Test matching of code values accounting for rare differences
 431    !           in stored lat (and lon) value(s) when codes are stored as strings in the form  
 432    !           LAT--LON--YYYYMMDDHHMM* (ie. with >= oss_code_sublen characters).
 433    !
 434    !:Caveat: The current version assumes the only source of difference would stem from
 435    !          a shift to the nearest latitude of the analysis grid from points near the pole.
 436    !          (this source of difference identified by M. Sitwell)
 437    !          Also currently assumes that most poleward analysis grid latitudes are within 1 degree
 438    !          away from a pole.
 439    !
 440    !:Arguments:
 441    !           :test_code: code for comparison to ref_code  
 442    !           :ref_lat:   latitude  (x100) part of reference code   
 443    !           :ref_code:  reference code
 444    !           :found:     logical indicating if a match has been found.  
 445    ! 
 446    implicit none
 447
 448    ! Arguments:
 449    character(len=*), intent(in) :: test_code
 450    character(len=*), intent(in) :: ref_code
 451    integer         , intent(in) :: ref_lat
 452    ! Result:
 453    logical :: found
 454
 455    ! Locals:
 456    integer, parameter :: lat_lim1=-8900    ! Lat*100
 457    integer, parameter :: lat_lim2=8900
 458    integer :: lat
 459          
 460    if (test_code(6:len_trim(test_code)) /= ref_code(oss_code_latlen+1:len_trim(ref_code))) then
 461      found=.false.
 462      return
 463    else 
 464      read(test_code(1:oss_code_latlen),*) lat
 465      if ((lat < lat_lim1 .and. ref_lat < lat_lim1 .and. lat < ref_lat ).or. &
 466          (lat > lat_lim2 .and. ref_lat > lat_lim2 .and. lat > ref_lat ) ) then
 467        found=.true.	     
 468        write(*,*) 'obsdata_extra_code_test: Accounted for lat. mismatch in codes near poles: ', &
 469              lat,ref_lat
 470      else
 471        found=.false.
 472      end if
 473    end if
 474
 475  end function obsdata_extra_code_test
 476    
 477
 478  function obsdata_get_header_code_i( ilon, ilat, date, time, stnid ) result(code)
 479    ! 
 480    !:Purpose: Generates a string code to identify an obervation by the header information in
 481    !           a BURP report. The BURP header information is saved as a string in the form  
 482    !           LAT--LON--YYYYMMDDHHMMSTNID----. Intention of this function is to be used for
 483    !           setting the unique identifier 'code' in struct_oss_obsdata. Can be called under
 484    !           the interface oss_obsdata_get_header_code.
 485    !
 486    !:Arguments:
 487    !           :ilon:  longitude integer
 488    !           :ilat:  latitude integer
 489    !           :date:  date in YYYYMMDD
 490    !           :time:  time in HHMM
 491    !           :stnid: station ID
 492    !           :code:  unique code
 493    ! 
 494    implicit none
 495
 496    ! Arguments:
 497    integer,          intent(in) :: ilon
 498    integer,          intent(in) :: ilat
 499    integer,          intent(in) :: date
 500    integer,          intent(in) :: time
 501    character(len=*), intent(in) :: stnid
 502    ! Result:
 503    character(len=oss_code_len) :: code
 504
 505    write(code(1:5),'(I5.5)') ilat
 506
 507    if (ilon > 0) then
 508      write(code(6:10),'(I5.5)') ilon
 509    else
 510      write(code(6:10),'(I5.5)') 36000 + ilon
 511    end if
 512    
 513    write(code(11:18),'(I8.8)') date
 514    write(code(19:22),'(I4.4)') time
 515
 516    write(code(23:oss_code_len),'(A)') stnid(1:min(len_trim(stnid),oss_code_len-oss_code_sublen))
 517
 518  end function obsdata_get_header_code_i
 519
 520
 521  function obsdata_get_header_code_r( lon, lat, date, time, stnid ) result(code)
 522    ! 
 523    !:Purpose: Generates a string code to identify an obervation by the header information in
 524    !           a BURP report. The BURP header information is saved as a string in the form  
 525    !           LAT--LON--YYYYMMDDHHMMSTNID----. Intention of this function is to be used for
 526    !           setting the unique identifier 'code' in struct_oss_obsdata. Can be called under
 527    !           the interface oss_obsdata_get_header_code.
 528    !
 529    !:Arguments:
 530    !           :lon:   longitude real (radians)
 531    !           :lat:   latitude real (radians)
 532    !           :date:  date in YYYYMMDD
 533    !           :time:  time in HHMM
 534    !           :stnid: station ID
 535    !           :code:  unique code
 536    ! 
 537    implicit none
 538
 539    ! Arguments:
 540    real(8)         , intent(in) :: lon
 541    real(8)         , intent(in) :: lat
 542    integer         , intent(in) :: date
 543    integer         , intent(in) :: time
 544    character(len=*), intent(in) :: stnid
 545    ! Result:
 546    character(len=oss_code_len)  :: code
 547
 548    ! Locals:
 549    integer :: ilon, ilat
 550    
 551    ilon = nint(100*(lon/MPC_RADIANS_PER_DEGREE_R8))
 552    ilat = nint(100*(90. + lat/MPC_RADIANS_PER_DEGREE_R8))
 553
 554    code = obsdata_get_header_code_i(ilon,ilat,date,time,stnid)
 555
 556  end function obsdata_get_header_code_r
 557
 558!----------------------------------------------------------------------------------------
 559
 560  subroutine oss_obsdata_add_data1d(obsdata,val,code,maxsize,dim1_opt)
 561    !
 562    !:Purpose: Add data value(s) to obsdata%data1d with associated identifier code.
 563    !
 564    !:Arguments:
 565    !           obsdata       struct_oss_obsdata instance
 566    !           val           data array to store in obsdata%data1d
 567    !           code          identifying code based on (lat,long,date,hhmm) if not also stnid
 568    !           maxsize       max allowed size for obsdata
 569    !           dim1          value() dimension (optional)
 570    !           obsdata       Updated obsdata 
 571    ! 
 572    !:Comments:
 573    !
 574    !          - Retrieval of values from obsdata%data1d to be done via oss_obsdata_get_element (or oss_obsdata_get_array1d).
 575    !          - If obsdata allocation is required for all processors (such as for use later with obsdata_MPIGather), 
 576    !            allocation and/or initialization of arrays needs to be done at a corresponding appropriate location 
 577    !            outside the obs operations such as in oss_setup to ensure allocation is done for all processors, 
 578    !            including those without associated data. This is to ensure that rpn_comm_allgather will work 
 579    !            in routine obsdata_MPIGather.
 580    !
 581    implicit none
 582    
 583    ! Arguments:
 584    type(struct_oss_obsdata), intent(inout) :: obsdata
 585    real(8)                 , intent(in)    :: val(:)
 586    integer                 , intent(in)    :: maxsize
 587    character(len=*)        , intent(in)    :: code
 588    integer       , optional, intent(in)    :: dim1_opt
 589
 590    if (.not.associated(obsdata%data1d)) then
 591      if (present(dim1_opt)) then 
 592         call oss_obsdata_alloc(obsdata,maxsize,dim1=dim1_opt)
 593      else
 594         call oss_obsdata_alloc(obsdata,maxsize,dim1=1)
 595      end if
 596      obsdata%nrep=0
 597    end if
 598
 599    if (obsdata%dim1 > size(val)) &
 600         call utl_abort('obsdata_add_data1d: Insufficient data values provided. ' // trim(utl_str(size(val))) )
 601     
 602    ! nrep counts the number data values/profiles in the data arrays
 603    obsdata%nrep = obsdata%nrep+1 
 604
 605    if (obsdata%nrep > maxsize) &
 606         call utl_abort('obsdata_add_data1d: Reach max size of array ' // trim(utl_str(maxsize)) )
 607  
 608    ! Save unique code
 609    obsdata%code(obsdata%nrep)=trim(code)
 610    
 611    ! Save value(s)
 612    obsdata%data1d(1:obsdata%dim1,obsdata%nrep) = val(1:obsdata%dim1)
 613    
 614  end subroutine oss_obsdata_add_data1d
 615
 616    
 617  subroutine oss_obsdata_MPIallgather(obsdata) 
 618    !
 619    !:Purpose: Gathers previously saved obsdata from all processors.
 620    !
 621    !:Arguments:
 622    !           :obsdata: Local struct_oss_obsdata to become global
 623    ! 
 624    !:Comments:
 625    !
 626    !           - Assumes obsdata%dim1 (and obsdata%dim2) the same over all processors.
 627    !
 628    implicit none
 629
 630    ! Arguments:
 631    type(struct_oss_obsdata), intent(inout) :: obsdata
 632
 633    ! Locals:
 634    integer, allocatable :: nrep(:)
 635    character(len=oss_code_len), allocatable :: code_local(:),code_global(:,:)
 636    real(8), allocatable :: data1d_local(:,:),data1d_global(:,:,:),data2d_local(:,:,:),data2d_global(:,:,:,:)
 637    integer :: i,ierr,nproc,nrep_total,nrep_max,irep,array_size
 638
 639    write(*,*) 'Begin oss_obsdata_MPIallgather'
 640      
 641    ! Identify number of processors.
 642    call rpn_comm_size("GRID",nproc,ierr)
 643      
 644    ! Get number of reports on each processor
 645
 646    allocate(nrep(nproc))
 647    nrep(:)=0
 648
 649    call rpn_comm_allgather(obsdata%nrep,1,"MPI_INTEGER",nrep,1,"MPI_INTEGER","GRID",ierr)
 650
 651    nrep_total=sum(nrep)
 652
 653    if (nrep_total == 0) then
 654       deallocate(nrep)
 655       write(*,*) 'Exit oss_obsdata_MPIallgather: no reports'
 656       return
 657    end if
 658        
 659    nrep_max = maxval(nrep)
 660
 661    ! Get values from all processors into global arrays
 662
 663    allocate(code_local(nrep_max))
 664    allocate(code_global(nrep_max,nproc))
 665
 666    code_local(:)=''
 667    if (obsdata%nrep > 0) code_local(1:obsdata%nrep) = obsdata%code(1:obsdata%nrep)
 668
 669    call mmpi_allgather_string(code_local,code_global,nrep_max,oss_code_len,nproc,"GRID",ierr)
 670
 671    if (obsdata%ndim == 1) then
 672
 673       allocate(data1d_local(obsdata%dim1,nrep_max))
 674       allocate(data1d_global(obsdata%dim1,nrep_max,nproc))
 675
 676       data1d_local(:,:)=0.0D0
 677       if (obsdata%nrep > 0) data1d_local(:,1:obsdata%nrep) = obsdata%data1d(:,1:obsdata%nrep)
 678       
 679       array_size = nrep_max*obsdata%dim1
 680
 681       call rpn_comm_allgather(data1d_local,array_size,pre_obsMpiReal,data1d_global,array_size,pre_obsMpiReal,"GRID",ierr)
 682    else 
 683       
 684       allocate(data2d_local(obsdata%dim1,obsdata%dim2,nrep_max))
 685       allocate(data2d_global(obsdata%dim1,obsdata%dim2,nrep_max,nproc))
 686       
 687       data2d_local(:,:,:)=0.0D0
 688       if (obsdata%nrep > 0) data2d_local(:,:,1:obsdata%nrep) = obsdata%data2d(:,:,1:obsdata%nrep)
 689
 690       array_size = nrep_max*obsdata%dim1*obsdata%dim2
 691
 692       call rpn_comm_allgather(data2d_local,array_size,pre_obsMpiReal,data2d_global,array_size,pre_obsMpiReal,"GRID",ierr)
 693
 694    end if
 695  
 696    deallocate(code_local)
 697    if (obsdata%ndim == 1) then 
 698        deallocate(data1d_local)
 699    else
 700        deallocate(data2d_local)
 701    end if 
 702
 703    ! Concatenate values from all processors
 704         
 705    irep = 0
 706    call oss_obsdata_dealloc(obsdata)    
 707
 708    if (obsdata%ndim == 1) then      
 709       
 710       call oss_obsdata_alloc(obsdata,nrep_total,obsdata%dim1)
 711
 712       do i=1,nproc
 713          if (nrep(i) > 0) then
 714             obsdata%code(irep+1:irep+nrep(i)) = code_global(1:nrep(i),i)
 715             obsdata%data1d(1:obsdata%dim1,irep+1:irep+nrep(i)) = data1d_global(1:obsdata%dim1,1:nrep(i),i)
 716             irep = irep+nrep(i)
 717          end if
 718       end do
 719
 720    else
 721
 722       call oss_obsdata_alloc(obsdata,nrep_total,obsdata%dim1,obsdata%dim2)
 723
 724       do i=1,nproc
 725          if (nrep(i) > 0) then
 726             obsdata%code(irep+1:irep+nrep(i)) = code_global(1:nrep(i),i)
 727             obsdata%data2d(1:obsdata%dim1,1:obsdata%dim2,irep+1:irep+nrep(i)) = &
 728                         data2d_global(1:obsdata%dim1,1:obsdata%dim2,1:nrep(i),i)  
 729             irep = irep+nrep(i)
 730          end if
 731       end do
 732 
 733    end if  
 734     
 735    deallocate(nrep,code_global)
 736    if (obsdata%ndim == 1) then 
 737        deallocate(data1d_global)
 738    else
 739        deallocate(data2d_global)
 740    end if 
 741    
 742    write(*,*) 'Exit oss_obsdata_MPIallgather'
 743 
 744  end subroutine oss_obsdata_MPIallgather
 745
 746
 747  subroutine oss_get_comboIdlist( obsSpaceData, stnid_list, varno_list, unilev_list, num_elements, nset )
 748    ! 
 749    !:Purpose: Uses the subroutine oss_comboIdlist to compile a unique list of stnid,  
 750    !           (stnid,varno) or (stnid,varno,multi/uni-level) combinations to be used in searches.
 751    !
 752    !:Arguments:
 753    !           :obsSpaceData: Observation space data
 754    !           :stnid_list:   List of unique stnids
 755    !           :varno_list:   List of unique varno
 756    !           :unilev_list:  List of unique uni/multi-level identifications
 757    !           :num_elements: Number of unique elements in *_list arrrays
 758    !           :nset:         Integer indicating grouping, with values indicating
 759    !
 760    !                    - 1: group by stnid
 761    !                    - 2: group by (stnid,bufr)
 762    !                    - 3: group by (stnid,bufr,multi/uni-level)
 763    !
 764    implicit none
 765
 766    ! Arguments:
 767    type(struct_obs), intent(inout) :: obsSpaceData
 768
 769    ! Locals:
 770    integer, parameter :: nmax=100
 771    integer, intent(out) :: num_elements
 772    integer, intent(out) :: nset
 773    integer, intent(out) :: varno_list(nmax)
 774    character(len=9), intent(out) :: stnid_list(nmax)
 775    logical, intent(out) :: unilev_list(nmax)
 776    integer :: headerIndex,bodyIndex,vco,nlev_obs,varno
 777    logical :: all_combos
 778
 779    call oss_comboIdlist(all_combos_opt=all_combos)
 780    
 781    if (all_combos) then
 782    
 783       ! Loop over obs to find all (stnid,varno) pairs to form a common sequence of search pairs
 784       ! over all processors. The prescribed starting stnid selections can have wild card 
 785       ! characters (via *, see routine oss_comboIdlist).
 786
 787       call obs_set_current_header_list(obsSpaceData,'CH')
 788       HEADER: do
 789          headerIndex = obs_getHeaderIndex(obsSpaceData)
 790          if (headerIndex < 0) exit HEADER
 791         
 792          ! Body info that we only need for first point in the profile
 793          bodyIndex = obs_headElem_i(obsSpaceData,OBS_RLN,headerIndex)     
 794          vco = obs_bodyElem_i(obsSpaceData,OBS_VCO,bodyIndex)
 795
 796          if (vco.ne.1 .and. vco.ne.2 .and. vco.ne.4 .and. vco.ne.5) then
 797             ! Vertical coordinate not handled
 798             write(*,*) 'oss_get_comboIdlist: Currently unaccounted VCO = ',vco
 799             cycle HEADER
 800          end if
 801          
 802          ! Identify varno and nlev_obs (exclude BUFR_SCALE_EXPONENT elements)
 803          nlev_obs = obs_headElem_i(obsSpaceData,OBS_NLV,headerIndex)
 804          call obs_set_current_body_list(obsSpaceData,headerIndex)
 805          do
 806             bodyIndex = obs_getBodyIndex(obsSpaceData)
 807             if (bodyIndex < 0) exit
 808             if (obs_bodyElem_i(obsSpaceData,OBS_VNM,bodyIndex).eq.BUFR_SCALE_EXPONENT) then
 809                nlev_obs = nlev_obs-1
 810             else
 811                varno=obs_bodyElem_i(obsSpaceData,OBS_VNM,bodyIndex)
 812             end if
 813          end do
 814
 815          ! Adds to running list of unique pairs if unique
 816          call oss_comboIdlist(stnid_add_opt=obs_elem_c(obsSpaceData,'STID',headerIndex), varno_add_opt=varno, unilev_add_opt=(nlev_obs.eq.1.and.vco.ge.4))
 817                       
 818       end do HEADER
 819       
 820       ! Get a common sequence of search pairs over all processors. 
 821       
 822       call oss_comboIdlist(gather_mpi_opt=.true.)
 823       
 824    end if
 825    
 826    ! Get list of unique pairs
 827    call oss_comboIdlist(stnid_list_opt=stnid_list, varno_list_opt=varno_list, unilev_list_opt=unilev_list, num_elements_opt=num_elements, nset_opt=nset)
 828
 829  end subroutine oss_get_comboIdlist
 830
 831!-----------------------------------------------------------------------------------
 832  
 833  subroutine oss_comboIdList( stnid_add_opt, varno_add_opt, unilev_add_opt, stnid_list_opt, &
 834                              varno_list_opt, unilev_list_opt, &
 835                              num_elements_opt, initialize_opt, nset_opt, gather_mpi_opt, all_combos_opt )
 836    ! 
 837    !:Purpose: Provide list of fixed or accumulated stnid, (stnid,varno) or 
 838    !           (stnid,varno,multi/uni-level) combinations to be used in searches.
 839    !
 840    !           Can be used for both single processor and  MPI mode, where 'gather_mpi' must be set
 841    !           to .true. at some point for use with MPI.
 842    !            Called from osd_chem_diagnmostics in file obspacediag_mod.ftn90.
 843    !  
 844    !:Arguments:
 845    !           :stnid_add_opt:    stnid to add to stnid_list if part of unique set
 846    !           :varno_add_opt:    varno to add to varno_list if part of unique set
 847    !           :unilev_add_opt:   unilev logical to add to unilev_list if part of unique set
 848    !           :initialize_opt:   Initialize internal arrays and counters
 849    !           :gather_mpi_opt:   Gather all local MPI process and recompile unique lists  
 850    !           :nset_opt:         Integer indicating grouping of diagnostics. Input variable
 851    !                              if initialize=.true., output variable otherwise.
 852    !                              Values indicate
 853    !
 854    !                              - 1: group by stnid
 855    !                              - 2: group by (stnid,bufr)
 856    !                              - 3: group by (stnid,bufr,multi/uni-level)
 857    !           :all combos_opt:   Indicates if all combinations specified by nset are to
 858    !                              be use, or only those specified in the namelist NAMCHEM
 859    !                              Input variable if initialize=.true., output variable otherwise.
 860    !           :stnid_list_opt:   List of unique stnids
 861    !           :varno_list_opt:   List of unique varno
 862    !           :unilev_list_opt:  List of unique uni/multi-level identifications
 863    !           :num_elements_opt: Number of unique elements in *_list arrrays
 864    !
 865    implicit none
 866    
 867    integer, parameter :: nmax=100, stnid_len=9
 868
 869    ! Arguments:
 870    logical                 , intent(in)   , optional :: initialize_opt
 871    logical                 , intent(in)   , optional :: gather_mpi_opt
 872    logical                 , intent(in)   , optional :: unilev_add_opt
 873    integer                 , intent(in)   , optional :: varno_add_opt
 874    character(len=stnid_len), intent(in)   , optional :: stnid_add_opt
 875    integer                 , intent(inout), optional :: nset_opt
 876    logical                 , intent(inout), optional :: all_combos_opt
 877    integer                 , intent(out)  , optional :: varno_list_opt(nmax)
 878    integer                 , intent(out)  , optional :: num_elements_opt
 879    character(len=stnid_len), intent(out)  , optional :: stnid_list_opt(nmax)
 880    logical                 , intent(out)  , optional :: unilev_list_opt(nmax)
 881
 882    ! Locals:
 883    integer, save :: varno_unique(nmax)
 884    character(len=stnid_len), save :: stnid_unique(nmax)
 885    logical, save :: unilev_unique(nmax)
 886    integer, allocatable :: num_unique_all(:),varno_unique_all(:,:)
 887    character(len=stnid_len),allocatable :: stnid_unique_all(:,:)
 888    logical, allocatable :: unilev_unique_all(:,:)
 889    integer, save :: num_unique ! running count of number of unique elements
 890    integer, save :: iset=2
 891    logical, save :: lall_combos=.true.
 892    logical :: same,init
 893    integer :: i,j,nproc,iproc,ierr
 894
 895    init=.false.
 896    if (present(initialize_opt)) init = initialize_opt
 897
 898    
 899    ! Initialize internal arrays and counters
 900    if (init) then
 901       stnid_unique(:) = ''
 902       varno_unique(:) = 0
 903       unilev_unique(:) = .false.
 904       num_unique = 0
 905       if (present(nset_opt)) iset = nset_opt
 906       if (present(all_combos_opt)) lall_combos = all_combos_opt
 907    end if      
 908
 909
 910    ! Add new elements to internal arrays if not there already
 911    if (present(stnid_add_opt)) then
 912      
 913       if (iset >= 2 .and. (.not. present(varno_add_opt))) call utl_abort('oss_comboIdlist: varno_add must be present to add element for nset>=2.')
 914       if (iset >= 3 .and. (.not. present(unilev_add_opt))) call utl_abort('oss_comboIdlist: unilev_add must be present to add element for nset>=3.')
 915
 916       same = .false.
 917
 918       do i=1,num_unique
 919          same = utl_stnid_equal(stnid_add_opt,stnid_unique(i))
 920          if (iset >= 2) same = same .and. varno_add_opt == varno_unique(i)
 921          if (iset >= 3) same = same .and. unilev_add_opt.eqv.unilev_unique(i)
 922          if (same) exit
 923       end do
 924
 925       if (.not.same) then
 926          num_unique=num_unique+1
 927          if (num_unique > nmax) call utl_abort("oss_comboIDlist: Max allowed dimension exceeded.")
 928          stnid_unique(num_unique) = stnid_add_opt
 929          if (iset >= 2) varno_unique(num_unique) = varno_add_opt
 930          if (iset >= 3) unilev_unique(num_unique) = unilev_add_opt
 931       end if
 932    end if        
 933
 934    ! Gather unique arrays from each local mpi process and compile global unique arrays
 935    if (present(gather_mpi_opt)) then
 936       if (gather_mpi_opt) then
 937
 938          call rpn_comm_size("GRID",nproc,ierr)
 939
 940          allocate(num_unique_all(nproc))
 941          allocate(stnid_unique_all(nmax,nproc))
 942          if (iset >= 2) allocate(varno_unique_all(nmax,nproc))
 943          if (iset >= 3) allocate(unilev_unique_all(nmax,nproc))
 944          
 945          num_unique_all(:) = 0
 946          stnid_unique_all(:,:) = ''
 947          if (iset >= 2) varno_unique_all(:,:) = 0
 948          if (iset >= 3) unilev_unique_all(:,:) = .false.
 949          
 950          if(mmpi_doBarrier) call rpn_comm_barrier("GRID",ierr)
 951
 952          call rpn_comm_allgather(num_unique,1,"MPI_INTEGER",num_unique_all,1,"MPI_INTEGER","GRID",ierr)
 953          call mmpi_allgather_string(stnid_unique,stnid_unique_all,nmax,stnid_len,nproc,"GRID",ierr)
 954          if (iset >= 2) call rpn_comm_allgather(varno_unique,nmax,"MPI_INTEGER",varno_unique_all,nmax,"MPI_INTEGER","GRID",ierr)
 955          if (iset >= 3) call rpn_comm_allgather(unilev_unique,nmax,"MPI_LOGICAL",unilev_unique_all,nmax,"MPI_LOGICAL","GRID",ierr)
 956          
 957          stnid_unique(:) = ''
 958          if (iset >= 2) varno_unique(:) = 0
 959          if (iset >= 3) unilev_unique(:) = .false.
 960          num_unique = 0
 961          
 962          ! Amalgamate unique lists
 963          do iproc=1,nproc
 964             do j=1,num_unique_all(iproc)
 965
 966                same = .false.
 967
 968                do i=1,num_unique
 969                   same = utl_stnid_equal(stnid_unique_all(j,iproc),stnid_unique(i))
 970                   if (iset >= 2) same = same .and. varno_unique_all(j,iproc) == varno_unique(i)
 971                   if (iset >= 3) same = same .and. unilev_unique_all(j,iproc).eqv.unilev_unique(i)
 972                   if (same) exit
 973                end do
 974              
 975                if (.not.same) then
 976                   num_unique=num_unique+1
 977                   if (num_unique > nmax) call utl_abort("oss_comboIDlist: Max allowed dimension exceeded.")
 978                   stnid_unique(num_unique) = stnid_unique_all(j,iproc)
 979                   if (iset >= 2) varno_unique(num_unique) = varno_unique_all(j,iproc)
 980                   if (iset >= 3) unilev_unique(num_unique) = unilev_unique_all(j,iproc)
 981                end if
 982             end do
 983          end do
 984
 985          deallocate(num_unique_all,stnid_unique_all)
 986          if (allocated(varno_unique_all)) deallocate(varno_unique_all)
 987          if (allocated(unilev_unique_all)) deallocate(unilev_unique_all)
 988
 989       end if
 990    end if
 991
 992
 993    ! Return internal arrays (and other info) if requested
 994    if (present(varno_list_opt)) varno_list_opt = varno_unique
 995    if (present(stnid_list_opt)) stnid_list_opt = stnid_unique
 996    if (present(unilev_list_opt)) unilev_list_opt = unilev_unique
 997    if (present(num_elements_opt)) num_elements_opt = num_unique
 998    if (.not. init) then
 999       if (present(nset_opt)) nset_opt = iset
1000       if (present(all_combos_opt)) all_combos_opt = lall_combos
1001    end if
1002    
1003    
1004  end subroutine oss_comboIdList
1005  
1006
1007  integer function oss_obsdata_code_len()  
1008    !
1009    !:Purpose: Pass on oss_code_len value.
1010    !
1011    implicit none
1012    
1013    oss_obsdata_code_len=oss_code_len
1014    
1015   end function oss_obsdata_code_len
1016
1017end module obsSubSpaceData_mod