ObsDataColumn_mod sourceΒΆ

   1! obsSpaceData_mod:  the module, obsSpaceData_mod, follows IndexListDepot_mod
   2
   3
   4module IndexListDepot_mod
   5  ! MODULE indexListDepot_mod (prefix='ild' category='7. Low-level data objects and utilities')
   6  !
   7  !:Purpose: The raison d'etre of this module is to support ObsSpaceData_mod in
   8  !           facilitating the traversal of a selection of the rows in its table.
   9  !           The selection of rows could be from either the header table or the
  10  !           body table. ObsSpaceData_mod currently populates the list with one
  11  !           of:
  12  !                * all header members of a given family
  13  !                * all   body members of a given family
  14  !                * all   body members of a given header row index
  15  !
  16  !:Usage:
  17  !    An ObsSpaceData_mod client must first call either
  18  !    obs_set_current_body_list or obs_set_current_header_list, specifying
  19  !    either the family of interest or the header row index of interest. 
  20  !    This does not return the list directly to the caller, but rather writes
  21  !    the list, as a struct_index_list, to the private contents of the obs
  22  !    oject that is returned to the caller but which cannot be examined by the
  23  !    caller.  Two lists can be active simultaneously:  one header list and
  24  !    one body list.
  25  !
  26  !    In order to access the indices that are in the list, the
  27  !    ObsSpaceData_mod client must call either obs_getHeaderIndex or
  28  !    obs_getBodyIndex, giving the ObsSpaceData object as the only argument. 
  29  !    On each call, one index is returned.  On calls after the last index in
  30  !    the list, a value of -1 is returned.
  31  !
  32  !    This is not a fully fledged module.  It is better described as a
  33  !    structure definition with a couple of helpful methods.  It is intended
  34  !    that the client, ObsSpaceData_mod, read/write directly from/to instances
  35  !    of these structures.
  36
  37
  38
  39  ! STRUCT_INDEX_LIST:
  40  !    A struct_index_list contains the identity of the family or header that
  41  !    was used to create the list, the actual list of indices, and the number
  42  !    of the list element that was last returned to the user.
  43  !
  44  ! STRUCT_INDEX_LIST_DEPOT:
  45  !    Because it is typical for a client to traverse a small group of lists
  46  !    several times each, performance is improved by retaining recent lists,
  47  !    thus avoiding having to regenerate them on each request.  Recent lists
  48  !    are stored in a struct_index_list_depot.  ObsSpaceData_mod contains one
  49  !    struct_index_list_depot for header lists and another for body lists.
  50  !    The struct_index_list_depot structure contains current_list, a pointer
  51  !    to the list in the depot that was last requested by the ObsSpaceData_mod
  52  !    client.
  53  !
  54  ! OMP:
  55  !    ObsSpaceData_mod has been designed so that it may be called from within
  56  !    an OMP section of code.  If there are n OMP threads, it is possible that
  57  !    there be as many as n lists in use simultaneously.  The parameter, 
  58  !    NUMBER_OF_LISTS, has been set to n to accommodate that many threads.  In
  59  !    this case, because the current_list of the depot is not OMP-private, it
  60  !    cannot be asked to remember the current list for each of the OMP
  61  !    threads.  Therefore, obs_set_current_header/body_list returns to the
  62  !    client an additional pointer to the list itself.  This pointer must then
  63  !    be passed as an optional parameter to obs_getHeader/BodyIndex.  When a
  64  !    new list is requested by an OMP thread, the same physical memory is
  65  !    re-used for the new list.
  66  !
  67  !    In order to ensure that the same physical memory is not initially
  68  !    distributed to more than one OMP thread, the two small sections of
  69  !    IndexListDepot_mod that accomplish this are marked omp-critical.
  70  !
  71
  72   implicit none
  73   save
  74   public
  75
  76   ! methods
  77   public :: ild_initialize             ! initialize a list depot
  78   public :: ild_finalize               ! finalize a list depot
  79   public :: ild_get_empty_index_list   ! return a pointer to an initialized list
  80   public :: ild_get_next_index         ! return the next element in the list
  81
  82   interface ild_get_next_index
  83      module procedure ild_get_next_index_depot
  84      module procedure ild_get_next_index_private
  85   end interface ild_get_next_index
  86
  87                                        ! This dimension must accommodate the
  88                                        ! maximum number of OMP processors 
  89   integer, parameter :: NUMBER_OF_LISTS = 32
  90
  91   type struct_index_list
  92      ! a list of integers, not to say indices into a struct_obs
  93      character(len=2) :: family        ! current_element's belong to this family
  94                                        ! Used only for a body list:
  95      integer :: header                 ! current_element's belong to this header
  96      integer :: current_element        ! the element that has just been returned
  97
  98                                        ! the actual list of integers
  99                                        ! N.B.:  These are the indices that are
 100                                        !        being held for the client.
 101                                        !        In the context of this module,
 102                                        !        these are elements; i.e. row
 103                                        !        indices.  Current_element is the
 104                                        !        last index that was returned
 105                                        !        to the client.
 106      integer, dimension(:), allocatable :: indices
 107   end type struct_index_list
 108
 109   type struct_index_list_depot
 110      ! A collection of lists, either empty or populated
 111                                        ! the collection of lists
 112      type(struct_index_list), dimension(NUMBER_OF_LISTS) :: index_lists
 113      integer :: list_last_attributed   ! list that was populated most recently
 114                                        ! list that was   used    most recently
 115      type(struct_index_list), pointer :: current_list
 116   end type struct_index_list_depot
 117
 118
 119contains
 120
 121   subroutine ild_finalize( depot )
 122      !
 123      !:Purpose: Finalize the indicated list depot
 124      !
 125      implicit none
 126
 127      ! Arguments:
 128      type(struct_index_list_depot), intent(inout) :: depot ! the depot to be finalized
 129
 130      ! Locals:
 131      integer :: list                   ! an index
 132
 133      ! Deallocate each list
 134      do list = 1,NUMBER_OF_LISTS
 135         depot%index_lists(list)%family = 'xx'
 136         depot%index_lists(list)%header = 0
 137         depot%index_lists(list)%current_element = 0
 138         deallocate(depot%index_lists(list)%indices)
 139      end do
 140      depot%list_last_attributed = 0
 141   end subroutine ild_finalize
 142
 143
 144   function ild_get_empty_index_list( depot, private_list_opt ) &
 145                                                         result(empty_index_list)
 146      !
 147      !:Purpose: From the given depot, return an index-list structure that contains
 148      !           no data, as a pointer.
 149      !           In other words, clear data from the (cyclicly) next (i.e. oldest)
 150      !           list and return a pointer to it.
 151      !
 152      !           If the list is being used within an OMP block, the ObsSpaceData
 153      !           client is responsible for holding a pointer to its own list.  This
 154      !           is supplied as the parameter, private_list_opt.
 155      !
 156      implicit none
 157
 158      ! Arguments:
 159      type(struct_index_list_depot), target,      intent(inout) :: depot            ! the depot containing the list
 160      type(struct_index_list), pointer, optional, intent(inout) :: private_list_opt ! used instead of depot (for OMP blocks)
 161      ! Result:
 162      type(struct_index_list), pointer                     :: empty_index_list      ! the returned list
 163
 164      nullify(empty_index_list)
 165
 166      if(present(private_list_opt)) then
 167         ! This is an OMP thread
 168         if(associated(private_list_opt)) then
 169            ! Memory has already been assigned for that thread.  Re-use it.
 170            empty_index_list => private_list_opt ! Set the return pointer
 171         end if
 172      end if
 173
 174      if(.not. associated(empty_index_list)) then
 175        !$omp critical
 176         ! Increment (cyclicly) the index to the next list
 177         depot%list_last_attributed = depot%list_last_attributed + 1
 178         if (depot%list_last_attributed > NUMBER_OF_LISTS) &
 179            depot%list_last_attributed = 1
 180
 181         ! Set the return pointer
 182         empty_index_list => depot%index_lists(depot%list_last_attributed)
 183         !$omp end critical
 184      end if
 185
 186      ! Initialize some values in the list
 187      ! empty_index_list%indices(:) = -1 --> No, the array is too big.
 188      empty_index_list%family = '  '
 189      empty_index_list%header = -1
 190      empty_index_list%current_element = 0
 191
 192      return
 193   end function ild_get_empty_index_list
 194
 195
 196   function ild_get_next_index_depot(depot, no_advance_opt) result(next_index)
 197      !
 198      !:Purpose: From the given depot, increment the index to the current element,
 199      !           and return the element itself, the new current element.
 200      !
 201      implicit none
 202
 203      ! Arguments:
 204      type(struct_index_list_depot), target, intent(inout) :: depot          ! the depot containing the list
 205      logical, optional,                     intent(in)    :: no_advance_opt ! current_element, just return next one
 206      ! Result:
 207      integer :: next_index                                                  ! the returned index
 208
 209      ! Locals:
 210      type(struct_index_list), pointer                     :: current_list ! current list of the depot
 211      integer                                              :: next_element ! next element of the current list
 212
 213      current_list => depot%current_list
 214      !$omp critical
 215                                        ! Obtain the next element from the list
 216      next_element = current_list%current_element + 1
 217      next_index = current_list%indices(next_element)
 218      if(.not. present(no_advance_opt) .and. next_index /= -1) then
 219                                        ! Increment the current element
 220         current_list%current_element = next_element
 221      end if
 222      !$omp end critical
 223   end function ild_get_next_index_depot
 224
 225
 226   function ild_get_next_index_private(private_list, no_advance_opt) &
 227                                                               result(next_index)
 228      !
 229      !:Purpose:  From the given list, increment the index to the current element, and
 230      !            return the element itself, the new current element.
 231      !
 232      implicit none
 233
 234      ! Arguments:
 235      type(struct_index_list), pointer, intent(inout) :: private_list   ! the list of interest
 236      logical, optional,                intent(in)    :: no_advance_opt ! current_element, just return next one
 237      ! Result:
 238      integer :: next_index                                           ! the returned index
 239
 240      ! Locals:
 241      integer                                         :: next_element ! next element of the list
 242
 243                                        ! Obtain the next element from the list
 244      next_element = private_list%current_element + 1
 245      next_index = private_list%indices(next_element)
 246
 247      if(.not. present(no_advance_opt) .and. next_index /= -1) then
 248                                        ! Increment the current element
 249         private_list%current_element = next_element
 250      end if
 251   end function ild_get_next_index_private
 252
 253
 254   subroutine ild_initialize(depot, numHeaderBody_max)
 255      !
 256      !:Purpose: Initialize the indicated list depot
 257      !   
 258      !:Note: indices is allocated with 2 extra elements to make room for
 259      !        the end-of-list flag that is set in
 260      !        obs_set_current_header/body_list
 261      !
 262      implicit none
 263
 264      ! Arguments:
 265      type(struct_index_list_depot), intent(inout) :: depot ! the depot to be initialized
 266                                                            ! max size of header or body of
 267                                                            ! struct_obs & hence of depot
 268      integer, intent(in) :: numHeaderBody_max
 269
 270      ! Locals:
 271      integer :: list                   ! an index
 272
 273      ! Allocate each list
 274      do list = 1,NUMBER_OF_LISTS
 275         depot%index_lists(list)%family = 'xx'
 276         depot%index_lists(list)%header = 0
 277         depot%index_lists(list)%current_element = 0
 278         allocate(depot%index_lists(list)%indices(numHeaderBody_max+2))
 279         depot%index_lists(list)%indices(:)=0
 280      end do
 281      depot%list_last_attributed = 0
 282      nullify(depot%current_list)
 283   end subroutine ild_initialize
 284
 285end module IndexListDepot_mod
 286
 287
 288
 289module ObsColumnNames_mod
 290  ! MODULE obsColumnNames_mod (prefix='ocn' category='7. Low-level data objects and utilities')
 291  !
 292  !:Purpose:  This module simply groups together many fortran paramters that
 293  !           serve as column names for ObsSpaceData_mod.
 294  !
 295  !:NOTE:  This module is logistically a part of the ObsSpaceData_mod module.
 296  !        In fact, if fortran allowed it, ObsColumnNames_mod would be
 297  !        'contain'ed inside the ObsSpaceData_mod module. For this reason, and
 298  !        more importantly because these parameters constitute a part of the
 299  !        visible (from outside ObsSpaceData_mod) interface to
 300  !        ObsSpaceData_mod, the parameters defined in this module carry the
 301  !        prefix, OBS, and not CN.
 302  !
 303
 304   public
 305
 306
 307   !
 308   ! INTEGER-HEADER COLUMN NUMBERS
 309   !  
 310   ! the first column index for integer header variables defined below
 311   ! (chosen such that every column number is unique, so that a mismatch between
 312   !  a column number and a column type (real, int, head, body) can be detected)
 313   integer, parameter :: NHDR_INT_BEG = 101
 314   integer, parameter, public :: OBS_RLN = NHDR_INT_BEG ! report location
 315                                             ! unique(within obsdat), possibly
 316   integer, parameter, public :: OBS_ONM = OBS_RLN+1  ! ordered, station id number
 317   integer, parameter, public :: OBS_INS = OBS_ONM+1  ! instrument ID  
 318   integer, parameter, public :: OBS_ITY = OBS_INS+1  ! code: instrument & retrieval type
 319   integer, parameter, public :: OBS_SAT = OBS_ITY+1  ! satellite code 
 320   integer, parameter, public :: OBS_TEC = OBS_SAT+1  ! satellite processing technique
 321   integer, parameter, public :: OBS_SEN = OBS_TEC+1  ! satellite sensor code
 322   integer, parameter, public :: OBS_DAT = OBS_SEN+1  ! observation date YYYYMMD
 323   integer, parameter, public :: OBS_ETM = OBS_DAT+1  ! observation time HHMM
 324   integer, parameter, public :: OBS_NLV = OBS_ETM+1  ! number of data at this location
 325   integer, parameter, public :: OBS_PAS = OBS_NLV+1  ! batch no. in sequential analysis
 326   integer, parameter, public :: OBS_REG = OBS_PAS+1  ! region number in the batch
 327   integer, parameter, public :: OBS_IP  = OBS_REG+1  ! number of mpi processors
 328   integer, parameter, public :: OBS_IPF = OBS_IP+1   ! mpi task id for file
 329   integer, parameter, public :: OBS_IPC = OBS_IPF+1  ! mpi task id for column/obsspacedata
 330   integer, parameter, public :: OBS_IPT = OBS_IPC+1  ! mpi task id for latlontile
 331   integer, parameter, public :: OBS_ST1 = OBS_IPT+1  ! header level status/rejection flag
 332   integer, parameter, public :: OBS_IDF = OBS_ST1+1  ! id. no. of observation-source file
 333
 334   integer, parameter, public :: OBS_GQF = OBS_IDF+1  ! iasi GQISFLAGQUAL: general quality flag which indicates,
 335                                                      ! when it is set to TRUE (=1), that some anomaly has been
 336                                                      ! detected at some step in the Level 0 or Level 1 IASI Processing.
 337                                                      ! The products should not be used if this occurs.
 338
 339   integer, parameter, public :: OBS_GQL = OBS_GQF+1  ! iasi GQISQUALINDEXLOC: It is defined as the uncertainty of the
 340                                                      ! coregistration between IIS (IASI internal imager) and AVHRR.
 341
 342   integer, parameter, public :: OBS_NCO2= OBS_GQL+1  ! NCO2: number of valid CO2 slicing estimates (AIRS,IASI,CrIS)
 343   integer, parameter, public :: OBS_STYP= OBS_NCO2+1 ! surface type in obs file (0,1,2)
 344   integer, parameter, public :: OBS_ROQF= OBS_STYP+1 ! QUALITY FLAGS FOR RADIO OCCULTATION DATA
 345   integer, parameter, public :: OBS_SWQ1= OBS_ROQF+1 ! QUALITY VALUES FOR SW (AMV) DATA
 346   integer, parameter, public :: OBS_SWQ2= OBS_SWQ1+1 ! QUALITY VALUES FOR SW (AMV) DATA
 347   integer, parameter, public :: OBS_SWMT= OBS_SWQ2+1 ! QUALITY VALUES FOR SW (AMV) DATA
 348   integer, parameter, public :: OBS_SWLS= OBS_SWMT+1 ! QUALITY VALUES FOR SW (AMV) DATA
 349   integer, parameter, public :: OBS_SWGA= OBS_SWLS+1 ! QUALITY VALUES FOR SW (AMV) DATA
 350   integer, parameter, public :: OBS_SWHA= OBS_SWGA+1 ! QUALITY VALUES FOR SW (AMV) DATA
 351   integer, parameter, public :: OBS_CHM = OBS_SWHA+1 ! BUFR code (table 08046) of constituent type (for the CH family)
 352   integer, parameter, public :: OBS_FOV = OBS_CHM+1  ! field of view 
 353   integer, parameter, public :: OBS_PRFL= OBS_FOV+1  ! profile id. number
 354   integer, parameter, public :: OBS_PHAS= OBS_PRFL+1 ! phase of flight
 355   integer, parameter, public :: OBS_ORI = OBS_PHAS+1 ! originating centre code
 356   integer, parameter, public :: OBS_LCH = OBS_ORI+1  ! launch time (hhmm)
 357   integer, parameter, public :: OBS_RTP = OBS_LCH+1  ! radiosonde type code
 358   integer, parameter, public :: OBS_HDD = OBS_RTP+1  ! date in burp header
 359   integer, parameter, public :: OBS_HDT = OBS_HDD+1  ! time in burp header
 360   integer, parameter, public :: OBS_TFLG= OBS_HDT+1  ! flag for hi-res time element
 361   integer, parameter, public :: OBS_LFLG= OBS_TFLG+1 ! flag for hi-res lat element
 362   integer, parameter, public :: OBS_ORBI= OBS_LFLG+1! satellite orbit index
 363   integer, parameter, public :: OBS_AQF1= OBS_ORBI+1! ATMS Geolocalisation Quality Control Flag 
 364   integer, parameter, public :: OBS_AQF2= OBS_AQF1+1! ATMS Granule Level Quality Control Flag 
 365   integer, parameter, public :: OBS_AQF3= OBS_AQF2+1! ATMS Scan Level Quality Control Flag 
 366   integer, parameter, public :: OBS_TTYP= OBS_AQF3+1! TERRAIN TYP INDICE for TOVS QC
 367   integer, parameter, public :: OBS_INFG= OBS_TTYP+1! SATPLOT INFO FLAG for TOVS
 368   integer, parameter, public :: OBS_RAIN= OBS_INFG+1! UKMO rain flag for ssmis obs
 369   integer, parameter, public :: OBS_CHID= OBS_RAIN+1! id. no. of ice chart
 370
 371
 372   ! the last column index for integer header variables defined just above
 373   integer, parameter :: NHDR_INT_END = OBS_CHID
 374
 375   integer, parameter :: NHDR_INT_SIZE = NHDR_INT_END - NHDR_INT_BEG + 1
 376
 377   !
 378   ! INTEGER-HEADER COLUMN NAMES
 379   !
 380   character(len=4), target :: ocn_ColumnNameList_IH(NHDR_INT_BEG:NHDR_INT_END) = &
 381      (/ 'RLN ','ONM ','INS ','ITY ','SAT ','TEC ','SEN ','DAT ','ETM ', &  
 382         'NLV ','PAS ','REG ','IP  ','IPF ','IPC ','IPT ', &  
 383         'ST1 ','IDF ','GQF ','GQL ','NCO2','STYP','ROQF', &
 384         'SWQ1','SWQ2','SWMT','SWLS','SWGA','SWHA','CHM ','FOV ', &
 385         'PRFL','PHAS','ORI ','LCH ','RTP ','HDD ','HDT ','TFLG',&
 386         'LFLG','ORBI','AQF1','AQF2','AQF3','TTYP','INFG','RAIN','CHID'/)
 387
 388   !
 389   ! REAL-HEADER COLUMN NUMBERS
 390   !
 391   ! the first column index for real header variables defined below
 392   ! (chosen such that every column number is unique, so that a mismatch between
 393   !  a column number and a column type (real, int, head, body) can be detected)
 394   integer, parameter :: NHDR_REAL_BEG = 201
 395   integer, parameter, public :: OBS_LAT = NHDR_REAL_BEG  ! latitude  in radians (N positive)
 396   integer, parameter, public :: OBS_LON = OBS_LAT+1 ! longitude in radians (E positive)
 397   integer, parameter, public :: OBS_ALT = OBS_LON+1 ! station altitude
 398   integer, parameter, public :: OBS_BX  = OBS_ALT+1 ! x-coordinate of block in R3
 399   integer, parameter, public :: OBS_BY  = OBS_BX +1 ! y-coordinate of block in R3
 400   integer, parameter, public :: OBS_BZ  = OBS_BY +1 ! z-coordinate of block in R3
 401
 402   integer, parameter, public :: OBS_M1C1  = OBS_BZ +1   ! mean for class 1 AVHRR channel 1
 403   integer, parameter, public :: OBS_M1C2  = OBS_M1C1 +1 ! mean for class 1 AVHRR channel 2
 404   integer, parameter, public :: OBS_M1C3  = OBS_M1C2 +1 ! mean for class 1 AVHRR channel 3
 405   integer, parameter, public :: OBS_M1C4  = OBS_M1C3 +1 ! mean for class 1 AVHRR channel 4
 406   integer, parameter, public :: OBS_M1C5  = OBS_M1C4 +1 ! mean for class 1 AVHRR channel 5
 407   integer, parameter, public :: OBS_M1C6  = OBS_M1C5 +1 ! mean for class 1 AVHRR channel 6
 408
 409   integer, parameter, public :: OBS_M2C1  = OBS_M1C6 +1 ! mean for class 2 AVHRR channel 1
 410   integer, parameter, public :: OBS_M2C2  = OBS_M2C1 +1 ! mean for class 2 AVHRR channel 2
 411   integer, parameter, public :: OBS_M2C3  = OBS_M2C2 +1 ! mean for class 2 AVHRR channel 3
 412   integer, parameter, public :: OBS_M2C4  = OBS_M2C3 +1 ! mean for class 2 AVHRR channel 4
 413   integer, parameter, public :: OBS_M2C5  = OBS_M2C4 +1 ! mean for class 2 AVHRR channel 5
 414   integer, parameter, public :: OBS_M2C6  = OBS_M2C5 +1 ! mean for class 2 AVHRR channel 6
 415
 416   integer, parameter, public :: OBS_M3C1  = OBS_M2C6 +1 ! mean for class 3 AVHRR channel 1
 417   integer, parameter, public :: OBS_M3C2  = OBS_M3C1 +1 ! mean for class 3 AVHRR channel 2
 418   integer, parameter, public :: OBS_M3C3  = OBS_M3C2 +1 ! mean for class 3 AVHRR channel 3
 419   integer, parameter, public :: OBS_M3C4  = OBS_M3C3 +1 ! mean for class 3 AVHRR channel 4
 420   integer, parameter, public :: OBS_M3C5  = OBS_M3C4 +1 ! mean for class 3 AVHRR channel 5
 421   integer, parameter, public :: OBS_M3C6  = OBS_M3C5 +1 ! mean for class 3 AVHRR channel 6
 422
 423   integer, parameter, public :: OBS_M4C1  = OBS_M3C6 +1 ! mean for class 4 AVHRR channel 1
 424   integer, parameter, public :: OBS_M4C2  = OBS_M4C1 +1 ! mean for class 4 AVHRR channel 2
 425   integer, parameter, public :: OBS_M4C3  = OBS_M4C2 +1 ! mean for class 4 AVHRR channel 3
 426   integer, parameter, public :: OBS_M4C4  = OBS_M4C3 +1 ! mean for class 4 AVHRR channel 4
 427   integer, parameter, public :: OBS_M4C5  = OBS_M4C4 +1 ! mean for class 4 AVHRR channel 5
 428   integer, parameter, public :: OBS_M4C6  = OBS_M4C5 +1 ! mean for class 4 AVHRR channel 6
 429
 430   integer, parameter, public :: OBS_M5C1  = OBS_M4C6 +1 ! mean for class 5 AVHRR channel 1
 431   integer, parameter, public :: OBS_M5C2  = OBS_M5C1 +1 ! mean for class 5 AVHRR channel 2
 432   integer, parameter, public :: OBS_M5C3  = OBS_M5C2 +1 ! mean for class 5 AVHRR channel 3
 433   integer, parameter, public :: OBS_M5C4  = OBS_M5C3 +1 ! mean for class 5 AVHRR channel 4
 434   integer, parameter, public :: OBS_M5C5  = OBS_M5C4 +1 ! mean for class 5 AVHRR channel 5
 435   integer, parameter, public :: OBS_M5C6  = OBS_M5C5 +1 ! mean for class 5 AVHRR channel 6
 436
 437   integer, parameter, public :: OBS_M6C1  = OBS_M5C6 +1 ! mean for class 6 AVHRR channel 1
 438   integer, parameter, public :: OBS_M6C2  = OBS_M6C1 +1 ! mean for class 6 AVHRR channel 2
 439   integer, parameter, public :: OBS_M6C3  = OBS_M6C2 +1 ! mean for class 6 AVHRR channel 3
 440   integer, parameter, public :: OBS_M6C4  = OBS_M6C3 +1 ! mean for class 6 AVHRR channel 4
 441   integer, parameter, public :: OBS_M6C5  = OBS_M6C4 +1 ! mean for class 6 AVHRR channel 5
 442   integer, parameter, public :: OBS_M6C6  = OBS_M6C5 +1 ! mean for class 6 AVHRR channel 6
 443
 444   integer, parameter, public :: OBS_M7C1  = OBS_M6C6 +1 ! mean for class 7 AVHRR channel 1
 445   integer, parameter, public :: OBS_M7C2  = OBS_M7C1 +1 ! mean for class 7 AVHRR channel 2
 446   integer, parameter, public :: OBS_M7C3  = OBS_M7C2 +1 ! mean for class 7 AVHRR channel 3
 447   integer, parameter, public :: OBS_M7C4  = OBS_M7C3 +1 ! mean for class 7 AVHRR channel 4
 448   integer, parameter, public :: OBS_M7C5  = OBS_M7C4 +1 ! mean for class 7 AVHRR channel 5
 449   integer, parameter, public :: OBS_M7C6  = OBS_M7C5 +1 ! mean for class 7 AVHRR channel 6
 450
 451
 452   integer, parameter, public :: OBS_S1C1  = OBS_M7C6 +1 ! stdev for class 1 AVHRR channel 1
 453   integer, parameter, public :: OBS_S1C2  = OBS_S1C1 +1 ! stdev for class 1 AVHRR channel 2
 454   integer, parameter, public :: OBS_S1C3  = OBS_S1C2 +1 ! stdev for class 1 AVHRR channel 3
 455   integer, parameter, public :: OBS_S1C4  = OBS_S1C3 +1 ! stdev for class 1 AVHRR channel 4
 456   integer, parameter, public :: OBS_S1C5  = OBS_S1C4 +1 ! stdev for class 1 AVHRR channel 5
 457   integer, parameter, public :: OBS_S1C6  = OBS_S1C5 +1 ! stdev for class 1 AVHRR channel 6
 458
 459   integer, parameter, public :: OBS_S2C1  = OBS_S1C6 +1 ! stdev for class 2 AVHRR channel 1
 460   integer, parameter, public :: OBS_S2C2  = OBS_S2C1 +1 ! stdev for class 2 AVHRR channel 2
 461   integer, parameter, public :: OBS_S2C3  = OBS_S2C2 +1 ! stdev for class 2 AVHRR channel 3
 462   integer, parameter, public :: OBS_S2C4  = OBS_S2C3 +1 ! stdev for class 2 AVHRR channel 4
 463   integer, parameter, public :: OBS_S2C5  = OBS_S2C4 +1 ! stdev for class 2 AVHRR channel 5
 464   integer, parameter, public :: OBS_S2C6  = OBS_S2C5 +1 ! stdev for class 2 AVHRR channel 6
 465
 466   integer, parameter, public :: OBS_S3C1  = OBS_S2C6 +1 ! stdev for class 3 AVHRR channel 1
 467   integer, parameter, public :: OBS_S3C2  = OBS_S3C1 +1 ! stdev for class 3 AVHRR channel 2
 468   integer, parameter, public :: OBS_S3C3  = OBS_S3C2 +1 ! stdev for class 3 AVHRR channel 3
 469   integer, parameter, public :: OBS_S3C4  = OBS_S3C3 +1 ! stdev for class 3 AVHRR channel 4
 470   integer, parameter, public :: OBS_S3C5  = OBS_S3C4 +1 ! stdev for class 3 AVHRR channel 5
 471   integer, parameter, public :: OBS_S3C6  = OBS_S3C5 +1 ! stdev for class 3 AVHRR channel 6
 472
 473   integer, parameter, public :: OBS_S4C1  = OBS_S3C6 +1 ! stdev for class 4 AVHRR channel 1
 474   integer, parameter, public :: OBS_S4C2  = OBS_S4C1 +1 ! stdev for class 4 AVHRR channel 2
 475   integer, parameter, public :: OBS_S4C3  = OBS_S4C2 +1 ! stdev for class 4 AVHRR channel 3
 476   integer, parameter, public :: OBS_S4C4  = OBS_S4C3 +1 ! stdev for class 4 AVHRR channel 4
 477   integer, parameter, public :: OBS_S4C5  = OBS_S4C4 +1 ! stdev for class 4 AVHRR channel 5
 478   integer, parameter, public :: OBS_S4C6  = OBS_S4C5 +1 ! stdev for class 4 AVHRR channel 6
 479
 480   integer, parameter, public :: OBS_S5C1  = OBS_S4C6 +1 ! stdev for class 5 AVHRR channel 1
 481   integer, parameter, public :: OBS_S5C2  = OBS_S5C1 +1 ! stdev for class 5 AVHRR channel 2
 482   integer, parameter, public :: OBS_S5C3  = OBS_S5C2 +1 ! stdev for class 5 AVHRR channel 3
 483   integer, parameter, public :: OBS_S5C4  = OBS_S5C3 +1 ! stdev for class 5 AVHRR channel 4
 484   integer, parameter, public :: OBS_S5C5  = OBS_S5C4 +1 ! stdev for class 5 AVHRR channel 5
 485   integer, parameter, public :: OBS_S5C6  = OBS_S5C5 +1 ! stdev for class 5 AVHRR channel 6
 486
 487   integer, parameter, public :: OBS_S6C1  = OBS_S5C6 +1 ! stdev for class 6 AVHRR channel 1
 488   integer, parameter, public :: OBS_S6C2  = OBS_S6C1 +1 ! stdev for class 6 AVHRR channel 2
 489   integer, parameter, public :: OBS_S6C3  = OBS_S6C2 +1 ! stdev for class 6 AVHRR channel 3
 490   integer, parameter, public :: OBS_S6C4  = OBS_S6C3 +1 ! stdev for class 6 AVHRR channel 4
 491   integer, parameter, public :: OBS_S6C5  = OBS_S6C4 +1 ! stdev for class 6 AVHRR channel 5
 492   integer, parameter, public :: OBS_S6C6  = OBS_S6C5 +1 ! stdev for class 6 AVHRR channel 6
 493
 494   integer, parameter, public :: OBS_S7C1  = OBS_S6C6 +1 ! stdev for class 7 AVHRR channel 1
 495   integer, parameter, public :: OBS_S7C2  = OBS_S7C1 +1 ! stdev for class 7 AVHRR channel 2
 496   integer, parameter, public :: OBS_S7C3  = OBS_S7C2 +1 ! stdev for class 7 AVHRR channel 3
 497   integer, parameter, public :: OBS_S7C4  = OBS_S7C3 +1 ! stdev for class 7 AVHRR channel 4
 498   integer, parameter, public :: OBS_S7C5  = OBS_S7C4 +1 ! stdev for class 7 AVHRR channel 5
 499   integer, parameter, public :: OBS_S7C6  = OBS_S7C5 +1 ! stdev for class 7 AVHRR channel 6
 500   integer, parameter, public :: OBS_CF1   = OBS_S7C6 +1 ! AVHRR fraction of class 1
 501   integer, parameter, public :: OBS_CF2   = OBS_CF1  +1 ! AVHRR fraction of class 2
 502   integer, parameter, public :: OBS_CF3   = OBS_CF2  +1 ! AVHRR fraction of class 3
 503   integer, parameter, public :: OBS_CF4   = OBS_CF3  +1 ! AVHRR fraction of class 4
 504   integer, parameter, public :: OBS_CF5   = OBS_CF4  +1 ! AVHRR fraction of class 5
 505   integer, parameter, public :: OBS_CF6   = OBS_CF5  +1 ! AVHRR fraction of class 6
 506   integer, parameter, public :: OBS_CF7   = OBS_CF6  +1 ! AVHRR fraction of class 7
 507   integer, parameter, public :: OBS_ETOP  = OBS_CF7  +1 ! CO2 slicing consensus (median) cloud top pressure
 508   integer, parameter, public :: OBS_VTOP  = OBS_ETOP +1 ! estimated error on CO2 slicing cloud top pressure
 509   integer, parameter, public :: OBS_ECF   = OBS_VTOP +1 ! CO2 slicing effective cloud fraction 
 510   integer, parameter, public :: OBS_VCF   = OBS_ECF  +1 ! estimated error on CO2 CO2 slicing cloud  fraction 
 511   integer, parameter, public :: OBS_HE    = OBS_VCF  +1 ! cloud effective height (one channel)
 512   integer, parameter, public :: OBS_ZTSR  = OBS_HE   +1 ! retrieved skin temperature from window channel in K
 513   integer, parameter, public :: OBS_ZTM   = OBS_ZTSR +1 ! model temperature, eta=1, in K (should not be there)
 514   integer, parameter, public :: OBS_ZTGM  = OBS_ZTM  +1 ! surface model temperature (skin) in K
 515   integer, parameter, public :: OBS_ZLQM  = OBS_ZTGM +1 ! specific humidity at surface (2m) in kg/kg
 516   integer, parameter, public :: OBS_ZPS   = OBS_ZLQM +1 ! surface model pressure in Pa
 517   integer, parameter, public :: OBS_TRAD  = OBS_ZPS  +1 ! Local EARTH Radius Metres
 518   integer, parameter, public :: OBS_GEOI  = OBS_TRAD +1 ! Geoid Undulation  Metres
 519   integer, parameter, public :: OBS_CLF   = OBS_GEOI +1 ! cloud fraction
 520   integer, parameter, public :: OBS_SUN   = OBS_CLF  +1 ! sun zenith angle
 521   integer, parameter, public :: OBS_SZA   = OBS_SUN  +1 ! satellite zenith angle
 522   integer, parameter, public :: OBS_AZA   = OBS_SZA  +1 ! satellite azimuthal angle
 523   integer, parameter, public :: OBS_SAZ   = OBS_AZA  +1 ! sun azimuth angle
 524   integer, parameter, public :: OBS_CLWO  = OBS_SAZ  +1 ! cloud liquid water retrieved from observation
 525   integer, parameter, public :: OBS_CLWB  = OBS_CLWO +1 ! cloud liquid water retrieved from background
 526   integer, parameter, public :: OBS_MWS   = OBS_CLWB +1 ! model wind speed (in ASCAT data)
 527   integer, parameter, public :: OBS_SIO   = OBS_MWS  +1 ! scatering index retrieved from observation
 528   integer, parameter, public :: OBS_SIB   = OBS_SIO  +1 ! scatering index retrieved from background
 529   integer, parameter, public :: OBS_IWV   = OBS_SIB  +1 ! atmospheric integrated water vapor for ssmis
 530   integer, parameter, public :: OBS_RZAM  = OBS_IWV  +1 ! Azimuth of the Radar beam   (radians)
 531   integer, parameter, public :: OBS_RELE  = OBS_RZAM +1 ! Elevation of the Radar beam (radians)
 532   integer, parameter, public :: OBS_RANS  = OBS_RELE +1 ! Initial range of the Radar beam 
 533   integer, parameter, public :: OBS_RANE  = OBS_RANS +1 ! Final range of the Radar beam 
 534
 535   ! the last column index for real header variables defined just above
 536   integer, parameter :: NHDR_REAL_END = OBS_RANE 
 537   integer, parameter :: NHDR_REAL_SIZE = NHDR_REAL_END - NHDR_REAL_BEG + 1
 538
 539   !
 540   ! REAL-HEADER COLUMN NAMES
 541   !
 542   character(len=4), target :: ocn_ColumnNameList_RH(NHDR_REAL_BEG:NHDR_REAL_END) =  &
 543      (/'LAT ','LON ','ALT ','BX  ','BY  ','BZ  ', &
 544        'M1C1','M1C2','M1C3','M1C4','M1C5','M1C6', &
 545        'M2C1','M2C2','M2C3','M2C4','M2C5','M2C6', &
 546        'M3C1','M3C2','M3C3','M3C4','M3C5','M3C6', &
 547        'M4C1','M4C2','M4C3','M4C4','M4C5','M4C6', &
 548        'M5C1','M5C2','M5C3','M5C4','M5C5','M5C6', &
 549        'M6C1','M6C2','M6C3','M6C4','M6C5','M6C6', &
 550        'M7C1','M7C2','M7C3','M7C4','M7C5','M7C6', &
 551        'S1C1','S1C2','S1C3','S1C4','S1C5','S1C6', &
 552        'S2C1','S2C2','S2C3','S2C4','S2C5','S2C6', &
 553        'S3C1','S3C2','S3C3','S3C4','S3C5','S3C6', &
 554        'S4C1','S4C2','S4C3','S4C4','S4C5','S4C6', &
 555        'S5C1','S5C2','S5C3','S5C4','S5C5','S5C6', &
 556        'S6C1','S6C2','S6C3','S6C4','S6C5','S6C6', &
 557        'S7C1','S7C2','S7C3','S7C4','S7C5','S7C6', &
 558        'CF1 ','CF2 ','CF3 ','CF4 ','CF5 ','CF6 ', &
 559        'CF7 ','ETOP','VTOP','ECF ','VCF ','HE  ', &
 560        'ZTSR','ZTM ','ZTGM','ZLQM','ZPS ','TRAD', &
 561        'GEOI','CLF ','SUN ','SZA ','AZA ','SAZ ', &
 562        'CLW1','CLW2','MWS ','SIO ','SIB ','IWV ', &
 563        'RZAM','RELE','RANS','RANE'/)
 564   !
 565   ! INTEGER-BODY COLUMN NUMBERS
 566   !
 567   ! the first column index for integer body variables defined below
 568   ! (chosen such that every column number is unique, so that a mismatch between
 569   !  a column number and a column type (real, int, head, body) can be detected)
 570   integer, parameter :: NBDY_INT_BEG = 401
 571   integer, parameter, public :: OBS_VNM = NBDY_INT_BEG ! variable number
 572   integer, parameter, public :: OBS_FLG = OBS_VNM+1  ! flags
 573   integer, parameter, public :: OBS_KFA = OBS_FLG+1  ! marker for forward interp problems
 574   integer, parameter, public :: OBS_ASS = OBS_KFA+1  ! flag to indicate if assimilated
 575   integer, parameter, public :: OBS_HIND= OBS_ASS+1  ! corresponding header row index
 576   integer, parameter, public :: OBS_VCO = OBS_HIND+1 ! type of vertical coordinate
 577   integer, parameter, public :: OBS_LYR = OBS_VCO+1  ! Index of anal level above observ'n
 578                                             ! Flag: extrapolation necessary of
 579   integer, parameter, public :: OBS_XTR = OBS_LYR+1  ! anal variables to obs'n location
 580   integer, parameter, public :: OBS_QCF2= OBS_XTR+1  ! TOVs Data Level Qc Flag
 581                                                      ! flag values for btyp=9248 block ele 033081
 582   integer, parameter, public :: OBS_CLA= OBS_QCF2+1  ! csr cloud amount btyp 9248 ele 020081
 583
 584   ! the last column index for integer body variables defined just above
 585   integer, parameter :: NBDY_INT_END = OBS_CLA
 586   integer, parameter :: NBDY_INT_SIZE = NBDY_INT_END - NBDY_INT_BEG + 1
 587
 588   !
 589   ! INTEGER-BODY COLUMN NAMES
 590   !
 591   character(len=4), target :: ocn_ColumnNameList_IB(NBDY_INT_BEG:NBDY_INT_END) = &
 592      (/ 'VNM ','FLG ','KFA ','ASS ','HIND','VCO ','LYR ','XTR ', 'QCFL ', &
 593          'CLA '/)  
 594
 595   !
 596   ! REAL-BODY COLUMN NUMBERS
 597   !
 598   ! the first column index for real body variables defined below
 599   ! (chosen such that every column number is unique, so that a mismatch between
 600   !  a column number and a column type (real, int, head, body) can be detected)
 601   integer, parameter :: NBDY_REAL_BEG   = 501
 602   integer, parameter, public :: OBS_PPP = NBDY_REAL_BEG ! vertical coordinate
 603   integer, parameter, public :: OBS_SEM = OBS_PPP +1 ! surface emissivity
 604   integer, parameter, public :: OBS_VAR = OBS_SEM +1 ! value of the observation
 605   integer, parameter, public :: OBS_OMP = OBS_VAR +1 ! obs - H (trial field)
 606   integer, parameter, public :: OBS_OMA = OBS_OMP +1 ! obs - H (analysis mean)
 607   integer, parameter, public :: OBS_OMAM= OBS_OMA +1 ! obs - H (analysis member)
 608   integer, parameter, public :: OBS_OER = OBS_OMAM +1 ! sigma(obs)
 609   integer, parameter, public :: OBS_HPHT= OBS_OER +1 ! root of (hpht with hx scalar)
 610   integer, parameter, public :: OBS_HAHT= OBS_HPHT +1 ! root of (hp_{a}ht with hx scalar)
 611   integer, parameter, public :: OBS_ZHA = OBS_HAHT+1 ! vert coordinate for Schur product
 612   integer, parameter, public :: OBS_OMP6= OBS_ZHA +1 ! obs - H (6-h trial field)
 613   integer, parameter, public :: OBS_OMA0= OBS_OMP6+1 ! obs - H (analysis at central time)
 614   integer, parameter, public :: OBS_SIGI= OBS_OMA0+1 ! ensemble-based estimate of the innov std dev
 615   integer, parameter, public :: OBS_SIGO= OBS_SIGI+1 ! ensemble-based estimate of obs std dev
 616   integer, parameter, public :: OBS_POB = OBS_SIGO+1 ! initial value of "gamma" for variational QC 
 617   integer, parameter, public :: OBS_WORK= OBS_POB +1 ! temporary values
 618   integer, parameter, public :: OBS_PRM = OBS_WORK+1 ! (adjusted) observed value for tovs in variational assimilation
 619   integer, parameter, public :: OBS_JOBS= OBS_PRM +1 ! contribution to obs cost function
 620   integer, parameter, public :: OBS_QCV = OBS_JOBS+1 ! weight-reduction factor for var QC
 621   integer, parameter, public :: OBS_FSO = OBS_QCV+1  ! forecast sensitivity to observations
 622   integer, parameter, public :: OBS_CRPS= OBS_FSO+1  ! Continuous Ranked Probability Score
 623   integer, parameter, public :: OBS_BCOR= OBS_CRPS+1 ! observation bias correction
 624   integer, parameter, public :: OBS_OMPE= OBS_BCOR+1 ! error standard deviation of [obs - H (trial field)]
 625   integer, parameter, public :: OBS_LATD= OBS_OMPE+1 ! obs LATitude  in Data table (radians)
 626   integer, parameter, public :: OBS_LOND= OBS_LATD+1 ! obs LONgitude in Data table (radians)
 627   integer, parameter, public :: OBS_BTCL= OBS_LOND+1 ! clear-sky simulated observation
 628   integer, parameter, public :: OBS_LOCI= OBS_BTCL+1 ! LOCation Information for observation (e.g. range along radar beam)
 629   
 630   ! the number of real body variables defined just above
 631   integer, parameter :: NBDY_REAL_END = OBS_LOCI
 632   integer, parameter :: NBDY_REAL_SIZE = NBDY_REAL_END - NBDY_REAL_BEG + 1
 633
 634   !
 635   ! REAL-BODY COLUMN NAMES
 636   !
 637   character(len=4), target :: ocn_ColumnNameList_RB(NBDY_REAL_BEG:NBDY_REAL_END) = &
 638      (/ 'PPP ','SEM ','VAR ','OMP ','OMA ','OMAM','OER ','HPHT','HAHT','ZHA ','OMP6',     &
 639         'OMA0','SIGI','SIGO','POB ','WORK','PRM ','JOBS','QCV ','FSO ','CRPS','BCOR',     &
 640         'OMPE','ROLA','ROLO','VAR2','LOCI' /)
 641end module ObsColumnNames_mod
 642
 643
 644
 645module ObsDataColumn_mod
 646  !
 647  ! MODULE obsDataColumn_mod (prefix='odc' category='7. Low-level data objects and utilities')
 648  !
 649  !:Purpose:
 650  !    This module is used exclusively by the obsSpaceData module which follows
 651  !    in this file. The derived type is used to represent a "column" of
 652  !    observation data in an instance of the struct_obs defined in obsSpaceData.
 653  !    It contains a pointer for each possible type of data stored in a column,
 654  !    but only one should be allocated at any time.
 655  !
 656   use codePrecision_mod
 657   use ObsColumnNames_mod
 658   implicit none
 659   save
 660   private
 661
 662   ! CLASS-CONSTANT:
 663   ! CLASS-CONSTANT:
 664   ! CLASS-CONSTANT:
 665
 666   ! This type gathers together into one structure the various CLASS-CONSTANT
 667   ! characteristics of a data column.  Four instances (flavours) of this derived
 668   ! type are defined below.
 669   type, public :: struct_odc_flavour
 670                                        ! These 2 values are informational only.
 671                                        ! They are used in error messages.
 672      character(len=4) :: dataType      ! REAL or INT
 673      character(len=4) :: headOrBody    ! HEAD or BODY
 674
 675      integer :: ncol_beg, ncol_end
 676      logical         , dimension(:), pointer :: columnActive   ! indexed from 1
 677      character(len=4), dimension(:), pointer :: columnNameList
 678
 679      integer,dimension(:), pointer ::activeIndexFromColumnIndex
 680      logical :: activeIndexFromColumnIndex_defined
 681
 682      integer,dimension(:), pointer ::columnIndexFromActiveIndex
 683      logical :: columnIndexFromActiveIndex_defined
 684   end type struct_odc_flavour
 685
 686   ! The four CLASS-CONSTANT flavours of data columns:
 687   !    Integer / Real  +  Body / Header
 688   type(struct_odc_flavour), public, target :: odc_flavour_IB, &
 689                                               odc_flavour_IH, &
 690                                               odc_flavour_RB, &
 691                                               odc_flavour_RH
 692   ! end of CLASS-CONSTANT objects
 693   ! end of CLASS-CONSTANT objects
 694   ! end of CLASS-CONSTANT objects
 695
 696
 697   ! methods
 698   public :: odc_allocate, odc_deallocate, odc_class_initialize
 699   public :: odc_initAllColumnFlavours
 700   public :: odc_activateColumn, odc_numActiveColumn
 701   public :: odc_columnElem, odc_columnSet
 702   public :: odc_columnIndexFromActiveIndex, odc_activeIndexFromColumnIndex
 703
 704   ! This type allows a single derived type to contain either a real or an int
 705   type, public :: struct_obsDataColumn
 706      logical          :: allocated = .false.
 707                                        ! For these arrays:
 708                                        !   1st dim'n:  row index (element index)
 709      integer,          pointer :: value_i(:)     => NULL()
 710      real(pre_obsReal),pointer :: value_r(:)     => NULL()
 711      character(len=4) :: dataType
 712   end type struct_obsDataColumn
 713
 714
 715   ! This type contains one array of data columns.  Four of these are necessary
 716   ! to constitute a complete set of observation data (struct_obs).
 717   type, public :: struct_obsDataColumn_Array
 718                                        ! CLASS-CONSTANT values (1 of 4 flavours)
 719      type(struct_odc_flavour), pointer :: odc_flavour => NULL()
 720                                        ! object-specific values
 721                                        !   1st dim'n: column index (column name)
 722      type(struct_obsDataColumn), dimension(:), pointer :: columns => NULL()
 723   end type struct_obsDataColumn_Array
 724  
 725
 726   ! These arrays store the status of the columns.  An active column is 
 727   ! allocated (and can therefore be used) in any object that is instantiated.
 728   logical, target :: columnActive_IH(NHDR_INT_BEG:NHDR_INT_END ) = .false.
 729   logical, target :: columnActive_RH(NHDR_REAL_BEG:NHDR_REAL_END) = .false.
 730   logical, target :: columnActive_IB(NBDY_INT_BEG:NBDY_INT_END ) = .false.
 731   logical, target :: columnActive_RB(NBDY_REAL_BEG:NBDY_REAL_END) = .false.
 732
 733   integer, target :: activeIndexFromColumnIndex_IB(NBDY_INT_BEG:NBDY_INT_END)
 734   integer, target :: activeIndexFromColumnIndex_IH(NHDR_INT_BEG:NHDR_INT_END)
 735   integer, target :: activeIndexFromColumnIndex_RB(NBDY_REAL_BEG:NBDY_REAL_END)
 736   integer, target :: activeIndexFromColumnIndex_RH(NHDR_REAL_BEG:NHDR_REAL_END)
 737
 738   integer, target :: columnIndexFromActiveIndex_IB(NBDY_INT_SIZE)
 739   integer, target :: columnIndexFromActiveIndex_IH(NHDR_INT_SIZE)
 740   integer, target :: columnIndexFromActiveIndex_RB(NBDY_REAL_SIZE)
 741   integer, target :: columnIndexFromActiveIndex_RH(NHDR_REAL_SIZE)
 742
 743   integer, public, parameter :: odc_ENKF_bdy_int_column_list(8) = &
 744      (/OBS_VNM, OBS_FLG, OBS_ASS, OBS_HIND, OBS_VCO, OBS_LYR, &
 745        OBS_QCF2, OBS_CLA /)
 746   integer, public, parameter :: odc_ENKF_bdy_real_column_list(15) = &
 747      (/OBS_PPP, OBS_SEM, OBS_VAR,  OBS_OMP,  OBS_OMA,  OBS_OER,  OBS_HPHT,&
 748        OBS_HAHT,OBS_ZHA, OBS_OMP6, OBS_OMA0, OBS_SIGI, OBS_SIGO, OBS_LATD,&
 749        OBS_LOND /)
 750
 751contains
 752
 753   subroutine odc_abort(cdmessage)
 754     !
 755     !:Purpose: Abort a job on error (same as OBS_ABORT)
 756     !
 757     !:Arguments:
 758     !           :cdmessage: message to be printed
 759     !
 760     !:Note: For debugging (i.e. UNIT_TESTING is defined), obs_abort should
 761     !        generally be followed by a 'return' in the calling routine.
 762
 763!#if defined(UNIT_TESTING)
 764!      use pFUnit
 765!#endif
 766      implicit none
 767
 768      ! Arguments:
 769      character(len=*), intent(in) :: cdmessage
 770
 771      write(*,'(//,4X,"ABORTING IN ObsDataColumn_mod:-------",/,8X,A)')cdmessage
 772      call flush(6)
 773
 774!#if defined(UNIT_TESTING)
 775!      call throw(Exception('exiting in odc_abort:' // cdmessage))
 776!#else
 777      call qqexit(1)
 778      stop
 779!#endif
 780   end subroutine odc_abort
 781
 782
 783   function odc_activeIndexFromColumnIndex(odc_flavour,column_index_in, &
 784                                           recompute_opt) result(active_index_out)
 785      !
 786      !:Purpose: The list of active columns is only a subset of all possible
 787      !           columns.  Return the index into the list of active columns, given
 788      !           the index into the list of all columns.
 789      !
 790      implicit none
 791
 792      ! Arguments:
 793      type(struct_odc_flavour), intent(inout) :: odc_flavour
 794      integer                 , intent(in)    :: column_index_in
 795      logical, optional       , intent(in)    :: recompute_opt
 796      ! Result:
 797      integer                                 :: active_index_out
 798
 799      ! Locals:
 800      integer :: active_index, &
 801                 column_index
 802      character(len=100) :: message
 803
 804      if(present(recompute_opt)) then
 805         if(recompute_opt) odc_flavour%activeIndexFromColumnIndex_defined=.false.
 806      endif
 807
 808      if(.not. odc_flavour%activeIndexFromColumnIndex_defined) then
 809         odc_flavour%activeIndexFromColumnIndex_defined=.true.
 810         active_index=0
 811         odc_flavour%activeIndexFromColumnIndex(:)=-1
 812
 813         do column_index = odc_flavour%ncol_beg, odc_flavour%ncol_end
 814            if(odc_flavour%columnActive(column_index)) then
 815               active_index=active_index+1
 816               odc_flavour%activeIndexFromColumnIndex(column_index) =active_index
 817            endif
 818         enddo
 819      endif
 820
 821      active_index_out=odc_flavour%activeIndexFromColumnIndex(column_index_in)
 822
 823      if(active_index_out == -1) then
 824         write(message,*)'ODC_activeIndexFromColumnIndex: requested column is ',&
 825                         'not active!  Column name is ', &
 826                         odc_flavour%columnNameList(column_index_in)
 827         call odc_abort(message)
 828      end if
 829   end function odc_activeIndexFromColumnIndex
 830
 831
 832   subroutine odc_allocate( odc, numRows, name, dataType, scratchReal, scratchInt )
 833     !
 834     !:Purpose: Allocate a single column of obs data according to 
 835     !           specifications in input arguments
 836     !
 837     !:Arguments:
 838     !           :odc:        instance of the obsDataColumn type
 839     !           :numRows:    number of column rows to allocate
 840     !           :name:       character string name of column
 841     !           :dataType:   character string type of column data: REAL or INT
 842     !           :headOrBody: character string indicating HEAD or BODY
 843     !
 844     !
 845      implicit none
 846
 847      ! Arguments:
 848      type(struct_obsDataColumn), intent(inout) :: odc
 849      integer,                    intent(in)    :: numRows
 850      character(len=*),           intent(in)    :: name,dataType
 851      real(pre_obsReal), pointer, intent(in)    :: scratchReal(:)
 852      integer          , pointer, intent(in)    :: scratchInt(:)
 853
 854      if(odc%allocated) then
 855         call odc_abort('ODC_ALLOCATE: column is already allocated. name=' &
 856                        // name)
 857         return
 858      endif
 859
 860      odc%allocated=.true.
 861      odc%dataType = dataType
 862
 863      select case (trim(dataType))
 864      case ('INT')
 865         allocate(odc%value_i(numRows))
 866         odc%value_i(:)=0
 867         odc%value_r   => scratchReal
 868
 869      case ('REAL')
 870         allocate(odc%value_r(numRows))
 871         odc%value_r(:)=real(0.0D0, pre_obsReal)
 872         odc%value_i   => scratchInt
 873
 874      case default
 875         call odc_abort('ODC_ALLOCATE: unknown data type. type=' // dataType)
 876      end select
 877
 878   end subroutine odc_allocate
 879
 880
 881   subroutine odc_activateColumn(odc_flavour, column_index)
 882      !
 883      !:Purpose: Set the 'active' flag for the indicated column.  This enables memory
 884      !           allocation for this column without actually allocating the memory.
 885      !
 886      implicit none
 887
 888      ! Arguments:
 889      type(struct_odc_flavour), intent(inout) :: odc_flavour
 890      integer,                  intent(in)    :: column_index
 891
 892      ! Locals:
 893      integer :: active_index, dummy_index
 894
 895      if(.not.odc_flavour%columnActive(column_index)) then
 896         odc_flavour%columnActive(column_index) = .true.
 897      endif
 898
 899      ! force the recalculation of indices to go between activeColumnIndex and
 900      ! columnIndex
 901      active_index=odc_activeIndexFromColumnIndex(odc_flavour,column_index, &
 902                                                  recompute_opt=.true.)
 903      dummy_index =odc_columnIndexFromActiveIndex(odc_flavour,active_index, &
 904                                                  recompute_opt=.true.)
 905   end subroutine odc_activateColumn
 906
 907
 908   subroutine odc_initColumnFlavour(odc_flavour, dataType_in, headOrBody_in)
 909      !
 910      !:Purpose: Set pointers according to the four column flavours (header /
 911      !           body, integer / real).
 912      !      
 913      implicit none
 914
 915      ! Arguments:
 916      type(struct_odc_flavour), intent(inout) :: odc_flavour
 917      character(len=*)        , intent(in)    :: dataType_in    ! REAL or INT
 918      character(len=*)        , intent(in)    :: headOrBody_in  ! HEAD or BODY
 919
 920      odc_flavour%dataType   = trim(dataType_in)
 921      odc_flavour%headOrBody = trim(headOrBody_in)
 922
 923      select case (trim(dataType_in))
 924      case ('REAL')
 925         select case (trim(headOrBody_in))
 926         case ('HEAD')
 927            odc_flavour%ncol_beg = NHDR_REAL_BEG
 928            odc_flavour%ncol_end = NHDR_REAL_END
 929            odc_flavour%columnActive   => columnActive_RH
 930            odc_flavour%columnNameList => ocn_ColumnNameList_RH
 931            odc_flavour%activeIndexFromColumnIndex=>activeIndexFromColumnIndex_RH
 932            odc_flavour%activeIndexFromColumnIndex_defined = .false.
 933            odc_flavour%columnIndexFromActiveIndex=>columnIndexFromActiveIndex_RH
 934            odc_flavour%columnIndexFromActiveIndex_defined = .false.
 935         case ('BODY')
 936            odc_flavour%ncol_beg = NBDY_REAL_BEG
 937            odc_flavour%ncol_end = NBDY_REAL_END
 938            odc_flavour%columnActive   => columnActive_RB
 939            odc_flavour%columnNameList => ocn_ColumnNameList_RB
 940            odc_flavour%activeIndexFromColumnIndex=>activeIndexFromColumnIndex_RB
 941            odc_flavour%activeIndexFromColumnIndex_defined = .false.
 942            odc_flavour%columnIndexFromActiveIndex=>columnIndexFromActiveIndex_RB
 943            odc_flavour%columnIndexFromActiveIndex_defined = .false.
 944         end select
 945
 946      case ('INT')
 947         select case (trim(headOrBody_in))
 948         case ('HEAD')
 949            odc_flavour%ncol_beg = NHDR_INT_BEG
 950            odc_flavour%ncol_end = NHDR_INT_END
 951            odc_flavour%columnActive   => columnActive_IH
 952            odc_flavour%columnNameList => ocn_ColumnNameList_IH
 953            odc_flavour%activeIndexFromColumnIndex=>activeIndexFromColumnIndex_IH
 954            odc_flavour%activeIndexFromColumnIndex_defined = .false.
 955            odc_flavour%columnIndexFromActiveIndex=>columnIndexFromActiveIndex_IH
 956            odc_flavour%columnIndexFromActiveIndex_defined = .false. 
 957         case ('BODY') 
 958            odc_flavour%ncol_beg = NBDY_INT_BEG
 959            odc_flavour%ncol_end = NBDY_INT_END
 960            odc_flavour%columnActive   => columnActive_IB
 961            odc_flavour%columnNameList => ocn_ColumnNameList_IB
 962            odc_flavour%activeIndexFromColumnIndex=>activeIndexFromColumnIndex_IB
 963            odc_flavour%activeIndexFromColumnIndex_defined = .false.
 964            odc_flavour%columnIndexFromActiveIndex=>columnIndexFromActiveIndex_IB
 965            odc_flavour%columnIndexFromActiveIndex_defined = .false.
 966         end select
 967      end select
 968
 969   end subroutine odc_initColumnFlavour
 970
 971
 972   subroutine odc_initAllColumnFlavours()
 973      !
 974      !:Purpose: Initialize the 4 flavours of odc
 975      !
 976      implicit none
 977
 978      ! Locals:
 979      logical, save :: firstCall = .true.
 980
 981      if (firstCall) then
 982        firstCall = .false.
 983        ! Initialize the four column flavours:
 984        call odc_initColumnFlavour(odc_flavour_IB, 'INT',  'BODY')
 985        call odc_initColumnFlavour(odc_flavour_IH, 'INT',  'HEAD')
 986        call odc_initColumnFlavour(odc_flavour_RB, 'REAL', 'BODY')
 987        call odc_initColumnFlavour(odc_flavour_RH, 'REAL', 'HEAD')
 988      end if
 989
 990   end subroutine odc_initAllColumnFlavours
 991
 992
 993   subroutine odc_class_initialize(obsColumnMode)
 994      !
 995      !:Purpose: Set variables that use the same values for all instances of the class.
 996      !
 997      implicit none
 998      ! mode controlling the subset of columns that are activated in all objects
 999
1000      ! Arguments:
1001      character(len=*), intent(in) :: obsColumnMode
1002
1003      ! Locals:
1004      integer :: column_index, list_index, ii
1005      integer, parameter :: COLUMN_LIST_SIZE = 100
1006      integer, dimension(COLUMN_LIST_SIZE) :: hdr_int_column_list, &
1007                  hdr_real_column_list, bdy_int_column_list, bdy_real_column_list
1008
1009      ! Initialize the four column flavours:
1010      call odc_initAllColumnFlavours()
1011
1012      COLUMN_MODE:if(trim(obsColumnMode) == 'ALL') then
1013         do column_index=NHDR_INT_BEG,NHDR_INT_END
1014            call odc_activateColumn(odc_flavour_IH, column_index)
1015         enddo
1016         do column_index=NHDR_REAL_BEG,NHDR_REAL_END
1017            call odc_activateColumn(odc_flavour_RH, column_index)
1018         enddo
1019         do column_index=NBDY_INT_BEG,NBDY_INT_END
1020            call odc_activateColumn(odc_flavour_IB, column_index)
1021         enddo
1022         do column_index=NBDY_REAL_BEG,NBDY_REAL_END
1023            call odc_activateColumn(odc_flavour_RB, column_index)
1024         enddo
1025
1026      elseif(trim(obsColumnMode) == 'ENKF') then COLUMN_MODE
1027
1028         hdr_int_column_list= &
1029            (/OBS_RLN , OBS_ONM , OBS_INS , OBS_ITY , OBS_SAT , OBS_TEC , OBS_DAT , &
1030              OBS_ETM , OBS_NLV , OBS_STYP, OBS_PAS , OBS_REG , OBS_IP  , OBS_ST1 , &
1031              OBS_IDF , OBS_SEN , OBS_GQF , OBS_GQL , OBS_ROQF,(0,ii=20,100) /)
1032
1033         hdr_real_column_list= &
1034            (/OBS_LAT , OBS_LON , OBS_ALT , OBS_BX  , OBS_BY  , OBS_BZ  , OBS_TRAD, &
1035              OBS_GEOI, OBS_CLF , OBS_SUN , OBS_SZA , OBS_AZA , OBS_SAZ , OBS_RZAM, &
1036              OBS_RELE, OBS_RANS, OBS_RANE, (0,ii=18,100)/)
1037
1038         bdy_int_column_list(:)    = 0
1039         bdy_int_column_list(1:size(odc_ENKF_bdy_int_column_list)) = &
1040            odc_ENKF_bdy_int_column_list(:)
1041
1042         bdy_real_column_list(:)   = 0
1043         bdy_real_column_list(1:size(odc_ENKF_bdy_real_column_list)) = &
1044            odc_ENKF_bdy_real_column_list(:)
1045
1046         do list_index=1,COLUMN_LIST_SIZE
1047            column_index = hdr_int_column_list(list_index)
1048            if(column_index == 0) exit
1049            call odc_activateColumn(odc_flavour_IH, column_index)
1050         end do
1051
1052         do list_index=1,COLUMN_LIST_SIZE
1053            column_index = hdr_real_column_list(list_index)
1054            if(column_index == 0) exit
1055            call odc_activateColumn(odc_flavour_RH, column_index)
1056         end do
1057
1058         do list_index=1,COLUMN_LIST_SIZE
1059            column_index = bdy_int_column_list(list_index)
1060            if(column_index == 0) exit
1061            call odc_activateColumn(odc_flavour_IB, column_index)
1062         end do
1063
1064         do list_index=1,COLUMN_LIST_SIZE
1065            column_index = bdy_real_column_list(list_index)
1066            if(column_index == 0) exit
1067            call odc_activateColumn(odc_flavour_RB, column_index)
1068         end do
1069
1070      elseif(trim(obsColumnMode) == 'ENKFMIDAS') then COLUMN_MODE
1071
1072         hdr_int_column_list= &
1073            (/OBS_RLN , OBS_ONM , OBS_INS , OBS_ITY , OBS_SAT , OBS_TEC , OBS_DAT , &
1074              OBS_ETM , OBS_NLV , OBS_STYP, OBS_PAS , OBS_REG , OBS_IP  , OBS_ST1 , &
1075              OBS_IDF , OBS_SEN , OBS_SWQ1, OBS_SWQ2, OBS_SWMT, OBS_SWLS, OBS_SWGA, &
1076              OBS_SWHA, OBS_GQF , OBS_GQL , OBS_ROQF, OBS_TTYP, (0,ii=27,100) /)
1077
1078         hdr_real_column_list= &
1079            (/OBS_LAT , OBS_LON , OBS_ALT , OBS_BX  , OBS_BY  , OBS_BZ  , OBS_TRAD, &
1080              OBS_GEOI, OBS_CLF , OBS_SUN , OBS_SZA , OBS_AZA , OBS_SAZ , OBS_RZAM, &
1081              OBS_RELE, OBS_RANS, OBS_RANE, (0,ii=18,100)/)
1082
1083         bdy_int_column_list= &
1084            (/OBS_VNM , OBS_FLG , OBS_ASS , OBS_HIND, OBS_VCO , OBS_LYR , OBS_XTR , &
1085              OBS_QCF2, OBS_CLA , (0,ii=10,100) /)
1086
1087         bdy_real_column_list= &
1088            (/OBS_PPP , OBS_SEM , OBS_VAR , OBS_OMP , OBS_OMA , OBS_OMAM, OBS_OER , &
1089              OBS_HPHT, OBS_HAHT, OBS_ZHA , OBS_OMP6, OBS_OMA0, OBS_SIGI, OBS_SIGO, &
1090              OBS_WORK, OBS_PRM , OBS_JOBS, OBS_BCOR, OBS_LOND, OBS_LATD, (0,ii=21,100) /)
1091
1092         do list_index=1,COLUMN_LIST_SIZE
1093            column_index = hdr_int_column_list(list_index)
1094            if(column_index == 0) exit
1095            call odc_activateColumn(odc_flavour_IH, column_index)
1096         end do
1097
1098         do list_index=1,COLUMN_LIST_SIZE
1099            column_index = hdr_real_column_list(list_index)
1100            if(column_index == 0) exit
1101            call odc_activateColumn(odc_flavour_RH, column_index)
1102         end do
1103
1104         do list_index=1,COLUMN_LIST_SIZE
1105            column_index = bdy_int_column_list(list_index)
1106            if(column_index == 0) exit
1107            call odc_activateColumn(odc_flavour_IB, column_index)
1108         end do
1109
1110         do list_index=1,COLUMN_LIST_SIZE
1111            column_index = bdy_real_column_list(list_index)
1112            if(column_index == 0) exit
1113            call odc_activateColumn(odc_flavour_RB, column_index)
1114         end do
1115
1116      elseif(trim(obsColumnMode) == 'VAR') then COLUMN_MODE
1117
1118         do column_index=NHDR_INT_BEG,NHDR_INT_END
1119            if( column_index < OBS_GQF .or. column_index > OBS_NCO2  &
1120              ) call odc_activateColumn(odc_flavour_IH, column_index)
1121         enddo
1122         do column_index=NHDR_REAL_BEG,NHDR_REAL_END
1123            if(     column_index /= OBS_BX &
1124               .and.column_index /= OBS_BY &
1125               .and.column_index /= OBS_BZ &
1126               .and. (column_index < OBS_M1C1 .or. &
1127                      column_index > OBS_ZPS) &
1128              ) call odc_activateColumn(odc_flavour_RH, column_index)
1129         enddo
1130
1131         do column_index=NBDY_INT_BEG,NBDY_INT_END
1132            if( column_index /= OBS_KFA ) call odc_activateColumn(odc_flavour_IB, column_index)
1133         enddo
1134         do column_index = NBDY_REAL_BEG, NBDY_REAL_END
1135           if(       column_index /= OBS_OMAM &
1136               .and. column_index /= OBS_OMP6 & 
1137               .and. column_index /= OBS_OMA0 &
1138               .and. column_index /= OBS_HAHT &
1139               .and. column_index /= OBS_SIGI &
1140               .and. column_index /= OBS_SIGO &
1141              )call odc_activateColumn(odc_flavour_RB, column_index)
1142         enddo
1143
1144      endif COLUMN_MODE
1145   end subroutine odc_class_initialize
1146
1147
1148   subroutine odc_columnElem(odc_array, column_index, row_index, value_i,value_r)
1149     !
1150     !:Purpose: Returns the value of the row_index'th element in the column array
1151     !           with the indicated column_index.
1152     !
1153     !           The column array can be of any one of the four possible column-array
1154     !           flavours.  The flavour is selected by one of four wrappers to this
1155     !           method.
1156     !
1157     implicit none
1158
1159     ! Arguments:
1160     type(struct_obsDataColumn_Array), intent(in)  :: odc_array
1161     integer                         , intent(in)  :: column_index
1162     integer                         , intent(in)  :: row_index
1163     integer                         , intent(out) :: value_i
1164     real(pre_obsReal)               , intent(out) :: value_r
1165
1166     ! Locals:
1167     character(len=100) :: message
1168
1169      if(      column_index >= odc_array%odc_flavour%ncol_beg &
1170         .and. column_index <= odc_array%odc_flavour%ncol_end) then
1171         if(odc_array%odc_flavour%columnActive(column_index)) then
1172            ! Return the value (return int AND real, just to make it simple)
1173            value_i = odc_array%columns(column_index)%value_i(row_index)
1174            value_r = odc_array%columns(column_index)%value_r(row_index)
1175         else
1176            write(message,*)'abort in odc_columnElem [' &
1177                          // odc_array%odc_flavour%dataType //',' &
1178                          // odc_array%odc_flavour%headOrBody // &
1179                          ']: column not active: ', &
1180                          odc_array%odc_flavour%columnNameList(column_index)
1181            call odc_abort(message)
1182         endif
1183      else
1184         write(message,*) 'abort in odc_columnElem [' &
1185                          // odc_array%odc_flavour%dataType //',' &
1186                          // odc_array%odc_flavour%headOrBody // &
1187                          ']: column index out of range: ', column_index
1188         call odc_abort(message); return
1189      endif
1190   end subroutine odc_columnElem
1191
1192
1193   function odc_columnIndexFromActiveIndex(odc_flavour,active_index_in, &
1194                                           recompute_opt) result(column_index_out)
1195      !
1196      !:Purpose: The list of active columns is only a subset of all possible
1197      !           columns.  Return the index into the list of all columns, given
1198      !           the index into the list of active columns, and given the column
1199      !           flavour.
1200      !
1201      implicit none
1202
1203      ! Arguments:
1204      type(struct_odc_flavour), intent(inout) :: odc_flavour
1205      integer                 , intent(in)    :: active_index_in
1206      logical, optional       , intent(in)    :: recompute_opt
1207      ! Result:
1208      integer                                 :: column_index_out
1209
1210      ! Locals:
1211      integer :: active_index, &
1212                 column_index
1213
1214      if(present(recompute_opt)) then
1215         if(recompute_opt) odc_flavour%columnIndexFromActiveIndex_defined=.false.
1216      endif
1217
1218      if(.not. odc_flavour%columnIndexFromActiveIndex_defined) then
1219         odc_flavour%columnIndexFromActiveIndex_defined=.true.
1220         active_index=0
1221
1222         do column_index = odc_flavour%ncol_beg, odc_flavour%ncol_end
1223            if(odc_flavour%columnActive(column_index)) then
1224               active_index=active_index+1
1225               odc_flavour%columnIndexFromActiveIndex(active_index) =column_index
1226            endif
1227         enddo
1228      endif
1229
1230      column_index_out=odc_flavour%columnIndexFromActiveIndex(active_index_in)
1231   end function odc_columnIndexFromActiveIndex
1232
1233
1234   subroutine odc_columnSet(odc_array, column_index, row_index, &
1235                            value_i, value_r, numElements, numElements_max)
1236      !
1237      !:Purpose: Sets the value of the row_index'th element in the column array with
1238      !           the indicated column_index.
1239      !
1240      !           The column array can be of any one of the four possible column-array
1241      !           flavours.  The flavour is selected by one of four wrappers to this
1242      !           method.
1243      !
1244      implicit none
1245
1246      ! Arguments:
1247      type(struct_obsDataColumn_Array), intent(inout) :: odc_array
1248      integer                         , intent(in)    :: column_index
1249      integer                         , intent(in)    :: row_index
1250      integer                         , intent(in)    :: value_i
1251      real(pre_obsReal)               , intent(in)    :: value_r
1252      integer                         , intent(inout) :: numElements
1253      integer                         , intent(in)    :: numElements_max
1254
1255      ! Locals:
1256      character(len=100) :: message
1257
1258      ! Validate the requested row_index, and
1259      ! Increment the number of elements, if necessary
1260      if(row_index > numElements_max) then
1261         write(message,*)'The requested ', &
1262                         trim(odc_array%odc_flavour%headOrBody), ' row_index, ',&
1263                         row_index,', is greater than the maximum, ', &
1264                         numElements_max
1265         call odc_abort(message)
1266
1267      else if(row_index > numElements+1) then
1268         write(message,*)'The requested ', &
1269                         trim(odc_array%odc_flavour%headOrBody), &
1270                         ' row_index, ', row_index, &
1271                         ', is beyond the next available index, ', &
1272                         numElements+1
1273         call odc_abort(message)
1274
1275      else if(row_index == numElements+1) then
1276         numElements = numElements+1
1277      end if
1278
1279
1280      ! Validate the requested column index, and
1281      ! Record the value
1282      if(      column_index >= odc_array%odc_flavour%ncol_beg &
1283         .and. column_index <= odc_array%odc_flavour%ncol_end) then
1284         if(odc_array%odc_flavour%columnActive(column_index)) then
1285            ! Record the value (record int AND real, just to make it simple)
1286            odc_array%columns(column_index)%value_i(row_index) = value_i
1287            odc_array%columns(column_index)%value_r(row_index) = value_r
1288         else
1289            write(message,*) 'abort in odc_columnSet [' &
1290                          // odc_array%odc_flavour%dataType //',' &
1291                          // odc_array%odc_flavour%headOrBody // &
1292                          ']: column not active: ', &
1293                          odc_array%odc_flavour%columnNameList(column_index)
1294            call odc_abort(message)
1295         endif
1296      else
1297         write(message,*) 'abort in odc_columnSet [' &
1298                          // odc_array%odc_flavour%dataType //',' &
1299                          // odc_array%odc_flavour%headOrBody // &
1300                          ']: column index out of range: ', column_index
1301         call odc_abort(message); return
1302      endif
1303
1304   end subroutine odc_columnSet
1305
1306
1307   subroutine odc_deallocate(odc)
1308      !
1309      !:Purpose: Deallocate a single column of obs data
1310      !
1311      !:Arguments:
1312      !           :odc: instance of the obsDataColumn type
1313      !
1314      implicit none
1315
1316      ! Arguments:
1317      type(struct_obsDataColumn), intent(inout) :: odc
1318
1319      if(.not.odc%allocated) then
1320         call odc_abort('ODC_DEALLOCATE: column is not already allocated.')
1321      endif
1322
1323      odc%allocated=.false.
1324      if(trim(odc%dataType) == 'INT' .and. associated(odc%value_i)) then
1325         deallocate(odc%value_i)
1326         nullify(odc%value_i)
1327         nullify(odc%value_r)    ! Dont deallocate:  this is not the only pointer
1328      end if
1329
1330      if(trim(odc%dataType) == 'REAL' .and. associated(odc%value_r)) then
1331         deallocate(odc%value_r)
1332         nullify(odc%value_r)
1333         nullify(odc%value_i)    ! Dont deallocate:  this is not the only pointer
1334      endif
1335
1336   end subroutine odc_deallocate
1337
1338
1339   function odc_numActiveColumn(odc_array) result(numActiveColumn)
1340      !
1341      !:Purpose: Return the number of active columns that are contained in the given
1342      !           column array.
1343      !
1344      !           The column array can be of any one of the four possible column-array
1345      !           flavours.
1346      !
1347      implicit none
1348
1349      ! Arguments:
1350      type(struct_obsDataColumn_Array), intent(in) :: odc_array
1351      ! Result:
1352      integer :: numActiveColumn
1353
1354      ! Locals:
1355      integer :: column_index
1356
1357      numActiveColumn=0
1358      do column_index = odc_array%odc_flavour%ncol_beg, &
1359                        odc_array%odc_flavour%ncol_end
1360         if(odc_array%odc_flavour%columnActive(column_index))  &
1361            numActiveColumn=numActiveColumn+1
1362      enddo
1363   end function odc_numActiveColumn
1364
1365end module ObsDataColumn_mod
1366
1367
1368module ObsSpaceData_mod
1369  ! MODULE obsSpaceData_mod (prefix='obs' category='6. High-level data objects')
1370  !
1371  !:Purpose:  This module contains the definition of the structure named "struct_obs" and
1372  !           methods for accessing its data.  An instance of struct_obs contains
1373  !           all information pertaining to a set of observation-space data.
1374  !
1375  !           This has evolved from the CMA structure, originated in work by
1376  !           D. Vasiljevic at ECMWF.
1377  !
1378  !           |
1379  !
1380  !           Very generally, obsSpaceData can be thought as two tables linked to one another. 
1381  !           
1382  !           A "Header" table:
1383  ! 
1384  !           +-----------------+------+------+------+
1385  !           | headerIndex     | Lat  | Lon  | ...  |
1386  !           +=================+======+======+======+
1387  !           |    1            |      |      |      |
1388  !           +-----------------+------+------+------+
1389  !           |    2            |      |      |      |
1390  !           +-----------------+------+------+------+
1391  !           |    ...          |      |      |      |
1392  !           +-----------------+------+------+------+
1393  !           | n_header        |      |      |      |
1394  !           +-----------------+------+------+------+
1395  !
1396  !           and a "data" table:
1397  ! 
1398  !           +-----------------+-------------------------+-------------+------+
1399  !           | dataIndex       | Associated headerIndex  | Obs Value   | ...  |
1400  !           +=================+=========================+=============+======+
1401  !           |    1            |            1            |             |      |
1402  !           +-----------------+-------------------------+-------------+------+
1403  !           |    2            |            1            |             |      |
1404  !           +-----------------+-------------------------+-------------+------+
1405  !           |    3            |            1            |             |      |
1406  !           +-----------------+-------------------------+-------------+------+
1407  !           |    4            |            1            |             |      |
1408  !           +-----------------+-------------------------+-------------+------+
1409  !           |    5            |            2            |             |      |
1410  !           +-----------------+-------------------------+-------------+------+
1411  !           |    6            |            2            |             |      |
1412  !           +-----------------+-------------------------+-------------+------+
1413  !           |    7            |            3            |             |      |
1414  !           +-----------------+-------------------------+-------------+------+
1415  !           |    8            |            3            |             |      |
1416  !           +-----------------+-------------------------+-------------+------+
1417  !           |    9            |            3            |             |      |
1418  !           +-----------------+-------------------------+-------------+------+
1419  !           | n_data          |                         |             |      |
1420  !           +-----------------+-------------------------+-------------+------+
1421  !
1422  !           |
1423  !
1424  !           * For satellite observations
1425  !               * One header contains information on lat/lon, azimuth, zenith angle and time. 
1426  !               * Data entries contain: channels, measurement value, etc. 
1427  !
1428  !             .. image:: /_static/satellite.png
1429  !                 :align: center
1430  !
1431  !           |
1432  !
1433  !           * For radar observations
1434  !               * One header for each radar "ray". Contains information on lat/lon of radar, elevation, azimuth, etc. 
1435  !               * Data entries contain: range, altitude, Doppler velocity, etc. 
1436  !
1437  !             .. image:: /_static/radar.png
1438  !                 :align: center
1439  !
1440  !           |
1441  !
1442  !           * For gps radio occultation (GPS-RO)
1443  !               * One header for each profile. Contains information on lat/lon time, etc. 
1444  !               * Data entries contain: bending angle, refractivity, etc.
1445  !
1446  !             .. image:: /_static/gps_ro.png
1447  !                 :align: center
1448  !
1449  !           |
1450  !
1451  !           * For other observations types such as radiosondes and aircrafts, there is one data entry per header rentry. 
1452  !               * headers contain information on lat/lon, time, etc. 
1453  !               * Data entries contain measurement values. 
1454  !
1455  !             .. image:: /_static/others.png
1456  !                 :align: center
1457  !
1458  !
1459  !:What:   The module  "ObsSpaceData_mod" relies on three other modules:
1460  !
1461  !          * ``IndexListDepot_mod``
1462  !          * ``ObsColumnNames_mod``
1463  !          * ``ObsDataColumn_mod``
1464  ! 
1465  !:Note:   Throughout this file:
1466  !
1467  !          * Column_index  is not (in general) indexed from one.  
1468  !          * Each column index has an equivalent name, ``OBS_*`` as defined in ``ObsColumnNames_mod``.
1469  !          * Active_index is indexed from one by definition (a column index).
1470  !          * row_index is indexed from one.  It has no equivalent name.
1471  !          * ``bodyIndex``, etc. is necessarily a row index
1472  !          * ``HeaderIndex``, etc. is necessarily a row index
1473  !
1474   use codePrecision_mod
1475   use ObsColumnNames_mod
1476   use ObsDataColumn_mod
1477   use IndexListDepot_mod
1478   use mathPhysConstants_mod
1479   implicit none
1480   save
1481   private
1482
1483
1484
1485   ! CLASS-CONSTANT:
1486   ! CLASS-CONSTANT:
1487   ! CLASS-CONSTANT:
1488
1489   logical, save :: obs_class_initialized = .false.
1490
1491   ! end of CLASS-CONSTANT variables.
1492   ! end of CLASS-CONSTANT variables
1493   ! end of CLASS-CONSTANT variables.
1494
1495
1496   ! PUBLIC METHODS:
1497   public obs_bodyElem_i ! obtain an integer body element from observation object
1498   public obs_bodyElem_r ! obtain a real body element from the observation object
1499   public obs_bodyPrimaryKey ! obtain the body primary key value
1500   public obs_bodyIndex_mpiglobal ! obtain mpiglobal body row index
1501   public obs_bodySet_i  ! set an integer body value in the observation object
1502   public obs_bodySet_r  ! set a real body value in the observation object
1503   public obs_class_initialize ! initialize class variables: column mode
1504   public obs_clean      ! remove from obs data those that not to be assimilated
1505   public obs_clean2     ! modified version of obs_clean used by MIDAS
1506   public obs_columnActive_IB ! return the active status for a column (T/F)
1507   public obs_columnActive_IH !                 "
1508   public obs_columnActive_RB !                 "
1509   public obs_columnActive_RH !                 "
1510   public obs_isColumnNameValid      ! Check if column name is a valid obsSpaceData name
1511   public obs_columnIndexFromName    ! get the index from the name (any flavour)
1512   public obs_columnIndexFromName_IB ! get the index from the name
1513   public obs_columnIndexFromName_IH !         "
1514   public obs_columnIndexFromName_RB !         "
1515   public obs_columnIndexFromName_RH !         "
1516   public obs_columnDataType ! tell user if column index has real or integer data
1517   public obs_copy       ! copy an obsdat object
1518   public obs_elem_c     ! obtain character element from the observation object
1519   public obs_enkf_prntbdy! print all data records associated with an observation
1520   public obs_enkf_prnthdr! print the header of an observation record
1521   public obs_expandToMpiGlobal ! restore data for the mpi-global context
1522   public obs_extractObsRealBodyColumn    ! return entire selected column (real/body)
1523   public obs_extractObsRealBodyColumn_r4 ! return entire selected column (real/body), real4 version
1524   public obs_extractObsIntBodyColumn     ! return entire selected column (int/body)
1525   public obs_extractObsRealHeaderColumn  ! return entire selected column (real/header)
1526   public obs_extractObsIntHeaderColumn   ! return entire selected column (int/header)
1527   public obs_finalize   ! object clean-up
1528   public obs_getBodyIndex ! obtain an element from the current body list
1529   public obs_getFamily  ! return the family of a datum
1530   public obs_getHeaderIndex ! obtain an element from the current header list
1531   public obs_getNchanAvhrr  ! to get the number of AVHRR channels
1532   public obs_getNclassAvhrr ! to get the number of AVHRR radiance classes
1533   public obs_headElem_i ! obtain an integer header element from the obs'n object
1534   public obs_headElem_r ! obtain real header element from the observation object
1535   public obs_headPrimaryKey ! obtain the header primary key value
1536   public obs_headerIndex_mpiglobal ! obtain mpiglobal header row index
1537   public obs_headSet_i  ! set an integer header value in the observation object
1538   public obs_headSet_r  ! set a real header value in the observation object
1539   public obs_initialize ! variable initialization
1540   public obs_mpiLocal   ! obtain the current mpi state of the observation object
1541   public obs_MpiRedistribute ! do a general redistribution of obs over mpi tasks
1542   public obs_numBody    ! returns the number of bodies recorded
1543   public obs_numBody_max! returns the dimensioned number of bodies
1544   public obs_numBody_mpiglobal ! returns mpi-global number of bodies recorded
1545   public obs_numHeader  ! returns the number of headers recorded
1546   public obs_numHeader_max ! returns the dimensioned number of headers
1547   public obs_numHeader_mpiglobal ! returns mpi-global number of headers recorded
1548   public obs_print      ! obs_enkf_prnthdr & obs_enkf_prntbdy for each station
1549   public obs_prntbdy    ! print the body data for one header
1550   public obs_prnthdr    ! print the data contained in one header
1551   public obs_reduceToMpiLocal ! retain only data pertinent to the mpi-local PE
1552   public obs_squeeze    ! reallocate objects arrays to reduce memory use
1553   public obs_setBodyPrimaryKey ! set the value of the body primary key
1554   public obs_setHeadPrimaryKey ! set the value of the body primary key
1555   public obs_set_c      ! set a character value in the observation object
1556   public obs_set_current_body_list   ! set a body list for a family as current
1557   public obs_set_current_header_list ! set a header list for a family as current
1558   public obs_setFamily  ! set the family of a datum
1559   public obs_sethind    ! set the header index in body table
1560   public obs_famExist   ! checks if a family is present in the observation set
1561   public obs_write      ! write the observation data to binary files
1562                         ! (calls obs_write_hdr, obs_write_bdy, obs_write_hx
1563                         !  for each station)
1564   public obs_write_hx   ! write to binary files a station's interpolated values
1565
1566   interface obs_getBodyIndex
1567      module procedure obs_getBodyIndex_depot
1568      module procedure obs_getBodyIndex_private
1569   end interface obs_getBodyIndex
1570
1571   interface obs_set_current_body_list
1572      module procedure obs_set_current_body_list_from_family
1573      module procedure obs_set_current_body_list_from_header
1574      module procedure obs_set_current_body_list_all
1575   end interface obs_set_current_body_list
1576
1577   interface obs_set_current_header_list
1578      module procedure obs_set_current_header_list_from_family
1579      module procedure obs_set_current_header_list_all
1580   end interface
1581
1582   interface obs_bodySet_r
1583      module procedure obs_bodySet_r4
1584      module procedure obs_bodySet_r8
1585   end interface obs_bodySet_r
1586
1587   interface obs_headSet_r
1588      module procedure obs_headSet_r4
1589      module procedure obs_headSet_r8
1590   end interface obs_headSet_r
1591
1592   ! PRIVATE METHODS:
1593   private obs_abort     ! abort a job on error
1594   private obs_allocate  ! array allocation
1595   private obs_columnIndexFromNameForFlavour ! get the index from the name
1596   private obs_deallocate! array de-allocation
1597   private obs_mpiDistributeIndices ! distribute header & body indices for mpi parallelization
1598   private obs_write_bdy ! write the observation data to binary files
1599   private obs_write_hdr ! write the observation header to binary files
1600
1601
1602   ! PARAMETERS INHERITED FROM ObsColumnNames_mod (make them public)
1603   !    integer-header column numbers
1604   public :: OBS_RLN, OBS_ONM, OBS_INS, OBS_IDF, OBS_ITY, OBS_SAT, OBS_TEC
1605   public :: OBS_DAT, OBS_ETM, OBS_NLV, OBS_PAS, OBS_REG, OBS_IP , OBS_SEN
1606   public :: OBS_IPF, OBS_IPC, OBS_IPT
1607   public :: OBS_ST1 
1608   public :: OBS_GQF, OBS_GQL
1609   public :: OBS_NCO2,OBS_STYP,OBS_ROQF
1610   public :: OBS_SWQ1,OBS_SWQ2,OBS_SWMT,OBS_SWLS,OBS_SWGA,OBS_SWHA
1611   public :: OBS_CHM, OBS_FOV, OBS_PRFL, OBS_PHAS, OBS_ORI
1612   public :: OBS_LCH, OBS_RTP, OBS_HDD, OBS_HDT, OBS_TFLG, OBS_LFLG
1613   public :: OBS_ORBI,OBS_AQF1, OBS_AQF2, OBS_AQF3, OBS_TTYP, OBS_INFG
1614   public :: OBS_RAIN, OBS_CHID
1615
1616   !    real-header column numbers
1617   public :: OBS_LAT, OBS_LON, OBS_ALT, OBS_BX,  OBS_BY,  OBS_BZ
1618   public :: OBS_M1C1, OBS_M1C2, OBS_M1C3, OBS_M1C4, OBS_M1C5, OBS_M1C6
1619   public :: OBS_M2C1, OBS_M2C2, OBS_M2C3, OBS_M2C4, OBS_M2C5, OBS_M2C6
1620   public :: OBS_M3C1, OBS_M3C2, OBS_M3C3, OBS_M3C4, OBS_M3C5, OBS_M3C6
1621   public :: OBS_M4C1, OBS_M4C2, OBS_M4C3, OBS_M4C4, OBS_M4C5, OBS_M4C6
1622   public :: OBS_M5C1, OBS_M5C2, OBS_M5C3, OBS_M5C4, OBS_M5C5, OBS_M5C6
1623   public :: OBS_M6C1, OBS_M6C2, OBS_M6C3, OBS_M6C4, OBS_M6C5, OBS_M6C6
1624   public :: OBS_M7C1, OBS_M7C2, OBS_M7C3, OBS_M7C4, OBS_M7C5, OBS_M7C6
1625   public :: OBS_S1C1, OBS_S1C2, OBS_S1C3, OBS_S1C4, OBS_S1C5, OBS_S1C6
1626   public :: OBS_S2C1, OBS_S2C2, OBS_S2C3, OBS_S2C4, OBS_S2C5, OBS_S2C6
1627   public :: OBS_S3C1, OBS_S3C2, OBS_S3C3, OBS_S3C4, OBS_S3C5, OBS_S3C6
1628   public :: OBS_S4C1, OBS_S4C2, OBS_S4C3, OBS_S4C4, OBS_S4C5, OBS_S4C6
1629   public :: OBS_S5C1, OBS_S5C2, OBS_S5C3, OBS_S5C4, OBS_S5C5, OBS_S5C6
1630   public :: OBS_S6C1, OBS_S6C2, OBS_S6C3, OBS_S6C4, OBS_S6C5, OBS_S6C6
1631   public :: OBS_S7C1, OBS_S7C2, OBS_S7C3, OBS_S7C4, OBS_S7C5, OBS_S7C6
1632   public :: OBS_CF1,  OBS_CF2,  OBS_CF3,  OBS_CF4,  OBS_CF5,  OBS_CF6, OBS_CF7
1633   public :: OBS_ETOP, OBS_VTOP, OBS_ECF,  OBS_VCF , OBS_HE  , OBS_ZTSR
1634   public :: OBS_ZTM , OBS_ZTGM, OBS_ZLQM, OBS_ZPS , OBS_TRAD, OBS_GEOI
1635   public :: OBS_CLF , OBS_SUN,  OBS_SZA,  OBS_AZA , OBS_SAZ , OBS_CLWO, OBS_CLWB, OBS_MWS
1636   public :: OBS_SIO , OBS_SIB,  OBS_IWV,  OBS_RZAM, OBS_RELE, OBS_RANS, OBS_RANE
1637   !    integer-body column numbers
1638   public :: OBS_VNM, OBS_FLG, OBS_KFA, OBS_ASS, OBS_HIND,OBS_VCO, OBS_LYR
1639   public :: OBS_XTR, OBS_QCF2, OBS_CLA
1640
1641   !    real-body column numbers
1642   public :: OBS_PPP, OBS_SEM, OBS_VAR, OBS_OMP, OBS_OMA, OBS_OMAM, OBS_OER
1643   public :: OBS_HPHT,OBS_HAHT,OBS_ZHA, OBS_OMP6,OBS_OMA0,OBS_SIGI, OBS_SIGO
1644   public :: OBS_WORK,OBS_PRM, OBS_JOBS,OBS_QCV, OBS_FSO, OBS_CRPS, OBS_BCOR
1645   public :: OBS_POB, OBS_OMPE,OBS_LATD,OBS_LOND,OBS_BTCL,OBS_LOCI
1646
1647   ! OBSERVATION-SPACE FUNDAMENTAL PARAMETERS
1648   integer, public, parameter :: obs_assimilated    = 1 ! OBS_ASS value for assimilated obs
1649   integer, public, parameter :: obs_notAssimilated = 0 ! OBS_ASS value for non assimilated obs
1650
1651   real(pre_obsReal), public, parameter :: obs_missingValue_R = real(MPC_missingValue_R8, pre_obsReal) ! Missing value
1652
1653   ! DERIVED TYPE AND MODULE VARIABLE DECLARATIONS
1654
1655                                        ! It is intended that these null values
1656                                        ! be used with scratchRealHeader, etc.
1657   real(pre_obsReal), parameter :: NULL_COLUMN_VALUE_R = real(-9.99D9, pre_obsReal)
1658   integer          , parameter :: NULL_COLUMN_VALUE_I = -9.99
1659
1660   ! This type is the goal of the ObsSpaceData and supporting modules.  An
1661   ! instance of this derived type contains all information pertaining to a set
1662   ! of observation-space data.
1663   type, public :: struct_obs
1664      private
1665      logical          :: allocated = .false.
1666                                        ! Two internal structures for
1667                                        ! facilitating multiple traversals of
1668                                        ! subsets of the observation-space data.
1669      type(struct_index_list_depot) :: header_index_list_depot
1670      type(struct_index_list_depot) :: body_index_list_depot
1671                                        ! For the cstnid and cfamily arrays:
1672                                        !   1st dim'n:  row index
1673      character(len=12),  pointer :: cstnid(:)
1674      character(len=2),   pointer :: cfamily(:)
1675                                        ! The primary keys for both tables
1676      integer(8), pointer :: headerPrimaryKey(:), bodyPrimaryKey(:)
1677                                        ! The four arrays of data columns
1678      type(struct_obsDataColumn_Array) :: &
1679                             realHeaders, & ! real header columns
1680                             intHeaders,  & ! integer header columns
1681                             realBodies,  & ! real body columns
1682                             intBodies      ! integer body columns
1683
1684      ! These scratch arrays are the basis for allowing the manipulation of both
1685      ! (real and integer) values of any obsDataColumn, without having to decide
1686      ! which one (real or integer) contains the sought value.  This ability
1687      ! simplifies the code.
1688      real(pre_obsReal), pointer :: scratchRealHeader(:), &
1689                                    scratchRealBody  (:)
1690      integer          , pointer :: scratchIntHeader (:), &
1691                                    scratchIntBody   (:)
1692
1693      integer :: numHeader              ! Actual number of headers on record
1694      integer :: numHeader_max          ! maximum number of headers(i.e.stations)
1695      integer :: numBody                ! Actual total number of bodies on record
1696      integer :: numBody_max            ! maximum number of bodies (i.e. data)
1697
1698                                        ! row indices of mpiglobal data, useful
1699                                        ! for transforming from mpiLocal back to
1700                                        ! mpiGlobal
1701                                        !    1st dim'n: row index, only in
1702                                        !               mpiLocal context
1703      integer, pointer, dimension(:) :: headerIndex_mpiglobal  => NULL()
1704      integer, pointer, dimension(:) :: bodyIndex_mpiglobal    => NULL()
1705
1706      logical :: mpi_local       ! T: keep only data needed by this PE (mpilocal)
1707   end type struct_obs
1708
1709
1710contains
1711
1712
1713   subroutine obs_abort(cdmessage)
1714      !
1715      !:Purpose: To stop a job when an error occurred
1716      !
1717      !:Arguments:
1718      !           :cdmessage: message to be printed
1719      !
1720
1721!#if defined(UNIT_TESTING)
1722!      use pFUnit
1723!#endif
1724      implicit none
1725
1726      ! Arguments:
1727      character(len=*), intent(in) :: cdmessage
1728
1729      write(*,'(//,4X,"ABORTING IN ObsSpaceData_mod:-------",/,8X,A)')cdmessage
1730      call flush(6)
1731
1732!#if defined(UNIT_TESTING)
1733!      call throw(Exception('exiting in obs_abort'))
1734!#else
1735      call qqexit(13)
1736      stop
1737!#endif
1738   end subroutine obs_abort
1739
1740
1741   subroutine obs_allocate(obsdat, numHeader_max, numBody_max, silent_opt)
1742      !
1743      !:Purpose: Allocate arrays according to the parameters, numHeader_max and
1744      !           numBody_max.  This is a private method.
1745      !
1746      implicit none
1747
1748      ! Arguments:
1749      type(struct_obs),  intent(inout) :: obsdat
1750      integer,           intent(in)    :: numHeader_max
1751      integer,           intent(in)    :: numBody_max
1752      logical, optional, intent(in)    :: silent_opt
1753
1754      ! Locals:
1755      logical :: silent
1756      integer :: column_index
1757
1758      if(obsdat%allocated) then
1759         call obs_abort('OBS_ALLOCATE:  a second allocation of ObsSpaceData has been attempted.')
1760         return
1761      endif
1762      obsdat%allocated=.true.
1763
1764      if(present(silent_opt)) then
1765         silent  = silent_opt
1766      else
1767         silent  = .false.
1768      end if
1769
1770      if(.not. silent) then
1771         write(*,*) ' DIMENSIONS OF OBSERVATION ARRAYS:'
1772         write(*,*) ' numHeader_max = ',numHeader_max,'  numBody_max = ', &
1773                    numBody_max
1774      end if
1775      obsdat%numHeader_max=numHeader_max
1776      obsdat%numBody_max=numBody_max
1777
1778      !
1779      ! ALLOCATE THE ARRAYS and initialize the contents
1780      !
1781      allocate(obsdat%realHeaders%columns(NHDR_REAL_BEG:NHDR_REAL_END))
1782      obsdat%realHeaders%odc_flavour => odc_flavour_RH
1783
1784      allocate(obsdat%intHeaders%columns(NHDR_INT_BEG:NHDR_INT_END))
1785      obsdat%intHeaders%odc_flavour => odc_flavour_IH
1786
1787      HEADER:if(numHeader_max > 0) then
1788         allocate(obsdat%headerPrimaryKey(numHeader_max))
1789         obsdat%headerPrimaryKey(:) = 0
1790
1791         allocate(obsdat%cfamily(numHeader_max))
1792         obsdat%cfamily(:)='XX'
1793
1794         allocate(obsdat%cstnid(numHeader_max))
1795         obsdat%cstnid(:)='XXXXXXXXXXXX'
1796
1797         allocate(obsdat%scratchRealHeader(numHeader_max))
1798         allocate(obsdat%scratchIntHeader (numHeader_max))
1799         obsdat%scratchRealHeader(:) = NULL_COLUMN_VALUE_R
1800         obsdat%scratchIntHeader (:) = NULL_COLUMN_VALUE_I
1801
1802         do column_index=NHDR_REAL_BEG,NHDR_REAL_END
1803            if(obsdat%realHeaders%odc_flavour%columnActive(column_index))  &
1804               call odc_allocate(obsdat%realHeaders%columns(column_index), &
1805                                 numHeader_max, &
1806                                 ocn_ColumnNameList_RH(column_index),'REAL',&
1807                                 obsdat%scratchRealHeader, &
1808                                 obsdat%scratchIntHeader)
1809         enddo
1810
1811         do column_index=NHDR_INT_BEG,NHDR_INT_END
1812            if(obsdat%intHeaders%odc_flavour%columnActive(column_index))  &
1813               call odc_allocate(obsdat%intHeaders%columns(column_index), &
1814                                 numHeader_max, &
1815                                 ocn_ColumnNameList_IH(column_index),'INT ', &
1816                                 obsdat%scratchRealHeader, &
1817                                 obsdat%scratchIntHeader)
1818         enddo
1819      endif HEADER
1820
1821      allocate(obsdat%realBodies%columns(NBDY_REAL_BEG:NBDY_REAL_END))
1822      obsdat%realBodies%odc_flavour => odc_flavour_RB
1823
1824      allocate(obsdat%intBodies%columns(NBDY_INT_BEG:NBDY_INT_END))
1825      obsdat%intBodies%odc_flavour => odc_flavour_IB
1826
1827      BODY:if(numBody_max > 0) then
1828         allocate(obsdat%bodyPrimaryKey(numBody_max))
1829         obsdat%bodyPrimaryKey(:)   = 0
1830
1831         allocate(obsdat%scratchRealBody(numBody_max))
1832         allocate(obsdat%scratchIntBody (numBody_max))
1833         obsdat%scratchRealBody(:) = NULL_COLUMN_VALUE_R
1834         obsdat%scratchIntBody (:) = NULL_COLUMN_VALUE_I
1835
1836         do column_index=NBDY_REAL_BEG,NBDY_REAL_END
1837            if(obsdat%realBodies%odc_flavour%columnActive(column_index))  &
1838               call odc_allocate(obsdat%realBodies%columns(column_index), &
1839                                 numBody_max, &
1840                                 ocn_ColumnNameList_RB(column_index),'REAL', &
1841                                 obsdat%scratchRealBody, obsdat%scratchIntBody)
1842         enddo
1843
1844         do column_index=NBDY_INT_BEG,NBDY_INT_END
1845            if(obsdat%intBodies%odc_flavour%columnActive(column_index))  &
1846               call odc_allocate(obsdat%intBodies%columns(column_index), &
1847                                 numBody_max, &
1848                                 ocn_ColumnNameList_IB(column_index),'INT ', &
1849                                 obsdat%scratchRealBody, obsdat%scratchIntBody)
1850         enddo
1851      endif BODY
1852
1853      call ild_initialize(obsdat%header_index_list_depot, obsdat%numHeader_max)
1854      call ild_initialize(obsdat%body_index_list_depot,   obsdat%numBody_max)
1855   end subroutine obs_allocate
1856
1857
1858   function obs_bodyElem_i( obsdat, column_index, row_index) result(value_i)
1859      !
1860      !:Purpose: To control access to the observation object.  Returns the (integer)
1861      !           value of the row_index'th ObsData element with the indicated column
1862      !           index from the "body".
1863      !
1864      implicit none
1865
1866      ! Arguments:
1867      type(struct_obs), intent(in)  :: obsdat
1868      integer         , intent(in)  :: column_index
1869      integer         , intent(in)  :: row_index
1870      ! Result:
1871      integer                       :: value_i
1872
1873      ! Locals:
1874      real(pre_obsReal) :: value_r         ! not used
1875
1876      call odc_columnElem(obsdat%intBodies, column_index, row_index, &
1877                          value_i, value_r)
1878   end function obs_bodyElem_i
1879
1880
1881   function obs_bodyElem_r(obsdat,column_index,row_index) result(value_r)
1882      !
1883      !:Purpose: Get a real-valued body observation-data element
1884      !      To control access to the observation object.  Returns the (real)
1885      !      value of the row_index'th ObsData element with the indicated column
1886      !      index from the "body".
1887      !
1888      implicit none
1889
1890      ! Arguments:
1891      type(struct_obs), intent(in)  :: obsdat
1892      integer         , intent(in)  :: column_index
1893      integer         , intent(in)  :: row_index
1894      ! Result:
1895      real(pre_obsReal)             :: value_r
1896
1897      ! Locals:
1898      integer :: value_i                ! not used
1899
1900      call odc_columnElem(obsdat%realBodies, column_index, row_index, &
1901                          value_i, value_r)
1902   end function obs_bodyElem_r
1903
1904
1905   function obs_bodyPrimaryKey(obsdat,row_index) result(primaryKey)
1906      !
1907      !:Purpose: Get the body primary key value.
1908      !
1909      implicit none
1910
1911      ! Arguments:
1912      type(struct_obs), intent(in)  :: obsdat
1913      integer         , intent(in)  :: row_index
1914      ! Result:
1915      integer(8)                    :: primaryKey
1916
1917      primaryKey = obsdat%bodyPrimaryKey(row_index)
1918
1919   end function obs_bodyPrimaryKey
1920
1921
1922   function obs_bodyIndex_mpiglobal(obsdat,row_index) result(value)
1923      !
1924      !:Purpose: Get the mpiglobal body row_index.
1925      !      To control access to the mpiglobal row_index into the "body".
1926      !
1927      implicit none
1928
1929      ! Arguments:
1930      type(struct_obs), intent(in)  :: obsdat
1931      integer         , intent(in)  :: row_index
1932      ! Result:
1933      integer value
1934
1935      value=obsdat%bodyIndex_mpiglobal(row_index)
1936   end function obs_bodyIndex_mpiglobal
1937
1938
1939   subroutine obs_bodySet_i(obsdat, column_index, row_index, value_i)
1940      !
1941      !:Purpose: Set an integer-valued body observation-data element.
1942      !      To control access to the observation object.  Sets the (integer)
1943      !      value of the row_index'th ObsData element with the indicated column
1944      !      index from the "body".
1945      !
1946      implicit none
1947
1948      ! Arguments:
1949      type(struct_obs), intent(inout)  :: obsdat
1950      integer         , intent(in)     :: column_index
1951      integer         , intent(in)     :: row_index
1952      integer         , intent(in)     :: value_i
1953
1954      call odc_columnSet(obsdat%intBodies, column_index, row_index, &
1955                         value_i, NULL_COLUMN_VALUE_R, &
1956                         obsdat%numBody, obsdat%numBody_max)
1957   end subroutine obs_bodySet_i
1958
1959
1960   subroutine obs_bodySet_r4(obsdat, column_index, row_index, value_r4)
1961      !
1962      !:Purpose: Set a real-valued body observation-data element.
1963      !      To control access to the observation object.  Sets the (real)
1964      !      value of the row_index'th ObsData element with the indicated column
1965      !      index from the "body".
1966      !
1967      implicit none
1968
1969      ! Arguments:
1970      type(struct_obs), intent(inout)  :: obsdat
1971      integer         , intent(in)     :: column_index
1972      integer         , intent(in)     :: row_index
1973      real(4)         , intent(in)     :: value_r4
1974
1975      ! Locals:
1976      real(pre_obsReal)                :: value_r
1977
1978      value_r = value_r4
1979
1980      call odc_columnSet(obsdat%realBodies, column_index, row_index, &
1981                         NULL_COLUMN_VALUE_I, value_r, &
1982                         obsdat%numBody, obsdat%numBody_max)
1983   end subroutine obs_bodySet_r4
1984
1985
1986   subroutine obs_bodySet_r8(obsdat, column_index, row_index, value_r8)
1987      !
1988      !:Purpose: Set a real-valued body observation-data element.
1989      !      To control access to the observation object.  Sets the (real)
1990      !      value of the row_index'th ObsData element with the indicated column
1991      !      index from the "body".
1992      !
1993      implicit none
1994
1995      ! Arguments:
1996      type(struct_obs), intent(inout)  :: obsdat
1997      integer         , intent(in)     :: column_index
1998      integer         , intent(in)     :: row_index
1999      real(8)         , intent(in)     :: value_r8
2000
2001      ! Locals:
2002      real(pre_obsReal)                :: value_r
2003
2004      value_r = value_r8
2005
2006      call odc_columnSet(obsdat%realBodies, column_index, row_index, &
2007                         NULL_COLUMN_VALUE_I, value_r, &
2008                         obsdat%numBody, obsdat%numBody_max)
2009   end subroutine obs_bodySet_r8
2010
2011
2012   subroutine obs_class_initialize(obsColumnMode_in)
2013      !
2014      !:Purpose: Set observation-data class variables.
2015      !      Set variables that take the same value for all instances of the
2016      !      class.
2017      !
2018      implicit none
2019
2020      ! Arguments:
2021      character(len=*), intent(in)  :: obsColumnMode_in ! mode controlling subset of columns activated in all objects
2022
2023      ! Locals:
2024      character(len=12), save :: obsColumnMode_class = '            '
2025
2026      INITIALIZED: if(.not. obs_class_initialized) then
2027         obs_class_initialized = .true.
2028
2029         ! Determine which columns will be initially active
2030         obsColumnMode_class=trim(obsColumnMode_in)
2031
2032         write(*,*)'OBS_CLASS_INITIALIZE: obsColumnMode=', &
2033                   trim(obsColumnMode_class)
2034
2035         ! DELEGATE THE REAL WORK TO THE ODC CLASS
2036         ! DELEGATE THE REAL WORK TO THE ODC CLASS
2037         call odc_class_initialize(obsColumnMode_class)
2038
2039      else INITIALIZED
2040         write(*,*) 'obs_class_initialize: !!! WARNING WARNING WARNING!!!'
2041         write(*,*) 'obs_class_initialize: already called before, not ' &
2042                    // 're-activating columns'
2043         write(*,*) 'obs_class_initialize: !!! WARNING WARNING WARNING!!!'
2044         if(trim(obsColumnMode_in) /= trim(obsColumnMode_class)) then
2045            call obs_abort('obs_class_initialize: called with different '&
2046                         //'value of obsColumnMode than during first call: '&
2047                         // trim(obsColumnMode_class) // ' /= ' &
2048                         // trim(obsColumnMode_in))
2049            return
2050         endif
2051      endif INITIALIZED
2052   end subroutine obs_class_initialize
2053
2054
2055   subroutine obs_clean(obsdat,hx,nens,nobsout,qcvar,checkZha_opt)
2056      !
2057      !:Purpose: remove all observations from the obsdat  
2058      !           that will not be assimilated. 
2059      !
2060      !:Arguments:
2061      !           :nobsout: unit number for the ASCII output
2062      !           :qcvar:   input logical indicating if the input obsdat 
2063      !                     data have benefited from a qc-var procedure
2064      !:The logic applied:
2065      !     A body (and its associated header)
2066      !     will be retained if these three conditions are all met:
2067      !
2068      !         - 1) either of:
2069      !
2070      !             - 1a) btest(obsdat%intBodies%columns(OBS_FLG,jdata),12)
2071      !             - 1b) .not. qcvar (the 5th parameter of obs_clean)
2072      !         - 2) obsdat% intBodies%columns(OBS_ASS,jdata) == 1
2073      !         - 3) obsdat%realBodies%columns(OBS_ZHA,jdata) >= 0.0
2074      !
2075      implicit none
2076
2077      ! Arguments:
2078      type (struct_obs), intent(inout) :: obsdat
2079      real(8),           intent(inout) :: hx(:,:)
2080      integer,           intent(in)    :: nens
2081      integer,           intent(in)    :: nobsout
2082      logical,           intent(in)    :: qcvar
2083      logical, optional, intent(in)    :: checkZha_opt
2084
2085      ! Locals:
2086      integer :: iaccept,idata,ipnt,iwrite
2087      integer :: jdata,kobs,var3d,kobsout
2088      integer :: column_index
2089      integer :: active_index
2090      logical :: checkZha
2091
2092      if (nobsout > 0) write(nobsout,'(1x,A,I7)') 'stations prior to cleanup: ', obsdat%numHeader
2093      write(*,*) 'enter obs_clean'
2094
2095      ! User can choose if check on negativity of OBS_ZHA is done (is done by default)
2096      if (present(checkZha_opt)) then
2097        checkZha = checkZha_opt
2098      else
2099        checkZha = .true.
2100      end if
2101
2102      kobsout=0 
2103      iwrite=0
2104      stations: do kobs=1,obsdat%numHeader
2105         ipnt  = obs_headElem_i(obsdat, OBS_RLN, kobs)
2106         idata = obs_headElem_i(obsdat, OBS_NLV, kobs)
2107         iaccept=0
2108         observations: do jdata = ipnt, ipnt + idata - 1
2109            if (     btest(obs_bodyElem_i(obsdat, OBS_FLG, jdata),12) &
2110                .or. .not. qcvar) then 
2111               ! data will be accepted if they went through the variational
2112               ! system  including the qcvar. They will also be accepted if the
2113               ! qcvar procedure was not applied (i.e. when backalt files are
2114               ! used as input).
2115               var3d=1
2116            else
2117               var3d=0
2118            endif
2119
2120            ! To remove observations for which the height in the atmosphere has
2121            ! not been assigned (for instance because they are above the model
2122            ! top for the EnKF system)
2123            if (checkZha .and. obs_bodyElem_r(obsdat, OBS_ZHA, jdata) < 0.) then
2124               call obs_bodySet_i(obsdat, OBS_ASS, jdata, -1)
2125            endif
2126
2127            if (      (obs_bodyElem_i(obsdat, OBS_ASS, jdata) == obs_assimilated) &
2128                 .and.(var3d == 1)) then 
2129               ! the observation will be used in the analysis
2130               iaccept=iaccept+1
2131               iwrite=iwrite+1
2132               do active_index=1,odc_numActiveColumn(obsdat%intBodies)
2133                  column_index=odc_columnIndexFromActiveIndex( &
2134                                       obsdat%intBodies%odc_flavour,active_index)
2135                     call obs_bodySet_i (obsdat, column_index, iwrite, &
2136                          obs_bodyElem_i(obsdat, column_index, jdata))
2137               enddo
2138               do active_index=1,odc_numActiveColumn(obsdat%realBodies)
2139                  column_index=odc_columnIndexFromActiveIndex( &
2140                                      obsdat%realBodies%odc_flavour,active_index)
2141                     call obs_bodySet_r (obsdat, column_index, iwrite, &
2142                          obs_bodyElem_r(obsdat, column_index, jdata))
2143               enddo
2144               hx(1:nens,iwrite)=hx(1:nens,jdata)
2145               ! Revise the header row cross-index to match the revised headers
2146               call obs_bodySet_i (obsdat, OBS_HIND, iwrite, kobsout+1)
2147            endif
2148         enddo observations
2149
2150         ! adjust obsdat%realHeaders%columns 
2151         if (iaccept > 0) then
2152            kobsout=kobsout+1
2153            do active_index=1,odc_numActiveColumn(obsdat%intHeaders)
2154               column_index = odc_columnIndexFromActiveIndex( &
2155                                      obsdat%intHeaders%odc_flavour,active_index)
2156               call obs_headSet_i (obsdat, column_index, kobsout, &
2157                    obs_headElem_i(obsdat, column_index, kobs))
2158            enddo
2159            do active_index=1,odc_numActiveColumn(obsdat%realHeaders)
2160               column_index = odc_columnIndexFromActiveIndex( &
2161                                     obsdat%realHeaders%odc_flavour,active_index)
2162               call obs_headSet_r (obsdat, column_index, kobsout, &
2163                    obs_headElem_r(obsdat, column_index, kobs))
2164            enddo
2165            obsdat%headerPrimaryKey(kobsout) = obsdat%headerPrimaryKey(kobs)
2166            obsdat%cstnid(kobsout)=obsdat%cstnid(kobs)
2167            obsdat%cfamily(kobsout)=obsdat%cfamily(kobs)
2168            ! Revise the body cross-indices to match the revised bodies
2169            call obs_headSet_i(obsdat, OBS_NLV, kobsout, iaccept)
2170            call obs_headSet_i(obsdat, OBS_RLN, kobsout, iwrite-iaccept+1)
2171         endif
2172      enddo stations
2173      obsdat%numHeader=kobsout
2174      obsdat%numBody = iwrite
2175
2176      if (nobsout > 0) then
2177        write(nobsout,'(1x,A)') 'after cleanup of the cma: '
2178        write(nobsout,'(1x,A,I7)') &
2179             'number of stations containing valid data   ',obsdat%numHeader
2180        write(nobsout,'(1x,A,I7)') & 
2181             'number of observations now in the cma file ',obsdat%numBody
2182      end if
2183
2184   end subroutine obs_clean
2185
2186
2187   subroutine obs_clean2(obsdat)
2188     !
2189     !:Purpose: remove all observations from the obsdat  
2190     !           that will not be assimilated. 
2191     !           modified version of obs_clean, used by MIDAS
2192     !
2193     implicit none
2194
2195     ! Arguments:
2196     type (struct_obs), intent(inout) :: obsdat
2197
2198     ! Locals:
2199     integer :: numBodyAccept,bodyIndexEnd,bodyIndexBeg,numBodyCleaned
2200     integer :: bodyIndex,headerIndex,numHeaderCleaned
2201     integer :: column_index
2202     integer :: active_index
2203
2204     write(*,*) 'before cleanup of obsSpaceData:'
2205     write(*,*) 'number of headers ', obsdat%numHeader
2206     write(*,*) 'number of bodies  ', obsdat%numBody
2207
2208     numHeaderCleaned = 0 
2209     numBodyCleaned = 0
2210     stations: do headerIndex = 1, obsdat%numHeader
2211       bodyIndexBeg = obs_headElem_i(obsdat, OBS_RLN, headerIndex)
2212       bodyIndexEnd = obs_headElem_i(obsdat, OBS_NLV, headerIndex) + bodyIndexBeg - 1
2213       numBodyAccept = 0
2214       observations: do bodyIndex = bodyIndexBeg, bodyIndexEnd
2215         if ( obs_bodyElem_i(obsdat,OBS_ASS,bodyIndex) == obs_assimilated ) then
2216           ! the observation will be used in the analysis
2217           numBodyAccept = numBodyAccept + 1
2218           numBodyCleaned = numBodyCleaned + 1
2219           do active_index = 1, odc_numActiveColumn(obsdat%intBodies)
2220             column_index = odc_columnIndexFromActiveIndex( &
2221                              obsdat%intBodies%odc_flavour,active_index)
2222             call obs_bodySet_i (obsdat, column_index, numBodyCleaned, &
2223                  obs_bodyElem_i(obsdat, column_index, bodyIndex))
2224           enddo
2225           do active_index = 1, odc_numActiveColumn(obsdat%realBodies)
2226             column_index = odc_columnIndexFromActiveIndex( &
2227                                 obsdat%realBodies%odc_flavour,active_index)
2228             call obs_bodySet_r (obsdat, column_index, numBodyCleaned, &
2229                                 obs_bodyElem_r(obsdat, column_index, bodyIndex))
2230           enddo
2231           ! Revise the header row cross-index to match the revised headers
2232           call obs_bodySet_i (obsdat, OBS_HIND, numBodyCleaned, numHeaderCleaned+1)
2233         endif
2234       enddo observations
2235
2236       ! adjust obsdat%realHeaders%columns 
2237       if (numBodyAccept > 0) then
2238         numHeaderCleaned = numHeaderCleaned + 1
2239         do active_index = 1, odc_numActiveColumn(obsdat%intHeaders)
2240           column_index = odc_columnIndexFromActiveIndex( &
2241                                  obsdat%intHeaders%odc_flavour,active_index)
2242           call obs_headSet_i (obsdat, column_index, numHeaderCleaned, &
2243                obs_headElem_i(obsdat, column_index, headerIndex))
2244         enddo
2245         do active_index = 1, odc_numActiveColumn(obsdat%realHeaders)
2246           column_index = odc_columnIndexFromActiveIndex( &
2247                                 obsdat%realHeaders%odc_flavour,active_index)
2248           call obs_headSet_r (obsdat, column_index, numHeaderCleaned, &
2249                obs_headElem_r(obsdat, column_index, headerIndex))
2250         enddo
2251         obsdat%headerPrimaryKey(numHeaderCleaned) = obsdat%headerPrimaryKey(headerIndex)
2252         obsdat%cstnid(numHeaderCleaned) = obsdat%cstnid(headerIndex)
2253         obsdat%cfamily(numHeaderCleaned) = obsdat%cfamily(headerIndex)
2254         ! Revise the body cross-indices to match the revised bodies
2255         call obs_headSet_i(obsdat, OBS_NLV, numHeaderCleaned, numBodyAccept)
2256         call obs_headSet_i(obsdat, OBS_RLN, numHeaderCleaned, numBodyCleaned-numBodyAccept+1)
2257       endif
2258     enddo stations
2259     obsdat%numHeader = numHeaderCleaned
2260     obsdat%numBody = numBodyCleaned
2261
2262     ! reallocate obsdat to free up memory
2263     call obs_squeeze(obsdat)
2264
2265     write(*,*) 'after cleanup of obsSpaceData: '
2266     write(*,*) 'number of headers with valid data   ',obsdat%numHeader
2267     write(*,*) 'number of bodies kept               ',obsdat%numBody
2268
2269   end subroutine obs_clean2
2270
2271   function obs_columnActive_IB(obsdat,column_index) result(columnActive)
2272      !
2273      !:Purpose:
2274      !      Return the active status for a column
2275      !
2276      implicit none
2277
2278      ! Arguments:
2279      type (struct_obs), intent(inout) :: obsdat
2280      integer,           intent(in)    :: column_index
2281      ! Result:
2282      logical :: columnActive
2283
2284      columnActive = obsdat%intBodies%odc_flavour%columnActive(column_index)
2285
2286   end function obs_columnActive_IB
2287
2288
2289   function obs_columnActive_IH(obsdat,column_index) result(columnActive)
2290      !
2291      !:Purpose:
2292      !      Return the active status for a column
2293      !
2294      implicit none
2295
2296      ! Arguments:
2297      type (struct_obs), intent(inout) :: obsdat
2298      integer,           intent(in)    :: column_index
2299      ! Result:
2300      logical :: columnActive
2301
2302      columnActive = obsdat%intHeaders%odc_flavour%columnActive(column_index)
2303
2304   end function obs_columnActive_IH
2305
2306
2307   function obs_columnActive_RB(obsdat,column_index) result(columnActive)
2308      !
2309      !:Purpose:
2310      !      Return the active status for a column
2311      !
2312      implicit none
2313
2314      ! Arguments:
2315      type (struct_obs), intent(inout) :: obsdat
2316      integer,           intent(in)    :: column_index
2317      ! Result:
2318      logical :: columnActive
2319
2320      columnActive = obsdat%realBodies%odc_flavour%columnActive(column_index)
2321
2322   end function obs_columnActive_RB
2323
2324
2325   function obs_columnActive_RH(obsdat,column_index) result(columnActive)
2326      !
2327      !:Purpose:
2328      !      Return the active status for a column
2329      !
2330      implicit none
2331
2332      ! Arguments:
2333      type (struct_obs), intent(inout) :: obsdat
2334      integer,           intent(in)    :: column_index
2335      ! Result:
2336      logical :: columnActive
2337
2338      columnActive = obsdat%realHeaders%odc_flavour%columnActive(column_index)
2339
2340   end function obs_columnActive_RH
2341
2342
2343   function obs_columnIndexFromName(column_name) result(column_index_out)
2344      !
2345      !:Purpose:
2346      !      Situations do occur where the client knows only the name of a
2347      !      column, but needs to know its index. This method supplies the index.
2348      !
2349      implicit none
2350
2351      ! Arguments:
2352      character(len=*)        , intent(in) :: column_name
2353      ! Result:
2354      integer                              :: column_index_out
2355
2356      ! Locals:
2357      integer            :: column_index
2358      logical            :: lfound
2359      character(len=100) :: message
2360
2361      lfound=.false.
2362
2363      ! check integer-header
2364      do column_index=odc_flavour_IH%ncol_beg, odc_flavour_IH%ncol_end
2365         if(trim(column_name) == &
2366            trim(odc_flavour_IH%columnNameList(column_index)))then
2367            lfound=.true.
2368            column_index_out = column_index
2369            exit
2370         endif
2371      enddo
2372      if (lfound) return
2373
2374      ! check real-header
2375      do column_index=odc_flavour_RH%ncol_beg, odc_flavour_RH%ncol_end
2376         if(trim(column_name) == &
2377            trim(odc_flavour_RH%columnNameList(column_index)))then
2378            lfound=.true.
2379            column_index_out = column_index
2380            exit
2381         endif
2382      enddo
2383      if (lfound) return
2384
2385      ! check integer-body
2386      do column_index=odc_flavour_IB%ncol_beg, odc_flavour_IB%ncol_end
2387         if(trim(column_name) == &
2388            trim(odc_flavour_IB%columnNameList(column_index)))then
2389            lfound=.true.
2390            column_index_out = column_index
2391            exit
2392         endif
2393      enddo
2394      if (lfound) return
2395
2396      ! check real-body
2397      do column_index=odc_flavour_RB%ncol_beg, odc_flavour_RB%ncol_end
2398         if(trim(column_name) == &
2399            trim(odc_flavour_RB%columnNameList(column_index)))then
2400            lfound=.true.
2401            column_index_out = column_index
2402            exit
2403         endif
2404      enddo
2405      if (lfound) return
2406
2407      write(message,*)'abort in obs_columnIndexFromName: ' // &
2408                      'name not found='// trim(column_name)
2409      call obs_abort(message); return
2410
2411   end function obs_columnIndexFromName
2412
2413
2414  function obs_isColumnNameValid(column_name) result(isValid)  
2415    !
2416    !:Purpose: Check if the obsSpaceData column name is valid.
2417    !      
2418    implicit none
2419
2420    ! Arguments:
2421    character(len=*), intent(in)  :: column_name
2422    ! Result:
2423    logical                       :: isValid
2424
2425    ! Locals: 
2426    integer                       :: column_index
2427
2428    call odc_initAllColumnFlavours()
2429
2430    isValid = .false.
2431
2432    ! check "STID" 
2433    if (trim(column_name) == "STID") isValid = .true.
2434 
2435    if (isValid) return
2436
2437    ! check integer-header
2438    do column_index = odc_flavour_IH%ncol_beg, odc_flavour_IH%ncol_end
2439      if (trim(column_name) == &
2440           trim(odc_flavour_IH%columnNameList(column_index))) then
2441        isValid = .true.
2442        exit
2443      end if
2444    end do
2445    if (isValid) return
2446
2447    ! check real-header
2448    do column_index = odc_flavour_RH%ncol_beg, odc_flavour_RH%ncol_end
2449      if (trim(column_name) == &
2450           trim(odc_flavour_RH%columnNameList(column_index))) then
2451        isValid = .true.
2452        exit
2453      end if
2454    end do
2455    if (isValid) return
2456  
2457    ! check integer-body
2458    do column_index = odc_flavour_IB%ncol_beg, odc_flavour_IB%ncol_end
2459      if (trim(column_name) == &
2460           trim(odc_flavour_IB%columnNameList(column_index))) then
2461        isValid = .true.
2462        exit
2463      end if
2464    end do
2465    if (isValid) return
2466
2467    ! check real-body
2468    do column_index = odc_flavour_RB%ncol_beg, odc_flavour_RB%ncol_end
2469      if (trim(column_name) == &
2470           trim(odc_flavour_RB%columnNameList(column_index))) then
2471        isValid = .true.
2472        exit
2473      end if
2474    end do
2475    if (isValid) return
2476   
2477  end function obs_isColumnNameValid
2478
2479
2480   function obs_columnIndexFromNameForFlavour(odc_flavour, column_name) &
2481                                                         result(column_index_out)
2482      !
2483      !:Purpose:
2484      !      Situations do occur where the client knows only the name of a
2485      !      column, but needs to know its index. This method supplies the index.
2486      !
2487      implicit none
2488
2489      ! Arguments:
2490      type(struct_odc_flavour), intent(in) :: odc_flavour
2491      character(len=*)        , intent(in) :: column_name
2492      ! Result:
2493      integer                              :: column_index_out
2494
2495      ! Locals:
2496      integer            :: column_index
2497      logical            :: lfound
2498      character(len=100) :: message
2499
2500      lfound=.false.
2501      do column_index=odc_flavour%ncol_beg, odc_flavour%ncol_end
2502         if(trim(column_name) == &
2503            trim(odc_flavour%columnNameList(column_index)))then
2504            lfound=.true.
2505            column_index_out = column_index
2506            exit
2507         endif
2508      enddo
2509
2510      if(.not.lfound) then
2511         write(message,*)'abort in obs_columnIndexFromNameForFlavour [' &
2512                       // odc_flavour%dataType //','// odc_flavour%headOrBody //&
2513                       ']: name not found='// column_name
2514         call obs_abort(message); return
2515      end if
2516   end function obs_columnIndexFromNameForFlavour
2517
2518
2519   function obs_columnIndexFromName_IB(column_name) result(column_index)
2520      !
2521      !:Purpose:
2522      !      This wrapper around obs_columnIndexFromName selects the data-column
2523      !      flavour.
2524      !
2525      implicit none
2526
2527      ! Arguments:
2528      character(len=*), intent(in) :: column_name
2529      ! Result:
2530      integer                      :: column_index
2531
2532      column_index = obs_columnIndexFromNameForFlavour(odc_flavour_IB, column_name)
2533   end function obs_columnIndexFromName_IB
2534
2535
2536   function obs_columnIndexFromName_IH(column_name) result(column_index)
2537      !
2538      !:Purpose:
2539      !      This wrapper around obs_columnIndexFromName selects the data-column
2540      !      flavour.
2541      !
2542      implicit none
2543
2544      ! Arguments:
2545      character(len=*), intent(in) :: column_name
2546      ! Result:
2547      integer                      :: column_index
2548
2549      column_index = obs_columnIndexFromNameForFlavour(odc_flavour_IH, column_name)
2550   end function obs_columnIndexFromName_IH
2551
2552
2553   function obs_columnIndexFromName_RB(column_name) result(column_index)
2554      !
2555      !:Purpose:
2556      !      This wrapper around obs_columnIndexFromName selects the data-column
2557      !      flavour.
2558      !
2559      implicit none
2560
2561      ! Arguments:
2562      character(len=*), intent(in) :: column_name
2563      ! Result:
2564      integer                      :: column_index
2565
2566      column_index = obs_columnIndexFromNameForFlavour(odc_flavour_RB, column_name)
2567   end function obs_columnIndexFromName_RB
2568
2569
2570   function obs_columnIndexFromName_RH(column_name) result(column_index)
2571      !
2572      !:Purpose:
2573      !      This wrapper around obs_columnIndexFromName selects the data-column
2574      !      flavour.
2575      !
2576      implicit none
2577
2578      ! Arguments:
2579      character(len=*), intent(in) :: column_name
2580      ! Result:
2581      integer                      :: column_index
2582
2583      column_index = obs_columnIndexFromNameForFlavour(odc_flavour_RH, column_name)
2584   end function obs_columnIndexFromName_RH
2585
2586
2587   function obs_columnDataType(columnIndex) result(dataType)
2588     !
2589     !:Purpose: return the data type of column, either 'real' or 'integer'
2590     !
2591     implicit none
2592
2593     ! Arguments:
2594     integer, intent(in) :: columnIndex
2595     ! Result:
2596     character(len=7)    :: dataType
2597
2598     if (columnIndex >= NHDR_INT_BEG .and. columnIndex <= NHDR_INT_END) then
2599       dataType = 'integer'
2600     else if (columnIndex >= NHDR_REAL_BEG .and. columnIndex <= NHDR_REAL_END) then
2601       dataType = 'real'
2602     else if (columnIndex >= NBDY_INT_BEG .and. columnIndex <= NBDY_INT_END) then
2603       dataType = 'integer'
2604     else if (columnIndex >= NBDY_REAL_BEG .and. columnIndex <= NBDY_REAL_END) then
2605       dataType = 'real'
2606     else
2607       call obs_abort('obs_columnDataType: invalid value of column index')
2608     end if
2609
2610   end function obs_columnDataType
2611
2612
2613   subroutine obs_copy( obs_a, obs_b )
2614      !
2615      !:Purpose: copy an obsdat object
2616      !
2617      !:Arguments:
2618      !           :obs_a: input object
2619      !           :obs_b: a copy of obs_a
2620      !
2621      !:Note: this method assumes that obs_b has already been initialized
2622      !
2623      implicit none
2624
2625      ! Arguments:
2626      type(struct_obs), intent(in)    :: obs_a
2627      type(struct_obs), intent(inout) :: obs_b
2628
2629      ! Locals:
2630      integer :: column_index
2631
2632      ! check if object to be copied is empty and react appropriately
2633      if (obs_a%numHeader_max == 0 .or. obs_a%numBody_max == 0) then
2634         obs_b%numHeader     = obs_a%numHeader
2635         obs_b%numHeader_max = obs_a%numHeader_max
2636         obs_b%numBody       = obs_a%numBody
2637         obs_b%numBody_max   = obs_a%numBody_max
2638         return
2639      endif
2640
2641      !** Commented out by M. Buehner to allow use in EnVar (also added copy of 
2642      !** headerIndex_mpiglobal and bodyIndex_mpiglobal, if they exist)
2643      !if(obs_a%mpi_local)then
2644      !   call obs_abort( &
2645      !         'obs_copy() is not equipped to handle the case, mpi_local=.true.')
2646      !   return
2647      !end if
2648
2649      do column_index=NHDR_REAL_BEG,NHDR_REAL_END
2650         if(obs_a%realHeaders%odc_flavour%columnActive(column_index))  &
2651              obs_b%realHeaders%columns(column_index)%value_r(:) &
2652            = obs_a%realHeaders%columns(column_index)%value_r(:)
2653      enddo
2654      do column_index=NHDR_INT_BEG,NHDR_INT_END
2655         if(obs_a%intHeaders%odc_flavour%columnActive(column_index))  &
2656              obs_b%intHeaders%columns(column_index)%value_i(:) &
2657            = obs_a%intHeaders%columns(column_index)%value_i(:)
2658      enddo
2659      do column_index=NBDY_REAL_BEG,NBDY_REAL_END
2660         if(obs_a%realBodies%odc_flavour%columnActive(column_index))  &
2661              obs_b%realBodies%columns(column_index)%value_r(:) &
2662            = obs_a%realBodies%columns(column_index)%value_r(:)
2663      enddo
2664      do column_index=NBDY_INT_BEG,NBDY_INT_END
2665         if(obs_a%intBodies%odc_flavour%columnActive(column_index))  &
2666              obs_b%intBodies%columns(column_index)%value_i(:) &
2667            = obs_a%intBodies%columns(column_index)%value_i(:)
2668      enddo
2669      obs_b%headerPrimaryKey(:) = obs_a%headerPrimaryKey(:)
2670      obs_b%cstnid(:)     = obs_a%cstnid(:)
2671      obs_b%cfamily(:)    = obs_a%cfamily(:)
2672
2673      obs_b%numHeader     = obs_a%numHeader
2674      obs_b%numHeader_max = obs_a%numHeader_max
2675      obs_b%numBody       = obs_a%numBody
2676      obs_b%numBody_max   = obs_a%numBody_max
2677
2678      if(associated(obs_a%headerIndex_mpiglobal)) then
2679        write(*,*) 'obs_copy: copying headerIndex_mpiglobal'
2680        if(associated(obs_b%headerIndex_mpiglobal)) then
2681          deallocate(obs_b%headerIndex_mpiglobal)
2682        endif
2683        allocate(obs_b%headerIndex_mpiglobal(size(obs_a%headerIndex_mpiglobal)))
2684        obs_b%headerIndex_mpiglobal(:) = obs_a%headerIndex_mpiglobal(:)
2685      endif
2686
2687      if(associated(obs_a%bodyIndex_mpiglobal)) then
2688        write(*,*) 'obs_copy: copying bodyIndex_mpiglobal'
2689        if(associated(obs_b%bodyIndex_mpiglobal)) then
2690          deallocate(obs_b%bodyIndex_mpiglobal)
2691        endif
2692        allocate(obs_b%bodyIndex_mpiglobal(size(obs_a%bodyIndex_mpiglobal)))
2693        obs_b%bodyIndex_mpiglobal(:) = obs_a%bodyIndex_mpiglobal(:)
2694      endif
2695
2696      obs_b%mpi_local     = obs_a%mpi_local
2697   end subroutine obs_copy
2698
2699
2700   subroutine obs_deallocate(obsdat)
2701      !
2702      !:Purpose:
2703      !      De-allocate arrays.  This is a private method.
2704      !
2705      implicit none
2706
2707      ! Arguments:
2708      type(struct_obs), intent(inout) :: obsdat
2709
2710      ! Locals:
2711      integer :: ierr
2712      integer :: column_index
2713
2714      if(.not.obsdat%allocated) then
2715         ! The object content has already been de-allocated.  Don't repeat it.
2716         return
2717      endif
2718
2719      obsdat%allocated=.false.
2720      HEADER:if(obsdat%numHeader_max > 0) then
2721         if (associated(obsdat%headerPrimaryKey)) then
2722            deallocate(obsdat%headerPrimaryKey,STAT=ierr)
2723            nullify(obsdat%headerPrimaryKey)
2724            if(ierr /= 0)write(*,*) 'Problem detected with headerPrimaryKey. IERR =',ierr
2725         end if
2726          
2727         if (associated(obsdat%cfamily)) then
2728            deallocate(obsdat%cfamily,STAT=ierr)
2729            nullify(obsdat%cfamily)
2730            if(ierr /= 0)write(*,*) 'Problem detected with CFAMILY. IERR =',ierr
2731         end if
2732
2733         if (associated(obsdat%cstnid))then
2734            deallocate(obsdat%cstnid,STAT=ierr)
2735            nullify(obsdat%cstnid)
2736            if(ierr /= 0)write(*,*) 'Problem detected with CSTNID. IERR =',ierr
2737         end if
2738
2739         if (associated(obsdat%realHeaders%columns))then
2740            do column_index=NHDR_REAL_BEG,NHDR_REAL_END
2741               if(obsdat%realHeaders%odc_flavour%columnActive(column_index)) &
2742                  call odc_deallocate(obsdat%realHeaders%columns(column_index))
2743            enddo
2744         end if
2745
2746         if (associated(obsdat%intHeaders%columns))then
2747            do column_index=NHDR_INT_BEG,NHDR_INT_END
2748               if(obsdat%intHeaders%odc_flavour%columnActive(column_index)) &
2749                  call odc_deallocate(obsdat%intHeaders%columns(column_index))
2750            enddo
2751         end if
2752
2753         deallocate(obsdat%scratchRealHeader)
2754         nullify   (obsdat%scratchRealHeader)
2755
2756         deallocate(obsdat%scratchIntHeader)
2757         nullify   (obsdat%scratchIntHeader)
2758      endif HEADER
2759
2760      BODY:if(obsdat%numBody_max > 0) then
2761         if (associated(obsdat%bodyPrimaryKey)) then
2762            deallocate(obsdat%bodyPrimaryKey,STAT=ierr)
2763            nullify(obsdat%bodyPrimaryKey)
2764            if(ierr /= 0)write(*,*) 'Problem detected with bodyPrimaryKey. IERR =',ierr
2765         end if
2766
2767         if (associated(obsdat%realBodies%columns))then
2768            do column_index=NBDY_REAL_BEG,NBDY_REAL_END
2769               if(obsdat%realBodies%odc_flavour%columnActive(column_index)) &
2770                  call odc_deallocate(obsdat%realBodies%columns(column_index))
2771            enddo
2772         end if
2773
2774         if (associated(obsdat%intBodies%columns))then
2775            do column_index=NBDY_INT_BEG,NBDY_INT_END
2776               if(obsdat%intBodies%odc_flavour%columnActive(column_index)) &
2777                  call odc_deallocate(obsdat%intBodies%columns(column_index))
2778            enddo
2779         end if
2780
2781         deallocate(obsdat%scratchRealBody)
2782         nullify   (obsdat%scratchRealBody)
2783
2784         deallocate(obsdat%scratchIntBody)
2785         nullify   (obsdat%scratchIntBody)
2786      endif BODY
2787
2788      obsdat%numHeader_max=0
2789      obsdat%numBody_max=0
2790
2791      call ild_finalize(obsdat%header_index_list_depot)
2792      call ild_finalize(obsdat%body_index_list_depot)
2793
2794   end subroutine obs_deallocate
2795
2796
2797   function obs_elem_c(obsdat,name,row_index) result(value)
2798      !
2799      !:Purpose: Get a character-string-valued observation-data element.
2800      !      To control access to the observation object.  Returns the
2801      !      (character) value of the ObsData element with the indicated name
2802      !      and row_index.
2803      !
2804      implicit none
2805
2806      ! Arguments:
2807      type(struct_obs), intent(in)  :: obsdat
2808      character(len=*), intent(in)  :: name
2809      integer         , intent(in)  :: row_index
2810      ! Result:
2811      character(len=12) :: value
2812
2813      select case (trim(name))
2814      case ('STID'); value=obsdat%cstnid(row_index)
2815
2816      case default
2817         call obs_abort('ERROR:  ' // name(1:4) // &
2818                        ' is not a character(len=*) observation.');return
2819      end select
2820   end function obs_elem_c
2821
2822
2823   subroutine obs_enkf_prntbdy(obsdat,kstn,kulout)
2824      !
2825      !:Purpose:  print all data records associated with an observation
2826      !
2827      !:Arguments:
2828      !     :kstn: no. of station 
2829      !     :kulout: unit used for printing
2830      !
2831      implicit none
2832
2833      ! Arguments:
2834      type(struct_obs), intent(in) :: obsdat
2835      integer,          intent(in) :: kstn
2836      integer,          intent(in) :: kulout
2837
2838      ! Locals:
2839      integer :: ipnt, idata, idata2, jdata, var3d
2840
2841      ! general information
2842
2843      ipnt  = obs_headElem_i(obsdat, OBS_RLN, kstn)
2844      idata = obs_headElem_i(obsdat, OBS_NLV, kstn)
2845
2846      if(idata == 1) then
2847         write(kulout,fmt=9101)idata,kstn
2848      else
2849         write(kulout,fmt=9100)idata,kstn
2850      end if
28519100  format(2x,'there are ',i6,1x,'data in record no.',1x,i6)
28529101  format(2x,'there is ', i6,1x,'data in record no.',1x,i6)
2853
2854      ! print all data records
2855
2856      write(kulout,'(a,a,a,a)') '   no.   var.  press. ass observ. ', &
2857         '  o minus p  o minus p6  o minus a   o min a0     obserr. root(hpht) ', &
2858         'root(haht) innov std dev obs std dev   acc ', &
2859         'zhad  vco'
2860      do jdata = ipnt, ipnt + idata - 1
2861         idata2 = jdata -ipnt + 1
2862         if (btest(obsdat%intBodies%columns(OBS_FLG)%value_i(jdata),12)) then 
2863            var3d=1
2864         else
2865            var3d=0
2866         endif
2867
2868         write(kulout,fmt=9201) idata2,obsdat%intBodies%columns(OBS_VNM)%value_i(jdata), &
2869            obsdat%realBodies%columns(OBS_PPP )%value_r(jdata), &
2870            obsdat%intBodies%columns(OBS_ASS )%value_i(jdata), &
2871            obsdat%realBodies%columns(OBS_VAR )%value_r(jdata), &
2872            obsdat%realBodies%columns(OBS_OMP )%value_r(jdata), &
2873            obsdat%realBodies%columns(OBS_OMP6)%value_r(jdata), &
2874            obsdat%realBodies%columns(OBS_OMA )%value_r(jdata), &
2875            obsdat%realBodies%columns(OBS_OMA0)%value_r(jdata), &
2876            obsdat%realBodies%columns(OBS_OER )%value_r(jdata), &
2877            obsdat%realBodies%columns(OBS_HPHT)%value_r(jdata), &
2878            obsdat%realBodies%columns(OBS_HAHT)%value_r(jdata), &
2879            obsdat%realBodies%columns(OBS_SIGI)%value_r(jdata), &
2880            obsdat%realBodies%columns(OBS_SIGO)%value_r(jdata), &
2881            var3d, &
2882            obsdat%realBodies%columns(OBS_ZHA )%value_r(jdata), &
2883            obsdat%intBodies%columns(OBS_VCO )%value_i(jdata)
2884      enddo
2885
28869201  format(1x,i6,1x,i6,1x,f7.0,1x,i3,10(1x,f10.3),1x,i2, &
2887           1x,f10.3,1x,i2)
2888
2889      return
2890
2891   end subroutine obs_enkf_prntbdy
2892
2893
2894   subroutine obs_enkf_prnthdr(obsdata,kobs,kulout)
2895      !
2896      !:Purpose: printing of the header of an observation record
2897      !
2898      !:Arguments:
2899      !           :kobs: no. of observation
2900      !           :kulout: unit used for optional printing
2901      !
2902      implicit none
2903
2904      ! Arguments:
2905      type(struct_obs), intent(in) :: obsdata
2906      integer,          intent(in) :: kobs
2907      integer,          intent(in) :: kulout
2908
2909      ! Locals:
2910      integer :: obsRLN, obsONM, obsDAT, obsETM, obsINS, obsIDF, obsITY
2911      integer :: obsNLV, obsPAS, obsREG, obsIP
2912      real(pre_obsReal) ::obsLAT, obsLON, obsALT, obsBX, obsBY, obsBZ, obsAZA
2913
2914      ! general information
2915
2916      write(kulout,fmt=9100)kobs,obsdata%cstnid(KOBS)
29179100  format(//,2x,'-- observation record no.' &
2918         ,1x,i6,3x,' station id:',A12)
2919
2920      ! print header's content
2921
29229202  format(2x,'position within realBodies:',i6)
2923      obsRLN = obs_headElem_i(obsdata, OBS_RLN, kobs)
2924      obsONM = obs_headElem_i(obsdata, OBS_ONM, kobs)
2925      obsDAT = obs_headElem_i(obsdata, OBS_DAT, kobs)
2926      obsETM = obs_headElem_i(obsdata, OBS_ETM, kobs)
2927      obsINS = obs_headElem_i(obsdata, OBS_INS, kobs)
2928      obsIDF = obs_headElem_i(obsdata, OBS_IDF, kobs)
2929      obsITY = obs_headElem_i(obsdata, OBS_ITY, kobs)
2930      obsLAT = obs_headElem_r(obsdata, OBS_LAT, kobs)
2931      obsLON = obs_headElem_r(obsdata, OBS_LON, kobs)
2932      obsALT = obs_headElem_r(obsdata, OBS_ALT, kobs)
2933      obsBX  = obs_headElem_r(obsdata, OBS_BX , kobs)
2934      obsBY  = obs_headElem_r(obsdata, OBS_BY , kobs)
2935      obsBZ  = obs_headElem_r(obsdata, OBS_BZ , kobs)
2936      obsAZA = obs_headElem_r(obsdata, OBS_AZA, kobs)
2937      write(kulout,fmt=9200) obsRLN, obsONM, obsDAT, obsETM, &
2938           obsINS, obsIDF, obsITY, obsLAT, obsLON, obsALT, &
2939           obsBX , obsBY , obsBZ , obsAZA
2940
2941      obsNLV = obs_headElem_i(obsdata, OBS_NLV, kobs)
2942      obsPAS = obs_headElem_i(obsdata, OBS_PAS, kobs)
2943      obsREG = obs_headElem_i(obsdata, OBS_REG, kobs)
2944      obsIP  = obs_headElem_i(obsdata, OBS_IP , kobs)
2945      write(kulout,fmt=9201) obsNLV, obsPAS, obsREG, obsIP
2946         
2947
29489200  format(2x,'position within realBodies:',i8,1x,'stn. number:',i6,1x,/, &
2949         '  date: ',i10,1x,' time: ',i8,/, &
2950         '  model box:',i12,1x,'instrument: ',i6,1x, &
2951         'obs. type:',i8,1x,/, &
2952         '  (lat,lon):',f12.6,1x,f12.6,1x, &
2953         'stations altitude:',f12.6,1x,/,2x, &
2954         'block location: ',3(f12.6,1x),/,2x, &
2955         'azimuth angle:',f12.6)
29569201  format('  number of data:',i6,1x, &
2957         ' pass: ',i6,' region: ',i6,/,2x,'processor: ',i6)
2958
2959      return
2960   end subroutine obs_enkf_prnthdr
2961
2962
2963   subroutine obs_expandToMpiGlobal(obsdat)
2964      !
2965      !:Purpose: restore Global array realBodies and intBodies.
2966      !      To reconstitute the mpi-global observation object by gathering the
2967      !      necessary data from all processors (to all processors).
2968      !
2969      !:Note: for the character data cstnid(:), this is converted to integers
2970      !       with IACHAR and back to characters with ACHAR, to facilitate this
2971      !       gather through rpn_comm_allreduce
2972      !
2973      implicit none
2974
2975      ! Arguments:
2976      type(struct_obs), intent(inout) :: obsdat
2977
2978      ! Locals:
2979      integer, allocatable :: headerIndex_mpiglobal(:),all_headerIndex_mpiglobal(:,:)
2980      integer, allocatable :: bodyIndex_mpiglobal(:),all_bodyIndex_mpiglobal(:,:)
2981      integer(8), allocatable :: headerPrimaryKey_mpilocal(:), all_headerPrimaryKey_mpilocal(:,:)
2982      integer(8), allocatable :: bodyPrimaryKey_mpilocal(:), all_bodyPrimaryKey_mpilocal(:,:)
2983      integer, allocatable :: intHeaders_mpilocal(:,:),all_intHeaders_mpilocal(:,:,:)
2984      real(pre_obsReal), allocatable :: realHeaders_mpilocal(:,:),all_realHeaders_mpilocal(:,:,:)
2985      integer, allocatable :: intStnid_mpilocal(:,:),all_intStnid_mpilocal(:,:,:)
2986      integer, allocatable :: intFamily_mpilocal(:,:),all_intFamily_mpilocal(:,:,:)
2987      integer, allocatable :: intBodies_mpilocal(:,:),all_intBodies_mpilocal(:,:,:)
2988      integer, allocatable :: all_numHeader_mpilocal(:), all_numBody_mpilocal(:)
2989      real(pre_obsReal), allocatable :: realBodies_mpilocal(:,:),all_realBodies_mpilocal(:,:,:)
2990      integer :: ierr
2991      integer :: get_max_rss
2992      integer :: numHeader_mpilocalmax,numBody_mpilocalmax
2993      integer :: numHeader_mpiGlobal,numBody_mpiGlobal
2994      integer :: numHeader_mpilocal, numBody_mpilocal
2995      integer :: bodyIndex_mpilocal,bodyIndex
2996      integer :: headerIndex_mpilocal,headerIndex
2997      integer :: headerIndexOffset,bodyIndexOffset
2998      integer :: nsize,sourcePE,nprocs_mpi,myid_mpi,procIndex
2999      integer :: charIndex,activeIndex,columnIndex
3000
3001      write(*,*) 'Entering obs_expandToMpiGlobal'
3002      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3003
3004      if(.not. obsdat%mpi_local)then
3005         call obs_abort('OBS_EXPANDTOMPIGLOBAL has been called, but the ' &
3006                        // 'obsSpaceData object is already in mpi-global state')
3007         return
3008      endif
3009
3010      ! determine rank and number of mpi tasks
3011      call rpn_comm_size("GRID",nprocs_mpi,ierr)
3012      call rpn_comm_rank("GRID",myid_mpi,ierr)
3013
3014      ! determine number of rows in mpiglobal arrays
3015      numHeader_mpiGlobal = obs_numHeader_mpiglobal(obsdat)
3016      numBody_mpiGlobal   = obs_numBody_mpiglobal  (obsdat)
3017
3018      ! determine number of rows in mpilocal arrays
3019      numHeader_mpilocal = obs_numHeader(obsdat)
3020      numBody_mpilocal   = obs_numBody(obsdat)
3021
3022      ! construct the header/body mpiglobal indices if they do not exist
3023      if ( .not. associated(obsdat%headerIndex_mpiglobal) ) then
3024         write(*,*) 'obs_expandToMpiGlobal: This object was never mpiglobal. ' // &
3025                    'Just gather the body and header rows in a simple way.'
3026
3027         if(numHeader_mpilocal > 0) then
3028            ! Allocate the list of global header indices
3029            allocate(obsdat%headerIndex_mpiglobal(numHeader_mpilocal))
3030            obsdat%headerIndex_mpiglobal(:)=0
3031         else
3032            nullify(obsdat%headerIndex_mpiglobal)
3033            write(*,*) 'obs_expandToMpiGlobal: This mpi processor has zero headers.'
3034         end if
3035
3036         if(numBody_mpilocal > 0) then
3037            ! Allocate the list of global body indices
3038            allocate(obsdat%bodyIndex_mpiglobal(numBody_mpilocal))
3039            obsdat%bodyIndex_mpiglobal(:)=0
3040         else
3041            nullify(obsdat%bodyIndex_mpiglobal)
3042            write(*,*) 'obs_expandToMpiGlobal: This mpi processor has zero bodies.'
3043         endif
3044
3045         allocate( all_numHeader_mpilocal(nprocs_mpi) )
3046         call rpn_comm_allgather(numHeader_mpilocal,     1, "mpi_integer", &
3047                                 all_numHeader_mpilocal, 1, "mpi_integer", &
3048                                 "GRID",ierr)
3049         allocate( all_numBody_mpilocal(nprocs_mpi) )
3050         call rpn_comm_allgather(numBody_mpilocal,     1, "mpi_integer", &
3051                                 all_numBody_mpilocal, 1, "mpi_integer", &
3052                                 "GRID",ierr)
3053
3054         headerIndexOffset = 0
3055         bodyIndexOffset = 0
3056         do procIndex = 1, myid_mpi
3057            headerIndexOffset = headerIndexOffset + all_numHeader_mpilocal(procIndex)
3058            bodyIndexOffset   = bodyIndexOffset   + all_numBody_mpilocal(procIndex)
3059         end do
3060
3061         do headerIndex = 1, numHeader_mpilocal
3062            obsdat%headerIndex_mpiglobal(headerIndex) = headerIndex + headerIndexOffset
3063         end do
3064
3065         do bodyIndex = 1, numBody_mpilocal
3066            obsdat%bodyIndex_mpiglobal(bodyIndex) = bodyIndex + bodyIndexOffset
3067         end do
3068
3069      end if ! construct mpiglobal indices
3070
3071      ! first set the mpiglobal header index value stored in the body table
3072      do bodyIndex_mpilocal=1,obsdat%numBody
3073         headerIndex_mpilocal=obs_bodyElem_i(obsdat,OBS_HIND,bodyIndex_mpilocal)
3074         call obs_bodySet_i(obsdat,OBS_HIND,bodyIndex_mpilocal, &
3075            obsdat%headerIndex_mpiglobal(headerIndex_mpilocal))
3076      enddo
3077
3078      ! gather the lists of mpiglobal header indices on proc 0 to know where everything goes
3079      call rpn_comm_allreduce(obsdat%numHeader,numHeader_mpilocalmax,1,"mpi_integer","mpi_max","GRID",ierr)
3080      allocate(headerIndex_mpiglobal(numHeader_mpilocalmax))
3081      headerIndex_mpiglobal(:)=0
3082      do headerIndex_mpilocal=1,obsdat%numHeader
3083         headerIndex_mpiglobal(headerIndex_mpilocal)=obsdat%headerIndex_mpiglobal(headerIndex_mpilocal)
3084      enddo
3085
3086      if(myid_mpi == 0) then
3087         allocate(all_headerIndex_mpiglobal(numHeader_mpilocalmax,0:nprocs_mpi-1))
3088      else
3089         allocate(all_headerIndex_mpiglobal(1,1))
3090      end if
3091
3092      call rpn_comm_gather(headerIndex_mpiglobal    ,numHeader_mpilocalmax,"mpi_integer", &
3093                           all_headerIndex_mpiglobal,numHeader_mpilocalmax,"mpi_integer", &
3094                           0,"GRID",ierr)
3095      deallocate(headerIndex_mpiglobal)
3096      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3097
3098      ! make the header primary key mpiglobal
3099      allocate(headerPrimaryKey_mpilocal(numHeader_mpilocalmax))
3100      headerPrimaryKey_mpilocal(:) = 0
3101      do headerIndex_mpilocal=1,obsdat%numHeader
3102         headerPrimaryKey_mpilocal(headerIndex_mpilocal)=  &
3103            obsdat%headerPrimaryKey(headerIndex_mpilocal)
3104      enddo
3105      if(myid_mpi == 0) then
3106         allocate(all_headerPrimaryKey_mpilocal(numHeader_mpilocalmax,0:nprocs_mpi-1))
3107      else
3108         allocate(all_headerPrimaryKey_mpilocal(1,1))
3109      end if
3110      nsize=size(headerPrimaryKey_mpilocal)
3111      call rpn_comm_gather(headerPrimaryKey_mpilocal    ,nsize,"mpi_integer8", &
3112                           all_headerPrimaryKey_mpilocal,nsize,"mpi_integer8", &
3113                           0,"GRID",ierr)
3114      deallocate(headerPrimaryKey_mpilocal)
3115      
3116      ! make header-level integer data mpiglobal
3117      allocate(intHeaders_mpilocal(odc_numActiveColumn(obsdat%intHeaders),numHeader_mpilocalmax))
3118      intHeaders_mpilocal(:,:)=0
3119      do headerIndex_mpilocal=1,obsdat%numHeader
3120         do activeIndex=1,odc_numActiveColumn(obsdat%intHeaders)
3121            columnIndex=odc_columnIndexFromActiveIndex( &
3122                                     obsdat%intHeaders%odc_flavour, activeIndex)
3123            intHeaders_mpilocal(activeIndex,headerIndex_mpilocal)=  &
3124               obs_headElem_i(obsdat, columnIndex, headerIndex_mpilocal)
3125         enddo
3126      enddo
3127
3128      if(myid_mpi == 0) then
3129         allocate(all_intHeaders_mpilocal(odc_numActiveColumn(obsdat%intHeaders),numHeader_mpilocalmax,0:nprocs_mpi-1))
3130      else
3131         allocate(all_intHeaders_mpilocal(1,1,1))
3132      end if
3133
3134      nsize=size(intHeaders_mpilocal)
3135      call rpn_comm_gather(intHeaders_mpilocal    ,nsize,"mpi_integer", &
3136                           all_intHeaders_mpilocal,nsize,"mpi_integer", &
3137                           0,"GRID",ierr)
3138      deallocate(intHeaders_mpilocal)
3139      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3140
3141      ! make header-level real data mpiglobal
3142      allocate(realHeaders_mpilocal(odc_numActiveColumn(obsdat%realHeaders),numHeader_mpilocalmax))
3143      realHeaders_mpilocal(:,:)=real(0.0d0,pre_obsReal)
3144      do headerIndex_mpilocal=1,obsdat%numHeader
3145         do activeIndex=1,odc_numActiveColumn(obsdat%realHeaders)
3146            columnIndex=odc_columnIndexFromActiveIndex( &
3147                                     obsdat%realHeaders%odc_flavour, activeIndex)
3148            realHeaders_mpilocal(activeIndex,headerIndex_mpilocal)=  &
3149               obs_headElem_r(obsdat, columnIndex, headerIndex_mpilocal)
3150         enddo
3151      enddo
3152      
3153      if(myid_mpi == 0) then
3154         allocate(all_realHeaders_mpilocal(odc_numActiveColumn(obsdat%realHeaders),numHeader_mpilocalmax,0:nprocs_mpi-1))
3155      else
3156         allocate(all_realHeaders_mpilocal(1,1,1))
3157      end if
3158
3159      nsize=size(realHeaders_mpilocal)
3160      call rpn_comm_gather(realHeaders_mpilocal    ,nsize,pre_obsMpiReal, &
3161                           all_realHeaders_mpilocal,nsize,pre_obsMpiReal, &
3162                           0,"GRID",ierr)
3163      deallocate(realHeaders_mpilocal)
3164      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3165
3166      ! make station-id data mpiglobal
3167      allocate(intStnid_mpilocal(len(obsdat%cstnid(1)),numHeader_mpilocalmax))
3168      intStnid_mpilocal(:,:)=0
3169      do headerIndex_mpilocal=1,obsdat%numHeader
3170         do charIndex=1,len(obsdat%cstnid(1))
3171            intStnid_mpilocal(charIndex,headerIndex_mpilocal)=  &
3172               iachar(obsdat%cstnid(headerIndex_mpilocal)(charIndex:charIndex))
3173         enddo
3174      enddo
3175
3176      if(myid_mpi == 0) then
3177         allocate(all_intStnid_mpilocal(len(obsdat%cstnid(1)),numHeader_mpilocalmax,0:nprocs_mpi-1))
3178      else
3179         allocate(all_intStnid_mpilocal(1,1,1))
3180      end if
3181
3182      nsize=size(intStnid_mpilocal)
3183      call rpn_comm_gather(intStnid_mpilocal    ,nsize,"mpi_integer", &
3184                           all_intStnid_mpilocal,nsize,"mpi_integer", &
3185                           0,"GRID",ierr)
3186      deallocate(intStnid_mpilocal)
3187      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3188
3189      ! make obs family data mpiglobal
3190      allocate(intFamily_mpilocal(len(obsdat%cfamily(1)),numHeader_mpilocalmax))
3191      intFamily_mpilocal(:,:)=0
3192      do headerIndex_mpilocal=1,obsdat%numHeader
3193         do charIndex=1,len(obsdat%cfamily(1))
3194            intFamily_mpilocal(charIndex,headerIndex_mpilocal)=  &
3195               iachar(obsdat%cfamily(headerIndex_mpilocal)(charIndex:charIndex))
3196         enddo
3197      enddo
3198
3199      if(myid_mpi == 0) then
3200         allocate(all_intFamily_mpilocal(len(obsdat%cfamily(1)),numHeader_mpilocalmax,0:nprocs_mpi-1))
3201      else
3202         allocate(all_intFamily_mpilocal(1,1,1))
3203      end if
3204
3205      nsize=size(intFamily_mpilocal)
3206      call rpn_comm_gather(intFamily_mpilocal    ,nsize,"mpi_integer", &
3207                           all_intFamily_mpilocal,nsize,"mpi_integer", &
3208                           0,"GRID",ierr)
3209      deallocate(intFamily_mpilocal)
3210      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3211
3212      ! gather the lists of mpiglobal body indices on proc 0 to know where everything goes
3213      call rpn_comm_allreduce(obsdat%numBody,numBody_mpilocalmax,1,"mpi_integer","mpi_max","GRID",ierr)
3214      allocate(bodyIndex_mpiglobal(numBody_mpilocalmax))
3215      bodyIndex_mpiglobal(:)=0
3216      do bodyIndex_mpilocal=1,obsdat%numBody
3217         bodyIndex_mpiglobal(bodyIndex_mpilocal)=obsdat%bodyIndex_mpiglobal(bodyIndex_mpilocal)
3218      enddo
3219
3220      if(myid_mpi == 0) then
3221         allocate(all_bodyIndex_mpiglobal(numBody_mpilocalmax,0:nprocs_mpi-1))
3222      else
3223         allocate(all_bodyIndex_mpiglobal(1,1))
3224      end if
3225
3226      call rpn_comm_gather(bodyIndex_mpiglobal    ,numBody_mpilocalmax,"mpi_integer", &
3227                           all_BodyIndex_mpiglobal,numBody_mpilocalmax,"mpi_integer", &
3228                           0,"GRID",ierr)
3229      deallocate(bodyIndex_mpiglobal)
3230      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3231
3232      ! make body primary key mpiglobal
3233      allocate(bodyPrimaryKey_mpilocal(numBody_mpilocalmax))
3234      bodyPrimaryKey_mpilocal(:)=0
3235      do bodyIndex_mpilocal=1,obsdat%numBody
3236         bodyPrimaryKey_mpilocal(bodyIndex_mpilocal)=  &
3237            obsdat%bodyPrimaryKey(bodyIndex_mpilocal)
3238      enddo
3239      if(myid_mpi == 0) then
3240         allocate(all_bodyPrimaryKey_mpilocal(numBody_mpilocalmax,0:nprocs_mpi-1))
3241      else
3242         allocate(all_bodyPrimaryKey_mpilocal(1,1))
3243      end if
3244      nsize=size(bodyPrimaryKey_mpilocal)
3245      call rpn_comm_gather(bodyPrimaryKey_mpilocal    ,nsize,"mpi_integer8", &
3246                           all_bodyPrimaryKey_mpilocal,nsize,"mpi_integer8", &
3247                           0,"GRID",ierr)
3248      deallocate(bodyPrimaryKey_mpilocal)
3249
3250      ! make body-level integer data mpiglobal
3251      allocate(intBodies_mpilocal(odc_numActiveColumn(obsdat%intBodies),numBody_mpilocalmax))
3252      intBodies_mpilocal(:,:)=0
3253      do bodyIndex_mpilocal=1,obsdat%numBody
3254         do activeIndex=1,odc_numActiveColumn(obsdat%intBodies)
3255            columnIndex=odc_columnIndexFromActiveIndex( &
3256                                     obsdat%intBodies%odc_flavour, activeIndex)
3257            intBodies_mpilocal(activeIndex,bodyIndex_mpilocal)=  &
3258               obs_bodyElem_i(obsdat, columnIndex, bodyIndex_mpilocal)
3259         enddo
3260      enddo
3261
3262      if(myid_mpi == 0) then
3263         allocate(all_intBodies_mpilocal(odc_numActiveColumn(obsdat%intBodies),numBody_mpilocalmax,0:nprocs_mpi-1))
3264      else
3265         allocate(all_intBodies_mpilocal(1,1,1))
3266      end if
3267
3268      nsize=size(intBodies_mpilocal)
3269      call rpn_comm_gather(intBodies_mpilocal    ,nsize,"mpi_integer", &
3270                           all_intBodies_mpilocal,nsize,"mpi_integer", &
3271                           0,"GRID",ierr)
3272      deallocate(intBodies_mpilocal)
3273      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3274
3275      ! make body-level real data mpiglobal
3276      allocate(realBodies_mpilocal(odc_numActiveColumn(obsdat%realBodies),numBody_mpilocalmax))
3277      realBodies_mpilocal(:,:)=real(0.0d0,pre_obsReal)
3278      do bodyIndex_mpilocal=1,obsdat%numBody
3279         do activeIndex=1,odc_numActiveColumn(obsdat%realBodies)
3280            columnIndex=odc_columnIndexFromActiveIndex( &
3281                                     obsdat%realBodies%odc_flavour, activeIndex)
3282            realBodies_mpilocal(activeIndex,bodyIndex_mpilocal)=  &
3283               obs_bodyElem_r(obsdat, columnIndex, bodyIndex_mpilocal)
3284         enddo
3285      enddo
3286
3287      if(myid_mpi == 0) then
3288         allocate(all_realBodies_mpilocal(odc_numActiveColumn(obsdat%realBodies),numBody_mpilocalmax,0:nprocs_mpi-1))
3289      else
3290         allocate(all_realBodies_mpilocal(1,1,1))
3291      end if
3292
3293      nsize=size(realBodies_mpilocal)
3294      call rpn_comm_gather(realBodies_mpilocal    ,nsize,pre_obsMpiReal, &
3295                           all_realBodies_mpilocal,nsize,pre_obsMpiReal, &
3296                           0,"GRID",ierr)
3297      deallocate(realBodies_mpilocal)
3298      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3299
3300      ! destroy object's mpilocal data and allocate mpiglobal data
3301      call obs_deallocate(obsdat)
3302
3303      ! Only processor 0 does any work hereafter
3304      if(myid_mpi == 0) then
3305          call obs_allocate(obsdat,numHeader_mpiGlobal,numBody_mpiGlobal)
3306      else
3307          call obs_allocate(obsdat,0,0)
3308      endif
3309
3310      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3311
3312      if(myid_mpi == 0) then
3313
3314         do sourcePE=0,nprocs_mpi-1
3315            do headerIndex_mpilocal=1,numHeader_mpilocalmax
3316               ! grab the mpiglobal header index
3317               headerIndex=all_headerIndex_mpiglobal(headerIndex_mpilocal,sourcePE)
3318               if(headerIndex > 0) then
3319
3320                  obsdat%headerPrimaryKey(headerIndex)= &
3321                         all_headerPrimaryKey_mpilocal(headerIndex_mpilocal,sourcePE)
3322
3323                  do activeIndex=1,odc_numActiveColumn(obsdat%realHeaders)
3324                     columnIndex=odc_columnIndexFromActiveIndex(obsdat%realHeaders%odc_flavour,activeIndex)
3325                     obsdat%realHeaders%columns(columnIndex)%value_r(headerIndex)= &
3326                            all_realHeaders_mpilocal(activeIndex,headerIndex_mpilocal,sourcePE)
3327                  enddo
3328
3329                  do activeIndex=1,odc_numActiveColumn(obsdat%intHeaders)
3330                     columnIndex=odc_columnIndexFromActiveIndex(obsdat%intHeaders%odc_flavour,activeIndex)
3331                     obsdat%intHeaders%columns(columnIndex)%value_i(headerIndex)= &
3332                            all_intHeaders_mpilocal(activeIndex,headerIndex_mpilocal,sourcePE)
3333                  enddo
3334
3335                  do charIndex=1,len(obsdat%cstnid(1))
3336                     obsdat%cstnid(headerIndex)(charIndex:charIndex) = &
3337                       achar(all_intStnid_mpilocal(charIndex,headerIndex_mpilocal,sourcePE))
3338                  enddo
3339
3340                  do charIndex=1,len(obsdat%cfamily(1))
3341                     obsdat%cfamily(headerIndex)(charIndex:charIndex) = &
3342                        achar(all_intFamily_mpilocal(charIndex,headerIndex_mpilocal,sourcePE))
3343                  enddo
3344
3345               endif
3346            enddo
3347         enddo
3348
3349         do sourcePE=0,nprocs_mpi-1
3350            do bodyIndex_mpilocal=1,numBody_mpilocalmax
3351               bodyIndex=all_bodyIndex_mpiglobal(bodyIndex_mpilocal,sourcePE)
3352               if(bodyIndex > 0) then
3353
3354                  obsdat%bodyPrimaryKey(bodyIndex)=  &
3355                     all_bodyPrimaryKey_mpilocal(bodyIndex_mpilocal,sourcePE)
3356
3357                  do activeIndex=1,odc_numActiveColumn(obsdat%realBodies)
3358                     columnIndex=odc_columnIndexFromActiveIndex(obsdat%realBodies%odc_flavour,activeIndex)
3359                     obsdat%realBodies%columns(columnIndex)%value_r(bodyIndex)= &
3360                        all_realBodies_mpilocal(activeIndex,bodyIndex_mpilocal,sourcePE)
3361                  enddo
3362
3363                  do activeIndex=1,odc_numActiveColumn(obsdat%intBodies)
3364                     columnIndex=odc_columnIndexFromActiveIndex(obsdat%intBodies%odc_flavour,activeIndex)
3365                     obsdat%intBodies%columns(columnIndex)%value_i(bodyIndex)=  &
3366                        all_intBodies_mpilocal(activeIndex,bodyIndex_mpilocal,sourcePE)
3367                  enddo
3368
3369               endif
3370            enddo
3371         enddo
3372 
3373
3374         ! Make RLN point to global data
3375         do headerIndex=1,numHeader_mpiGlobal
3376            if(headerIndex == 1) then
3377               obsdat%intHeaders%columns(OBS_RLN)%value_i(headerIndex) = 1
3378            else
3379               obsdat%intHeaders%columns(OBS_RLN)%value_i(headerIndex) = &
3380                   obsdat%intHeaders%columns(OBS_RLN)%value_i(headerIndex-1) + &
3381                   obsdat%intHeaders%columns(OBS_NLV)%value_i(headerIndex-1)
3382            endif
3383         enddo
3384
3385         obsdat%numBody     =  numBody_mpiGlobal
3386         obsdat%numHeader   =numHeader_mpiGlobal
3387
3388      else
3389
3390         obsdat%numBody     = 0
3391         obsdat%numHeader   = 0
3392
3393      endif ! myid_mpi == 0
3394
3395      ! deallocate the complete temporary arrays
3396      deallocate(all_headerIndex_mpiglobal)
3397      deallocate(all_bodyIndex_mpiglobal)
3398      deallocate(all_headerPrimaryKey_mpilocal)
3399      deallocate(all_bodyPrimaryKey_mpilocal)
3400      deallocate(all_intStnid_mpilocal)
3401      deallocate(all_intFamily_mpilocal)
3402      deallocate(all_intHeaders_mpilocal)
3403      deallocate(all_realHeaders_mpilocal)
3404      deallocate(all_intBodies_mpilocal)
3405      deallocate(all_realBodies_mpilocal)
3406
3407      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
3408      obsdat%mpi_local = .false.
3409      write(*,*) 'Leaving obs_expandToMpiGlobal'
3410      return
3411   end subroutine obs_expandToMpiGlobal
3412
3413
3414   subroutine obs_extractObsRealBodyColumn(realBodyColumn, obsSpaceData, obsColumnIndex)
3415     !:Purpose: Extract contents of a real body column into a vector.
3416     !
3417     implicit none
3418
3419     ! Arguments:
3420     real(pre_obsReal), intent(out) :: realBodyColumn(:)
3421     type(struct_obs),  intent(in)  :: obsSpaceData
3422     integer,           intent(in)  :: obsColumnIndex
3423
3424     ! Locals:
3425     integer :: bodyIndex
3426
3427     do bodyIndex = 1, obs_numBody(obsSpaceData)
3428       realBodyColumn(bodyIndex) = obs_bodyElem_r(obsSpaceData,obsColumnIndex,bodyIndex)
3429     end do
3430
3431   end subroutine obs_extractObsRealBodyColumn
3432
3433
3434   subroutine obs_extractObsRealBodyColumn_r4(realBodyColumn, obsSpaceData, obsColumnIndex)
3435     !:Purpose: Extract contents of a real body column into a vector.
3436     !
3437     implicit none
3438
3439     ! Arguments:
3440     real(4),          intent(out) :: realBodyColumn(:)
3441     type(struct_obs), intent(in)  :: obsSpaceData
3442     integer,          intent(in)  :: obsColumnIndex
3443
3444     ! Locals:
3445     integer :: bodyIndex
3446
3447     do bodyIndex = 1, obs_numBody(obsSpaceData)
3448       realBodyColumn(bodyIndex) = obs_bodyElem_r(obsSpaceData,obsColumnIndex,bodyIndex)
3449     end do
3450
3451   end subroutine obs_extractObsRealBodyColumn_r4
3452
3453
3454   subroutine obs_extractObsIntBodyColumn(intBodyColumn, obsSpaceData, obsColumnIndex)
3455     !:Purpose: Extract contents of an integer body column into a vector.
3456     !
3457     implicit none
3458
3459     ! Arguments:
3460     integer,          intent(out) :: intBodyColumn(:)
3461     type(struct_obs), intent(in)  :: obsSpaceData
3462     integer,          intent(in)  :: obsColumnIndex
3463
3464     ! Locals:
3465     integer :: bodyIndex
3466
3467     do bodyIndex = 1, obs_numBody(obsSpaceData)
3468       intBodyColumn(bodyIndex) = obs_bodyElem_i(obsSpaceData,obsColumnIndex,bodyIndex)
3469     end do
3470
3471   end subroutine obs_extractObsIntBodyColumn
3472
3473
3474   subroutine obs_extractObsRealHeaderColumn(realHeaderColumn, obsSpaceData, obsColumnIndex)
3475     !:Purpose: Extract contents of a real header column into a vector. Note
3476     !           that the output can be either in the form of a vector with
3477     !           length equal to the number of rows in the body OR header table.
3478     !
3479     implicit none
3480
3481     ! Arguments:
3482     real(pre_obsReal), intent(out) :: realHeaderColumn(:)
3483     type(struct_obs),  intent(in)  :: obsSpaceData
3484     integer,           intent(in)  :: obsColumnIndex
3485
3486     ! Locals:
3487     integer :: bodyIndex, headerIndex
3488
3489     if (size(realHeaderColumn) == obs_numBody(obsSpaceData)) then
3490       do bodyIndex = 1, obs_numBody(obsSpaceData)
3491         headerIndex = obs_bodyElem_i(obsSpaceData,OBS_HIND,bodyIndex)
3492         realHeaderColumn(bodyIndex) = obs_headElem_r(obsSpaceData,obsColumnIndex,headerIndex)
3493       end do
3494     else if(size(realHeaderColumn) == obs_numHeader(obsSpaceData)) then
3495       do headerIndex = 1, obs_numHeader(obsSpaceData)
3496         realHeaderColumn(headerIndex) = obs_headElem_r(obsSpaceData,obsColumnIndex,headerIndex)
3497       end do
3498     else
3499       call obs_abort('extractObsRealHeaderColumn: size of output vector invalid')
3500     end if
3501
3502   end subroutine obs_extractObsRealHeaderColumn
3503
3504
3505   subroutine obs_extractObsIntHeaderColumn(intHeaderColumn, obsSpaceData, obsColumnIndex)
3506     !:Purpose: Extract contents of an integer header column into a vector. Note
3507     !           that the output can be either in the form of a vector with
3508     !           length equal to the number of rows in the body OR header table.
3509     !
3510     implicit none
3511
3512     ! Arguments:
3513     integer,          intent(out) :: intHeaderColumn(:)
3514     type(struct_obs), intent(in)  :: obsSpaceData
3515     integer,          intent(in)  :: obsColumnIndex
3516
3517     ! Locals:
3518     integer :: bodyIndex, headerIndex
3519
3520     if (size(intHeaderColumn) == obs_numBody(obsSpaceData)) then
3521       do bodyIndex = 1, obs_numBody(obsSpaceData)
3522         headerIndex = obs_bodyElem_i(obsSpaceData,OBS_HIND,bodyIndex)
3523         intHeaderColumn(bodyIndex) = obs_headElem_i(obsSpaceData,obsColumnIndex,headerIndex)
3524       end do
3525     else if(size(intHeaderColumn) == obs_numHeader(obsSpaceData)) then
3526       do headerIndex = 1, obs_numHeader(obsSpaceData)
3527         intHeaderColumn(headerIndex) = obs_headElem_i(obsSpaceData,obsColumnIndex,headerIndex)
3528       end do
3529     else
3530       call obs_abort('extractObsIntHeaderColumn: size of output vector invalid')
3531     end if
3532
3533   end subroutine obs_extractObsIntHeaderColumn
3534
3535
3536   subroutine obs_finalize(obsdat)
3537      !
3538      !:Purpose: De-allocate memory and clean up the object.
3539      !      De-allocate object arrays, and perform any other clean-up that is
3540      !      necessary before object deletion.
3541      !
3542      implicit none
3543
3544      ! Arguments:
3545      type(struct_obs), intent(inout) :: obsdat
3546
3547      call obs_deallocate(obsdat)
3548   end subroutine obs_finalize
3549
3550
3551   function obs_getBodyIndex_depot(obsdat) result(row_index)
3552      !
3553      !:Purpose:
3554      !      Return the next element from the current body list
3555      !
3556      implicit none
3557
3558      ! Arguments:
3559      type(struct_obs), intent(inout) :: obsdat
3560      ! Result:
3561      integer :: row_index
3562
3563      row_index = ild_get_next_index(obsdat%body_index_list_depot)
3564   end function obs_getBodyIndex_depot
3565
3566
3567   function obs_getBodyIndex_private(private_list) result(row_index)
3568      !
3569      !:Purpose:
3570      !      Return the next element from the supplied private body list
3571      !
3572      implicit none
3573
3574      ! Arguments:
3575      type(struct_index_list), pointer, intent(inout) :: private_list
3576      ! Result:
3577      integer :: row_index
3578
3579      row_index = ild_get_next_index(private_list)
3580   end function obs_getBodyIndex_private
3581
3582
3583   function obs_getFamily(obsdat,headerIndex_opt,bodyIndex_opt)
3584      !
3585      !:Purpose:
3586      !      Return the family for the indicated header, or else for the
3587      !      indicated body.
3588      !
3589      implicit none
3590
3591      ! Arguments:
3592      type(struct_obs),  intent(in) :: obsdat
3593      integer, optional, intent(in) :: headerIndex_opt
3594      integer, optional, intent(in) :: bodyIndex_opt
3595      ! Result:
3596      character(len=2)              :: obs_getFamily
3597
3598      ! Locals:
3599      integer          :: headerIndex
3600
3601      if(present(headerIndex_opt)) then
3602         headerIndex=headerIndex_opt
3603      elseif(present(bodyIndex_opt)) then
3604         headerIndex=obs_bodyElem_i(obsdat,OBS_HIND,bodyIndex_opt)
3605      else
3606         call obs_abort('OBS_GETFAMILY: Header or Body index must be specified!')
3607         return
3608      endif
3609
3610      obs_getFamily=obsdat%cfamily(headerIndex)
3611
3612   end function obs_getFamily
3613
3614
3615   function obs_getNclassAvhrr() 
3616     !
3617     !:Purpose:
3618     !      to get the number of  AVHRR radiance classes
3619     !
3620     implicit none
3621
3622     ! Result:
3623     integer :: obs_getNclassAvhrr
3624
3625     obs_getNclassAvhrr = ( OBS_CF7 - OBS_CF1 + 1 )
3626
3627   end function obs_getNclassAvhrr
3628
3629   function obs_getNchanAvhrr() 
3630     !
3631     !:Purpose:
3632     !      to get the number of AVHRR channels
3633     !
3634     implicit none
3635
3636     ! Result:
3637     integer :: obs_getNchanAvhrr
3638
3639     obs_getNchanAvhrr = ( OBS_M1C6 - OBS_M1C1 + 1 )
3640
3641   end function obs_getNchanAvhrr
3642
3643
3644   function obs_getHeaderIndex(obsdat) result(row_index)
3645      !
3646      !:Purpose:
3647      !      Return the next element from the current header list.
3648      !
3649      implicit none
3650
3651      ! Arguments:
3652      type(struct_obs), intent(inout) :: obsdat
3653      ! Result:
3654      integer :: row_index
3655
3656      row_index = ild_get_next_index(obsdat%header_index_list_depot)
3657   end function obs_getHeaderIndex
3658
3659
3660   function obs_headElem_i(obsdat,column_index,row_index) result(value_i)
3661      !
3662      !:Purpose: Get an integer-valued header observation-data element.
3663      !      To control access to the observation object.  Returns the (integer)
3664      !      value of the row_index'th ObsData element with the indicated column
3665      !      index from the "header".
3666      !
3667      implicit none
3668
3669      ! Arguments:
3670      type(struct_obs), intent(in)  :: obsdat
3671      integer         , intent(in)  :: column_index
3672      integer         , intent(in)  :: row_index
3673      ! Result:
3674      integer                       :: value_i
3675
3676      ! Locals:
3677      real(pre_obsReal) :: value_r         ! not used
3678
3679      call odc_columnElem(obsdat%intHeaders, column_index, row_index, &
3680                          value_i, value_r)
3681   end function obs_headElem_i
3682
3683
3684   function obs_headElem_r(obsdat,column_index,row_index) result(value_r)
3685      !
3686      !:Purpose: Get a real-valued header observation-data element.
3687      !      To control access to the observation object.  Returns the (real)
3688      !      value of the row_index'th ObsData element with the indicated column
3689      !      index from the "header".
3690      !
3691      implicit none
3692
3693      ! Arguments:
3694      type(struct_obs), intent(in)  :: obsdat
3695      integer         , intent(in)  :: column_index
3696      integer         , intent(in)  :: row_index
3697      ! Result:
3698      real(pre_obsReal)             :: value_r
3699
3700      ! Locals:
3701      integer :: value_i                ! unused
3702
3703      call odc_columnElem(obsdat%realHeaders, column_index, row_index, &
3704                          value_i, value_r)
3705   end function obs_headElem_r
3706
3707
3708   function obs_headPrimaryKey(obsdat,row_index) result(primaryKey)
3709      !
3710      !:Purpose: Get the header primary key value.
3711      !
3712      implicit none
3713
3714      ! Arguments:
3715      type(struct_obs), intent(in)  :: obsdat
3716      integer         , intent(in)  :: row_index
3717      ! Result:
3718      integer(8)                    :: primaryKey
3719
3720      primaryKey = obsdat%headerPrimaryKey(row_index)
3721
3722   end function obs_headPrimaryKey
3723
3724
3725   function obs_headerIndex_mpiglobal(obsdat,row_index) result(value)
3726      !
3727      !:Purpose: Get the mpiglobal header row index.
3728      !      To control access to the mpiglobal row_index into the "header".
3729      !
3730      implicit none
3731
3732      ! Arguments:
3733      type(struct_obs), intent(in)  :: obsdat
3734      integer         , intent(in)  :: row_index
3735      ! Result:
3736      integer value
3737
3738      value=obsdat%headerIndex_mpiglobal(row_index)
3739   end function obs_headerIndex_mpiglobal
3740
3741
3742   subroutine obs_headSet_i(obsdat, column_index, row_index, value_i)
3743      !
3744      !:Purpose: set an integer-valued header observation-data element.
3745      !      To control access to the observation object.  Sets the (integer)
3746      !      value of the row_index'th ObsData element with the indicated column
3747      !      index from the "header".
3748      !
3749      implicit none
3750
3751      ! Arguments:
3752      type(struct_obs), intent(inout)  :: obsdat
3753      integer         , intent(in)     :: column_index
3754      integer         , intent(in)     :: row_index
3755      integer         , intent(in)     :: value_i
3756
3757      call odc_columnSet(obsdat%intHeaders, column_index, row_index, &
3758                         value_i, NULL_COLUMN_VALUE_R, &
3759                         obsdat%numHeader, obsdat%numHeader_max)
3760   end subroutine obs_headSet_i
3761
3762
3763   subroutine obs_headSet_r4(obsdat, column_index, row_index, value_r4)
3764      !
3765      !:Purpose: set a real header value in the observation object.
3766      !      To control access to the observation object.  Sets the (real)
3767      !      value of the row_index'th ObsData element with the indicated column
3768      !      index from the "header".
3769      !
3770      implicit none
3771
3772      ! Arguments:
3773      type(struct_obs), intent(inout)  :: obsdat
3774      integer         , intent(in)     :: column_index
3775      integer         , intent(in)     :: row_index
3776      real(4)         , intent(in)     :: value_r4
3777
3778      ! Locals:
3779      real(pre_obsReal)                :: value_r
3780
3781      value_r = value_r4
3782
3783      call odc_columnSet(obsdat%realHeaders, column_index, row_index, &
3784                         NULL_COLUMN_VALUE_I, value_r, &
3785                         obsdat%numHeader, obsdat%numHeader_max)
3786   end subroutine obs_headSet_r4
3787
3788
3789   subroutine obs_headSet_r8(obsdat, column_index, row_index, value_r8)
3790      !
3791      !:Purpose: set a real header value in the observation object.
3792      !      To control access to the observation object.  Sets the (real)
3793      !      value of the row_index'th ObsData element with the indicated column
3794      !      index from the "header".
3795      !
3796      implicit none
3797
3798      ! Arguments:
3799      type(struct_obs), intent(inout)  :: obsdat
3800      integer         , intent(in)     :: column_index
3801      integer         , intent(in)     :: row_index
3802      real(8)         , intent(in)     :: value_r8
3803
3804      ! Locals:
3805      real(pre_obsReal)                :: value_r
3806
3807      value_r = value_r8
3808
3809      call odc_columnSet(obsdat%realHeaders, column_index, row_index, &
3810                         NULL_COLUMN_VALUE_I, value_r, &
3811                         obsdat%numHeader, obsdat%numHeader_max)
3812   end subroutine obs_headSet_r8
3813
3814
3815   subroutine obs_initialize(obsdat, numHeader_max_opt, numBody_max_opt, mpi_local_opt, &
3816                             silent_opt)
3817      !
3818      !:Purpose: Set an observation-data module to a known state.
3819      !      Initialize object variables, and allocate arrays according to the
3820      !      parameters, header_max and body_max.
3821      !
3822      implicit none
3823
3824      ! Arguments:
3825      type(struct_obs),  intent(inout) :: obsdat            ! inout allows detection of 2nd call
3826      integer, optional, intent(in)    :: numHeader_max_opt ! number of header elements allocated
3827      integer, optional, intent(in)    :: numBody_max_opt   ! total no. of body elements allocated
3828      logical, optional, intent(in)    :: mpi_local_opt
3829      logical, optional, intent(in)    :: silent_opt
3830
3831      ! Locals:
3832      logical :: silent
3833      integer :: nulnam,fnom,fclos,ierr
3834      character(len=120) :: message
3835
3836      ! Namelist variables:
3837      integer :: nmxobs  ! maximum number of rows in 'header' table (used for initial allocation)
3838      integer :: ndatamx ! maximum number of rows in 'body' table (used for initial allocation)
3839      namelist /namdimo/nmxobs,ndatamx
3840
3841      if(.not. obs_class_initialized) then
3842         call obs_abort('obs_class_initialize must be called before ' // &
3843                        'obs_initialize')
3844         return
3845      end if
3846
3847      !
3848      ! INITIALIZE ALL OBJECT VARIABLES
3849      !
3850      nullify(obsdat%headerPrimaryKey)
3851      nullify(obsdat%bodyPrimaryKey)
3852      nullify(obsdat%cstnid)
3853      nullify(obsdat%cfamily)
3854
3855      obsdat%numHeader     = 0
3856      obsdat%numHeader_max = 0
3857      obsdat%numBody       = 0
3858      obsdat%numBody_max   = 0
3859
3860      if(present(mpi_local_opt)) then
3861         obsdat%mpi_local  = mpi_local_opt
3862      else
3863         obsdat%mpi_local  = .false.
3864      end if
3865
3866      if(present(silent_opt)) then
3867         silent  = silent_opt
3868      else
3869         silent  = .false.
3870      end if
3871
3872      nullify(obsdat%headerIndex_mpiglobal)
3873      nullify(obsdat%bodyIndex_mpiglobal)
3874
3875      !
3876      ! DETERMINE THE ARRAY DIMENSIONS
3877      !
3878      if(present(numHeader_max_opt)) then
3879         ! numBody_max is necessarily also present
3880         nmxobs = numHeader_max_opt
3881         ndatamx = numBody_max_opt
3882
3883      else
3884         ! Initialize with bad values
3885         nmxobs=0
3886         ndatamx=0
3887
3888         ! Open the file, flnml
3889         nulnam=0
3890         ierr=fnom(nulnam,'./flnml','FTN+SEQ+R/O',0)
3891         if(ierr < 0) then
3892            write(message,*)'Failed to open flnml to obtain nmxobs and ndatamx:'&
3893                            // '  ierr=', ierr
3894            call obs_abort(message); return
3895         end if
3896
3897         ! Read the dimensions from a namelist
3898         read(nulnam,nml=namdimo,iostat=ierr)
3899         if(ierr /= 0) call obs_abort('obs_initialize: Error reading namelist')
3900         write(*,nml=namdimo)
3901         ierr=fclos(nulnam)
3902         ! Verify that the namelist contained values
3903         if(nmxobs <= 0 .or. ndatamx <= 0) then
3904            write(message,*)'From file, flnml, positive values were not ' &
3905                            // 'obtained for nmxobs or ndatamx:  ',nmxobs,ndatamx
3906            call obs_abort(message); return
3907         end if
3908      end if ! present(numHeader_max)
3909
3910      !
3911      ! ALLOCATE
3912      !
3913      call obs_allocate(obsdat, nmxobs, ndatamx, silent)
3914
3915   end subroutine obs_initialize
3916
3917
3918   subroutine obs_mpiDistributeIndices(obsdat)
3919      !
3920      !:Purpose:
3921      !  Compute headerIndex_mpiglobal and bodyIndex_mpiglobal: 
3922      !  this determines how obs are distributed over MPI processes
3923      !  and is needed for converting from mpiglobal to mpilocal and vice versa.
3924      !  The header indices are distributed following the chosen strategy,
3925      !  currently either "round robin" or by latitude bands.
3926      !
3927      !:Note: this subroutine is called before converting from mpiglobal to 
3928      !        mpilocal
3929      !
3930      !:Comments:  In principle this method could have obtained
3931      !           my_mpi_id by use'ing the module, mpi.  However, it queries
3932      !           rpn_comm for itself because the mpi module belongs to the
3933      !           3dvar code, whereas the present module is shared code.
3934      !
3935      implicit none
3936
3937      ! Arguments:
3938      type(struct_obs), intent(inout) :: obsdat
3939
3940      ! Locals:
3941      integer :: headerIndex_mpiglobal,headerIndex_mpilocal
3942      integer ::   bodyIndex_mpiglobal,  bodyIndex_mpilocal
3943      integer :: numHeader_mpiLocal,numBody_mpiLocal,idata,idataend
3944      integer :: my_mpi_id, my_mpi_idx_dummy, my_mpi_idy_dummy
3945
3946      write(*,*) '-------- Start obs_mpiDistributeIndices ---------'
3947
3948      if(obsdat%mpi_local) then
3949         call obs_abort( &
3950                      'obs_mpiDistributeIndices: data already mpi-local, Abort!')
3951         return
3952      end if
3953
3954      call rpn_comm_mype(my_mpi_id, my_mpi_idx_dummy, my_mpi_idy_dummy)
3955
3956      ! Count number of headers and bodies for each processor
3957      numHeader_mpiLocal=0
3958      numBody_mpiLocal=0
3959      do headerIndex_mpiglobal=1,obsdat%numHeader
3960         if ( my_mpi_id == obs_headElem_i( obsdat, OBS_IP, headerIndex_mpiglobal )) then
3961            numHeader_mpiLocal = numHeader_mpiLocal + 1
3962            numBody_mpilocal = numBody_mpilocal &
3963                            +obs_headElem_i( obsdat, OBS_NLV, headerIndex_mpiglobal )
3964         end if
3965      enddo
3966      write(*,*) 'obs_mpidistributeindices: numHeader_mpiLocal,global=',  &
3967                 numHeader_mpiLocal,obsdat%numHeader
3968      write(*,*) 'obs_mpidistributeindices: numBody_mpiLocal,global=',  &
3969                 numBody_mpiLocal,obsdat%numBody
3970
3971      if ( numHeader_mpilocal > 0 ) then
3972         ! Allocate the list of global header indices
3973         allocate(obsdat%headerIndex_mpiglobal(numHeader_mpilocal))
3974         obsdat%headerIndex_mpiglobal(:)=0
3975      else
3976         nullify(obsdat%headerIndex_mpiglobal)
3977         write(*,*) 'This mpi processor has zero headers.'
3978      end if
3979
3980      if ( numBody_mpilocal > 0 ) then
3981         ! Allocate the list of global body indices
3982         allocate(obsdat%bodyIndex_mpiglobal(numBody_mpilocal))
3983         obsdat%bodyIndex_mpiglobal(:)=0
3984      else
3985         nullify(obsdat%bodyIndex_mpiglobal)
3986         write(*,*) 'This mpi processor has zero bodies to treat'
3987      endif
3988
3989      ! determine the list of header indices
3990      headerIndex_mpilocal = 0
3991      do headerIndex_mpiglobal=1,obsdat%numHeader
3992         if ( my_mpi_id == obs_headElem_i( obsdat, OBS_IP, headerIndex_mpiglobal )) then
3993            headerIndex_mpilocal=headerIndex_mpilocal+1
3994            obsdat%headerIndex_mpiglobal(headerIndex_mpilocal) &
3995                                                           =headerIndex_mpiglobal
3996         endif
3997      enddo
3998
3999      ! determine the corresponding list of body indices
4000      bodyIndex_mpilocal=0
4001      do headerIndex_mpilocal=1,numHeader_mpilocal
4002         headerIndex_mpiglobal=obsdat%headerIndex_mpiglobal(headerIndex_mpilocal)
4003         idata= obs_headElem_i(obsdat, OBS_RLN, headerIndex_mpiglobal)
4004         idataend = obs_headElem_i(obsdat, OBS_NLV, headerIndex_mpiglobal) &
4005                  + idata -1 
4006         do bodyIndex_mpiglobal=idata,idataend 
4007            bodyIndex_mpilocal=bodyIndex_mpilocal+1 
4008            obsdat%bodyIndex_mpiglobal(bodyIndex_mpilocal) = bodyIndex_mpiglobal
4009         enddo
4010      enddo
4011
4012      write(*,*) '-------- END OF obs_mpiDistributeIndices ---------'
4013      write(*,*) ' '
4014   end subroutine obs_mpiDistributeIndices
4015
4016
4017   logical function obs_mpiLocal(obsdat)
4018      !
4019      !:Purpose: returns true if the object contains only data that are
4020      !          needed by the current mpi PE; false if it contains all
4021      !          data.
4022      !      To provide the state of the internal variable, mpiLocal.  This
4023      !      method exists primarily to facilitate unit tests on this module.
4024      !
4025      implicit none
4026
4027      ! Arguments:
4028      type(struct_obs) , intent(in)  :: obsdat
4029
4030      obs_mpiLocal=obsdat%mpi_local
4031   end function obs_mpiLocal
4032
4033
4034   integer function obs_numBody(obsdat)
4035      !
4036      !:Purpose: returns the number of mpi-local bodies recorded.
4037      !      To provide the number of bodies that are currently recorded in the
4038      !      mpi-local observation-data object.
4039      !
4040      implicit none
4041
4042      ! Arguments:
4043      type(struct_obs) , intent(in)  :: obsdat
4044
4045      obs_numBody=obsdat%numBody
4046
4047   end function obs_numBody
4048
4049
4050   integer function obs_numBody_max(obsdat)
4051      !
4052      !:Purpose: returns the dimensioned mpi-local number of bodies.
4053      !      To provide the dimension for the number of bodies in the mpi-local
4054      !      observation-data object.
4055      !
4056      implicit none
4057
4058      ! Arguments:
4059      type(struct_obs) , intent(in)  :: obsdat
4060
4061      obs_numBody_max=obsdat%numBody_max
4062
4063   end function obs_numBody_max
4064
4065
4066   integer function obs_numBody_mpiglobal(obsdat)
4067      !
4068      !:Purpose: returns the number of bodies recorded in the
4069      !           entire mpi-global obs object.
4070      !      To provide the number of bodies that are currently recorded in the
4071      !      entire mpi-global observation-data object.
4072      !
4073      implicit none
4074
4075      ! Arguments:
4076      type(struct_obs), intent(in)  :: obsdat
4077
4078      ! Locals:
4079      integer :: numBody_mpiGlobal, sizedata, ierr
4080
4081      if(obsdat%mpi_local)then
4082         sizedata=1
4083         call rpn_comm_allreduce(obsdat%numBody,numBody_mpiGlobal,sizedata, &
4084                                 "mpi_integer","mpi_sum","GRID",ierr)
4085         obs_numBody_mpiglobal = numBody_mpiGlobal
4086      else
4087         obs_numBody_mpiglobal = obsdat%numBody
4088      end if
4089
4090   end function obs_numBody_mpiglobal
4091
4092
4093   integer function obs_numHeader(obsdat)
4094      !
4095      !:Purpose: returns the number of mpi-local headers recorded.
4096      !      To provide the number of headers that are currently recorded in the
4097      !      observation-data object.
4098      !
4099      implicit none
4100
4101      ! Arguments:
4102      type(struct_obs) , intent(in)  :: obsdat
4103
4104      obs_numHeader=obsdat%numHeader
4105   end function obs_numHeader
4106
4107
4108   integer function obs_numHeader_max(obsdat)
4109      !
4110      !:Purpose: returns the dimensioned mpi-local number of headers.
4111      !      To provide the dimension for the number of headers in the mpi-local
4112      !      observation-data object.
4113      !
4114      implicit none
4115
4116      ! Arguments:
4117      type(struct_obs) , intent(in)  :: obsdat
4118
4119      obs_numHeader_max=obsdat%numHeader_max
4120   end function obs_numHeader_max
4121
4122
4123   integer function obs_numHeader_mpiglobal(obsdat)
4124      !
4125      !:Purpose: returns the number of headers recorded in
4126      !           the entire mpi-global obs object.
4127      !      To provide the number of headers that are currently recorded in the
4128      !      entire mpi-global observation-data object.
4129      !
4130      implicit none
4131
4132      ! Arguments:
4133      type(struct_obs) , intent(in)  :: obsdat
4134
4135      ! Locals:
4136      integer :: numHeader_mpiGlobal, sizedata, ierr
4137
4138      if(obsdat%mpi_local)then
4139         sizedata=1
4140         call rpn_comm_allreduce(obsdat%numHeader,numHeader_mpiGlobal,sizedata, &
4141                                 "mpi_integer","mpi_sum","GRID",ierr)
4142         obs_numHeader_mpiglobal = numHeader_mpiGlobal
4143      else
4144         obs_numHeader_mpiglobal = obsdat%numHeader
4145      end if
4146   end function obs_numHeader_mpiglobal
4147
4148
4149   subroutine obs_print( obsdat, nobsout )
4150      !
4151      !:Purpose: print the contents of the obsdat to an ASCII file
4152      !
4153      !:Arguments:
4154      !           :obsdat:  obsSpaceData object
4155      !           :nobsout: unit used for printing
4156      !
4157      implicit none
4158
4159      ! Arguments:
4160      type(struct_obs), intent(in) :: obsdat
4161      integer,          intent(in) :: nobsout
4162
4163      ! Locals:
4164      integer :: jo
4165
4166      do jo=1,obsdat%numHeader
4167         call obs_enkf_prnthdr(obsdat,jo,nobsout)
4168         call obs_enkf_prntbdy(obsdat,jo,nobsout)
4169      enddo
4170
4171      return
4172   end subroutine obs_print
4173
4174
4175   subroutine obs_prntbdy( obsdat , index_header, unitout_opt )
4176      !
4177      !:Purpose: Print all data records associated with an observation
4178      !
4179      !:Arguments:
4180      !           :obsdat:  obsSpaceData object
4181      !           :index_header: index of the group of observations to be printed
4182      !           :unitout: unit number on which to print
4183      !
4184      implicit none
4185
4186      ! Arguments:
4187      type(struct_obs),  intent(in) :: obsdat
4188      integer         ,  intent(in) :: index_header
4189      integer, optional, intent(in) :: unitout_opt ! variable output unit facilitates unit testing
4190
4191      ! Locals:
4192      integer :: unitout_
4193      integer :: ipnt, idata, idata2, jdata, ivco
4194      character(len=13) :: ccordtyp(4)
4195      integer :: obsVNM, obsFLG, obsASS
4196      real(pre_obsReal) :: obsPPP, obsVAR, obsOMP, obsOMA, obsOER, obsHPHT, obsOMPE
4197
4198      if ( present( unitout_opt ) ) then
4199        unitout_ = unitout_opt
4200      else
4201        unitout_ = 6
4202      end if
4203
4204      ccordtyp(1) = 'HEIGHT      :'
4205      ccordtyp(2) = 'PRESSURE    :'
4206      ccordtyp(3) = 'CHANNEL NUM :'
4207      ccordtyp(4) = 'VCO UNDEFINED'
4208      !
4209      ! 1. General information
4210      !
4211      ipnt  = obs_headElem_i(obsdat,OBS_RLN,index_header)
4212      idata = obs_headElem_i(obsdat,OBS_NLV,index_header)
4213
4214      if ( idata == 1) then
4215        write ( unitout_, fmt=9101 ) idata, index_header, NBDY_INT_SIZE+NBDY_REAL_SIZE
4216      else
4217        write ( unitout_, fmt=9100 ) idata, index_header, NBDY_INT_SIZE+NBDY_REAL_SIZE
4218      end if
4219
42209100  format(4x,'THERE ARE ', &
4221         i3,1x,'DATA IN OBSERVATION RECORD NO.' &
4222         ,1x,i6,4x,'DATA RECORD''S LENGTH:',i6)
42239101  format(4x,'THERE IS ', &
4224         i3,1x,'DATUM IN OBSERVATION RECORD NO.' &
4225         ,1x,i6,4x,'DATA RECORD''S LENGTH:',i6)
4226      !
4227      ! 2. Print all data records
4228      !
4229      do jdata = ipnt, ipnt + idata - 1
4230        idata2 = jdata -ipnt + 1
4231        if ( obs_bodyElem_i( obsdat, OBS_ASS, jdata ) >= 0) then
4232          ivco = obs_bodyElem_i( obsdat, OBS_VCO, jdata )
4233          if ( ivco < 1 .or. ivco > 3 ) ivco = 4
4234          obsVNM = obs_bodyElem_i( obsdat, OBS_VNM , jdata )
4235          obsPPP = obs_bodyElem_r( obsdat, OBS_PPP , jdata )
4236          obsVAR = obs_bodyElem_r( obsdat, OBS_VAR , jdata )
4237          obsOMP = obs_bodyElem_r( obsdat, OBS_OMP , jdata )
4238          obsOMA = obs_bodyElem_r( obsdat, OBS_OMA , jdata )
4239          obsOER = obs_bodyElem_r( obsdat, OBS_OER , jdata )
4240          obsHPHT= obs_bodyElem_r( obsdat, OBS_HPHT, jdata )
4241          obsOMPE= obs_bodyElem_r( obsdat, OBS_OMPE, jdata )
4242          obsFLG = obs_bodyElem_i( obsdat, OBS_FLG , jdata )
4243          obsASS = obs_bodyElem_i( obsdat, OBS_ASS , jdata )
4244          write ( unitout_, fmt=9201 ) idata2, &
4245               obsVNM, ccordtyp(ivco), &
4246               obsPPP,obsVAR,obsOMP,obsOMA, &
4247               obsOER,obsHPHT,obsOMPE,obsFLG,obsASS
4248        end if
4249      end do
4250
42519201  format(4x,'DATA NO.',i6,/,10x &
4252         ,'VARIABLE NO.:',i6,4x,a13,g13.6,4x &
4253         ,/,10x &
4254         ,'OBSERVED VALUE:',g23.16,5x,'OBSERVED - BACKGROUND VALUES:' &
4255         ,g23.16,4x &
4256         ,/,10x &
4257         ,'OBSERVED - ANALYZED VALUES:',g13.6,4x &
4258         ,/,10x &
4259         ,'ERROR STANDARD DEVIATIONS FOR' &
4260         ,/,20x &
4261         ,'OBSERVATION:',g13.6,4x &
4262         ,/,20x &
4263         ,'FIRST-GUESS:',g13.6,4x &
4264         ,/,20x &
4265         ,'OBS minus F-G:',g13.6,4x &
4266         ,/,10x &
4267         ,'BURP FLAGS:',i6,4x,'OBS. ASSIMILATED (1-->YES;0-->NO):',i3)
4268
4269      return
4270
4271   end subroutine obs_prntbdy
4272
4273
4274   subroutine obs_prnthdr( obsdata, index_hd, unitout_opt )
4275      !
4276      !:Purpose: Printing of the header of an observation record
4277      !:Arguments:
4278      !           :obsdata:  obsSpaceData object
4279      !           :index_hd: index of the header to be printed
4280      !           :unitout_opt: unit number on which to print
4281      !
4282      implicit none
4283
4284      ! Arguments:
4285      type(struct_obs),  intent(in) :: obsdata
4286      integer         ,  intent(in) :: index_hd
4287      integer, optional, intent(in) :: unitout_opt ! variable output unit facilitates unit testing
4288
4289      ! Locals:
4290      integer :: unitout_
4291      integer :: obsRLN, obsONM, obsINS, obsIDF, obsITY
4292      integer :: obsDAT, obsETM, obsNLV, obsST1, obsSTYP, obsTTYP
4293      real(pre_obsReal) :: obsLON, obsLAT, obsALT
4294      character(len=12) :: stnid
4295
4296      if ( present( unitout_opt ) ) then
4297         unitout_ = unitout_opt
4298      else
4299         unitout_ = 6
4300      end if
4301
4302      !
4303      ! 1. General information
4304      !
4305      write(unitout_,fmt=9100)index_hd, NHDR_INT_SIZE + NHDR_REAL_SIZE
43069100  format(//,10x,'-- OBSERVATION RECORD NO.' &
4307         ,1x,i6,3x,'HEADER''S LENGTH:',i6)
4308      !
4309      ! 2. PRINT HEADER'S CONTENT
4310      !
4311      obsRLN = obs_headElem_i(obsdata,OBS_RLN,index_hd)
4312      obsONM = obs_headElem_i(obsdata,OBS_ONM,index_hd)
4313      obsINS = obs_headElem_i(obsdata,OBS_INS,index_hd)
4314      obsIDF = obs_headElem_i(obsdata,OBS_IDF,index_hd)
4315      obsITY = obs_headElem_i(obsdata,OBS_ITY,index_hd)
4316      obsLAT = obs_headElem_r(obsdata,OBS_LAT,index_hd)
4317      obsLON = obs_headElem_r(obsdata,OBS_LON,index_hd)
4318      obsDAT = obs_headElem_i(obsdata,OBS_DAT,index_hd)
4319      obsETM = obs_headElem_i(obsdata,OBS_ETM,index_hd)
4320      obsALT = obs_headElem_r(obsdata,OBS_ALT,index_hd)
4321      obsNLV = obs_headElem_i(obsdata,OBS_NLV,index_hd)
4322      obsSTYP= obs_headElem_i(obsdata,OBS_STYP,index_hd)
4323      obsTTYP= obs_headElem_i(obsdata,OBS_TTYP,index_hd)
4324      obsST1 = obs_headElem_i(obsdata,OBS_ST1,index_hd)
4325      stnid  = obs_elem_c(obsdata,'STID',index_hd)
4326      write(unitout_,fmt=9200) obsRLN, obsONM, obsINS, obsIDF, obsITY, &
4327           obsLAT, obsLON, obsDAT, obsETM, stnid, obsALT, obsNLV, &
4328           obsSTYP, obsTTYP, obsST1
4329
43309200  format(6x,'Position within realBodies:',i6,1x,'OBS. NUMBER:',i6,1x &
4331         ,'INSTR. ID:',i6,1x,'OBS. TYPE:',i6,1x &
4332         ,'INSTR./RETR. TYPE:',i6,1x &
4333         ,/,6x &
4334         ,'OBSERVATION LOCATION. (LAT,LON):',2(f10.4,1x) &
4335         ,'DATE:',i12,1x,'EXACT TIME: ',i6,1x &
4336         ,/,6x &
4337         ,'STATION ID:',a9,1x &
4338         ,'STATION''S ALTITUDE:',g13.6,1x &
4339         ,'NUMBER OF DATA:',i6,1x &
4340         ,/,6x &
4341         ,'SURFACE TYPE:',i4,1x &
4342         ,'TERRAIN TYPE:',i4,1x &
4343         ,'REPORT STATUS 2:',i10,1x &
4344         ,/,6x &
4345         )
4346
4347      return
4348   end subroutine obs_prnthdr
4349
4350
4351   subroutine obs_reduceToMpiLocal(obsdat)
4352      !
4353      !:Purpose: re-construct observation data object by
4354      !          giving local Obs TAG. 
4355      !      To retain in the observation object only those data that are
4356      !      pertinent to the present mpi processor, i.e. convert from mpiglobal
4357      !      to mpilocal.
4358      !
4359      implicit none
4360
4361      ! Arguments:
4362      type(struct_obs), intent(inout) :: obsdat
4363
4364      ! Locals:
4365      integer(8),        allocatable,dimension(:)   :: headerPrimaryKey_tmp
4366      integer(8),        allocatable,dimension(:)   :: bodyPrimaryKey_tmp
4367      character(len=12), allocatable,dimension(:)   :: cstnid_tmp
4368      character(len=2),  allocatable,dimension(:)   :: cfamily_tmp
4369      real(pre_obsReal), allocatable,dimension(:,:) :: realHeaders_tmp
4370      real(pre_obsReal), allocatable,dimension(:,:) :: realBodies_tmp
4371      integer,allocatable,dimension(:,:) :: intHeaders_tmp,intBodies_tmp
4372      integer :: numHeader_mpilocal,numHeader_mpiglobal
4373      integer ::   numBody_mpilocal,  numBody_mpiglobal
4374      integer :: bodyIndex_mpilocal,bodyIndex_mpiglobal
4375      integer :: headerIndex_mpilocal,headerIndex_mpiglobal
4376      integer :: idataend,idata,active_index
4377      integer :: column_index
4378
4379      !!---------------------------------------------------------------
4380      WRITE(*,*) '============= Enter obs_reduceToMpiLocal =============='
4381
4382      if(obsdat%mpi_local)then
4383         call obs_abort('OBS_REDUCETOMPILOCAL() has been called, but the ' &
4384                        // 'obsSpaceData object is already in mpi-local state')
4385         return
4386      end if
4387
4388      if(obsdat%numHeader_max == 0)then
4389         call obs_abort('OBS_REDUCETOMPILOCAL() has been called when there are '&
4390                        // 'no data.  Obs_reduceToMpiLocal cannot be called ' &
4391                        // 'after a call to obs_expandToMpiGlobal.')
4392         return
4393      end if
4394
4395      ! compute the mpilocal lists of indices into the mpiglobal data
4396      call obs_mpiDistributeIndices(obsdat)
4397
4398      ! calculate the size of the local obs data  
4399      if(associated(obsdat%headerIndex_mpiglobal)) then
4400         numHeader_mpilocal=size(obsdat%headerIndex_mpiglobal) 
4401      else
4402         numHeader_mpilocal=0
4403      endif
4404      numBody_mpiLocal=0
4405      do headerIndex_mpilocal=1,numHeader_mpilocal
4406         headerIndex_mpiglobal=obsdat%headerIndex_mpiglobal(headerIndex_mpilocal)
4407         idata=obs_headElem_i(obsdat, OBS_NLV, headerIndex_mpiglobal)
4408         numBody_mpiLocal = numBody_mpiLocal + idata
4409      enddo
4410
4411      numHeader_mpiGlobal = obs_numHeader(obsdat)
4412      numBody_mpiGlobal   = obs_numBody(obsdat)
4413
4414      ! allocate temporary arrays to hold mpilocal data
4415      if(numHeader_mpiLocal > 0) then
4416         allocate(headerPrimaryKey_tmp(numHeader_mpiLocal)) 
4417         allocate(cfamily_tmp(    numHeader_mpiLocal)) 
4418         allocate( cstnid_tmp(    numHeader_mpiLocal)) 
4419         allocate(realHeaders_tmp(odc_numActiveColumn(obsdat%realHeaders), &
4420                                  numHeader_mpilocal))
4421         allocate( intHeaders_tmp(odc_numActiveColumn(obsdat%intHeaders), &
4422                                  numHeader_mpilocal))
4423      endif
4424
4425      if(numBody_mpiLocal > 0) then
4426         allocate(bodyPrimaryKey_tmp(numBody_mpiLocal)) 
4427         allocate( realBodies_tmp(odc_numActiveColumn(obsdat%realBodies), &
4428                                  numBody_mpilocal))
4429         allocate(  intBodies_tmp(odc_numActiveColumn(obsdat%intBodies), &
4430                                  numBody_mpilocal))
4431      endif
4432
4433      ! copy the mpilocal data to temporary arrays: header-level data
4434      do headerIndex_mpilocal=1,numHeader_mpilocal 
4435         headerIndex_mpiglobal=obsdat%headerIndex_mpiglobal(headerIndex_mpilocal)
4436
4437         do active_index=1,odc_numActiveColumn(obsdat%realHeaders)
4438            column_index=odc_columnIndexFromActiveIndex( &
4439                                    obsdat%realHeaders%odc_flavour, active_index)
4440            realHeaders_tmp(active_index,headerIndex_mpilocal)= &
4441                      obs_headElem_r(obsdat, column_index, headerIndex_mpiglobal)
4442         enddo
4443
4444         do active_index=1,odc_numActiveColumn(obsdat%intHeaders)
4445            column_index=odc_columnIndexFromActiveIndex( &
4446                                     obsdat%intHeaders%odc_flavour, active_index)
4447            intHeaders_tmp(active_index,headerIndex_mpilocal)= &
4448                      obs_headElem_i(obsdat, column_index, headerIndex_mpiglobal)
4449         enddo
4450
4451         headerPrimaryKey_tmp(headerIndex_mpilocal) = &
4452                obsdat%headerPrimaryKey(headerIndex_mpiglobal)
4453
4454         cstnid_tmp (headerIndex_mpilocal) =obsdat%cstnid (headerIndex_mpiglobal)
4455         cfamily_tmp(headerIndex_mpilocal) =obsdat%cfamily(headerIndex_mpiglobal)
4456
4457                                        ! Make RLN point to local data
4458         if(headerIndex_mpilocal == 1) then
4459            intHeaders_tmp &
4460                (odc_activeIndexFromColumnIndex( &
4461                                        obsdat%intHeaders%odc_flavour,OBS_RLN), &
4462                 1 &
4463                ) = 1
4464         else
4465            intHeaders_tmp &
4466                (odc_activeIndexFromColumnIndex( &
4467                                        obsdat%intHeaders%odc_flavour,OBS_RLN), &
4468                 headerIndex_mpilocal &
4469                ) = intHeaders_tmp(odc_activeIndexFromColumnIndex( &
4470                                        obsdat%intHeaders%odc_flavour,OBS_RLN), &
4471                                   headerIndex_mpilocal-1 &
4472                                  ) &
4473                  + intHeaders_tmp(odc_activeIndexFromColumnIndex( &
4474                                        obsdat%intHeaders%odc_flavour,OBS_NLV), &
4475                                   headerIndex_mpilocal-1 &
4476                                  ) 
4477         endif
4478      enddo
4479
4480      ! copy the mpilocal data to temporary arrays: body-level data
4481      bodyIndex_mpilocal=0 
4482      do headerIndex_mpilocal=1,numHeader_mpilocal
4483         headerIndex_mpiglobal=obsdat%headerIndex_mpiglobal(headerIndex_mpilocal)
4484 
4485                                        ! Make HIND point to local header
4486         idata    = obs_headElem_i(obsdat, OBS_RLN,headerIndex_mpiglobal)
4487         idataend = obs_headElem_i(obsdat, OBS_NLV,headerIndex_mpiglobal)+idata-1
4488         do bodyIndex_mpiglobal=idata,idataend 
4489            bodyIndex_mpilocal=bodyIndex_mpilocal+1 
4490
4491            bodyPrimaryKey_tmp(bodyIndex_mpilocal)= &
4492                        obsdat%bodyPrimaryKey(bodyIndex_mpiglobal)
4493
4494            do active_index=1,odc_numActiveColumn(obsdat%realBodies)
4495               column_index=odc_columnIndexFromActiveIndex( &
4496                                      obsdat%realBodies%odc_flavour,active_index)
4497               realBodies_tmp(active_index,bodyIndex_mpilocal)= &
4498                        obs_bodyElem_r(obsdat, column_index, bodyIndex_mpiglobal)
4499            enddo
4500
4501            do active_index=1,odc_numActiveColumn(obsdat%intBodies)
4502               column_index=odc_columnIndexFromActiveIndex( &
4503                                       obsdat%intBodies%odc_flavour,active_index)
4504               intBodies_tmp(active_index,bodyIndex_mpilocal)= &
4505                        obs_bodyElem_i(obsdat, column_index, bodyIndex_mpiglobal)
4506            enddo
4507
4508            intBodies_tmp(odc_activeIndexFromColumnIndex( &
4509                                       obsdat%intBodies%odc_flavour, OBS_HIND), &
4510                          bodyIndex_mpilocal &
4511                         ) = headerIndex_mpilocal
4512         enddo
4513      enddo
4514
4515      ! destroy object's mpiglobal data and allocate mpilocal data
4516      obsdat%numHeader=numHeader_mpiLocal
4517      obsdat%numBody=numBody_mpiLocal
4518      call obs_deallocate(obsdat)
4519      call obs_allocate(obsdat,obsdat%numHeader,obsdat%numBody)
4520
4521      ! copy all data from temporary arrays to object's arrays
4522      HEADER:if(numHeader_mpiLocal > 0) then
4523         obsdat%headerPrimaryKey(:)=headerPrimaryKey_tmp(:  ) 
4524         obsdat%cfamily(:  )=cfamily_tmp(:  ) 
4525         obsdat%cstnid (:  )= cstnid_tmp(:  )
4526         do active_index=1,odc_numActiveColumn(obsdat%realHeaders)
4527            column_index=odc_columnIndexFromActiveIndex( &
4528                                    obsdat%realHeaders%odc_flavour, active_index)
4529            obsdat%realHeaders%columns(column_index)%value_r(:) &
4530                                                 =realHeaders_tmp(active_index,:)
4531         enddo
4532         do active_index=1,odc_numActiveColumn(obsdat%intHeaders)
4533            column_index=odc_columnIndexFromActiveIndex( &
4534                                     obsdat%intHeaders%odc_flavour, active_index)
4535            obsdat%intHeaders%columns(column_index)%value_i(:) &
4536                                                  =intHeaders_tmp(active_index,:)
4537         enddo
4538
4539         ! deallocate temporary arrays
4540         deallocate(headerPrimaryKey_tmp)
4541         deallocate(cfamily_tmp)
4542         deallocate(cstnid_tmp)
4543         deallocate(realHeaders_tmp)
4544         deallocate(intHeaders_tmp)
4545      endif HEADER
4546
4547      BODY:if(numBody_mpiLocal > 0) then
4548         obsdat%bodyPrimaryKey(:) = bodyPrimaryKey_tmp(:)
4549         do active_index=1,odc_numActiveColumn(obsdat%realBodies)
4550            column_index=odc_columnIndexFromActiveIndex( &
4551                                     obsdat%realBodies%odc_flavour, active_index)
4552            obsdat%realBodies%columns(column_index)%value_r(:) &
4553                                                  =realBodies_tmp(active_index,:)
4554         enddo
4555         do active_index=1,odc_numActiveColumn(obsdat%intBodies)
4556            column_index=odc_columnIndexFromActiveIndex( &
4557                                      obsdat%intBodies%odc_flavour, active_index)
4558            obsdat%intBodies%columns(column_index)%value_i(:) &
4559                                                  =intBodies_tmp(active_index,:) 
4560         enddo
4561
4562         ! deallocate temporary arrays
4563         deallocate(bodyPrimaryKey_tmp)
4564         deallocate(realBodies_tmp)
4565         deallocate(intBodies_tmp)
4566      endif BODY
4567
4568
4569      obsdat%mpi_local = .true.
4570
4571      write(*,*) '============= Leave obs_reduceToMpiLocal =============='
4572
4573      return
4574   end subroutine obs_reduceToMpiLocal
4575
4576
4577   subroutine obs_squeeze( obsdat )
4578      !
4579      !:Purpose: re-construct observation data object to save memory
4580      !
4581      implicit none
4582
4583      ! Arguments:
4584      type(struct_obs), intent(inout) :: obsdat
4585
4586      ! Locals:
4587      integer(8),        allocatable :: headerPrimaryKey_tmp(:)
4588      integer(8),        allocatable :: bodyPrimaryKey_tmp(:)
4589      character(len=12), allocatable :: cstnid_tmp(:)
4590      character(len=2),  allocatable :: cfamily_tmp(:)
4591      real(pre_obsReal), allocatable :: realHeaders_tmp(:,:), realBodies_tmp(:,:)
4592      integer,           allocatable :: intHeaders_tmp(:,:),intBodies_tmp(:,:)
4593      integer :: bodyIndex, headerIndex, active_index, column_index
4594      integer :: idataend, idata
4595
4596      write(*,*) '============= Enter obs_squeeze =============='
4597
4598      if(obsdat%numHeader_max == obsdat%numHeader .and. &
4599         obsdat%numBody_max == obsdat%numBody)then
4600         write(*,*) 'obs_squeeze: obsdata instance is already sized correctly, do nothing'
4601         return
4602      end if
4603
4604      ! allocate temporary arrays to hold data
4605      if(obsdat%numHeader > 0) then
4606         allocate(headerPrimaryKey_tmp(obsdat%numHeader))
4607         allocate(cfamily_tmp(    obsdat%numHeader)) 
4608         allocate( cstnid_tmp(    obsdat%numHeader)) 
4609         allocate(realHeaders_tmp(odc_numActiveColumn(obsdat%realHeaders), &
4610                                  obsdat%numHeader))
4611         allocate( intHeaders_tmp(odc_numActiveColumn(obsdat%intHeaders), &
4612                                  obsdat%numHeader))
4613      endif
4614
4615      if(obsdat%numBody > 0) then
4616         allocate(bodyPrimaryKey_tmp(obsdat%numBody))
4617         allocate( realBodies_tmp(odc_numActiveColumn(obsdat%realBodies), &
4618                                  obsdat%numBody))
4619         allocate(  intBodies_tmp(odc_numActiveColumn(obsdat%intBodies), &
4620                                  obsdat%numBody))
4621      endif
4622
4623      ! copy the data to temporary arrays: header-level data
4624      do headerIndex=1,obsdat%numHeader 
4625         headerPrimaryKey_tmp(headerIndex)= &
4626                      obsdat%headerPrimaryKey(headerIndex)
4627        
4628         do active_index=1,odc_numActiveColumn(obsdat%realHeaders)
4629            column_index=odc_columnIndexFromActiveIndex( &
4630                                    obsdat%realHeaders%odc_flavour, active_index)
4631            realHeaders_tmp(active_index,headerIndex)= &
4632                      obs_headElem_r(obsdat, column_index, headerIndex)
4633         enddo
4634
4635         do active_index=1,odc_numActiveColumn(obsdat%intHeaders)
4636            column_index=odc_columnIndexFromActiveIndex( &
4637                                     obsdat%intHeaders%odc_flavour, active_index)
4638            intHeaders_tmp(active_index,headerIndex)= &
4639                      obs_headElem_i(obsdat, column_index, headerIndex)
4640         enddo
4641
4642         cstnid_tmp (headerIndex) =obsdat%cstnid (headerIndex)
4643         cfamily_tmp(headerIndex) =obsdat%cfamily(headerIndex)
4644
4645      enddo
4646
4647      ! copy the data to temporary arrays: body-level data
4648      do headerIndex=1,obsdat%numHeader
4649                                        ! Make HIND point to local header
4650         idata    = obs_headElem_i(obsdat, OBS_RLN,headerIndex)
4651         idataend = obs_headElem_i(obsdat, OBS_NLV,headerIndex)+idata-1
4652         do bodyIndex=idata,idataend 
4653            bodyPrimaryKey_tmp(bodyIndex)= &
4654                        obsdat%bodyPrimaryKey(bodyIndex)
4655           
4656            do active_index=1,odc_numActiveColumn(obsdat%realBodies)
4657               column_index=odc_columnIndexFromActiveIndex( &
4658                                      obsdat%realBodies%odc_flavour,active_index)
4659               realBodies_tmp(active_index,bodyIndex)= &
4660                        obs_bodyElem_r(obsdat, column_index, bodyIndex)
4661            enddo
4662
4663            do active_index=1,odc_numActiveColumn(obsdat%intBodies)
4664               column_index=odc_columnIndexFromActiveIndex( &
4665                                       obsdat%intBodies%odc_flavour,active_index)
4666               intBodies_tmp(active_index,bodyIndex)= &
4667                        obs_bodyElem_i(obsdat, column_index, bodyIndex)
4668            enddo
4669
4670            intBodies_tmp(odc_activeIndexFromColumnIndex(obsdat%intBodies%odc_flavour, OBS_HIND), &
4671                          bodyIndex) = headerIndex
4672         enddo
4673      enddo
4674
4675      ! destroy object's data and allocate at correct size data
4676      call obs_deallocate(obsdat)
4677      call obs_allocate(obsdat,obsdat%numHeader,obsdat%numBody)
4678
4679      ! copy all data from temporary arrays to object's arrays
4680      HEADER:if(obsdat%numHeader > 0) then
4681         obsdat%headerPrimaryKey(:)=headerPrimaryKey_tmp(:)
4682         obsdat%cfamily(:)=cfamily_tmp(:) 
4683         obsdat%cstnid (:)= cstnid_tmp(:)
4684         do active_index=1,odc_numActiveColumn(obsdat%realHeaders)
4685            column_index=odc_columnIndexFromActiveIndex( &
4686                                    obsdat%realHeaders%odc_flavour, active_index)
4687            obsdat%realHeaders%columns(column_index)%value_r(:) &
4688                                                 =realHeaders_tmp(active_index,:)
4689         enddo
4690         do active_index=1,odc_numActiveColumn(obsdat%intHeaders)
4691            column_index=odc_columnIndexFromActiveIndex( &
4692                                     obsdat%intHeaders%odc_flavour, active_index)
4693            obsdat%intHeaders%columns(column_index)%value_i(:) &
4694                                                  =intHeaders_tmp(active_index,:)
4695         enddo
4696
4697         ! deallocate temporary arrays
4698         deallocate(headerPrimaryKey_tmp)
4699         deallocate(cfamily_tmp)
4700         deallocate(cstnid_tmp)
4701         deallocate(realHeaders_tmp)
4702         deallocate(intHeaders_tmp)
4703      endif HEADER
4704
4705      BODY:if(obsdat%numBody > 0) then
4706         obsdat%bodyPrimaryKey(:) = bodyPrimaryKey_tmp(:)
4707         do active_index=1,odc_numActiveColumn(obsdat%realBodies)
4708            column_index=odc_columnIndexFromActiveIndex( &
4709                                     obsdat%realBodies%odc_flavour, active_index)
4710            obsdat%realBodies%columns(column_index)%value_r(:) &
4711                                                  =realBodies_tmp(active_index,:)
4712         enddo
4713         do active_index=1,odc_numActiveColumn(obsdat%intBodies)
4714            column_index=odc_columnIndexFromActiveIndex( &
4715                                      obsdat%intBodies%odc_flavour, active_index)
4716            obsdat%intBodies%columns(column_index)%value_i(:) &
4717                                                  =intBodies_tmp(active_index,:) 
4718         enddo
4719
4720         ! deallocate temporary arrays
4721         deallocate(bodyPrimaryKey_tmp)
4722         deallocate(realBodies_tmp)
4723         deallocate(intBodies_tmp)
4724      endif BODY
4725
4726      write(*,*) '============= Leave obs_squeeze =============='
4727
4728   end subroutine obs_squeeze
4729
4730
4731   subroutine obs_MpiRedistribute( obsdat_inout, target_ip_index )
4732      !
4733      !:Purpose: Redistribute obs over mpi tasks according to mpi task id stored 
4734      !                            in the integer header column "target_ip_index"
4735      !
4736      implicit none
4737
4738      ! Arguments:
4739      type(struct_obs), intent(inout) :: obsdat_inout
4740      integer,          intent(in)    :: target_ip_index
4741
4742      ! Locals:
4743      type(struct_obs) :: obsdat_tmp
4744      integer, allocatable :: numHeaderPE_mpilocal(:), numHeaderPE_mpiglobal(:)
4745      integer, allocatable :: numBodyPE_mpilocal(:), numBodyPE_mpiglobal(:)
4746      integer,           allocatable :: intcstnid_send(:,:,:), intcstnid_recv(:,:,:)
4747      integer,           allocatable :: intcfamily_send(:,:,:), intcfamily_recv(:,:,:)
4748      integer(8),        allocatable :: primaryKey_send(:,:), primaryKey_recv(:,:)
4749      real(pre_obsReal), allocatable :: real_send(:,:,:), real_recv(:,:,:)
4750      integer,           allocatable :: int_send(:,:,:), int_recv(:,:,:), message_onm(:,:)
4751      real(pre_obsReal), allocatable :: real_send_2d(:,:), real_recv_2d(:,:)
4752      integer,           allocatable :: int_send_2d(:,:), int_recv_2d(:,:)
4753      integer :: numHeader_in, numBody_in
4754      integer :: numHeader_out, numBody_out
4755      integer :: numHeader_mpimessage, numBody_mpimessage
4756      integer :: bodyIndex, headerIndex, headerIndex_out, bodyIndex_out, columnIndex, activeIndex, procIndex, charIndex
4757      integer :: bodyIndexBeg, bodyIndexEnd
4758      integer :: nprocs_mpi, myid_mpi, ierr, nsize, target_ip
4759      logical :: needToRedistribute, needToRedistribute_mpiglobal
4760      integer :: get_max_rss
4761
4762      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
4763      write(*,*) '============= Enter obs_MpiRedistribute =============='
4764      write(*,*) 'redistribute data according to mpi task ID stored in column :', &
4765                 ocn_ColumnNameList_IH(target_ip_index)
4766
4767      ! determine rank and number of mpi tasks
4768      call rpn_comm_size("GRID",nprocs_mpi,ierr)
4769      call rpn_comm_rank("GRID",myid_mpi,ierr)
4770
4771
4772      ! Number of headers and bodies per task before redistribution
4773      numHeader_in = obs_numHeader(obsdat_inout)
4774      numBody_in   = obs_numBody(obsdat_inout)
4775
4776      ! check if redistribution is really needed
4777      needToRedistribute = .false.
4778      do headerIndex = 1, numHeader_in
4779         target_ip = obs_headElem_i(obsdat_inout,target_ip_index,headerIndex)
4780         if (target_ip /= myid_mpi) needToRedistribute = .true.
4781      enddo
4782      call rpn_comm_allreduce(needToRedistribute,needToRedistribute_mpiglobal,1,  &
4783                              "MPI_LOGICAL","MPI_LOR","world",ierr)
4784      if(.not.needToRedistribute_mpiglobal) then
4785         write(*,*) 'obs_MpiRedistribute: do not need to redistribute, returning'
4786         return
4787      endif
4788
4789      ! allocate arrays used for counting on each mpi task
4790      allocate(numHeaderPE_mpilocal(nprocs_mpi))
4791      allocate(numHeaderPE_mpiglobal(nprocs_mpi))
4792      allocate(numBodyPE_mpilocal(nprocs_mpi))
4793      allocate(numBodyPE_mpiglobal(nprocs_mpi))
4794
4795      ! Compute number of headers and bodies per task after redistribution
4796      numHeaderPE_mpilocal(:) = 0
4797      numBodyPE_mpilocal(:) = 0
4798      do headerIndex = 1, numHeader_in
4799         target_ip = obs_headElem_i(obsdat_inout,target_ip_index,headerIndex)
4800         numHeaderPE_mpilocal(1+target_ip) = numHeaderPE_mpilocal(1+target_ip) + 1
4801         numBodyPE_mpilocal(1+target_ip)   = numBodyPE_mpilocal(1+target_ip)   + obs_headElem_i(obsdat_inout,OBS_NLV,headerIndex)
4802      enddo
4803      call rpn_comm_allreduce(numHeaderPE_mpilocal,numHeaderPE_mpiglobal,nprocs_mpi,  &
4804                              "MPI_INTEGER","MPI_SUM","world",ierr)
4805      call rpn_comm_allreduce(numBodyPE_mpilocal,numBodyPE_mpiglobal,nprocs_mpi,  &
4806                              "MPI_INTEGER","MPI_SUM","world",ierr)
4807      numHeader_out = numHeaderPE_mpiglobal(myid_mpi+1)
4808      numBody_out   = numBodyPE_mpiglobal(myid_mpi+1)
4809      write(*,*) 'obs_MpiRedistribute: num mpi header and body before redistribution =', numHeader_in, numBody_in
4810      write(*,*) 'obs_MpiRedistribute: num mpi header and body after redistribution  =', numHeader_out, numBody_out
4811
4812      ! Compute the max number of headers and bodies in each mpi message sent/received in the transpose
4813      call rpn_comm_allreduce(numHeaderPE_mpilocal,numHeaderPE_mpiglobal,nprocs_mpi,  &
4814                              "MPI_INTEGER","MPI_MAX","GRID",ierr)
4815      call rpn_comm_allreduce(numBodyPE_mpilocal,numBodyPE_mpiglobal,nprocs_mpi,  &
4816                              "MPI_INTEGER","MPI_MAX","GRID",ierr)
4817      if(myid_mpi == 0) write(*,*) 'obs_MpiRedistribute: num mpi header messages =', numHeaderPE_mpilocal
4818      if(myid_mpi == 0) write(*,*) 'obs_MpiRedistribute: num mpi body messages =', numBodyPE_mpilocal
4819      if(myid_mpi == 0) write(*,*) 'obs_MpiRedistribute: num mpi header messages (max) =', numHeaderPE_mpiglobal
4820      if(myid_mpi == 0) write(*,*) 'obs_MpiRedistribute: num mpi body messages (max) =', numBodyPE_mpiglobal
4821      numHeader_mpimessage = maxval(numHeaderPE_mpiglobal(:))
4822      numBody_mpimessage   = maxval(numBodyPE_mpiglobal(:))
4823
4824      call obs_initialize(obsdat_tmp,numHeader_max_opt=numHeader_out,  &
4825                          numBody_max_opt=numBody_out,mpi_local_opt=.true.)
4826      obsdat_tmp%numHeader = numHeader_out
4827      obsdat_tmp%numBody   = numBody_out
4828
4829      ! allocate temporary arrays to hold header-level data for mpi communication
4830      allocate(intcfamily_send(len(obsdat_inout%cfamily(1)),numHeader_mpimessage,nprocs_mpi)) 
4831      allocate(intcfamily_recv(len(obsdat_inout%cfamily(1)),numHeader_mpimessage,nprocs_mpi)) 
4832
4833      allocate(intcstnid_send(len(obsdat_inout%cstnid(1)),numHeader_mpimessage,nprocs_mpi)) 
4834      allocate(intcstnid_recv(len(obsdat_inout%cstnid(1)),numHeader_mpimessage,nprocs_mpi)) 
4835
4836      allocate(real_send(odc_numActiveColumn(obsdat_inout%realHeaders), &
4837                         numHeader_mpimessage,nprocs_mpi))
4838      allocate(real_recv(odc_numActiveColumn(obsdat_inout%realHeaders), &
4839                         numHeader_mpimessage,nprocs_mpi))
4840      allocate(int_send(odc_numActiveColumn(obsdat_inout%intHeaders), &
4841                        numHeader_mpimessage,nprocs_mpi))
4842      allocate(int_recv(odc_numActiveColumn(obsdat_inout%intHeaders), &
4843                        numHeader_mpimessage,nprocs_mpi))
4844
4845      allocate(primaryKey_send(numHeader_mpimessage,nprocs_mpi))
4846      allocate(primaryKey_recv(numHeader_mpimessage,nprocs_mpi))
4847
4848      ! copy the data to temporary arrays: header-level data
4849      numHeaderPE_mpilocal(:) = 0
4850      int_send(:,:,:) = -99999
4851      do headerIndex=1,numHeader_in
4852         target_ip = obs_headElem_i(obsdat_inout,target_ip_index,headerIndex)
4853         numHeaderPE_mpilocal(1+target_ip) = numHeaderPE_mpilocal(1+target_ip) + 1
4854
4855         primaryKey_send(numHeaderPE_mpilocal(1+target_ip),1+target_ip)= &
4856               obsdat_inout%headerPrimaryKey(headerIndex)
4857
4858         do activeIndex=1,odc_numActiveColumn(obsdat_inout%realHeaders)
4859            columnIndex=odc_columnIndexFromActiveIndex(obsdat_inout%realHeaders%odc_flavour, &
4860                                                       activeIndex)
4861            real_send(activeIndex,numHeaderPE_mpilocal(1+target_ip),1+target_ip)= &
4862               obs_headElem_r(obsdat_inout, columnIndex, headerIndex)
4863         enddo
4864
4865         do activeIndex=1,odc_numActiveColumn(obsdat_inout%intHeaders)
4866            columnIndex=odc_columnIndexFromActiveIndex(obsdat_inout%intHeaders%odc_flavour, &
4867                                                       activeIndex)
4868            int_send(activeIndex,numHeaderPE_mpilocal(1+target_ip),1+target_ip)= &
4869               obs_headElem_i(obsdat_inout, columnIndex, headerIndex)
4870         enddo
4871
4872         do charIndex=1, len(obsdat_inout%cstnid(1))
4873            intcstnid_send(charIndex,numHeaderPE_mpilocal(1+target_ip),1+target_ip)= &
4874               iachar(obsdat_inout%cstnid(headerIndex)(charIndex:charIndex))
4875         enddo
4876
4877         do charIndex=1,len(obsdat_inout%cfamily(1))
4878            intcfamily_send(charIndex,numHeaderPE_mpilocal(1+target_ip),1+target_ip)=  &
4879               iachar(obsdat_inout%cfamily(headerIndex)(charIndex:charIndex))
4880         enddo
4881
4882      enddo
4883
4884      ! do mpi communication: header-level data
4885      if(nprocs_mpi > 1) then
4886        nsize = numHeader_mpimessage
4887        call rpn_comm_alltoall(primaryKey_send,nsize,"mpi_integer8",  &
4888                               primaryKey_recv,nsize,"mpi_integer8","GRID",ierr)
4889
4890        nsize = numHeader_mpimessage*odc_numActiveColumn(obsdat_inout%realHeaders)
4891        call rpn_comm_alltoall(real_send,nsize,"mpi_double_precision",  &
4892                               real_recv,nsize,"mpi_double_precision","GRID",ierr)
4893
4894        nsize = numHeader_mpimessage*odc_numActiveColumn(obsdat_inout%intHeaders)
4895        call rpn_comm_alltoall(int_send,nsize,"mpi_integer",  &
4896                               int_recv,nsize,"mpi_integer","GRID",ierr)
4897
4898        nsize = numHeader_mpimessage*len(obsdat_inout%cstnid(1))
4899        call rpn_comm_alltoall(intcstnid_send,nsize,"mpi_integer",  &
4900                               intcstnid_recv,nsize,"mpi_integer","GRID",ierr)
4901
4902        nsize = numHeader_mpimessage*len(obsdat_inout%cfamily(1))
4903        call rpn_comm_alltoall(intcfamily_send,nsize,"mpi_integer",  &
4904                               intcfamily_recv,nsize,"mpi_integer","GRID",ierr)
4905      else
4906        primaryKey_recv(:,1)   = primaryKey_send(:,1)
4907        real_recv(:,:,1)       = real_send(:,:,1)
4908        int_recv(:,:,1)        = int_send(:,:,1)
4909        intcstnid_recv(:,:,1)  = intcstnid_send(:,:,1)
4910        intcfamily_recv(:,:,1) = intcfamily_send(:,:,1)
4911      endif
4912
4913      allocate(message_onm(numHeader_mpimessage,nprocs_mpi))
4914      activeIndex = odc_activeIndexFromColumnIndex(obsdat_inout%intHeaders%odc_flavour,OBS_ONM)
4915      message_onm(:,:) = int_recv(activeIndex,:,:)
4916
4917      ! copy the data from temporary arrays: header-level data
4918      headerIndex_out = 0
4919      do procIndex = 1, nprocs_mpi
4920         do headerIndex=1,numHeader_mpimessage
4921            if(int_recv(1,headerIndex,procIndex) /= -99999) then
4922               if(target_ip_index == OBS_IPF) then
4923                 ! put headers back in original order as in the files
4924                 headerIndex_out = message_onm(headerIndex,procIndex)
4925               else
4926                 headerIndex_out = headerIndex_out + 1
4927               endif
4928
4929               obsdat_tmp%headerPrimaryKey(headerIndex_out)= &
4930                     primaryKey_recv(headerIndex,procIndex)
4931
4932               do activeIndex=1,odc_numActiveColumn(obsdat_inout%realHeaders)
4933                  columnIndex=odc_columnIndexFromActiveIndex(obsdat_inout%realHeaders%odc_flavour, &
4934                                                             activeIndex)
4935                  obsdat_tmp%realHeaders%columns(columnIndex)%value_r(headerIndex_out)= &
4936                     real_recv(activeIndex,headerIndex,procIndex)
4937               enddo
4938
4939               do activeIndex=1,odc_numActiveColumn(obsdat_inout%intHeaders)
4940                  columnIndex=odc_columnIndexFromActiveIndex(obsdat_inout%intHeaders%odc_flavour, &
4941                                                             activeIndex)
4942                  obsdat_tmp%intHeaders%columns(columnIndex)%value_i(headerIndex_out)= &
4943                     int_recv(activeIndex,headerIndex,procIndex)
4944               enddo
4945
4946               do charIndex=1, len(obsdat_inout%cstnid(1))
4947                  obsdat_tmp%cstnid(headerIndex_out)(charIndex:charIndex) = &
4948                     achar(intcstnid_recv(charIndex,headerIndex,procIndex))
4949               enddo
4950
4951               do charIndex=1, len(obsdat_inout%cfamily(1))
4952                  obsdat_tmp%cfamily(headerIndex_out)(charIndex:charIndex) = &
4953                     achar(intcfamily_recv(charIndex,headerIndex,procIndex))
4954               enddo
4955
4956            endif
4957         enddo
4958      enddo
4959
4960      ! recompute RLN to point to correct body data
4961      do headerIndex=1,numHeader_out
4962         if(headerIndex == 1) then
4963            obsdat_tmp%intHeaders%columns(OBS_RLN)%value_i(headerIndex) = 1
4964         else
4965            obsdat_tmp%intHeaders%columns(OBS_RLN)%value_i(headerIndex) = &
4966                obsdat_tmp%intHeaders%columns(OBS_RLN)%value_i(headerIndex-1) + &
4967                obsdat_tmp%intHeaders%columns(OBS_NLV)%value_i(headerIndex-1)
4968         endif
4969      enddo
4970
4971      deallocate(primaryKey_send)
4972      deallocate(primaryKey_recv)
4973      deallocate(intcfamily_send)
4974      deallocate(intcfamily_recv)
4975      deallocate(intcstnid_send)
4976      deallocate(intcstnid_recv)
4977      deallocate(real_send)
4978      deallocate(real_recv)
4979      deallocate(int_send)
4980      deallocate(int_recv)
4981
4982      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
4983
4984      ! Do communication for bodyPrimaryKey
4985
4986      allocate(primaryKey_send(numBody_mpimessage,nprocs_mpi))
4987      allocate(primaryKey_recv(numBody_mpimessage,nprocs_mpi))
4988
4989      numBodyPE_mpilocal(:) = 0
4990      primaryKey_send(:,:) = -99999
4991      do bodyIndex=1,numBody_in
4992         headerIndex = obs_bodyElem_i(obsdat_inout,OBS_HIND,bodyIndex)
4993         target_ip = obs_headElem_i(obsdat_inout,target_ip_index,headerIndex)
4994         numBodyPE_mpilocal(1+target_ip) = numBodyPE_mpilocal(1+target_ip) + 1
4995         primaryKey_send(numBodyPE_mpilocal(1+target_ip),1+target_ip)= &
4996              obsdat_inout%bodyPrimaryKey(bodyIndex)
4997      enddo
4998      if(nprocs_mpi > 1) then
4999         nsize = numBody_mpimessage
5000         call rpn_comm_alltoall(primaryKey_send,nsize,"mpi_integer8",  &
5001                                primaryKey_recv,nsize,"mpi_integer8","GRID",ierr)
5002      else
5003         primaryKey_recv(:,1) = primaryKey_send(:,1)
5004      endif
5005      if(target_ip_index == OBS_IPF) then
5006         ! copy the data in the same order as in the original files
5007         do procIndex = 1, nprocs_mpi
5008            bodyIndex = 0
5009            do headerIndex=1,numHeader_mpimessage
5010               headerIndex_out = message_onm(headerIndex,procIndex)
5011               if(headerIndex_out /= -99999) then
5012                  bodyIndexBeg = obs_headElem_i(obsdat_tmp,OBS_RLN,headerIndex_out)
5013                  bodyIndexEnd = obs_headElem_i(obsdat_tmp,OBS_NLV,headerIndex_out) + bodyIndexBeg - 1
5014                  do bodyIndex_out = bodyIndexBeg, bodyIndexEnd
5015                     bodyIndex = bodyIndex + 1
5016                     obsdat_tmp%bodyPrimaryKey(bodyIndex_out)= &
5017                          primaryKey_recv(bodyIndex,procIndex)
5018                  enddo
5019               endif
5020            enddo
5021         enddo
5022         ! copy the data in sequential order
5023         bodyIndex_out = 0
5024         do procIndex = 1, nprocs_mpi
5025            do bodyIndex=1,numBody_mpimessage
5026               if(primaryKey_recv(bodyIndex,procIndex) /= -99999) then
5027                  bodyIndex_out = bodyIndex_out + 1
5028                  obsdat_tmp%bodyPrimaryKey(bodyIndex_out)= &
5029                       primaryKey_recv(bodyIndex,procIndex)
5030               endif
5031            enddo
5032         enddo
5033      endif
5034
5035      deallocate(primaryKey_send)
5036      deallocate(primaryKey_recv)
5037
5038      ! First do REAL body columns
5039
5040      allocate(real_send_2d(numBody_mpimessage,nprocs_mpi))
5041      allocate(real_recv_2d(numBody_mpimessage,nprocs_mpi))
5042
5043      do activeIndex=1,odc_numActiveColumn(obsdat_inout%realBodies)
5044         columnIndex=odc_columnIndexFromActiveIndex(obsdat_inout%realBodies%odc_flavour, &
5045                                                    activeIndex)
5046
5047         ! copy the data to temporary arrays: body-level data
5048         numBodyPE_mpilocal(:) = 0
5049         real_send_2d(:,:) = -99999.0d0
5050         do bodyIndex=1,numBody_in
5051            headerIndex = obs_bodyElem_i(obsdat_inout,OBS_HIND,bodyIndex)
5052            target_ip = obs_headElem_i(obsdat_inout,target_ip_index,headerIndex)
5053            numBodyPE_mpilocal(1+target_ip) = numBodyPE_mpilocal(1+target_ip) + 1
5054            real_send_2d(numBodyPE_mpilocal(1+target_ip),1+target_ip)= &
5055               obs_bodyElem_r(obsdat_inout, columnIndex, bodyIndex)
5056         enddo
5057
5058         ! do mpi communication: body-level data
5059         if(nprocs_mpi > 1) then
5060           nsize = numBody_mpimessage
5061           call rpn_comm_alltoall(real_send_2d,nsize,"mpi_double_precision",  &
5062                                  real_recv_2d,nsize,"mpi_double_precision","GRID",ierr)
5063         else
5064           real_recv_2d(:,1)       = real_send_2d(:,1)
5065         endif
5066
5067         ! copy the data from temporary arrays: body-level data
5068         if(target_ip_index == OBS_IPF) then
5069
5070            ! copy the data in the same order as in the original files
5071            do procIndex = 1, nprocs_mpi
5072               bodyIndex = 0
5073               do headerIndex=1,numHeader_mpimessage
5074                  headerIndex_out = message_onm(headerIndex,procIndex)
5075                  if(headerIndex_out /= -99999) then
5076                     bodyIndexBeg = obs_headElem_i(obsdat_tmp,OBS_RLN,headerIndex_out)
5077                     bodyIndexEnd = obs_headElem_i(obsdat_tmp,OBS_NLV,headerIndex_out) + bodyIndexBeg - 1
5078                     do bodyIndex_out = bodyIndexBeg, bodyIndexEnd
5079                        bodyIndex = bodyIndex + 1
5080                        obsdat_tmp%realBodies%columns(columnIndex)%value_r(bodyIndex_out)= &
5081                           real_recv_2d(bodyIndex,procIndex)
5082                     enddo
5083                  endif
5084               enddo
5085            enddo
5086
5087         else
5088
5089            ! copy the data in sequential order
5090            bodyIndex_out = 0
5091            do procIndex = 1, nprocs_mpi
5092               do bodyIndex=1,numBody_mpimessage
5093                  if(real_recv_2d(bodyIndex,procIndex) /= -99999.0d0) then
5094                     bodyIndex_out = bodyIndex_out + 1
5095                     obsdat_tmp%realBodies%columns(columnIndex)%value_r(bodyIndex_out)= &
5096                        real_recv_2d(bodyIndex,procIndex)
5097                  endif
5098               enddo
5099            enddo
5100
5101         endif
5102
5103      enddo ! activeIndex
5104
5105      deallocate(real_send_2d)
5106      deallocate(real_recv_2d)
5107
5108      ! Now do INTEGER body columns
5109
5110      allocate(int_send_2d(numBody_mpimessage,nprocs_mpi))
5111      allocate(int_recv_2d(numBody_mpimessage,nprocs_mpi))
5112
5113      do activeIndex=1,odc_numActiveColumn(obsdat_inout%intBodies)
5114         columnIndex=odc_columnIndexFromActiveIndex(obsdat_inout%intBodies%odc_flavour, &
5115                                                    activeIndex)
5116
5117         ! copy the data to temporary arrays: body-level data
5118         numBodyPE_mpilocal(:) = 0
5119         int_send_2d(:,:) = -99999
5120         do bodyIndex=1,numBody_in
5121            headerIndex = obs_bodyElem_i(obsdat_inout,OBS_HIND,bodyIndex)
5122            target_ip = obs_headElem_i(obsdat_inout,target_ip_index,headerIndex)
5123            numBodyPE_mpilocal(1+target_ip) = numBodyPE_mpilocal(1+target_ip) + 1
5124            int_send_2d(numBodyPE_mpilocal(1+target_ip),1+target_ip)= &
5125               obs_bodyElem_i(obsdat_inout, columnIndex, bodyIndex)
5126         enddo
5127
5128         ! do mpi communication: body-level data
5129         if(nprocs_mpi > 1) then
5130           nsize = numBody_mpimessage
5131           call rpn_comm_alltoall(int_send_2d,nsize,"mpi_integer",  &
5132                                  int_recv_2d,nsize,"mpi_integer","GRID",ierr)
5133         else
5134           int_recv_2d(:,1)        = int_send_2d(:,1)
5135         endif
5136
5137         ! copy the data from temporary arrays: body-level data
5138         if(target_ip_index == OBS_IPF) then
5139
5140            ! copy the data in the same order as in the original files
5141            do procIndex = 1, nprocs_mpi
5142               bodyIndex = 0
5143               do headerIndex=1,numHeader_mpimessage
5144                  headerIndex_out = message_onm(headerIndex,procIndex)
5145                  if(headerIndex_out /= -99999) then
5146                     bodyIndexBeg = obs_headElem_i(obsdat_tmp,OBS_RLN,headerIndex_out)
5147                     bodyIndexEnd = obs_headElem_i(obsdat_tmp,OBS_NLV,headerIndex_out) + bodyIndexBeg - 1
5148                     do bodyIndex_out = bodyIndexBeg, bodyIndexEnd
5149                        bodyIndex = bodyIndex + 1
5150                        obsdat_tmp%intBodies%columns(columnIndex)%value_i(bodyIndex_out)= &
5151                           int_recv_2d(bodyIndex,procIndex)
5152                     enddo
5153                  endif
5154               enddo
5155            enddo
5156
5157         else
5158
5159            ! copy the data in sequential order
5160            bodyIndex_out = 0
5161            do procIndex = 1, nprocs_mpi
5162               do bodyIndex=1,numBody_mpimessage
5163                  if(int_recv_2d(bodyIndex,procIndex) /= -99999) then
5164                     bodyIndex_out = bodyIndex_out + 1
5165                     obsdat_tmp%intBodies%columns(columnIndex)%value_i(bodyIndex_out)= &
5166                        int_recv_2d(bodyIndex,procIndex)
5167                  endif
5168               enddo
5169            enddo
5170
5171         endif
5172
5173      enddo ! activeIndex
5174
5175      deallocate(int_send_2d)
5176      deallocate(int_recv_2d)
5177
5178      ! reset the headerIndex within the body
5179      do headerIndex = 1, obs_numheader(obsdat_tmp)
5180        bodyIndexBeg = obs_headElem_i(obsdat_tmp,OBS_RLN,headerIndex)
5181        bodyIndexEnd = obs_headElem_i(obsdat_tmp,OBS_NLV,headerIndex) + bodyIndexBeg - 1
5182        do bodyIndex = bodyIndexBeg, bodyIndexEnd
5183          call obs_bodySet_i(obsdat_tmp,OBS_HIND,bodyIndex, headerIndex)
5184        enddo
5185      enddo
5186
5187      deallocate(message_onm)
5188
5189      deallocate(numHeaderPE_mpilocal)
5190      deallocate(numHeaderPE_mpiglobal)
5191      deallocate(numBodyPE_mpilocal)
5192      deallocate(numBodyPE_mpiglobal)
5193
5194      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
5195
5196      ! reallocate obsdat_inout to the new size and copy over redistributed data
5197      call obs_deallocate(obsdat_inout)
5198      call obs_allocate(obsdat_inout,obsdat_tmp%numHeader,obsdat_tmp%numBody)
5199      call obs_copy(obsdat_tmp,obsdat_inout)
5200      call obs_finalize(obsdat_tmp)
5201
5202      write(*,*) '============= Leave obs_MpiRedistribute =============='
5203      write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
5204
5205      return
5206   end subroutine obs_MpiRedistribute
5207
5208
5209   subroutine obs_set_c(obsdat, name, row_index, value)
5210      !
5211      !:Purpose: set a character(len=9) in the observation object
5212      !      To control access to the observation object.
5213      !
5214      implicit none
5215
5216      ! Arguments:
5217      type(struct_obs), intent(inout)  :: obsdat
5218      character(len=*), intent(in)     :: name
5219      integer         , intent(in)     :: row_index
5220      character(len=*), intent(in)     :: value
5221
5222      select case (trim(name))
5223      case ('STID'); obsdat%cstnid (row_index) = value
5224         if(row_index == (obsdat%numHeader+1)) obsdat%numHeader = obsdat%numHeader+1
5225
5226      case default
5227         call obs_abort('ERROR writing:  ' // trim(name) // &
5228                        ' is not a character(len=9) observation.')
5229         return
5230      end select
5231   end subroutine obs_set_c
5232
5233
5234   subroutine obs_set_current_body_list_from_family(obsdat, family, &
5235      list_is_empty_opt, current_list_opt)
5236      !
5237      !:Purpose:
5238      !      Create a row_index list from the indicated family and place it in
5239      !      the body depot.
5240      !
5241      implicit none
5242
5243      ! Arguments:
5244      type(struct_obs), target,                   intent(inout) :: obsdat
5245      character(len=*),                           intent(in)    :: family
5246      logical,                          optional, intent(out)   :: list_is_empty_opt
5247      type(struct_index_list), pointer, optional, intent(out)   :: current_list_opt
5248
5249      ! Locals:
5250      type(struct_index_list_depot), pointer :: depot
5251      type(struct_index_list), pointer :: index_list
5252      integer :: index_header, list, list_index, row_index
5253      integer :: first, last
5254
5255      nullify(index_list)
5256      depot => obsdat%body_index_list_depot
5257
5258      ! Search for an existing list
5259      if(present(current_list_opt)) then
5260         if(associated(current_list_opt)) then
5261            if (current_list_opt%family == family) then
5262               index_list => current_list_opt
5263            end if ! family matches
5264         end if ! associated
5265
5266      else ! not present(current_list)
5267         do list = 1, NUMBER_OF_LISTS
5268            if (depot%index_lists(list)%family == family) then
5269               index_list => depot%index_lists(list)
5270               exit                     ! Don't look any further
5271            end if
5272         end do
5273      end if
5274
5275      ! If the list does not already exist
5276      if (.not. associated(index_list)) then
5277
5278         ! Acquire memory for the list
5279         if(present(current_list_opt)) then
5280            ! This is an OMP thread. Re-use the same physical memory for the list
5281            index_list => ild_get_empty_index_list(depot, current_list_opt)
5282         else
5283            index_list => ild_get_empty_index_list(depot)
5284         end if
5285
5286         ! Initialize the new list
5287         index_list%family = family
5288         index_list%header = -99        ! not used
5289
5290         !
5291         ! Populate the list
5292         !
5293
5294         ! Loop over all header indices of the family
5295         list_index = 0
5296         call obs_set_current_header_list(obsdat, family)
5297         HEADER: do
5298            index_header = obs_getHeaderIndex(obsdat)
5299            if (index_header < 0) exit HEADER
5300            first= obs_headElem_i(obsdat,OBS_RLN,index_header)
5301            last = obs_headElem_i(obsdat,OBS_NLV,index_header) + first - 1
5302            do row_index=first,last     ! For each item indicated in the header
5303               ! Add the row_index to the list
5304               list_index = list_index + 1
5305               index_list%indices(list_index) = row_index
5306            end do
5307         end do HEADER
5308         index_list%indices(list_index+1)= -1 ! Flag the end of the list ...
5309         index_list%indices(list_index+2)= -1 ! ... clearly
5310      end if ! list does not already exist
5311
5312      index_list%current_element = 0    ! Set pointer to the start of the list
5313      depot%current_list => index_list  ! Note the current list
5314
5315      if(present(list_is_empty_opt)) then
5316         ! Return whether the list is empty
5317         list_is_empty_opt = (ild_get_next_index(depot, no_advance_opt=.true.) < 0)
5318      end if
5319
5320      if(present(current_list_opt)) then
5321         ! Return a pointer to the current list
5322         current_list_opt => index_list
5323      end if
5324   end subroutine obs_set_current_body_list_from_family
5325
5326
5327   subroutine obs_set_current_body_list_from_header(obsdat, header, &
5328      list_is_empty_opt, current_list_opt)
5329      !
5330      !:Purpose:
5331      !      Create a row_index list from the indicated header and place it in
5332      !      the body depot.
5333      !
5334      implicit none
5335
5336      ! Arguments:
5337      type(struct_obs), target,                   intent(inout) :: obsdat
5338      integer,                                    intent(in)    :: header
5339      logical,                          optional, intent(out)   :: list_is_empty_opt
5340      type(struct_index_list), pointer, optional, intent(out)   :: current_list_opt
5341
5342      ! Locals:
5343      type(struct_index_list_depot), pointer :: depot
5344      type(struct_index_list), pointer :: index_list
5345      integer :: list, list_index, row_index
5346      integer :: first, last
5347
5348      nullify(index_list)
5349      depot => obsdat%body_index_list_depot
5350
5351      ! Search for an existing list
5352      if(present(current_list_opt)) then
5353         if(associated(current_list_opt)) then
5354            if (current_list_opt%header == header) then
5355               index_list => current_list_opt
5356            end if ! header matches
5357         end if ! associated
5358
5359      else ! not present(current_list)
5360         do list = 1, NUMBER_OF_LISTS
5361            if (depot%index_lists(list)%header == header) then
5362               index_list => depot%index_lists(list)
5363               exit                     ! Don't look any further
5364            end if
5365         end do
5366      end if
5367
5368      ! If the list does not already exist
5369      if (.not. associated(index_list)) then
5370
5371         ! Acquire memory for the list
5372         if(present(current_list_opt)) then
5373            ! This is an OMP thread. Re-use the same physical memory for the list
5374            index_list => ild_get_empty_index_list(depot, current_list_opt)
5375         else
5376            index_list => ild_get_empty_index_list(depot)
5377         end if
5378
5379         ! Initialize the new list
5380         index_list%family = 'xx'       ! not used
5381         index_list%header = header
5382
5383         ! Populate the list
5384         first= obs_headElem_i(obsdat,OBS_RLN,header)
5385         last = obs_headElem_i(obsdat,OBS_NLV,header) + first - 1
5386         list_index = 0
5387         do row_index=first,last        ! For each item indicated in the header
5388            ! Add the row_index to the list
5389            list_index = list_index + 1
5390            index_list%indices(list_index) = row_index
5391         end do
5392         index_list%indices(list_index+1)= -1 ! Flag the end of the list ...
5393         index_list%indices(list_index+2)= -1 ! ... clearly
5394      end if ! list does not already exist
5395
5396      index_list%current_element = 0    ! Set pointer to the start of the list
5397      depot%current_list => index_list  ! Note the current list
5398
5399      if(present(list_is_empty_opt)) then
5400         ! Return whether the list is empty
5401         list_is_empty_opt = (ild_get_next_index(depot, no_advance_opt=.true.) < 0)
5402      end if
5403
5404      if(present(current_list_opt)) then
5405         ! Return a pointer to the current list
5406         current_list_opt => index_list
5407      end if
5408   end subroutine obs_set_current_body_list_from_header
5409
5410
5411   subroutine obs_set_current_body_list_all(obsdat, list_is_empty_opt, current_list_opt)
5412      !
5413      !:Purpose:
5414      !      Create a row_index list containing all bodies and place it in the
5415      !      body depot.
5416      !
5417      implicit none
5418
5419      ! Arguments:
5420      type(struct_obs), target,                   intent(inout) :: obsdat
5421      logical,                          optional, intent(out)   :: list_is_empty_opt
5422      type(struct_index_list), pointer, optional, intent(out)   :: current_list_opt
5423
5424      ! Locals:
5425      type(struct_index_list_depot), pointer :: depot
5426      type(struct_index_list), pointer :: index_list
5427      integer :: list, list_index, row_index, index_header
5428      integer :: first, last
5429
5430      nullify(index_list)
5431      depot => obsdat%body_index_list_depot
5432
5433      ! Search for an existing list
5434      if(present(current_list_opt)) then
5435         if(associated(current_list_opt)) then
5436            if (      current_list_opt%header == -1 &
5437                .and. current_list_opt%family == '  ') then
5438               index_list => current_list_opt
5439            end if ! null header and family
5440         end if ! associated
5441
5442      else ! not present(current_list)
5443         do list = 1, NUMBER_OF_LISTS
5444            if (      depot%index_lists(list)%header == -1 &
5445                .and. depot%index_lists(list)%family == '  ') then
5446               index_list => depot%index_lists(list)
5447               exit                     ! Don't look any further
5448            end if
5449         end do
5450      end if
5451
5452      ! If the list does not already exist
5453      if (.not. associated(index_list)) then
5454
5455         ! Acquire memory for the list
5456         if(present(current_list_opt)) then
5457            ! This is an OMP thread. Re-use the same physical memory for the list
5458            index_list => ild_get_empty_index_list(depot, current_list_opt)
5459         else
5460            index_list => ild_get_empty_index_list(depot)
5461         end if
5462
5463         ! Initialize the new list
5464         index_list%family = '  '       ! null
5465         index_list%header = -1         ! null
5466
5467         !
5468         ! Populate the list
5469         !
5470
5471         ! Loop over all header indices
5472         list_index = 0
5473         call obs_set_current_header_list(obsdat)
5474         HEADER: do
5475            index_header = obs_getHeaderIndex(obsdat)
5476            if (index_header < 0) exit HEADER
5477            first= obs_headElem_i(obsdat,OBS_RLN,index_header)
5478            last = obs_headElem_i(obsdat,OBS_NLV,index_header) + first - 1
5479            do row_index=first,last     ! For each item indicated in the header
5480               ! Add the row_index to the list
5481               list_index = list_index + 1
5482               index_list%indices(list_index) = row_index
5483            end do
5484         end do HEADER
5485         index_list%indices(list_index+1)= -1 ! Flag the end of the list ...
5486         index_list%indices(list_index+2)= -1 ! ... clearly
5487      end if ! list does not already exist
5488
5489      index_list%current_element = 0    ! Set pointer to the start of the list
5490      depot%current_list => index_list  ! Note the current list
5491
5492      if(present(list_is_empty_opt)) then
5493         ! Return whether the list is empty
5494         list_is_empty_opt = (ild_get_next_index(depot, no_advance_opt=.true.) < 0)
5495      end if
5496
5497      if(present(current_list_opt)) then
5498         ! Return a pointer to the current list
5499         current_list_opt => index_list
5500      end if
5501   end subroutine obs_set_current_body_list_all
5502
5503
5504   subroutine obs_set_current_header_list_from_family(obsdat, family)
5505      !
5506      !:Purpose:
5507      !      Find or create a row_index list for the indicated family and place
5508      !      it in the header depot.
5509      !
5510      implicit none
5511
5512      ! Arguments:
5513      type(struct_obs), target, intent(inout) :: obsdat
5514      character(len=*),         intent(in)    :: family
5515
5516      ! Locals:
5517      type(struct_index_list_depot), pointer :: depot
5518      type(struct_index_list), pointer :: index_list
5519      integer :: list, list_index, row_index
5520
5521      nullify(index_list)
5522      depot => obsdat%header_index_list_depot
5523
5524      ! Search for an existing list
5525      do list = 1, NUMBER_OF_LISTS
5526         if (depot%index_lists(list)%family == family) then
5527            index_list => depot%index_lists(list)
5528            index_list%current_element=0! Start at the beginning of the list
5529            exit                        ! Don't look any further
5530         end if
5531      end do
5532
5533      ! If the list does not already exist
5534      if (.not. associated(index_list)) then
5535         ! Create a new list
5536         index_list => ild_get_empty_index_list(depot)
5537         index_list%family = family
5538         index_list%header = -1
5539
5540         ! Populate the list
5541         list_index = 0
5542         do row_index = 1, obsdat%numHeader
5543            ! If the station is of the right family
5544            if(obsdat%cfamily(row_index) == family) then
5545               ! Add the row_index to the list
5546               list_index = list_index + 1
5547               index_list%indices(list_index) = row_index
5548            end if
5549         end do
5550         index_list%indices(list_index+1)= -1 ! Flag the end of the list ...
5551         index_list%indices(list_index+2)= -1 ! ... clearly
5552      end if ! list does not already exist
5553
5554      index_list%current_element = 0    ! Set pointer to the start of the list
5555      depot%current_list => index_list  ! Note the current list
5556   end subroutine obs_set_current_header_list_from_family
5557
5558
5559   subroutine obs_set_current_header_list_all(obsdat)
5560      !
5561      !:Purpose:
5562      !      Find or create a row_index list for all headers and place it in the
5563      !      header depot.
5564      !
5565      implicit none
5566
5567      ! Arguments:
5568      type(struct_obs), target, intent(inout) :: obsdat
5569
5570      ! Locals:
5571      type(struct_index_list_depot), pointer :: depot
5572      type(struct_index_list), pointer :: index_list
5573      integer :: list, list_index, row_index
5574
5575      nullify(index_list)
5576      depot => obsdat%header_index_list_depot
5577
5578      ! Search for an existing list
5579      do list = 1, NUMBER_OF_LISTS
5580         if (depot%index_lists(list)%family == '  ') then
5581            index_list => depot%index_lists(list)
5582            index_list%current_element=0! Start at the beginning of the list
5583            exit                        ! Don't look any further
5584         end if
5585      end do
5586
5587      ! If the list does not already exist
5588      if (.not. associated(index_list)) then
5589         ! Create a new list
5590         index_list => ild_get_empty_index_list(depot)
5591         index_list%family = '  '
5592         index_list%header = -1
5593
5594         ! Populate the list
5595         list_index = 0
5596         do row_index = 1, obsdat%numHeader
5597            ! Add the row_index to the list
5598            list_index = list_index + 1
5599            index_list%indices(list_index) = row_index
5600         end do
5601         index_list%indices(list_index+1)= -1 ! Flag the end of the list ...
5602         index_list%indices(list_index+2)= -1 ! ... clearly
5603      end if ! list does not already exist
5604
5605      index_list%current_element = 0    ! Set pointer to the start of the list
5606      depot%current_list => index_list  ! Note the current list
5607   end subroutine obs_set_current_header_list_all
5608
5609
5610   subroutine obs_setBodyPrimaryKey(obsdat,bodyIndex,primaryKey)
5611      !
5612      !:Purpose:
5613      !      Set to the indicated value the body primary key for the indicated body.
5614      !
5615      implicit none
5616
5617      ! Arguments:
5618      type(struct_obs), intent(inout) :: obsdat
5619      integer(8),       intent(in)    :: primaryKey
5620      integer,          intent(in)    :: bodyIndex
5621
5622      obsdat%bodyPrimaryKey(bodyIndex) = primaryKey
5623
5624      if(bodyIndex == (obsdat%numBody+1)) then
5625         obsdat%numBody=obsdat%numBody+1
5626      endif
5627
5628   end subroutine obs_setBodyPrimaryKey
5629
5630
5631   subroutine obs_setHeadPrimaryKey(obsdat,headerIndex,primaryKey)
5632      !
5633      !:Purpose:
5634      !      Set to the indicated value the header primary key for the indicated header.
5635      !
5636      implicit none
5637
5638      ! Arguments:
5639      type(struct_obs), intent(inout) :: obsdat
5640      integer(8),       intent(in)    :: primaryKey
5641      integer,          intent(in)    :: headerIndex
5642
5643      obsdat%headerPrimaryKey(headerIndex) = primaryKey
5644      if(headerIndex == (obsdat%numHeader+1)) then
5645         obsdat%numHeader=obsdat%numHeader+1
5646      endif
5647
5648   end subroutine obs_setHeadPrimaryKey
5649
5650
5651   subroutine obs_setFamily(obsdat,Family_in,headerIndex_opt,bodyIndex_opt)
5652      !
5653      !:Purpose:
5654      !      Set to the indicated value the family for the indicated header, or
5655      !      else for the indicated body.
5656      !
5657      implicit none
5658
5659      ! Arguments:
5660      type(struct_obs),  intent(inout) :: obsdat
5661      character(len=*),  intent(in)    :: Family_in
5662      integer, optional, intent(in)    :: headerIndex_opt
5663      integer, optional, intent(in)    :: bodyIndex_opt
5664
5665      ! Locals:
5666      integer          :: headerIndex
5667
5668      if(present(headerIndex_opt)) then
5669         headerIndex = headerIndex_opt
5670      elseif(present(bodyIndex_opt)) then
5671         headerIndex = obs_bodyElem_i(obsdat,OBS_HIND,bodyIndex_opt)
5672      else
5673         call obs_abort('OBS_SETFAMILY: Header or Body index must be specified!')
5674         return
5675      endif
5676
5677      obsdat%cfamily(headerIndex)=Family_in
5678      if(headerIndex == (obsdat%numHeader+1)) then
5679         obsdat%numHeader=obsdat%numHeader+1
5680      endif
5681
5682   end subroutine obs_setFamily
5683
5684
5685   subroutine obs_sethind(obsSpaceData)
5686     !
5687     !:Purpose: Set the header index in the body table
5688     !
5689     implicit none
5690
5691     ! Arguments:
5692     type(struct_obs), intent(inout) :: obsSpaceData
5693
5694     ! Locals:
5695     integer :: idata,idatend,bodyIndex,headerIndex
5696
5697     ! Set the header index in the body of obsSpaceData
5698     do headerIndex = 1, obs_numheader(obsSpaceData)
5699        idata   = obs_headElem_i(obsSpaceData,OBS_RLN,headerIndex)
5700        idatend = obs_headElem_i(obsSpaceData,OBS_NLV,headerIndex) + idata - 1
5701        do bodyIndex= idata, idatend
5702           call obs_bodySet_i(obsSpaceData, OBS_HIND, bodyIndex, headerIndex)
5703        end do
5704     end do
5705   end subroutine obs_sethind
5706
5707
5708   subroutine obs_write(obsdat,hx, &
5709      nens,nobshdrout,nobsbdyout,nobshxout,nobsdimout)
5710      ! 
5711      !:Purpose: 
5712      !      Write the obsdat info to unformatted files.
5713      !
5714      !:Note: the body information is written in the order that it will
5715      !      be used by sekfeta.f
5716      !
5717      implicit none
5718
5719      ! Arguments:
5720      type(struct_obs), intent(in) :: obsdat
5721      real(8),          intent(in) :: hx(:,:)
5722      integer,          intent(in) :: nens
5723      integer,          intent(in) :: nobshdrout
5724      integer,          intent(in) :: nobsbdyout
5725      integer,          intent(in) :: nobshxout
5726      integer,          intent(in) :: nobsdimout
5727
5728      ! Locals:
5729      integer :: irealBodies,jo,nrealBodies
5730
5731      irealBodies=1
5732      do jo=1,obsdat%numHeader
5733         call obs_write_hdr(obsdat,jo,nobshdrout,irealBodies,nrealBodies)
5734         call obs_write_bdy(obsdat,jo,nobsbdyout)
5735         if (nens > 0) then
5736            call obs_write_hx(obsdat,hx,jo,nobshxout)
5737         endif
5738         irealBodies=irealBodies+nrealBodies
5739      enddo
5740      write(nobsdimout,*) obsdat%numHeader
5741      write(nobsdimout,*) irealBodies-1
5742      write(nobsdimout,*) nens
5743
5744      return
5745
5746   end subroutine obs_write
5747
5748
5749   subroutine obs_write_bdy(obsdat,kobs,kulout)
5750      !
5751      !:Purpose: write the data records associated with a
5752      !                 station in unformatted form.
5753      !
5754      !:Arguments:
5755      !           :obsdat: obsSpaceData object
5756      !           :kobs: no. of observation
5757      !           :kulout: unit used for writing 
5758      !
5759      implicit none 
5760
5761      ! Arguments:
5762      type(struct_obs), intent(in) :: obsdat
5763      integer,          intent(in) :: kobs
5764      integer,          intent(in) :: kulout
5765
5766      ! Locals:
5767      integer :: ipnt,idata,j,jdata,k
5768
5769      ipnt  = obs_headElem_i(obsdat, OBS_RLN, kobs) 
5770      idata = obs_headElem_i(obsdat, OBS_NLV, kobs)
5771
5772      ! write the data records
5773      do jdata=ipnt,ipnt+idata-1
5774         write(kulout) &
5775           (obsdat%intBodies%columns(odc_ENKF_bdy_int_column_list(k) &
5776                                    )%value_i(jdata), &
5777                 k=1,size(odc_ENKF_bdy_int_column_list(:))),&
5778           (obsdat%realBodies%columns(odc_ENKF_bdy_real_column_list(j) &
5779                                     )%value_r(jdata), &
5780                 j=1,size(odc_ENKF_bdy_real_column_list(:)))
5781      enddo
5782
5783      return
5784
5785   end subroutine obs_write_bdy
5786
5787
5788   subroutine obs_write_hdr( obsdat, kobs, kulout, irealBodies, nrealBodies )
5789      !
5790      !:Purpose: writing of the header of a station record
5791      !
5792      !:Arguments:
5793      !           :obsdat: obsSpaceData object
5794      !           :kobs: no. of observation
5795      !           :kulout: unit used for output 
5796      !           :irealBodies: location in the sorted realBodies
5797      !           :nrealBodies: number of observations for this header
5798      !
5799      implicit none
5800
5801      ! Arguments:
5802      type(struct_obs), intent(in)  :: obsdat
5803      integer         , intent(in)  :: kobs
5804      integer         , intent(in)  :: kulout
5805      integer         , intent(in)  :: irealBodies
5806      integer         , intent(out) :: nrealBodies
5807
5808      ! Locals:
5809      integer :: i,j,nprocs_mpi,ierr
5810
5811      ! (note that as a part of the writing, the body is being sorted
5812      !  so that the order of the observations in the body array 
5813      !  corresponds with the order of the headers in the header array).
5814      call rpn_comm_size("GRID",nprocs_mpi,ierr)
5815
5816      if(obsdat%mpi_local .and. nprocs_mpi>1) then
5817         call obs_abort('obs_write_hdr() is not equipped to handle the ' // &
5818                        'case, mpi_local=.true.')
5819         return
5820      end if
5821
5822      nrealBodies=obs_headElem_i(obsdat, OBS_NLV, kobs)
5823      ! write the header's content 
5824      write(kulout) irealBodies, &
5825            (obs_headElem_i &
5826                  (obsdat, &
5827                   odc_columnIndexFromActiveIndex(obsdat%intHeaders%odc_flavour,&
5828                                                  i), &
5829                   kobs &
5830                  ), &
5831             i=2,odc_numActiveColumn(obsdat%intHeaders) &
5832            ), &
5833
5834            (obs_headElem_r &
5835                 (obsdat, &
5836                  odc_columnIndexFromActiveIndex(obsdat%realHeaders%odc_flavour,&
5837                                                 j), &
5838                  kobs &
5839                 ), &
5840             j=1,odc_numActiveColumn(obsdat%realHeaders) &
5841            ), &
5842
5843            obsdat%cstnid(kobs), &
5844            obsdat%cfamily(kobs)
5845
5846      return
5847
5848   end subroutine obs_write_hdr
5849
5850
5851   subroutine obs_write_hx(obsdat,hx,kobs,kulout)
5852      !
5853      !:Purpose: write the interpolated values associated with a
5854      !                 station in unformatted form.
5855      !
5856      !:Arguments:
5857      !           :obsdat: obsSpaceData object
5858      !           :hx: interpolated values    
5859      !           :kobs: no. of station 
5860      !           :kulout: unit used for writing 
5861      !
5862      implicit none 
5863
5864      ! Arguments:
5865      type(struct_obs), intent(in) :: obsdat
5866      real(8),          intent(in) :: hx(:,:)
5867      integer,          intent(in) :: kobs
5868      integer,          intent(in) :: kulout
5869
5870      ! Locals:
5871      integer :: ipnt,idata,iens,jdata,nens
5872
5873      nens = size(hx,1)
5874
5875      ipnt  = obs_headElem_i(obsdat, OBS_RLN, kobs) 
5876      idata = obs_headElem_i(obsdat, OBS_NLV, kobs)
5877
5878      ! write the data records
5879      do jdata=ipnt,ipnt+idata-1
5880         write(kulout) (hx(iens,jdata),iens=1,nens)
5881      enddo
5882
5883      return
5884
5885   end subroutine obs_write_hx
5886
5887
5888   function obs_famExist( obsdat, family, localMPI_opt )
5889     !
5890     !:Purpose: Check if any observations of a particular family are present
5891     !          in obsdat. Returned result will be the MPI local value if
5892     !          the optional argument local_mpi is set to .true., will be
5893     !          the MPI global value otherwise.
5894     !
5895     !:Arguments:
5896     !           :obsdat: ObsSpaceData structure
5897     !           :family: Obs family
5898     !           :localMPI_opt: return MPI local result; optional, default is .false.
5899     !
5900     implicit none 
5901
5902     ! Arguments:
5903     type(struct_obs),  intent(in) :: obsdat
5904     character(len=2),  intent(in) :: family
5905     logical, optional, intent(in) :: localMPI_opt
5906     ! Result:
5907     logical                                :: obs_famExist  ! Logical indicating if 'family' is part of the list
5908
5909     ! Locals:
5910     integer :: index_header,ierr
5911     logical :: famExist,local
5912
5913     if (present(localMPI_opt)) then
5914        local = localMPI_opt
5915     else
5916        local = .false.
5917     end if
5918     
5919     famExist = .false.
5920
5921     ! check if family exists in local MPI process
5922     do index_header = 1,obs_numheader(obsdat)
5923        if (obs_getFamily(obsdat,headerIndex_opt=index_header) == family) then
5924           famExist = .true.
5925           exit
5926        end if
5927     end do
5928
5929     if (local) then
5930        ! return MPI local value
5931        obs_famExist = famExist
5932     else
5933        ! return MPI global value
5934        call rpn_comm_allreduce(famExist,obs_famExist,1,"MPI_LOGICAL","MPI_LOR","GRID",ierr)
5935     end if
5936
5937   end function obs_famExist
5938
5939end module ObsSpaceData_mod