obsFiles_mod sourceΒΆ

   1module obsFiles_mod
   2  ! MODULE obsFiles_mod (prefix='obsf' category='3. Observation input/output')
   3  !
   4  !:Purpose:  High-level module to handle reading/writing of observations that
   5  !           can be stored in one of several different formats. Currently, the
   6  !           only supported formats are:
   7  !
   8  !              1. BURP
   9  !              2. SQLITE (burp2rdb format)
  10  !              3. SQLITE (obsDB format)
  11  !
  12  use midasMpi_mod
  13  use ramDisk_mod
  14  use utilities_mod
  15  use obsSpaceData_mod
  16  use burpFiles_mod
  17  use sqliteFiles_mod
  18  use sqliteUtilities_mod
  19  use obsdbFiles_mod
  20  use obsDiagFiles_mod
  21  use bufr_mod
  22  use obsSubSpaceData_mod
  23  use obsUtil_mod
  24  use obsVariableTransforms_mod
  25  use burpRead_mod
  26  use biasCorrectionConv_mod
  27  use clibInterfaces_mod
  28  use tovsNL_mod 
  29  use ensembleObservations_mod
  30
  31  implicit none
  32  save
  33  private
  34
  35  ! public variables
  36  public :: obsf_nfiles, obsf_fileName
  37
  38  ! public procedures
  39  public :: obsf_setup, obsf_filesSplit, obsf_determineFileType, obsf_determineSplitFileType
  40  public :: obsf_readFiles, obsf_writeFiles, obsf_obsSub_read, obsf_obsSub_update
  41  public :: obsf_addCloudParametersAndEmissivity, obsf_getFileName, obsf_copyObsDirectory
  42  public :: obsf_updateMissingObsFlags, obsf_cleanObsFiles
  43  logical           :: obsFilesSplit
  44  logical           :: initialized = .false.
  45
  46  integer, parameter :: maxNumObsfiles = 150
  47  integer, parameter :: maxLengthFilename = 1060
  48  integer, parameter :: fileTypeLen = 10
  49  integer, parameter :: familyTypeLen = 2
  50  integer :: obsf_nfiles, obsf_numMpiUniqueList
  51  character(len=maxLengthFilename) :: obsf_fileName(maxNumObsfiles)
  52  character(len=familyTypeLen)     :: obsf_familyType(maxNumObsfiles)
  53  character(len=48)  :: obsFileMode
  54  character(len=maxLengthFilename) :: obsf_baseFileNameMpiUniqueList(maxNumObsfiles)
  55  character(len=familyTypeLen)     :: obsf_familyTypeMpiUniqueList(maxNumObsfiles)
  56  character(len=fileTypeLen)       :: obsf_fileTypeMpiUniqueList(maxNumObsfiles)
  57  character(len=9) :: obsf_myIdExt
  58
  59contains
  60
  61  !--------------------------------------------------------------------------
  62  ! obsf_setup
  63  !--------------------------------------------------------------------------
  64  subroutine obsf_setup(dateStamp_out, obsFileMode_in)
  65
  66    implicit none
  67
  68    ! Arguments:
  69    integer         , intent(out)           :: dateStamp_out
  70    character(len=*), intent(in)            :: obsFileMode_in
  71
  72    ! Locals:
  73    character(len=fileTypeLen)              :: obsFileType
  74
  75    obsFileMode = trim(obsFileMode_in)
  76
  77    !
  78    ! Initialize file names and the file type
  79    !
  80    call obsf_setupFileNames()
  81    call obsf_determineFileType(obsFileType)
  82
  83    !
  84    ! Determine if obsFiles are split
  85    !
  86    if ( obsFileType == 'OBSDB' .or. obsFileType == 'BURP' .or.  &
  87         obsFileType == 'SQLITE') then
  88      obsFilesSplit = .true.
  89    else
  90      call utl_abort('obsf_setup: invalid observation file type: ' // trim(obsFileType))
  91    end if
  92
  93    !
  94    ! Do some setup of observation files
  95    !
  96    if ( obsFileType == 'BURP' ) then
  97      call brpf_getDateStamp( dateStamp_out, obsf_fileName(1) )
  98    else if ( obsFileType == 'OBSDB' ) then
  99      call odbf_getDateStamp( dateStamp_out, obsf_fileName(1) )
 100    else if ( obsFileType == 'SQLITE' ) then
 101      call sqlf_getDateStamp( dateStamp_out, obsf_fileName(1) )
 102    else
 103      dateStamp_out = -1
 104    end if
 105
 106    initialized = .true.
 107
 108  end subroutine obsf_setup
 109
 110  !--------------------------------------------------------------------------
 111  ! obsf_filesSplit
 112  !--------------------------------------------------------------------------
 113  function obsf_filesSplit() result(obsFilesSplit_out)
 114    implicit none
 115
 116    ! Result:
 117    logical :: obsFilesSplit_out
 118
 119    if ( .not.initialized ) call utl_abort('obsf_filesSplit: obsFiles_mod not initialized!')
 120
 121    obsFilesSplit_out = obsFilesSplit
 122        
 123  end function obsf_filesSplit
 124
 125  !--------------------------------------------------------------------------
 126  ! obsf_readFiles
 127  !--------------------------------------------------------------------------
 128  subroutine obsf_readFiles( obsSpaceData )
 129    implicit none
 130
 131    ! Arguments:
 132    type(struct_obs), intent(inout) :: obsSpaceData
 133
 134    ! Locals:
 135    integer :: fileIndex, fileIndexMpiLocal, numHeaders, numBodies
 136    integer :: numHeaderBefore, numBodyBefore, numHeaderRead, numBodyRead
 137    character(len=fileTypeLen)       :: obsFileType
 138    character(len=maxLengthFilename) :: fileName
 139    character(len=256)               :: fileNamefull
 140    character(len=familyTypeLen)     :: obsFamilyType
 141    logical :: fileExists
 142
 143    if ( .not.initialized ) call utl_abort('obsf_readFiles: obsFiles_mod not initialized!')
 144
 145    ! for every splitted file, the file type is defined separately 
 146    do fileIndex = 1, obsf_numMpiUniqueList
 147
 148      fileName = trim(obsf_baseFileNameMpiUniqueList(fileIndex)) // '_' // trim(obsf_myIdExt)
 149      obsFamilyType = obsf_familyTypeMpiUniqueList(fileIndex)
 150      obsFileType = obsf_fileTypeMpiUniqueList(fileIndex)
 151      fileNameFull = ram_fullWorkingPath(fileName,noAbort_opt=.true.)
 152      inquire(file=trim(fileNameFull),exist=fileExists)
 153
 154      if (fileExists) then
 155        ! get fileIndex on mpi local
 156        do fileIndexMpiLocal = 1, obsf_nfiles
 157          if (trim(obsf_fileName(fileIndexMpiLocal)) == fileNamefull .and. &
 158              trim(obsf_familyType(fileIndexMpiLocal)) == obsFamilyType) then
 159            exit
 160          end if
 161        end do
 162
 163        if ( obsFileType == 'BURP' )   then
 164          ! Add extra bias correction elements to conventional and TO files
 165          ! Bias correction elements for AI are added at the derivate file stage
 166          if ( obsFamilyType == 'TO' ) then
 167            call brpr_addElementsToBurp(fileNameFull, obsFamilyType, beSilent_opt=.false.)
 168          else 
 169            if ( bcc_biasActive(obsFamilyType) ) then
 170              call brpr_addElementsToBurp(fileNameFull, obsFamilyType, beSilent_opt=.false.)
 171            end if
 172          end if
 173
 174          numHeaderBefore = obs_numHeader(obsSpaceData)
 175          numBodyBefore = obs_numBody(obsSpaceData)
 176          call brpf_readFile( obsSpaceData, fileNameFull, obsFamilyType, fileIndexMpiLocal )
 177          numHeaders = obs_numHeader(obsSpaceData)
 178          numBodies = obs_numBody(obsSpaceData)
 179          numHeaderRead = numHeaders - numHeaderBefore
 180          numBodyRead = numBodies - numBodyBefore
 181        end if
 182        if ( obsFileType == 'SQLITE' ) then
 183          call sqlf_readFile( obsSpaceData, fileNameFull, obsFamilyType, fileIndexMpiLocal )
 184        end if
 185        if ( obsFileType == 'OBSDB' ) then
 186          call odbf_readFile( obsSpaceData, fileNameFull, obsFamilyType, fileIndexMpiLocal )
 187        end if
 188      else
 189        numHeaderRead = 0
 190        numBodyRead = 0
 191      end if ! if (fileExists)
 192
 193      if (obsFileType == 'BURP') then
 194        call setHeadBodyPrimaryKeyColumns(obsSpaceData, numHeaderRead, numBodyRead)
 195      end if
 196      
 197    end do
 198
 199    ! abort if NAMTOV does not exist but there are radiance observation files
 200    if ( .not. utl_isNamelistPresent('NAMTOV','./flnml') .and. &
 201        any(obsf_familyType(:) == 'TO') ) then
 202      call utl_abort('obsf_readFiles: Namelist block NAMTOV is missing but there are radiance observation files')
 203    end if
 204
 205    ! initialize OBS_HIND for each observation
 206    call obs_sethind(obsSpaceData)
 207  
 208  end subroutine obsf_readFiles
 209
 210  !--------------------------------------------------------------------------
 211  ! obsf_writeFiles
 212  !--------------------------------------------------------------------------
 213  subroutine obsf_writeFiles( obsSpaceData, HXens_mpiglobal_opt, asciDumpObs_opt, writeDiagFiles_opt, ensObs_opt)
 214    implicit none
 215
 216    ! Arguments:
 217    type(struct_obs),           intent(inout) :: obsSpaceData
 218    real(8),          optional, intent(in)    :: HXens_mpiglobal_opt(:,:)
 219    logical,          optional, intent(in)    :: asciDumpObs_opt
 220    logical,          optional, intent(in)    :: writeDiagFiles_opt
 221    type(struct_eob), optional, intent(in)    :: ensObs_opt          
 222  
 223    ! Locals:
 224    integer           :: fileIndex, fnom, fclos, nulnam, ierr
 225    integer           :: status, baseNameIndexBeg
 226    character(len=maxLengthFilename) :: baseNameNoPrefix, baseName, fullName, fullNameWithPath, fileNameDir
 227    character(len=256):: obsDirectory
 228    character(len=10) :: obsFileType, sfFileName
 229    character(len=*), parameter :: myName = 'obsf_writeFiles'
 230    character(len=*), parameter :: myWarning = myName //' WARNING: '
 231
 232    ! Namelist variables:
 233    logical :: lwritediagsql ! choose to write 'diag' sqlite observation files
 234    logical :: onlyAssimObs  ! choose to not include unassimilated obs in 'diag' sqlite files
 235    logical :: addFSOdiag    ! choose to include FSO column in body table
 236    logical :: writeObsDb    ! write obDB file from scratch
 237
 238    namelist /namwritediag/ lwritediagsql, onlyAssimObs, addFSOdiag, writeObsDb
 239
 240    call utl_tmg_start(10,'--Observations')
 241
 242    if ( .not.initialized ) call utl_abort('obsf_writeFiles: obsFiles_mod not initialized!')
 243 
 244    call obsf_determineFileType(obsFileType)
 245
 246    nulnam=0
 247    lwritediagsql = .false.
 248    onlyAssimObs = .false.
 249    addFSOdiag = .false.
 250    writeObsDb = .false.
 251    ierr=fnom(nulnam,'./flnml','FTN+SEQ+R/O',0)
 252    read(nulnam,nml=namwritediag,iostat=ierr)
 253    if (ierr /= 0) write(*,*) myWarning//' namwritediag is missing in the namelist. The default value will be taken.'
 254    if (mmpi_myid == 0) write(*,nml = namwritediag)
 255    if (present(writeDiagFiles_opt)) then
 256      lwritediagsql = lwritediagsql .and. writeDiagFiles_opt
 257    end if
 258    ierr=fclos(nulnam)
 259
 260    if ( obsFileType == 'BURP' .or. obsFileType == 'OBSDB' .or. &
 261         obsFileType == 'SQLITE' ) then
 262
 263      if (trim(obsFileMode) == 'analysis') call ovt_transformResiduals(obsSpaceData, obs_oma)
 264      if (trim(obsFileMode) /= 'prepcma' .and. trim(obsFileMode) /= 'thinning') then
 265        call ovt_transformResiduals(obsSpaceData, obs_omp)
 266      end if
 267      if (trim(obsFileMode) == 'analysis' .or. trim(obsFileMode) == 'FSO') call obsu_setassflg(obsSpaceData)
 268      if (trim(obsFileMode) /= 'prepcma' .and. trim(obsFileMode) /= 'thinning') then
 269        call obsu_updateSourceVariablesFlag(obsSpaceData)
 270      end if
 271      if (trim(obsFileMode) /= 'prepcma') call ovt_transformResiduals(obsSpaceData, obs_omp)
 272      if (trim(obsFileMode) /= 'prepcma') call obsu_updateSourceVariablesFlag(obsSpaceData)
 273
 274      do fileIndex = 1, obsf_nfiles
 275        call obsf_determineSplitFileType( obsFileType, obsf_fileName(fileIndex) )
 276
 277        if ( obsFileType == 'BURP'   ) then
 278          call brpf_updateFile( obsSpaceData, obsf_fileName(fileIndex), obsf_familyType(fileIndex), &
 279                                fileIndex )
 280        else if ( obsFileType == 'OBSDB' ) then
 281          call odbf_updateFile( obsSpaceData, obsf_fileName(fileIndex), &
 282                                obsf_familyType(fileIndex), fileIndex )
 283        else if ( obsFileType == 'SQLITE' ) then
 284          call sqlf_updateFile( obsSpaceData, obsf_fileName(fileIndex), &
 285                                obsf_familyType(fileIndex), fileIndex )
 286        end if
 287      end do
 288
 289      if ( present(HXens_mpiglobal_opt) .and. mmpi_myid == 0 ) then
 290        call obsf_writeHX(obsSpaceData, HXens_mpiglobal_opt)
 291      end if
 292
 293      ! write obsDB files in the same directory with odb prefix
 294      if (writeObsDb) then
 295        fileNameDir = trim(ram_getRamDiskDir())
 296        if (fileNameDir == ' ') then
 297          write(*,*) 'obsf_writeFiles: WARNING! Writing obsDB to current working path ' // &
 298                     'instead of ramdisk'
 299        end if
 300
 301        if (obsf_filesSplit()) call rpn_comm_barrier('GRID',status)
 302
 303        ! update obsDB files
 304        do fileIndex = 1, obsf_nfiles
 305          fullName = trim(obsf_fileName(fileIndex))
 306          baseNameIndexBeg = index(fullName,'/',back=.true.)
 307
 308          obsDirectory = 'obs'
 309
 310          ! change the prefix to odb
 311          baseNameNoPrefix = fullName(baseNameIndexBeg+1+3:)
 312          baseName = 'odb' // trim(baseNameNoPrefix)
 313
 314          fullNameWithPath = trim(fileNameDir) // trim(obsDirectory) // '/'//trim(baseName)
 315          
 316          call odbf_updateFile(obsSpaceData, fullNameWithPath, &
 317                               obsf_familyType(fileIndex), fileIndex)
 318        end do        
 319      end if
 320
 321    end if
 322
 323    if ( index(obsf_getFileName('SF'), 'sfc') > 0 ) then
 324      sfFileName = 'sfc'
 325    else
 326      sfFileName = 'sf'
 327    end if
 328
 329    if (lwritediagsql) then 
 330      call diaf_writeAllSqlDiagFiles(obsSpaceData, sfFileName, onlyAssimObs, &
 331                                     addFSOdiag, ensObs_opt=ensObs_opt)
 332    end if
 333
 334    if ( present(asciDumpObs_opt) ) then
 335      if ( asciDumpObs_opt ) then
 336        if ( obsFileType == 'BURP' .or. obsFileType == 'OBSDB' .or.  &
 337             obsFileType == 'SQLITE' .or. mmpi_myid == 0   ) then
 338          ! all processors write to files only for BURP, OBSDB and SQLITE    
 339          call obsf_writeAsciDump(obsSpaceData)
 340        end if
 341      end if
 342    end if
 343
 344    call utl_tmg_stop(10)
 345
 346  end subroutine obsf_writeFiles
 347
 348  !--------------------------------------------------------------------------
 349  ! obsf_cleanObsFiles
 350  !--------------------------------------------------------------------------
 351  subroutine obsf_cleanObsFiles()
 352    implicit none
 353
 354    ! Locals:
 355    integer                    :: fileIndex
 356    character(len=fileTypeLen) :: obsFileType
 357
 358    if ( .not.initialized ) call utl_abort('obsf_cleanObsFiles: obsFiles_mod not initialized!')
 359   
 360    call obsf_determineFileType(obsFileType)
 361
 362    if ( obsFileType /= 'BURP' .and. obsFileType /= 'OBSDB' .and.  &
 363         obsFileType /= 'SQLITE' ) then
 364      write(*,*) 'obsf_cleanObsFiles: obsFileType=', obsFileType, &
 365                ' is not BURP, OBSDB nor SQLITE. Return.' 
 366      return
 367    end if
 368    
 369    call utl_tmg_start(23, '----ObsFileClean')
 370
 371    do fileIndex = 1, obsf_nfiles
 372      call obsf_determineSplitFileType( obsFileType, obsf_fileName(fileIndex) )
 373
 374      if ( obsFileType == 'BURP' ) then
 375        call brpr_burpClean( obsf_fileName(fileIndex), obsf_familyType(fileIndex) )
 376      else if ( obsFileType == 'OBSDB' ) then
 377        call obdf_clean( obsf_fileName(fileIndex), obsf_familyType(fileIndex) )
 378      else if ( obsFileType == 'SQLITE' ) then
 379        call sqlf_cleanFile( obsf_fileName(fileIndex), obsf_familyType(fileIndex) )
 380      end if
 381    end do
 382
 383    call utl_tmg_stop(23)
 384
 385  end subroutine obsf_cleanObsFiles 
 386
 387  !--------------------------------------------------------------------------
 388  ! obsf_writeHX
 389  !--------------------------------------------------------------------------
 390  subroutine obsf_writeHX(obsSpaceData, HXens_mpiglobal)
 391    implicit none
 392
 393    ! Arguments:
 394    type(struct_obs), intent(in) :: obsSpaceData
 395    real(8),          intent(in) :: HXens_mpiglobal(:,:)
 396
 397    ! Locals:
 398    integer :: unitHX, ierr, headerIndex, fnom, fclos
 399    character(len=10) :: fileNameHX
 400
 401    write(*,*) 'obsf_writeHX: Starting'
 402
 403    fileNameHX     = 'cmahxout'
 404    unitHX = 0
 405    ierr = fnom(unitHX, fileNameHX, 'FTN+SEQ+UNF+R/W', 0)
 406
 407    do headerIndex = 1, obs_numHeader(obsSpaceData)
 408
 409      call obs_write_hx(obsSpaceData, HXens_mpiglobal, headerIndex, unitHX)
 410
 411    enddo
 412 
 413    ierr = fclos(unitHX)
 414
 415  end subroutine obsf_writeHX
 416
 417  !--------------------------------------------------------------------------
 418  ! obsf_writeAsciDump
 419  !--------------------------------------------------------------------------
 420  subroutine obsf_writeAsciDump(obsSpaceData)
 421    implicit none
 422
 423    ! Arguments:
 424    type(struct_obs), intent(in) :: obsSpaceData
 425
 426    ! Locals:
 427    character(len=25) :: fileNameAsciDump
 428    integer :: unitAsciDump, ierr, fnom, fclos
 429    character(len=4)    :: myIdxStr, myIdyStr
 430    character(len=9)    :: myIdStr
 431
 432    write(*,*) 'obsf_writeAsciDump: Starting'
 433
 434    ! determine the file name depending on if obs data is mpi local or global
 435    if ( obs_mpiLocal(obsSpaceData) ) then
 436      ! separate file per mpi task
 437      write(myIdyStr,'(I4.4)') (mmpi_myidy + 1)
 438      write(myIdxStr,'(I4.4)') (mmpi_myidx + 1)
 439      myIdStr  = trim(myIdxStr) // '_' // trim(myIdyStr)
 440      fileNameAsciDump = 'obsout_asci_' // trim(myIdStr)
 441    else
 442      ! only task 0 write the global data
 443      if ( mmpi_myid > 0 ) return
 444      fileNameAsciDump = 'obsout_asci'
 445    end if
 446
 447    write(*,*) 'obsf_writeAsciDump: writing to file : ', fileNameAsciDump
 448    unitAsciDump = 0
 449    ierr = fnom(unitAsciDump, fileNameAsciDump, 'FTN+SEQ+FMT+R/W', 0)
 450    call obs_print(obsSpaceData,unitAsciDump)
 451    ierr = fclos(unitAsciDump)
 452
 453  end subroutine obsf_writeAsciDump
 454
 455  !--------------------------------------------------------------------------
 456  ! obsf_setupFileNames
 457  !--------------------------------------------------------------------------
 458  subroutine obsf_setupFileNames()
 459    implicit none
 460
 461    ! Locals:
 462    character(len=20) :: namePrefix(maxNumObsfiles)
 463    character(len=2) :: familyName(maxNumObsfiles)
 464    character(len=4) :: myIdxStr, myIdyStr
 465    character(len=256):: obsDirectory
 466    character(len=maxLengthFilename) :: fileName, baseFileName  ! the length should be more than 
 467                                                                ! len(obsDirectory)+1+len(namePrefix)+1+len(obsf_myIdExt)
 468    character(len=maxLengthFilename) :: baseFileNameList(maxNumObsfiles)
 469    character(len=fileTypeLen)       :: fileTypeList(maxNumObsfiles)
 470    character(len=fileTypeLen)       :: obsFileType
 471    character(len=256)               :: fileNamefull
 472    logical :: fileExists
 473    integer :: fileIndex
 474
 475    write(myIdyStr,'(I4.4)') (mmpi_myidy + 1)
 476    write(myIdxStr,'(I4.4)') (mmpi_myidx + 1)
 477    obsf_myIdExt  = trim(myIdxStr) // '_' // trim(myIdyStr)
 478
 479    namePrefix(:)=''
 480    ! file names only for burp
 481    namePrefix( 1) = 'brpuan'
 482    namePrefix( 2) = 'brpuas'
 483    namePrefix( 3) = 'brpai'
 484    namePrefix( 4) = 'brpain'
 485    namePrefix( 5) = 'brpais'
 486    namePrefix( 6) = 'brpaie'
 487    namePrefix( 7) = 'brpaiw'
 488    namePrefix( 8) = 'brpsfc'
 489    namePrefix( 9) = 'brpsf'
 490    namePrefix(10) = 'brptov'
 491    namePrefix(11) = 'brpssmis'
 492    namePrefix(12) = 'brpairs'
 493    namePrefix(13) = 'brpto_amsua'
 494    namePrefix(14) = 'brpto_amsua_allsky'
 495    namePrefix(15) = 'brpto_amsub'
 496    namePrefix(16) = 'brpto_amsub_allsky'
 497    namePrefix(17) = 'brpcsr'
 498    namePrefix(18) = 'brpiasi'
 499    namePrefix(19) = 'brpatms'
 500    namePrefix(20) = 'brpatms_allsky'
 501    namePrefix(21) = 'brpcris'
 502    namePrefix(22) = 'brpcrisfsr'
 503    namePrefix(23) = 'brpcrisfsr1'
 504    namePrefix(24) = 'brpcrisfsr2'
 505    namePrefix(25) = 'brpcrisfsr3'
 506    namePrefix(26) = 'brpcrisfsr4'
 507    namePrefix(27) = 'brpsw'
 508    namePrefix(28) = 'brpswgoes9'
 509    namePrefix(29) = 'brpswgoese'
 510    namePrefix(30) = 'brpswgoesw'
 511    namePrefix(31) = 'brpswmodis'
 512    namePrefix(32) = 'brpswmtsate'
 513    namePrefix(33) = 'brpswmtsatw'
 514    namePrefix(34) = 'brpgo'
 515    namePrefix(35) = 'brpsc'
 516    namePrefix(36) = 'brppr'
 517    namePrefix(37) = 'brpro'
 518    namePrefix(38) = 'brphum'
 519    namePrefix(39) = 'brpsat'
 520    namePrefix(40) = 'brpssm'
 521    namePrefix(41) = 'brpgp'
 522    namePrefix(42) = 'brpch'
 523    namePrefix(43) = 'brpua'
 524    ! new general file names for burp and sqlite
 525    namePrefix(44) = 'obsua'
 526    namePrefix(45) = 'obsuan'
 527    namePrefix(46) = 'obsuas'
 528    namePrefix(47) = 'obsai'
 529    namePrefix(48) = 'obsain'
 530    namePrefix(49) = 'obsais'
 531    namePrefix(50) = 'obsaie'
 532    namePrefix(51) = 'obsaiw'
 533    namePrefix(52) = 'obssfc'
 534    namePrefix(53) = 'obssf'
 535    namePrefix(54) = 'obstov'
 536    namePrefix(55) = 'obsssmis'
 537    namePrefix(56) = 'obsairs'
 538    namePrefix(57) = 'obsto_amsua'
 539    namePrefix(58) = 'obsto_amsua_allsky'
 540    namePrefix(59) = 'obsto_amsub'
 541    namePrefix(60) = 'obsto_amsub_allsky'
 542    namePrefix(61) = 'obscsr'
 543    namePrefix(62) = 'obsiasi'
 544    namePrefix(63) = 'obsatms'
 545    namePrefix(64) = 'obsatms_allsky'
 546    namePrefix(65) = 'obscris'
 547    namePrefix(66) = 'obscrisfsr'
 548    namePrefix(67) = 'obscrisfsr1'
 549    namePrefix(68) = 'obscrisfsr2'
 550    namePrefix(69) = 'obscrisfsr3'
 551    namePrefix(70) = 'obscrisfsr4'
 552    namePrefix(71) = 'obssw'
 553    namePrefix(72) = 'obsswgoes9'
 554    namePrefix(73) = 'obsswgoese'
 555    namePrefix(74) = 'obsswgoesw'
 556    namePrefix(75) = 'obsswmodis'
 557    namePrefix(76) = 'obsswmtsate'
 558    namePrefix(77) = 'obsswmtsatw'
 559    namePrefix(78) = 'obsgo'
 560    namePrefix(79) = 'obssc'
 561    namePrefix(80) = 'obspr'
 562    namePrefix(81) = 'obsro'
 563    namePrefix(82) = 'obshum'
 564    namePrefix(83) = 'obssat'
 565    namePrefix(84) = 'obsssm'
 566    namePrefix(85) = 'obsgp'
 567    namePrefix(86) = 'obsch'
 568    namePrefix(87) = 'obsgl_ssmi'
 569    namePrefix(88) = 'obsgl_ssmis'
 570    namePrefix(89) = 'obsgl_amsr2'
 571    namePrefix(90) = 'obsgl_ascat'
 572    namePrefix(91) = 'obsgl_avhrr'
 573    namePrefix(92) = 'obsgl_cisA'
 574    namePrefix(93) = 'obsgl_cisI'
 575    namePrefix(94) = 'obsgl_cisL'
 576    namePrefix(95) = 'obsgl_cisR'
 577    namePrefix(96) = 'cmaheader' ! file name for CMA format used by EnKF
 578    namePrefix(97) = 'brpsst'
 579    namePrefix(98) = 'obsal'
 580    namePrefix(99) = 'obsradar'
 581    namePrefix(100)= 'obssst_insitu'
 582    namePrefix(101)= 'obshydro'
 583    namePrefix(102)= 'obsmwhs2'
 584    namePrefix(103)= 'brpmwhs2'
 585    namePrefix(104)= 'obssarwinds'
 586    namePrefix(105)= 'obssst_avhrr'
 587    namePrefix(106)= 'obssst_amsr2'
 588    namePrefix(107)= 'obssst_viirs'
 589    namePrefix(108)= 'obssst_pseudo'
 590    namePrefix(109)= 'obssst'
 591    namePrefix(110)= 'obsgl_rcm'
 592
 593    familyName(:)   = ''
 594    familyName( 1)  = 'UA'
 595    familyName( 2)  = 'UA'
 596    familyName( 3)  = 'AI'
 597    familyName( 4)  = 'AI'
 598    familyName( 5)  = 'AI'
 599    familyName( 6)  = 'AI'
 600    familyName( 7)  = 'AI'
 601    familyName( 8)  = 'SF'
 602    familyName( 9)  = 'SF'
 603    familyName(10)  = 'TO'
 604    familyName(11)  = 'TO'
 605    familyName(12)  = 'TO'
 606    familyName(13)  = 'TO'
 607    familyName(14)  = 'TO'
 608    familyName(15)  = 'TO'
 609    familyName(16)  = 'TO'
 610    familyName(17)  = 'TO'
 611    familyName(18)  = 'TO'
 612    familyName(19)  = 'TO'
 613    familyName(20)  = 'TO'
 614    familyName(21)  = 'TO'
 615    familyName(22)  = 'TO'
 616    familyName(23)  = 'TO'
 617    familyName(24)  = 'TO'
 618    familyName(25)  = 'TO'
 619    familyName(26)  = 'TO'
 620    familyName(27)  = 'SW'
 621    familyName(28)  = 'SW'
 622    familyName(29)  = 'SW'
 623    familyName(30)  = 'SW'
 624    familyName(31)  = 'SW'
 625    familyName(32)  = 'SW'
 626    familyName(33)  = 'SW'
 627    familyName(34)  = 'GO'
 628    familyName(35)  = 'SC'
 629    familyName(36)  = 'PR'
 630    familyName(37)  = 'RO'
 631    familyName(38)  = 'HU'
 632    familyName(39)  = 'ST'
 633    familyName(40)  = 'MI'
 634    familyName(41)  = 'GP'
 635    familyName(42)  = 'CH'
 636    familyName(43)  = 'UA'
 637    familyName(44)  = 'UA'
 638    familyName(45)  = 'UA'
 639    familyName(46)  = 'UA'
 640    familyName(47)  = 'AI'
 641    familyName(48)  = 'AI'
 642    familyName(49)  = 'AI'
 643    familyName(50)  = 'AI'
 644    familyName(51)  = 'AI'
 645    familyName(52)  = 'SF'
 646    familyName(53)  = 'SF'
 647    familyName(54)  = 'TO'
 648    familyName(55)  = 'TO'
 649    familyName(56)  = 'TO'
 650    familyName(57)  = 'TO'
 651    familyName(58)  = 'TO'
 652    familyName(59)  = 'TO'
 653    familyName(60)  = 'TO'
 654    familyName(61)  = 'TO'
 655    familyName(62)  = 'TO'
 656    familyName(63)  = 'TO'
 657    familyName(64)  = 'TO'
 658    familyName(65)  = 'TO'
 659    familyName(66)  = 'TO'
 660    familyName(67)  = 'TO'
 661    familyName(68)  = 'TO'
 662    familyName(69)  = 'TO'
 663    familyName(70)  = 'TO'
 664    familyName(71)  = 'SW'
 665    familyName(72)  = 'SW'
 666    familyName(73)  = 'SW'
 667    familyName(74)  = 'SW'
 668    familyName(75)  = 'SW'
 669    familyName(76)  = 'SW'
 670    familyName(77)  = 'SW'
 671    familyName(78)  = 'GO'
 672    familyName(79)  = 'SC'
 673    familyName(80)  = 'PR'
 674    familyName(81)  = 'RO'
 675    familyName(82)  = 'HU'
 676    familyName(83)  = 'ST'
 677    familyName(84)  = 'MI'
 678    familyName(85)  = 'GP'
 679    familyName(86)  = 'CH'
 680    familyName(87)  = 'GL'
 681    familyName(88)  = 'GL'
 682    familyName(89)  = 'GL'
 683    familyName(90)  = 'GL'
 684    familyName(91)  = 'GL'
 685    familyName(92)  = 'GL'
 686    familyName(93)  = 'GL'
 687    familyName(94)  = 'GL'
 688    familyName(95)  = 'GL'
 689    familyName(96)  = 'XX' ! dummy family type for CMA, since it contains all families
 690    familyName(97)  = 'TM'
 691    familyName(98)  = 'AL'
 692    familyName(99)  = 'RA'
 693    familyName(100) = 'TM'
 694    familyName(101) = 'HY'
 695    familyName(102) = 'TO'
 696    familyName(103) = 'TO'
 697    familyName(104) = 'SF'
 698    familyName(105) = 'TM'
 699    familyName(106) = 'TM'
 700    familyName(107) = 'TM'
 701    familyName(108) = 'TM'
 702    familyName(109) = 'TM'
 703    familyName(110) = 'GL'
 704
 705    obsDirectory = 'obs'
 706
 707    obsf_nfiles = 0
 708    obsf_fileName(1) = 'DUMMY_FILE_NAME'
 709    baseFileNameList(:) = ''
 710    fileTypeList(:) = ''
 711
 712    do fileIndex = 1, maxNumObsfiles 
 713
 714      if(namePrefix(fileIndex) == '') exit
 715
 716      baseFileName = trim(obsDirectory) // '/' // trim(namePrefix(fileIndex))
 717      fileName = trim(baseFileName) // '_' // trim(obsf_myIdExt)
 718      fileNameFull = ram_fullWorkingPath(fileName,noAbort_opt=.true.)
 719      inquire(file=trim(fileNameFull),exist=fileExists)
 720
 721      if (.not. fileExists ) then
 722        fileName=trim(baseFileName)
 723        fileNameFull = ram_fullWorkingPath(fileName, noAbort_opt=.true.)
 724        inquire(file=trim(fileNameFull), exist=fileExists)
 725      end if
 726
 727      if ( fileExists ) then
 728        obsf_nfiles=obsf_nfiles + 1
 729        baseFileNameList(obsf_nfiles) = trim(baseFileName)
 730        obsf_fileName(obsf_nfiles) = fileNameFull
 731        obsf_familyType(obsf_nfiles) = familyName(fileIndex)
 732        
 733        call obsf_determineSplitFileType(obsFileType,fileNameFull)
 734        fileTypeList(obsf_nfiles) = obsFileType
 735      end if
 736
 737    end do
 738
 739    call setObsFilesMpiUniqueList(baseFileNameList,fileTypeList)
 740    
 741    write(*,*) ' '
 742    write(*,*)'obsf_setupFileNames: Number of observation files is :', obsf_nfiles
 743    write(*,*)'Type  Name '
 744    write(*,*)'----  ---- '
 745    do fileIndex = 1, obsf_nfiles
 746      write(*,'(1X,A2,1X,A60)' ) obsf_familyType(fileIndex), trim(obsf_fileName(fileIndex))
 747    end do
 748
 749  end subroutine obsf_setupFileNames
 750
 751  !--------------------------------------------------------------------------
 752  ! setObsFilesMpiUniqueList
 753  !--------------------------------------------------------------------------
 754  subroutine setObsFilesMpiUniqueList(baseFileNameList,fileTypeList)
 755    !
 756    !:Purpose: Create a unique list of obs filenames/familyTypes across all mpi tasks.
 757    !
 758    implicit none
 759
 760    ! Arguments:
 761    character(len=*), intent(in) :: baseFileNameList(:)
 762    character(len=*), intent(in) :: fileTypeList(:)
 763
 764    ! Locals:
 765    integer :: fileIndex, fileIndex2, procIndex, ierr
 766    character(len=maxLengthFilename), allocatable :: baseFileNameListAllMpi(:,:)
 767    character(len=familyTypeLen), allocatable :: familyTypeListAllMpi(:,:)
 768    character(len=fileTypeLen), allocatable :: fileTypeListAllMpi(:,:)
 769
 770    ! Communicate filenames across all mpi tasks
 771    allocate(baseFileNameListAllMpi(maxNumObsfiles,mmpi_nprocs))
 772    baseFileNameListAllMpi(:,:) = ''
 773    call mmpi_allgather_string(baseFileNameList, baseFileNameListAllMpi, &
 774                               maxNumObsfiles, maxLengthFilename, mmpi_nprocs, &
 775                               "GRID", ierr)
 776
 777    ! Communicate familyTypes across all mpi tasks
 778    allocate(familyTypeListAllMpi(maxNumObsfiles,mmpi_nprocs))
 779    familyTypeListAllMpi(:,:) = ''
 780    call mmpi_allgather_string(obsf_familyType, familyTypeListAllMpi, &
 781                               maxNumObsfiles, familyTypeLen, mmpi_nprocs, &
 782                               "GRID", ierr)
 783
 784    ! Communicate fileTypes across all mpi tasks
 785    allocate(fileTypeListAllMpi(maxNumObsfiles,mmpi_nprocs))
 786    fileTypeListAllMpi(:,:) = ''
 787    call mmpi_allgather_string(fileTypeList, fileTypeListAllMpi, &
 788                               maxNumObsfiles, fileTypeLen, mmpi_nprocs, &
 789                               "GRID", ierr)
 790
 791    ! Create a unique list of obs filenames/familytype across all mpi tasks without duplicates
 792    obsf_baseFileNameMpiUniqueList(:) = ''
 793    obsf_familyTypeMpiUniqueList(:) = ''
 794    obsf_fileTypeMpiUniqueList(:) = ''
 795    obsf_numMpiUniqueList = 1
 796    obsf_baseFileNameMpiUniqueList(obsf_numMpiUniqueList) = baseFileNameListAllMpi(1,1)
 797    obsf_familyTypeMpiUniqueList(obsf_numMpiUniqueList) = familyTypeListAllMpi(1,1)
 798    obsf_fileTypeMpiUniqueList(obsf_numMpiUniqueList) = fileTypeListAllMpi(1,1)
 799    do procIndex = 1, mmpi_nprocs
 800      loopFilename: do fileIndex = 1, maxNumObsfiles 
 801        if (trim((baseFileNameListAllMpi(fileIndex,procIndex))) == '') cycle loopFilename
 802
 803        ! cycle if filename already exists in the unique list
 804        do fileIndex2 = 1, obsf_numMpiUniqueList
 805          if (trim(obsf_baseFileNameMpiUniqueList(fileIndex2)) == &
 806              trim(baseFileNameListAllMpi(fileIndex,procIndex))) then
 807            cycle loopFilename
 808          end if
 809        end do
 810        
 811        ! add the filename to the unique list
 812        obsf_numMpiUniqueList = obsf_numMpiUniqueList + 1
 813        obsf_baseFileNameMpiUniqueList(obsf_numMpiUniqueList) = baseFileNameListAllMpi(fileIndex,procIndex)
 814        obsf_familyTypeMpiUniqueList(obsf_numMpiUniqueList) = familyTypeListAllMpi(fileIndex,procIndex)
 815        obsf_fileTypeMpiUniqueList(obsf_numMpiUniqueList) = fileTypeListAllMpi(fileIndex,procIndex)
 816
 817      end do loopFilename
 818    end do
 819
 820    write(*,*) 'setObsFilesMpiUniqueList: obsf_numMpiUniqueList=', obsf_numMpiUniqueList
 821    write(*,*) 'setObsFilesMpiUniqueList: familyType/fileType/filename in unique list:'
 822    write(*,'(1X,A10,1X,A10,1X,A60)') 'familyType', 'fileType', 'fileName'
 823    write(*,'(1X,A10,1X,A10,1X,A60)') '----------', '--------', '--------'
 824    do fileIndex = 1, obsf_numMpiUniqueList
 825      write(*,'(1X,A10,1X,A10,1X,A60)' ) trim(obsf_familyTypeMpiUniqueList(fileIndex)), &
 826                                        trim(obsf_fileTypeMpiUniqueList(fileIndex)), &
 827                                        trim(obsf_baseFileNameMpiUniqueList(fileIndex))
 828    end do
 829
 830  end subroutine setObsFilesMpiUniqueList
 831    
 832  !--------------------------------------------------------------------------
 833  ! obsf_determineFileType
 834  !--------------------------------------------------------------------------
 835  subroutine obsf_determineFileType( obsFileType )
 836    implicit none
 837
 838    ! Arguments:
 839    character(len=*), intent(out) :: obsFileType
 840
 841    ! Locals:
 842    integer :: ierr, procID, all_nfiles(0:(mmpi_nprocs-1))
 843    logical :: fileExists
 844
 845    call rpn_comm_allgather( obsf_nfiles, 1, 'MPI_INTEGER', &
 846                             all_nfiles,  1, 'MPI_INTEGER', 'GRID', ierr )
 847    fileExists = .false.
 848    procid_loop: do procID = 0, (mmpi_nprocs-1)
 849      if ( all_nfiles(procID) > 0 ) then
 850        fileExists = .true.
 851        exit procid_loop
 852      end if
 853    end do procid_loop
 854
 855    if ( .not.fileExists ) then
 856      call utl_abort('obsf_determineFileType: No observation files found')
 857    end if
 858
 859    write(*,*) 'obsf_determineFileType: read obs file that exists on mpi task id: ', procID
 860
 861    if ( mmpi_myid == procID ) call obsf_determineSplitFileType( obsFileType, obsf_fileName(1) )
 862
 863    call rpn_comm_bcastc(obsFileType , len(obsFileType), 'MPI_CHARACTER', procID, 'GRID', ierr)
 864    write(*,*) 'obsf_determineFileType: obsFileType = ', obsFileType
 865
 866  end subroutine obsf_determineFileType
 867
 868  !--------------------------------------------------------------------------
 869  ! obsf_determineSplitFileType
 870  !--------------------------------------------------------------------------
 871  subroutine obsf_determineSplitFileType( obsFileType, fileName )
 872    implicit none
 873
 874    ! Arguments:
 875    character(len=*), intent(out) :: obsFileType
 876    character(len=*), intent(in)  :: fileName
 877
 878    ! Locals:
 879    integer           :: ierr, unitFile
 880    integer           :: fnom, fclos, wkoffit
 881    character(len=20) :: fileStart
 882    character(len=*), parameter :: obsDbTableName = 'Report'
 883
 884    write(*,*) 'obsf_determineSplitFileType: read obs file: ', trim(fileName)
 885   
 886    ierr = wkoffit(trim(fileName))
 887
 888    if (ierr.eq.6) then
 889      obsFiletype = 'BURP'
 890    else
 891
 892      unitFile = 0
 893      ierr = fnom(unitFile, fileName, 'FTN+SEQ+FMT+R/O', 0)
 894      read(unitFile,'(A)') fileStart
 895      ierr = fclos(unitFile)
 896
 897      if ( index( fileStart, 'SQLite format 3' ) > 0 ) then
 898        if (sqlu_sqlTableExists(fileName, obsDbTableName)) then
 899          obsFileType = 'OBSDB'
 900        else
 901          obsFileType = 'SQLITE'
 902        end if
 903      else
 904        call utl_abort('obsf_determineSplitFileType: unknown obs file type')
 905      end if
 906
 907    end if
 908
 909    write(*,*) 'obsf_determineSplitFileType: obsFileType = ', obsFileType
 910
 911  end subroutine obsf_determineSplitFileType
 912
 913  !--------------------------------------------------------------------------
 914  ! obsf_getFileName
 915  !--------------------------------------------------------------------------
 916  function obsf_getFileName(obsfam,fileFound_opt) result(filename)
 917    !
 918    !:Purpose: Returns the observations file name assigned to the calling processor.
 919    !           If the input family has more than one file, the first file found will
 920    !           be returned.
 921    !
 922    !:Arguments:
 923    !           :obsfam: observation family name
 924    !           :found_opt: logical indicating if a file could be found for the family (optional)
 925    !
 926    implicit none
 927
 928    ! Arguments:
 929    character(len=2),  intent(in)  :: obsfam
 930    logical, optional, intent(out) :: fileFound_opt
 931    ! Result:
 932    character(len=maxLengthFilename) :: filename ! file name of associated observations file
 933
 934    ! Locals:
 935    integer :: numFound, ifile
 936
 937    filename = ""
 938    numFound = 0
 939   
 940    do ifile=1,obsf_nfiles
 941       if (obsfam == obsf_familyType(ifile)) then
 942          filename = obsf_fileName(ifile)
 943          numFound = numFound + 1
 944          exit
 945       end if
 946    end do
 947
 948    if (numFound == 0) then
 949      write(*,*) "obsf_getFileName: WARNING: File not found for obs family '" // trim(obsfam) // "'"
 950    end if
 951    if (numFound > 1) then
 952      write(*,*) "obsf_getFileName: WARNING: Multiple files found for obs family '" // trim(obsfam) // "'"
 953    end if
 954
 955    if (present(fileFound_opt)) fileFound_opt = (numFound > 0)
 956
 957  end function obsf_getFileName
 958
 959
 960  function obsf_obsSub_read( obsfam, stnid, varno, nlev, ndim, numColumns_opt, &
 961                             bkstp_opt, block_opt, match_nlev_opt, codtyp_opt ) result(obsdata)
 962    !
 963    !:Purpose: Retrieves information for observations from observation files and returns the data
 964    !           in a struct_oss_obsdata object. Data will be retrieved for all nodes that have valid
 965    !           filenames for the specied observational family and combined into one struct_oss_obsdata
 966    !           if the observational files are split.
 967    !
 968    !:Arguments:
 969    !           :obsfam:          observation family name
 970    !           :stnid:           station ID of observation
 971    !           :varno:           BUFR code (if <=0, to search through all codes to obtain first
 972    !                             between 10000 and 16000)
 973    !           :nlev:            number of levels in the observation
 974    !           :ndim:            number of dimensions for the retrieved data in
 975    !                             each report (e.g. ndim=1 for std, ndim=2 for
 976    !                             averaging kernel matrices) 
 977    !           :numColumns_opt:  Number of columns (if different from nlev and for ndim=2)
 978    !           :bkstp_opt:       bkstp number of requested block if BURP file type (optional)
 979    !           :block_opt:       block type of requested block if BURP file type (optional)
 980    !                             Valid values are 'DATA', 'INFO', '3-D', and 'MRQR', indicated
 981    !                             by the two rightmost bits of bknat.
 982    !           :match_nlev_opt:  determines if the report matching criteria includes checking
 983    !                             if the report number of levels is the same as the input
 984    !                             argument nlev (optional)
 985    !           :codtyp_opt:      optional CODTYP list for search (optional)
 986    !
 987    implicit none
 988
 989    ! Arguments:
 990    character(len=*)          , intent(in) :: obsfam
 991    character(len=*)          , intent(in) :: stnid
 992    integer                   , intent(in) :: varno
 993    integer                   , intent(in) :: nlev
 994    integer                   , intent(in) :: ndim
 995    integer,          optional, intent(in) :: numColumns_opt ! Number of columns (if different from nlev and for ndim=2)
 996    integer,          optional, intent(in) :: bkstp_opt
 997    integer,          optional, intent(in) :: codtyp_opt(:)
 998    logical,          optional, intent(in) :: match_nlev_opt
 999    character(len=4), optional, intent(in) :: block_opt
1000    ! Result:
1001    type(struct_oss_obsdata)               :: obsdata ! struct_oss_obsdata object
1002
1003    ! Locals:
1004    character(len=maxLengthFilename) :: filename
1005    logical                          :: fileFound
1006    character(len=fileTypeLen)       :: obsFileType
1007
1008    filename = obsf_getFileName(obsfam,fileFound)
1009
1010    if (fileFound) then
1011       call obsf_determineSplitFileType( obsFileType, filename )
1012       if (obsFileType=='BURP') then
1013          if (.not.present(block_opt)) &
1014               call utl_abort("obsf_obsSub_read: optional variable 'block_opt' is required for BURP observational files.")
1015          if (.not.present(numColumns_opt)) then
1016            obsdata = brpf_obsSub_read(filename,stnid,varno,nlev,ndim,block_opt,bkstp_opt=bkstp_opt, &
1017                                   match_nlev_opt=match_nlev_opt,codtyp_opt=codtyp_opt)
1018          else
1019            obsdata = brpf_obsSub_read(filename,stnid,varno,nlev,ndim,block_opt,bkstp_opt=bkstp_opt, &
1020                                       match_nlev_opt=match_nlev_opt,codtyp_opt=codtyp_opt,numColumns_opt=numColumns_opt)
1021          end if 
1022       else
1023          call utl_abort("obsf_obsSub_read: Only BURP observational files currently supported.")
1024       end if
1025
1026    else
1027
1028       write(*,*) "obsf_obsSub_read: No observational files found for family '" // trim(obsfam) // "' for this node."
1029
1030       if (obsf_filesSplit()) then
1031          ! Must allocate obsdata so that it is available from ALL processors when
1032          ! requiring of rpn_comm_allgather via oss_obsdata_MPIallgather.
1033          if (ndim == 1) then
1034             call oss_obsdata_alloc(obsdata,1,dim1=nlev)
1035          else
1036             call oss_obsdata_alloc(obsdata,1,dim1=nlev,dim2_opt=nlev)
1037          end if
1038          obsdata%nrep=0
1039          write(*,*) "obsf_obsSub_read: Setting empty struct_oss_obsdata object for this node."
1040       else
1041          call utl_abort("obsf_obsSub_read: Abort since files are not split.")
1042       end if
1043    end if
1044
1045    if (obsf_filesSplit()) call oss_obsdata_MPIallgather(obsdata)
1046
1047  end function obsf_obsSub_read
1048
1049
1050  function obsf_obsSub_update( obsdata, obsfam, varno, bkstp_opt, block_opt, multi_opt ) &
1051                           result(nrep_modified)
1052    !:Purpose: Add or modify data in observational files from data stored
1053    !           in a struct_oss_obsdata object.
1054    !
1055    !:Arguments:
1056    !           :obsdata: Input struct_oss_obsdata object for varno.
1057    !           :obsfam:  observation family name
1058    !           :varno:   BUFR descriptors. Number of elements must be 
1059    !                     max(1,obsdata%dim2)
1060    !           :bkstp_opt: bkstp number of requested block if BURP file type (optional)
1061    !           :block_opt: block type of requested block if BURP file type (optional)
1062    !                       Valid values are 'DATA', 'INFO', '3-D', and 'MRQR', indicated
1063    !                       by the two rightmost bits of bknat.
1064    !           :multi_opt: Indicates if intended report are for 'UNI' or 'MULTI' level data (optional)
1065    !
1066    implicit none
1067
1068    ! Arguments:
1069    type(struct_oss_obsdata),   intent(inout) :: obsdata
1070    character(len=*),           intent(in)    :: obsfam
1071    integer,                    intent(in)    :: varno(:)
1072    integer         , optional, intent(in)    :: bkstp_opt
1073    character(len=4), optional, intent(in)    :: block_opt
1074    character(len=*), optional, intent(in)    :: multi_opt
1075    ! Result:
1076    integer :: nrep_modified    ! Number of modified reports
1077
1078    ! Locals:
1079    integer :: ierr,nrep_modified_global
1080    character(len=maxLengthFilename) :: filename
1081    logical                          :: fileFound
1082    character(len=fileTypeLen)       :: obsFileType
1083
1084    filename = obsf_getFileName(obsfam,fileFound)
1085
1086    if (fileFound) then
1087       if (obsf_filesSplit() .or. mmpi_myid == 0) then
1088          call obsf_determineSplitFileType( obsFileType, filename )
1089          if (obsFileType=='BURP') then
1090             if (.not.present(block_opt)) &
1091                  call utl_abort("obsf_obsSub_update: optional varaible 'block_opt' is required for BURP observational files.")
1092             nrep_modified = brpf_obsSub_update(obsdata,filename,varno,block_opt,bkstp_opt=bkstp_opt,multi_opt=multi_opt)
1093          else
1094             call utl_abort("obsf_obsSub_update: Only BURP observational files currently supported.")
1095          end if
1096       end if
1097    else
1098       write(*,*) "obsf_obsSub_update: No observational files found for family '" // trim(obsfam) // "' for this node."
1099    end if
1100
1101    if (obsf_filesSplit()) then
1102       call rpn_comm_allreduce(nrep_modified,nrep_modified_global,1,"MPI_INTEGER","MPI_SUM","GRID",ierr)
1103       nrep_modified = nrep_modified_global
1104    end if
1105
1106  end function obsf_obsSub_update
1107
1108  !--------------------------------------------------------------------------
1109  ! obsf_addCloudParametersAndEmissivity
1110  !--------------------------------------------------------------------------
1111  subroutine obsf_addCloudParametersAndEmissivity(obsSpaceData)
1112    !
1113    !:Purpose: Loop on observation files to add cloud parameters and emissivity
1114    !
1115    implicit none
1116
1117    ! Arguments:
1118    type(struct_obs),  intent(inout)  :: obsSpaceData
1119
1120    ! Locals:
1121    integer                    :: fileIndex
1122    character(len=fileTypeLen) :: obsFileType
1123
1124    ! If obs files not split and I am not task 0, then return
1125    if ( .not.obsf_filesSplit() .and. mmpi_myid /= 0 ) return
1126
1127    do fileIndex = 1, obsf_nfiles
1128
1129      call obsf_determineSplitFileType( obsFileType, obsf_fileName(fileIndex) )
1130      if ( trim(obsFileType) == 'SQLITE' )  then
1131        call sqlf_addCloudParametersandEmissivity(obsSpaceData, fileIndex, obsf_fileName(fileIndex))
1132      else if ( trim(obsFileType) == 'BURP' ) then 
1133        call brpr_addCloudParametersandEmissivity(obsSpaceData, fileIndex, trim( obsf_fileName(fileIndex) ) )
1134      else if ( trim(obsFileType) == 'OBSDB' ) then
1135        ! The variables updated here for other file types are added/updated in ObsDb files using a
1136        ! seperate subroutine called odbf_updateMidasHeaderTable which is called when writing everything else.
1137        write(*,*) 'obsf_addCloudParametersAndEmissivity: obsDB files handled separately'
1138      else  
1139        write(*,*) ' UNKNOWN FileType=',obsFileType
1140        call utl_abort("obsf_addCloudParametersAndEmissivity: Only BURP, OBSDB and SQLITE observational files supported.")
1141      end if
1142    end do
1143
1144  end subroutine obsf_addCloudParametersAndEmissivity
1145
1146  !--------------------------------------------------------------------------
1147  ! obsf_updateMissingObsFlags
1148  !--------------------------------------------------------------------------
1149  subroutine obsf_updateMissingObsFlags(obsSpaceData)
1150    !
1151    !:Purpose: Loop on observation files to set missing observation flags to 2048
1152    !           For now, this is done for only ATMS and AMSUA
1153    !
1154    implicit none
1155
1156    ! Arguments:
1157    type(struct_obs), intent(inout) :: obsSpaceData
1158
1159    ! Locals:
1160    integer                    :: fileIndex
1161    character(len=fileTypeLen) :: obsFileType
1162    logical                    :: toDataPresent
1163    integer                    :: headerIndex
1164    integer                    :: codtyp 
1165
1166    toDataPresent = .false.
1167    call obs_set_current_header_list(obsSpaceData,'TO')
1168    HEADER: do
1169      headerIndex = obs_getHeaderIndex(obsSpaceData)
1170    if (headerIndex < 0) exit HEADER
1171      codtyp = obs_headElem_i(obsSpaceData, OBS_ITY, headerIndex)
1172      if ( (tvs_isIdBurpInst(codtyp,'atms' )) .or. &
1173           (tvs_isIdBurpInst(codtyp,'amsua')) .or. &
1174           (tvs_isIdBurpInst(codtyp,'amsub')) .or. &
1175           (tvs_isIdBurpInst(codtyp,'mhs'  )) .or. & 
1176           (tvs_isIdBurpInst(codtyp,'radianceclear'  )) ) then
1177        toDataPresent = .true.
1178      end if
1179    end do HEADER
1180
1181    if ( .not. toDataPresent ) then
1182      write(*,*) 'WARNING: WILL NOT RUN obsf_updateMissingObsFlags since no ATMS or AMSUA, CSR, AMSUB, MHS'
1183      return
1184    end if
1185
1186    ! If obs files not split and I am not task 0, then return
1187    if ( .not.obsf_filesSplit() .and. mmpi_myid /= 0 ) return
1188
1189    FILELOOP: do fileIndex = 1, obsf_nfiles
1190      if ( obsf_familyType(fileIndex) /= 'TO' ) cycle FILELOOP
1191      write(*,*) 'INPUT FILE TO  obsf_updateMissingObsFlags = ', trim( obsf_fileName(fileIndex) )
1192      call obsf_determineSplitFileType( obsFileType, obsf_fileName(fileIndex) )
1193      if ( trim(obsFileType) /= 'BURP' ) then
1194        write(*,*) 'obsFileType = ',obsFileType
1195        write(*,*) 'obsf_updateMissingObsFlags: WARNING this s/r is currently only compatible with BURP files'
1196      else
1197        call brpr_updateMissingObsFlags( trim( obsf_fileName(fileIndex) ) )
1198      end if
1199    end do FILELOOP
1200
1201  end subroutine obsf_updateMissingObsFlags
1202
1203  !--------------------------------------------------------------------------
1204  ! obsf_copyObsDirectory
1205  !--------------------------------------------------------------------------
1206  subroutine obsf_copyObsDirectory(directoryInOut, direction)
1207    !
1208    !:Purpose: Loop on observation files and copy each to and from
1209    !           the specified directory
1210    !
1211    implicit none
1212
1213    ! Arguments:
1214    character(len=*), intent(in) :: directoryInOut
1215    character(len=*), intent(in) :: direction
1216
1217    ! Locals:
1218    integer            :: status, fileIndex, baseNameIndexBeg
1219    character(len=200) :: baseName, fullName
1220
1221    if (trim(direction) == 'TO') then
1222      ! Copy files TO the specified directory
1223
1224      ! Create destination directory
1225      if (mmpi_myid == 0) status = clib_mkdir_r(trim(directoryInOut))
1226      if (obsf_filesSplit()) call rpn_comm_barrier('GRID',status)
1227
1228      ! If obs files not split and I am not task 0, then return
1229      if ( .not.obsf_filesSplit() .and. mmpi_myid /= 0 ) return
1230
1231      do fileIndex = 1, obsf_nfiles
1232        fullName = trim( obsf_fileName(fileIndex) )
1233        baseNameIndexBeg = index(fullName,'/',back=.true.)
1234        baseName = fullName(baseNameIndexBeg+1:)
1235        write(*,*) 'obsf_copyObsDirectory: Copying file ', trim(baseName)
1236        status = utl_copyFile(fullName,trim(directoryInOut)//'/'//trim(baseName))
1237      end do
1238
1239    else if (trim(direction) == 'FROM') then
1240      ! Copy files FROM the specified directory
1241
1242      ! If obs files not split and I am not task 0, then return
1243      if ( .not.obsf_filesSplit() .and. mmpi_myid /= 0 ) return
1244
1245      do fileIndex = 1, obsf_nfiles
1246        fullName = trim( obsf_fileName(fileIndex) )
1247        baseNameIndexBeg = index(fullName,'/',back=.true.)
1248        baseName = fullName(baseNameIndexBeg+1:)
1249        write(*,*) 'obsf_copyObsDirectory: Copying file ', trim(baseName)
1250        status = utl_copyFile(trim(directoryInOut)//'/'//trim(baseName),fullName)
1251        status = clib_remove(trim(directoryInOut)//'/'//trim(baseName))
1252      end do
1253
1254      ! Remove the directory
1255      if (obsf_filesSplit()) call rpn_comm_barrier('GRID',status)
1256      if (mmpi_myid == 0) status = clib_remove(trim(directoryInOut))
1257      
1258    else
1259
1260      call utl_abort('obsf_copyObsDirectory: invalid value for direction')
1261
1262    end if
1263
1264  end subroutine obsf_copyObsDirectory
1265
1266  !--------------------------------------------------------------------------
1267  ! setHeadBodyPrimaryKeyColumns
1268  !--------------------------------------------------------------------------
1269  subroutine setHeadBodyPrimaryKeyColumns(obsDat, numHeaderRead, numBodyRead)
1270    !
1271    !:Purpose: Set header/body primary keys in obsSpaceData that 
1272    !           will ensure unique values over all mpi tasks.
1273    !
1274    implicit none
1275
1276    ! Arguments:
1277    type(struct_obs), intent(inout) :: obsDat
1278    integer,          intent(in)    :: numHeaderRead
1279    integer,          intent(in)    :: numBodyRead
1280
1281    ! Locals:
1282    integer    :: initialHeaderindex, initialBodyindex
1283    integer    :: numHeaders, numBodies
1284    integer    :: headerIndex, bodyIndex
1285    integer    :: headerIndexBegin, headerIndexEnd, bodyIndexBegin, bodyIndexEnd
1286    integer    :: ierr
1287    integer(8) :: headerPrimaryKey, bodyPrimaryKey
1288    integer, allocatable :: allNumHeaderRead(:), allNumBodyRead(:)
1289
1290    write(*,*) 'setHeadBodyPrimaryKeyColumns: start'
1291    numHeaders = obs_numHeader(obsDat)
1292    numBodies = obs_numBody(obsDat)
1293     
1294    write(*,*) 'setHeadBodyPrimaryKeyColumns: numHeaders=', numHeaders, ', numBodies=', numBodies
1295    write(*,*) 'setHeadBodyPrimaryKeyColumns: numHeaderRead=', numHeaderRead, &
1296                ', numBodyRead=', numBodyRead
1297
1298    allocate(allNumHeaderRead(mmpi_nprocs))
1299    allocate(allNumBodyRead(mmpi_nprocs))
1300    call rpn_comm_allgather(numHeaderRead,1,'mpi_integer',       &
1301                            allNumHeaderRead,1,'mpi_integer','GRID',ierr)
1302    call rpn_comm_allgather(numBodyRead,1,'mpi_integer',       &
1303                            allNumBodyRead,1,'mpi_integer','GRID',ierr)
1304    if (mmpi_myid > 0) then
1305      initialHeaderindex = sum(allNumHeaderRead(1:mmpi_myid))
1306      initialBodyindex = sum(allNumBodyRead(1:mmpi_myid))
1307    else
1308      initialHeaderindex = 0
1309      initialBodyindex = 0
1310    end if
1311    deallocate(allNumHeaderRead)
1312    deallocate(allNumBodyRead)
1313
1314    write(*,*) 'setHeadBodyPrimaryKeyColumns: initialHeaderIndex=', initialHeaderindex , &
1315                ', initialBodyindex=', initialBodyindex
1316
1317    headerIndexBegin = numHeaders - numHeaderRead + 1
1318    headerIndexEnd = numHeaders
1319    headerPrimaryKey = initialHeaderindex
1320    do headerIndex = headerIndexBegin, headerIndexEnd
1321      headerPrimaryKey = headerPrimaryKey + 1
1322      call obs_setHeadPrimaryKey(obsdat, headerIndex, headerPrimaryKey)
1323    end do
1324        
1325    bodyIndexBegin = numBodies - numBodyRead + 1
1326    bodyIndexEnd = numBodies
1327    bodyPrimaryKey = initialBodyindex
1328    do bodyIndex = bodyIndexBegin, bodyIndexEnd
1329      bodyPrimaryKey = bodyPrimaryKey + 1
1330      call obs_setBodyPrimaryKey(obsdat, bodyIndex, bodyPrimaryKey)
1331    end do
1332
1333    write(*,*) 'setHeadBodyPrimaryKeyColumns: end'
1334
1335  end subroutine setHeadBodyPrimaryKeyColumns
1336
1337end module obsFiles_mod