obsdbFiles_mod sourceΒΆ

   1module obsdbFiles_mod
   2  ! MODULE obsdbFiles_mod (prefix='odbf' category='3. Observation input/output')
   3  !
   4  !:Purpose: To read and update sqlite files that are in the new 'obsDB' format.
   5  !
   6  use midasMpi_mod
   7  use codePrecision_mod
   8  use mathPhysConstants_mod
   9  use bufr_mod
  10  use codtyp_mod
  11  use obsSpaceData_mod
  12  use fSQLite
  13  use utilities_mod
  14  use clibInterfaces_mod
  15  use obsUtil_mod
  16  use obsVariableTransforms_mod
  17  use sqliteUtilities_mod
  18  use timeCoord_mod
  19
  20  implicit none
  21  save
  22  private
  23
  24  ! Public subroutines and functions:
  25  public :: odbf_getDateStamp, odbf_readFile, odbf_updateFile, obdf_clean
  26
  27  ! Arrays used to match obsDB column names with obsSpaceData column names
  28
  29  integer, parameter :: lenSqlName    = 60
  30  integer, parameter :: sqlColIndex   = 1
  31  integer, parameter :: obsColIndex   = 2
  32  integer, parameter :: varNoColIndex = 2
  33  integer, parameter :: maxElementnumber = 100
  34  
  35  character(len=lenSqlName) :: headTableName = ' '
  36  character(len=lenSqlName) :: bodyTableName = ' '
  37  character(len=lenSqlName) :: midasBodyTableName = ' '
  38  character(len=lenSqlName) :: midasHeadTableName = ' '
  39
  40  ! ...for the header table
  41
  42  integer :: numHeadMatch 
  43  character(len=lenSqlName) :: obsHeadKeySqlName = ' '
  44  character(len=lenSqlName) :: headDateSqlName = ' ' 
  45  character(len=lenSqlName), allocatable :: headMatchList(:,:) 
  46  character(len=6), allocatable :: headBufrList(:)
  47
  48  ! ...for the body table
  49  
  50  integer :: numBodyMatch
  51  character(len=lenSqlName) :: obsBodyKeySqlName = ' '
  52  character(len=lenSqlName), allocatable :: bodyMatchList(:,:)
  53  character(len=6), allocatable :: bodyBufrList(:)
  54
  55  ! Dictionary of 'varno' value for each obsDB observation value column
  56  integer :: numVarNo
  57  character(len=lenSqlName), allocatable :: varNoList(:,:)
  58
  59  ! Column names for the MIDAS Header table and corresponding obsSpace names
  60  character(len=lenSqlName) :: midasHeadKeySqlName
  61  integer :: numMidasHeadMatch
  62  character(len=lenSqlName), allocatable :: midasHeadNamesList(:,:)
  63
  64  ! Column names for the MIDAS Body table and corresponding obsSpace names
  65  character(len=lenSqlName) :: midasBodyKeySqlName
  66  integer :: numMidasBodyMatch
  67  character(len=lenSqlName), allocatable :: midasBodyNamesList(:,:)  
  68  integer :: numBodyMidasTableRequired
  69   
  70  ! Other constants
  71  logical, parameter :: setObsFlagZero = .true.
  72  character(len=20)  :: combinedTableName = 'combinedTable'
  73
  74  ! NAMELIST variables
  75  integer :: numElemIdList                ! MUST NOT BE INCLUDED IN NAMELIST!
  76  integer :: elemIdList(maxElementNumber) ! list of bufr element IDs to read from file
  77
  78contains
  79
  80  !--------------------------------------------------------------------------
  81  ! readNml
  82  !--------------------------------------------------------------------------
  83  subroutine readNml()
  84    !
  85    !:Purpose: Read the namelist for obsDB files
  86    !
  87    implicit none
  88
  89    ! Locals:
  90    integer            :: nulfile, ierr
  91    integer, external  :: fnom, fclos
  92    logical, save      :: alreadyRead = .false.
  93    integer            :: elementIndex
  94
  95    namelist /namobsdb/ numElemIdList, elemIdList
  96
  97    if ( alreadyRead ) return
  98
  99    alreadyRead = .true.
 100
 101    ! default values
 102    numElemIdList = MPC_missingValue_INT
 103    elemIdList(:) = 0
 104
 105    if ( .not. utl_isNamelistPresent('NAMOBSDB','./flnml') ) then
 106      if ( mmpi_myid == 0 ) then
 107        write(*,*) 'readNml (odbf): namObsDB is missing in the namelist.'
 108        write(*,*) '                The default values will be taken.'
 109      end if
 110    else
 111      ! reading namelist variables
 112      nulfile = 0
 113      ierr = fnom(nulfile,'./flnml','FTN+SEQ+R/O',0)
 114      read(nulfile, nml=namobsdb, iostat=ierr)
 115      if ( ierr /= 0 ) call utl_abort('readNml (odbf): Error reading namelist')
 116      ierr = fclos(nulfile)
 117      if (numElemIdList /= MPC_missingValue_INT) then
 118        call utl_abort('readNml (odbf): check namobsdb namelist section: numElemIdList should be removed')
 119      end if
 120      numElemIdList = 0
 121      do elementIndex = 1, maxElementNumber
 122        if (elemIdList(elementIndex) == 0) exit
 123        numElemIdList = numElemIdList + 1
 124      end do
 125    end if
 126    if ( mmpi_myid == 0 ) write(*, nml=namObsDb)
 127
 128    if (numElemIdList==0) then
 129      call utl_abort('odbf_setup: element list is empty')
 130    end if
 131
 132  end subroutine readNml
 133
 134  !--------------------------------------------------------------------------
 135  ! odbf_setup
 136  !--------------------------------------------------------------------------
 137  subroutine odbf_setup()
 138    !
 139    !:Purpose: Read the namelist for obsDB files and read the obsDB column table
 140    !
 141    implicit none
 142
 143    ! Locals:
 144    integer            :: nulfile, ierr
 145    integer, external  :: fnom, fclos
 146    logical, save      :: alreadyRead = .false.
 147    character(len=512), parameter :: obsDbColumnFile = 'obsdbColumnTable.dat' 
 148    character(len=512) :: readLine
 149    character(len=lenSqlName) :: readDBColumn, readObsSpaceColumn, readBufrColumn  ! Strings to temporary assign 
 150                                                                                ! read header and body column names
 151    integer            :: headerTableRow, bodyTableRow, midasBodyTableRow, midasHeadTableRow ! Counters to determine the size of 
 152                                                                         ! header and body table
 153    integer            :: countRow, countMatchRow, countVarRow                        
 154    integer            :: stringFoundIndex
 155    logical            :: obsColumnIsValid
 156                                                                       
 157    if ( alreadyRead ) return
 158
 159    alreadyRead = .true.
 160
 161    call readNml()
 162
 163    ! initialize obsDb columns names to be consistent with MIDAS obsSpaceData Report column names
 164
 165    nulfile = 0 
 166    ierr = fnom(nulfile,trim(obsDbColumnFile),'FTN+SEQ+R/O',0)
 167    if ( ierr /= 0 ) call utl_abort('odbf_setup: Error reading ObsDBColumnTable file') 
 168  
 169    ! Get number of rows in ObsDBColumnTable file
 170    headerTableRow = 0
 171    bodyTableRow = 0
 172    midasBodyTableRow = 0
 173    midasHeadTableRow = 0
 174    numHeadMatch = 0
 175    numBodyMatch = 0 
 176    numMidasBodyMatch = 0
 177    numMidasHeadMatch = 0
 178    numVarNo = 0
 179    do while(ierr == 0)
 180      read(nulfile, '(A)' ,iostat = ierr) readline
 181
 182      if (index(trim(readline),'OBS_HEADER TABLE INFO BEGINS') > 0) then
 183        ! Found the start of header table in the file
 184        do while (ierr == 0)
 185          read(nulfile,'(A)' ,iostat = ierr) readline
 186          ! Stop search at the end of header table
 187          if (index(trim(readline),'OBS_HEADER TABLE INFO ENDS') > 0) exit
 188          headerTableRow = headerTableRow +1
 189          if (index(trim(readline),'[') == 0 .and. index(trim(readline),']') == 0) then 
 190            ! Count obsSpaceData header columns  
 191            numHeadMatch = numHeadMatch + 1
 192          end if
 193        end do
 194
 195      else if (index(trim(readline),'OBS_BODY TABLE INFO BEGINS') > 0) then  
 196        ! Found the start of the body table
 197        do while (ierr == 0)
 198          read(nulfile,'(A)' ,iostat = ierr) readline
 199          ! Stop search at the end of body table
 200          if (index(trim(readline),'OBS_BODY TABLE INFO ENDS') > 0) exit
 201          bodyTableRow = bodyTableRow + 1
 202          if (index(trim(readline),'[') == 0 .and. index(trim(readline),']') == 0) then 
 203            ! Count obsSpaceData body columns
 204            numBodyMatch = numBodyMatch + 1 
 205            if (index(trim(readline),'VAR') /= 0) then
 206              ! Count VarNo columns
 207              numVarNo = numVarNo + 1
 208            end if
 209          end if
 210        end do
 211
 212      else if (index(trim(readline),'MIDAS_HEADER TABLE INFO BEGINS') > 0) then
 213        ! Found the start of the MIDAS_HEADER table
 214        do while (ierr == 0)
 215          read(nulfile,'(A)' ,iostat = ierr) readline
 216          ! Stop search at the end of MIDAS_HEADER table
 217          if (index(trim(readline),'MIDAS_HEADER TABLE INFO ENDS') > 0) exit
 218          midasHeadTableRow = midasHeadTableRow + 1
 219          ! Count obsSpaceData  MIDAS_HEADER columns
 220          if (index(trim(readline),'[') == 0 .and. index(trim(readline),']') == 0) then 
 221            numMidasHeadMatch = numMidasHeadMatch + 1
 222          end if
 223        end do
 224
 225      else if (index(trim(readline),'MIDAS_BODY TABLE INFO BEGINS') > 0) then
 226        ! Found the start of the MIDAS_BODY table
 227        do while (ierr == 0)
 228          read(nulfile,'(A)' ,iostat = ierr) readline
 229          ! Stop search at the end of MIDAS_BODY table
 230          if (index(trim(readline),'MIDAS_BODY TABLE INFO ENDS') > 0) exit
 231          midasBodyTableRow = midasBodyTableRow + 1
 232          ! Count obsSpaceData MIDAS_BODY columns
 233          if (index(trim(readline),'[') == 0 .and. index(trim(readline),']') == 0) then 
 234            numMidasBodyMatch = numMidasBodyMatch + 1
 235          end if
 236        end do
 237      end if ! if (index(trim(readline)
 238    end do ! do while(ierr == 0)
 239
 240    write(*,*) 'odbf_setup: Number of obs header columns in ObsDBColumnTable', headerTableRow
 241    write(*,*) 'odbf_setup: Number of obs body columns in ObsDBColumnTable', bodyTableRow
 242    write(*,*) 'odbf_setup: Number of midas header columns in ObsDBColumnTable', midasHeadTableRow
 243    write(*,*) 'odbf_setup: Number of midas body columns in ObsDBColumnTable', midasBodyTableRow
 244    
 245    write(*,*) 'odbf_setup: numHeadMatch', numHeadMatch
 246    write(*,*) 'odbf_setup: numBodyMatch', numBodyMatch
 247    write(*,*) 'odbf_setup: numMidasHeadMatch',numMidasHeadMatch
 248    write(*,*) 'odbf_setup: numMidasBodyMatch', numMidasBodyMatch
 249
 250    ! Read Report obsdb and MIDAS obsSpaceData columns name
 251    allocate(headMatchList(2,numHeadMatch))
 252    allocate(headBufrList(numHeadMatch))
 253    allocate(bodyMatchList(2,numBodyMatch))
 254    allocate(bodyBufrList(numBodyMatch))
 255    allocate(midasBodyNamesList(2,numMidasBodyMatch))
 256    allocate(midasHeadNamesList(2,numMidasHeadMatch))
 257    allocate(varNoList(2,numVarNo))
 258
 259    rewind(nulfile)
 260
 261    ierr = 0
 262    do while (ierr == 0)
 263      read(nulfile, '(A)' ,iostat = ierr) readline
 264
 265      if (index(trim(readline),'OBS_HEADER TABLE INFO BEGINS') > 0) then
 266        ! Found the start of Obs Header table in the file  
 267        countMatchRow = 0
 268        do countRow = 1, headerTableRow
 269          ! Read all header table rows
 270          read(nulfile,*,iostat = ierr) readDBColumn, readObsSpaceColumn, readBufrColumn      
 271          select case (trim(readObsSpaceColumn))
 272            ! Assign read values to appropriate variable
 273            case ('[headTableName]')
 274              headTableName = trim(readDBColumn)
 275              ! Remove the brackets from column name string 
 276              headTableName = headTableName(1:len(headTableName)-1)
 277            case ('[headPrimaryKey]')
 278              obsHeadKeySqlName = trim(readDBColumn)
 279              ! Remove the brackets from column name string 
 280              obsHeadKeySqlName = obsHeadKeySqlName(1:len(obsHeadKeySqlName)-1)
 281            case ('[headDateSqlName]')
 282              headDateSqlName = trim(readDBColumn)
 283              ! Remove the brackets from column name string  
 284              headDateSqlName = headDateSqlName(1:len(headDateSqlName)-1)
 285            case default
 286              countMatchRow = countMatchRow + 1
 287              headMatchList(1,countMatchRow) = trim(readDBColumn)
 288              headMatchList(2,countMatchRow) = trim(readObsSpaceColumn)
 289              headBufrList(countMatchRow) = trim(readBufrColumn)
 290          end select
 291        end do ! do countRow
 292
 293      else if (index(trim(readline),'OBS_BODY TABLE INFO BEGINS') > 0) then
 294        ! Found the start of Obs BODY table in the file  
 295        countMatchRow = 0
 296        countVarRow = 0 
 297        do countRow = 1, bodyTableRow
 298          ! Read all body table rows 
 299          read(nulfile,*,iostat = ierr) readDBColumn, readObsSpaceColumn, readBufrColumn      
 300          select case (trim(readObsSpaceColumn))
 301            ! Assign read values to appropriate variable
 302            case ('[bodyTableName]')
 303              bodyTableName = trim(readDBColumn)
 304              ! Remove the brackets from column name string
 305              bodyTableName = bodyTableName(1:len(bodyTableName)-1)
 306            case ('[bodyPrimaryKey]')
 307              obsBodyKeySqlName = trim(readDBColumn)
 308              ! Remove the brackets from column name string
 309              obsBodyKeySqlName = obsBodyKeySqlName(1:len(obsBodyKeySqlName)-1)
 310            case default
 311              countMatchRow = countMatchRow + 1
 312              bodyMatchList(1,countMatchRow) = trim(readDBColumn)
 313              bodyMatchList(2,countMatchRow) = trim(readObsSpaceColumn)
 314              bodyBufrList(countMatchRow) = trim(readBufrColumn)
 315
 316              ! Assign varNoList if appropriate
 317              if (bodyMatchList(2,countMatchRow) == 'VAR') then
 318                countVarRow = countVarRow + 1 
 319                varNoList(1, countVarRow) = bodyMatchList(1,countMatchRow) 
 320                varNoList(2, countVarRow) = bodyBufrList(countMatchRow)
 321              end if
 322          end select
 323        end do ! do countRow
 324
 325      else if (index(trim(readline),'MIDAS_HEADER TABLE INFO BEGINS') > 0) then
 326        ! Found the start of MIDAS Header table in the file
 327        countMatchRow = 0
 328        do countRow = 1, midasHeadTableRow
 329          !Read all header table rows
 330          read(nulfile,*,iostat = ierr) readDBColumn, readObsSpaceColumn, readBufrColumn
 331          select case (trim(readObsSpaceColumn))
 332            case ('[midasHeadTableName]')
 333              midasHeadTableName = trim(readDBColumn)
 334              midasHeadTableName = midasHeadTableName(1:len(midasHeadTableName)-1)
 335            case ('[midasHeadPrimaryKey]')
 336              midasHeadKeySqlName = trim(readDBColumn)
 337              ! Remove the brackets from column name string
 338              midasHeadKeySqlName = midasHeadKeySqlName(1:len(midasHeadKeySqlName)-1)
 339            case default
 340              countMatchRow = countMatchRow + 1
 341              midasHeadNamesList(1,countMatchRow) = trim(readDBColumn)              
 342              midasHeadNamesList(2,countMatchRow) = trim(readObsSpaceColumn)
 343          end select
 344        end do ! do countRow
 345
 346      else if (index(trim(readline),'MIDAS_BODY TABLE INFO BEGINS') > 0) then
 347        ! Found the start of MIDAS Body table in the file
 348        numBodyMidasTableRequired = 0
 349        countMatchRow = 0
 350        do countRow = 1, midasBodyTableRow
 351          ! Read all body table rows
 352          read(nulfile,*,iostat = ierr) readDBColumn, readObsSpaceColumn, readBufrColumn      
 353          select case (trim(readObsSpaceColumn))
 354            ! Assign read values to appropriate variable
 355            case ('[midasBodyTableName]')
 356              midasBodyTableName = trim(readDBColumn)
 357              ! Remove the brackets from column name string
 358              midasBodyTableName = midasBodyTableName(1:len(midasBodyTableName)-1)
 359            case ('[midasBodyPrimaryKey]')
 360              midasBodyKeySqlName = trim(readDBColumn)
 361              ! Remove the brackets from column name string
 362              midasBodyKeySqlName = midasBodyKeySqlName(1:len(midasBodyKeySqlName)-1)
 363            case default
 364              countMatchRow = countMatchRow + 1
 365              midasBodyNamesList(1,countMatchRow) = trim(readDBColumn)              
 366              midasBodyNamesList(2,countMatchRow) = trim(readObsSpaceColumn)
 367             
 368              ! Check if the ObsSpaceData column name is mandatory 
 369              stringFoundIndex = index(midasBodyNamesList(2,countMatchRow),'*')
 370              if (stringFoundIndex > 0) then
 371                midasBodyNamesList(2,countMatchRow) = trim(midasBodyNamesList(2,countMatchRow)(1:stringFoundIndex-1))
 372                numBodyMidasTableRequired = numBodyMidasTableRequired + 1
 373              end if
 374          end select
 375        end do ! do countRow
 376      end if ! if (index(trim(readline)
 377    end do ! do while (ierr == 0)
 378
 379    ierr = fclos(nulfile)
 380 
 381    ! Check if the header, body and MIDAS table column names are read correctly 
 382    if (len(trim(headTableName)) == 0) call utl_abort('odbf_setup: headTableName is incorrectly defined or missing')
 383    if (len(trim(obsHeadKeySqlName)) == 0) call utl_abort('odbf_setup: obsHeadKeySqlName is incorrectly defined or missing')
 384    if (len(trim(headDateSqlName)) == 0) call utl_abort('odbf_setup: headDateSqlName is incorrectly defined or missing')
 385    if (len(trim(bodyTableName)) == 0) call utl_abort('odbf_setup: bodyTableName is incorrectly defined or missing')
 386    if (len(trim(obsBodyKeySqlName)) == 0) call utl_abort('odbf_setup: obsBodyKeySqlName is incorrectly defined or missing')
 387    if (len(trim(midasBodyTableName)) == 0) call utl_abort('odbf_setup: midasBodyTableName is incorrectly defined or missing')
 388    if (len(trim(midasBodyKeySqlName)) == 0) call utl_abort('odbf_setup: midasBodyKeySqlName is incorrectly defined or missing')
 389    if (len(trim(midasHeadKeySqlName)) == 0) call utl_abort('odbf_setup: midasHeadKeySqlName is incorrectly defined or missing')
 390
 391    do countRow = 1, numHeadMatch
 392      obsColumnIsValid = obs_isColumnNameValid(trim(headMatchList(2,countRow)))
 393      if (.not. obsColumnIsValid) then
 394        call utl_abort('odbf_setup: Column in Header Table does not exist in ObsSpaceData: ' // &
 395                       trim(headMatchList(2,countRow)))
 396      end if
 397    end do
 398      
 399    do countRow = 1, numBodyMatch 
 400      obsColumnIsValid = obs_isColumnNameValid(trim(bodyMatchList(2,countRow)))
 401      if (.not. obsColumnIsValid) then
 402        call utl_abort('odbf_setup: Column in Body Table does not exist in ObsSpaceData: ' // &
 403                       trim(bodyMatchList(2,countRow)))
 404      end if
 405    end do
 406
 407    do countRow = 1, numMidasBodyMatch 
 408      obsColumnIsValid = obs_isColumnNameValid(trim(midasBodyNamesList(2,countRow)))
 409      if (.not. obsColumnIsValid) then
 410        call utl_abort('odbf_setup: Column in MIDAS Body Table does not exist in ObsSpaceData: ' // &
 411                       trim(midasBodyNamesList(2,countRow)))
 412      end if
 413    end do
 414
 415    do countRow = 1, numMidasHeadMatch 
 416      obsColumnIsValid = obs_isColumnNameValid(trim(midasHeadNamesList(2,countRow)))
 417      if (.not. obsColumnIsValid) then
 418        call utl_abort('odbf_setup: Column in MIDAS Header Table does not exist in ObsSpaceData: ' // &
 419                       trim(midasHeadNamesList(2,countRow)))
 420      end if
 421    end do
 422
 423  end subroutine odbf_setup
 424
 425  !--------------------------------------------------------------------------
 426  ! odbf_getDateStamp
 427  !--------------------------------------------------------------------------
 428  subroutine odbf_getDateStamp(dateStamp, fileName)
 429    !
 430    ! Purpose: get dateStamp from an obsDB file
 431    !
 432    implicit none
 433    
 434    ! Arguments:
 435    integer         , intent(out) :: dateStamp
 436    character(len=*), intent(in)  :: fileName
 437    
 438    ! Locals:
 439    integer,    allocatable :: headDateValues(:), headTimeValues(:)
 440    integer                 :: ier, imode, validTime, validDate, validDateRecv, validTimeRecv
 441    integer                 :: newdate
 442
 443    call odbf_setup()
 444
 445    call sqlu_getColumnValuesDateStr(headDateValues, headTimeValues, fileName=trim(fileName), &
 446                                     tableName=headTableName, sqlColumnName=headDateSqlName)
 447
 448    validDate = MPC_missingValue_INT 
 449    validTime = MPC_missingValue_INT 
 450
 451    call tim_getValidDateTimeFromList(headDateValues, headTimeValues, validDate, validTime)
 452
 453    ! Make sure all mpi tasks have a valid date (important for split sqlite files)
 454    call rpn_comm_allreduce(validDate, validDateRecv, 1, "MPI_INTEGER", "MPI_MAX", "GRID", ier)
 455    call rpn_comm_allreduce(validTime, validTimeRecv, 1, "MPI_INTEGER", "MPI_MAX", "GRID", ier)
 456    
 457    if (validDateRecv == MPC_missingValue_INT .or. validTimeRecv == MPC_missingValue_INT) then
 458      call utl_abort('odbf_getDateStamp: Error in getting valid date and time!')
 459    end if
 460    
 461    ! printable to stamp, validTime must be multiplied with 1e6 to make newdate work
 462    imode = 3
 463    ier = newdate(dateStamp, validDateRecv, validTimeRecv * 1000000, imode)
 464    write(*,*)'odbf_getDateStamp: obsDB files valid date (YYYYMMDD): ', validDateRecv
 465    write(*,*)'odbf_getDateStamp: obsDB files valid time       (HH): ', validTimeRecv
 466    write(*,*)'odbf_getDateStamp: obsDB files dateStamp            : ', datestamp
 467
 468    deallocate(headDateValues)
 469    deallocate(headTimeValues)
 470
 471  end subroutine odbf_getDateStamp
 472
 473  !--------------------------------------------------------------------------
 474  ! odbf_readFile
 475  !--------------------------------------------------------------------------
 476  subroutine odbf_readFile(obsdat, fileName, familyType, fileIndex)
 477    !
 478    !:Purpose: Read the contents of an obsDB file and put in obsSpaceData
 479    !
 480    implicit none
 481
 482    ! Arguments:
 483    type (struct_obs), intent(inout) :: obsdat
 484    character(len=*),  intent(in)    :: fileName
 485    character(len=*),  intent(in)    :: familyType
 486    integer         ,  intent(in)    :: fileIndex
 487
 488    ! Locals:
 489    integer :: bodyIndexBegin, headIndexBegin
 490    integer :: headIndexEnd, headIndex, obsRln
 491    integer :: numBody, numHead, columnIndex
 492    integer :: headTableIndex, numRowsHeadTable, bodyTableIndex, numRowsBodyTable
 493    character(len=lenSqlName), allocatable :: headCharSqlNames(:)
 494    character(len=lenSqlName), allocatable :: headSqlNames(:),    bodySqlNames(:)
 495    character(len=50),         allocatable :: headCharValues(:,:)
 496    integer,                   allocatable :: headDateValues(:), headTimeValues(:)
 497    real(8),                   allocatable :: headValues(:,:), bodyValues(:,:)
 498    integer(8),                allocatable :: headPrimaryKey(:)
 499    integer(8),                allocatable :: bodyPrimaryKey(:), bodyHeadKey(:)
 500
 501    write(*,*)
 502    write(*,*) 'odbf_readFile: Starting'
 503    write(*,*)
 504    write(*,*) 'odbf_readFile: FileName   : ', trim(FileName)
 505    write(*,*) 'odbf_readFile: FamilyType : ', FamilyType
 506
 507    !- 0.0 Some initialization
 508    call odbf_setup()
 509    call ovt_setup(elemIdList(1:numElemIdList))
 510
 511    !- 1.0 Determine names of columns present in obsDB file
 512
 513    call sqlu_getSqlColumnNames(headCharSqlNames, fileName=trim(fileName), &
 514                                tableName=headTableName, dataType='varchar')
 515    call sqlu_getSqlColumnNames(headSqlNames, fileName=trim(fileName), &
 516                                tableName=headTableName, dataType='numeric' )
 517    call sqlu_getSqlColumnNames(bodySqlNames, fileName=trim(fileName), &
 518                                tableName=bodyTableName, dataType='numeric')
 519
 520    ! Print all of the column names to the listing
 521    do columnIndex = 1, size(headCharSqlNames)
 522      write(*,*) 'odbf_readFile: headCharSqlNames =', columnIndex, &
 523                 trim(headCharSqlNames(columnIndex))
 524    end do
 525    do columnIndex = 1, size(headSqlNames)
 526      write(*,*) 'odbf_readFile: headSqlNames =', columnIndex, &
 527                 trim(headSqlNames(columnIndex))
 528    end do
 529    do columnIndex = 1, size(bodySqlNames)
 530      write(*,*) 'odbf_readFile: bodySqlNames   =', columnIndex, &
 531                 trim(bodySqlNames(columnIndex))
 532    end do
 533
 534    !- 1.1 Read most of the contents of the file into local tables
 535
 536    call odbf_getPrimaryKeys(headPrimaryKey, bodyPrimaryKey, bodyHeadKey, &
 537                             fileName=trim(fileName))
 538
 539    call sqlu_getColumnValuesDateStr(headDateValues, headTimeValues, fileName=trim(fileName), &
 540                                     tableName=headTableName, sqlColumnName=headDateSqlName)
 541    call sqlu_getColumnValuesChar(headCharValues, fileName=trim(fileName), &
 542                                  tableName=headTableName, sqlColumnNames=headCharSqlNames)
 543    call sqlu_getColumnValuesNum (headValues, fileName=trim(fileName), &
 544                                  tableName=headTableName, sqlColumnNames=headSqlNames)
 545    call sqlu_getColumnValuesNum (bodyValues, fileName=trim(fileName), &
 546                                  tableName=bodyTableName, sqlColumnNames=bodySqlNames)
 547    numRowsBodyTable = size(bodyValues,1)
 548    numRowsHeadTable = size(headValues,1)
 549    
 550    ! For debugging, print first 10 rows of each local table to the listing
 551    do headTableIndex = 1, min(10, numRowsHeadTable)
 552      write(*,*) 'odbf_readFile: headKeyValues  = ', headPrimaryKey(headTableIndex)
 553    end do
 554    do headTableIndex = 1, min(10, numRowsHeadTable)
 555      write(*,*) 'odbf_readFile: headCharValues = ', headCharValues(headTableIndex,:)
 556    end do
 557    do headTableIndex = 1, min(10, numRowsHeadTable)
 558      write(*,*) 'odbf_readFile: headValues = ', headValues(headTableIndex,:)
 559    end do
 560    do bodyTableIndex = 1, min(10, numRowsBodyTable)
 561      write(*,*) 'odbf_readFile: bodyKeyValues  = ', bodyPrimaryKey(bodyTableIndex)
 562    end do
 563    do bodyTableIndex = 1, min(10, numRowsBodyTable)
 564      write(*,*) 'odbf_readFile: bodyHeadKeyValues  = ', bodyHeadKey(bodyTableIndex)
 565    end do
 566    do bodyTableIndex = 1, min(10, numRowsBodyTable)
 567      write(*,*) 'odbf_readFile: bodyValues = ', bodyValues(bodyTableIndex,:)
 568    end do
 569
 570    
 571
 572    ! Starting point for adding rows to obsSpaceData
 573    bodyIndexBegin = obs_numBody(obsdat) + 1
 574    headIndexBegin = obs_numHeader(obsdat) + 1
 575
 576    !- 1.2 Initialize header and body columns as 'missing'
 577
 578    ! Set a bunch of obsSpaceData header columns to 'missing'
 579    do headTableIndex = 1, numRowsHeadTable
 580      headIndex = headTableIndex + headIndexBegin - 1
 581      call obs_headSet_i(obsdat, OBS_SEN, headIndex, nint(MPC_missingValue_R8))
 582      call obs_headSet_i(obsdat, OBS_INS, headIndex, nint(MPC_missingValue_R8))
 583    end do
 584
 585    !- 1.3 Copy values from local tables into obsSpaceData
 586
 587    ! Set the columns related to surface type
 588    call odbf_setSurfaceType(obsdat, headIndexBegin, numRowsHeadTable, &
 589                             fileName=trim(fileName), tableName=headTableName)
 590
 591    ! Header date/time values
 592    call odbf_copyToObsSpaceHeadDate(obsdat, headDateValues, headTimeValues, &
 593                                     headIndexBegin)
 594
 595    ! Header-Character values
 596    call odbf_copyToObsSpaceHeadChar(obsdat, headCharSqlNames, headCharValues, &
 597                                     headIndexBegin)
 598
 599    ! Header-Numeric values
 600    call odbf_copyToObsSpaceHead(obsdat, headSqlNames, headPrimaryKey, &
 601                                 headValues, headIndexBegin)
 602
 603    ! Body-Numeric values
 604    call odbf_copyToObsSpaceBody(obsdat, bodySqlNames, bodyPrimaryKey, bodyHeadKey, &
 605                                 bodyValues, bodyIndexBegin, headIndexBegin)
 606
 607    ! Get indexes of last rows added to obsSpaceData
 608    headIndexEnd = obs_numHeader(obsdat)
 609
 610    !- 1.4 Set some other quantities in obsSpaceData Header table
 611
 612    do headIndex = headIndexBegin, headIndexEnd
 613      call obs_headSet_i(obsdat, OBS_ONM, headIndex, headIndex)
 614      call obs_headSet_i(obsdat, OBS_IDF, headIndex, fileIndex)
 615      call obs_setFamily(obsdat, trim(familyType), headIndex)
 616      if ( headIndex == 1 ) then
 617        call obs_headSet_i(obsdat, OBS_RLN, headIndex, 1 )
 618      else
 619        obsRln = obs_headElem_i(obsdat, OBS_RLN, headIndex-1) + &
 620                 obs_headElem_i(obsdat, OBS_NLV, headIndex-1)
 621        call obs_headSet_i(obsdat, OBS_RLN, headIndex, obsRln)
 622      end if
 623    end do
 624
 625    !- 1.5 Read values written during previous MIDAS program executions
 626
 627    call odbf_readMidasBodyTable(obsdat, trim(fileName), familyType, fileIndex)
 628
 629    !- 2.0 Additional changes to values after they are in obsSpaceData
 630
 631    call odbf_adjustValues(obsdat, headIndexBegin, headIndexEnd)
 632
 633    if ( trim(familyType) /= 'TO' ) then
 634      call ovt_transformObsValues      (obsdat, headIndexBegin, headIndexEnd )
 635      call ovt_adjustHumGZ             (obsdat, headIndexBegin, headIndexEnd )
 636      call obsu_computeVertCoordSurfObs(obsdat, headIndexBegin, headIndexEnd )
 637    end if
 638
 639    ! For GP family, initialize OBS_OER to element 15032 (ZTD formal error) 
 640    ! for all ZTD observations (element 15031)
 641    if ( trim(familyType) == 'GP' ) then
 642      write(*,*) 'odbf_readFile: Initializing OBS_OER for GB-GPS ZTD to formal error (ele 15032)'
 643      call obsu_setGbgpsError( obsdat, headIndexBegin, headIndexEnd )
 644    end if
 645
 646    numHead = obs_numHeader(obsdat)
 647    numBody = obs_numBody(obsdat)
 648    write(*,*) 'odbf_readFile: after reading file, obs_numHeader = ', numHead
 649    write(*,*) 'odbf_readFile: after reading file, obs_numBody   = ', numBody
 650    write(*,*)
 651    write(*,*) 'odbf_readFile: finished'
 652    write(*,*)
 653
 654  end subroutine odbf_readFile
 655
 656  !--------------------------------------------------------------------------
 657  ! odbf_readMidasBodyTable
 658  !--------------------------------------------------------------------------
 659  subroutine odbf_readMidasBodyTable(obsdat, fileName, familyType, fileIndex)
 660    !
 661    !:Purpose: Read values from any column found the MIDAS table, if it
 662    !           already exists in the file. This will replace any existing
 663    !           values read from the original obs-DB tables (e.g. the obs
 664    !           value).
 665    !
 666    implicit none
 667
 668    ! Arguments:
 669    type(struct_obs), intent(inout) :: obsdat
 670    character(len=*), intent(in)    :: fileName   
 671    character(len=*), intent(in)    :: familyType
 672    integer,          intent(in)    :: fileIndex
 673
 674    ! Locals:
 675    type(fSQL_STATUS)    :: stat ! sqlite error status
 676    type(fSQL_DATABASE)  :: db   ! sqlite file handle
 677    type(fSQL_STATEMENT) :: stmt ! precompiled sqlite statements
 678    integer(8)           :: obsIdd
 679    integer              :: obsIdf, obsVarNo, sqlNameIndex, matdata_int(1,1)
 680    integer              :: headIndex, bodyIndex, bodyIndexBegin, bodyIndexEnd
 681    integer              :: obsSpaceColIndex, ierr, numRows, numColumns
 682    real(8)              :: obsPPP, matdata_r8(1,1)
 683    character(len=4)     :: obsSpaceColumnName
 684    character(len=lenSqlName) :: sqlColumnName, vnmSqlName, pppSqlName, varSqlName
 685    character(len=3000)  :: query
 686    logical              :: midasBodyTableExists
 687    logical, allocatable :: midasColumnExists(:)
 688
 689    write(*,*)
 690    write(*,*) 'odbf_readMidasBodyTable: Starting'
 691    write(*,*)
 692    write(*,*) 'odbf_readMidasBodyTable: FileName   : ', trim(FileName)
 693    write(*,*) 'odbf_readMidasBodyTable: FamilyType : ', FamilyType
 694
 695    ! check if midasTable already exists in the file
 696    midasBodyTableExists = sqlu_sqlTableExists(fileName, midasBodyTableName)
 697
 698    if (.not. midasBodyTableExists) then
 699      write(*,*) 'odbf_readMidasBodyTable: MIDAS table not present in file'
 700      return
 701    else
 702      write(*,*) 'odbf_readMidasBodyTable: MIDAS table present in file, will read contents'
 703    end if
 704
 705    ! some sql column names
 706    vnmSqlName = odbf_midasTabColFromObsSpaceName('VNM', midasBodyNamesList)
 707    pppSqlName = odbf_midasTabColFromObsSpaceName('PPP', midasBodyNamesList)
 708    varSqlName = odbf_midasTabColFromObsSpaceName('VAR', midasBodyNamesList)
 709
 710    ! check which columns exist in the MIDAS output table
 711    allocate(midasColumnExists(numMidasBodyMatch))
 712    do sqlNameIndex = 1, numMidasBodyMatch
 713      sqlColumnName = midasBodyNamesList(1,sqlNameIndex)
 714      midasColumnExists(sqlNameIndex) = sqlu_sqlColumnExists(fileName, midasBodyTableName, sqlColumnName)
 715    end do
 716
 717    ! open the obsDB file
 718    call fSQL_open(db, trim(fileName), stat)
 719    if ( fSQL_error(stat) /= FSQL_OK ) then
 720      write(*,*) 'odbf_readMidasBodyTable: fSQL_open: ', fSQL_errmsg(stat)
 721      call utl_abort('odbf_readMidasBodyTable: fSQL_open')
 722    end if
 723    
 724    ! read the contents of the MIDAS table, one column at a time
 725    SQLNAME: do sqlNameIndex = 1, numMidasBodyMatch
 726
 727      ! skip this sql column name if it is not present in the file
 728      if (.not. midasColumnExists(sqlNameIndex)) cycle SQLNAME
 729
 730      ! get obsSpaceData column name and index corresponding to sql column
 731      sqlColumnName      = midasBodyNamesList(1,sqlNameIndex)
 732      obsSpaceColumnName = midasBodyNamesList(2,sqlNameIndex)
 733      ierr = clib_toUpper(obsSpaceColumnName)
 734      obsSpaceColIndex = obs_columnIndexFromName(trim(obsSpaceColumnName))
 735
 736      ! skip this sql column name if the related obsSpaceData column is not active
 737      if (obs_columnDataType(obsSpaceColIndex) == 'real') then
 738        if (.not. obs_columnActive_RB(obsdat, obsSpaceColIndex)) cycle SQLNAME
 739      else
 740        if (.not. obs_columnActive_IB(obsdat, obsSpaceColIndex)) cycle SQLNAME
 741      end if
 742
 743      write(*,*) 'odbf_readMidasBodyTable: reading midasTable column: ', &
 744                 trim(sqlColumnName)
 745      write(*,*) 'odbf_readMidasBodyTable: to update obsSpaceData column: ', &
 746                 trim(obsSpaceColumnName)
 747
 748      ! prepare sql update query
 749      query = 'select ' // trim(sqlColumnName) // &
 750              ' from ' // trim(midasBodyTableName) // &
 751              ' where ' // &
 752              trim(obsBodyKeySqlName) // ' = ? and '   // &
 753              trim(vnmSqlName)     // ' = ? and '   // &
 754              trim(pppSqlName)     // ' = ? ;'
 755      write(*,*) 'odbf_readMidasBodyTable: query ---> ', trim(query)
 756
 757      call fSQL_prepare(db, query , stmt, stat)
 758      if ( fSQL_error(stat) /= FSQL_OK ) then
 759        write(*,*) 'odbf_readMidasBodyTable: fSQL_prepare: ', fSQL_errmsg(stat)
 760        call utl_abort('odbf_readMidasBodyTable: fSQL_prepare')
 761      end if
 762
 763      call fSQL_begin(db)
 764      HEADER2: do headIndex = 1, obs_numHeader(obsdat)
 765
 766        obsIdf = obs_headElem_i( obsdat, OBS_IDF, headIndex )
 767        if ( obsIdf /= fileIndex ) cycle HEADER2
 768
 769        bodyIndexBegin = obs_headElem_i( obsdat, OBS_RLN, headIndex )
 770        bodyIndexEnd = bodyIndexBegin + &
 771                       obs_headElem_i( obsdat, OBS_NLV, headIndex ) - 1
 772
 773        BODY2: do bodyIndex = bodyIndexBegin, bodyIndexEnd
 774
 775          ! execute the sql query to select the desired value
 776          obsIdd  = obs_bodyPrimaryKey(obsdat, bodyIndex)
 777          call fSQL_bind_param(stmt, PARAM_INDEX=1, INT8_VAR=obsIdd)
 778          obsVarNo  = obs_bodyElem_i(obsdat, obs_vnm, bodyIndex)
 779          call fSQL_bind_param(stmt, PARAM_INDEX=2, INT_VAR=obsVarNo)
 780          obsPPP  = obs_bodyElem_r(obsdat, obs_ppp, bodyIndex)
 781          call fSQL_bind_param(stmt, PARAM_INDEX=3, REAL8_VAR=obsPPP)
 782          call fSQL_exec_stmt(stmt)
 783
 784          ! read the real or integer value
 785          if (obs_columnDataType(obsSpaceColIndex) == 'real') then
 786            call fSQL_get_many(stmt, nrows=numRows, ncols=numColumns, &
 787                               mode=FSQL_REAL8, status=stat)
 788            if ( fSQL_error(stat) /= FSQL_OK ) then
 789              write(*,*) 'odbf_readMidasBodyTable: fSQL_get_many: ', fSQL_errmsg(stat)
 790              call utl_abort('odbf_readMidasBodyTable: problem with fSQL_get_many')
 791            end if
 792            if (numRows /= 1 .or. numColumns /= 1) then
 793              write(*,*) 'odbf_readMidasBodyTable: numRows, numColumns =', numRows, numColumns
 794              call utl_abort('odbf_readMidasBodyTable: sql query did not return 1 value')
 795            end if
 796            call fSQL_fill_matrix(stmt, matdata_r8)
 797            call obs_bodySet_r(obsdat,obsSpaceColIndex,bodyIndex,matdata_r8(1,1))
 798          else
 799            call fSQL_get_many(stmt, nrows=numRows, ncols=numColumns, &
 800                               mode=FSQL_INT, status=stat)
 801            if ( fSQL_error(stat) /= FSQL_OK ) then
 802              write(*,*) 'odbf_readMidasBodyTable: fSQL_get_many: ', fSQL_errmsg(stat)
 803              call utl_abort('odbf_readMidasBodyTable: problem with fSQL_get_many')
 804            end if
 805            if (numRows /= 1 .or. numColumns /= 1) then
 806              write(*,*) 'odbf_readMidasBodyTable: numRows, numColumns =', numRows, numColumns
 807              call utl_abort('odbf_readMidasBodyTable: sql query did not return 1 value')
 808            end if
 809            call fSQL_fill_matrix(stmt, matdata_int)
 810            call obs_bodySet_i(obsdat,obsSpaceColIndex,bodyIndex,matdata_int(1,1))
 811          end if ! if (obs_columnDataType(obsSpaceColIndex) == 'real')
 812
 813          call fSQL_free_mem(stmt)
 814
 815        end do BODY2
 816
 817      end do HEADER2
 818
 819      call fSQL_finalize(stmt)
 820      call fSQL_commit(db)
 821
 822    end do SQLNAME
 823
 824    ! close the obsDB file
 825    call fSQL_close(db, stat) 
 826
 827    deallocate(midasColumnExists)
 828
 829    write(*,*)
 830    write(*,*) 'odbf_readMidasBodyTable: finished'
 831    write(*,*)
 832
 833  end subroutine odbf_readMidasBodyTable
 834
 835  !--------------------------------------------------------------------------
 836  ! odbf_getPrimaryKeys
 837  !--------------------------------------------------------------------------
 838  subroutine odbf_getPrimaryKeys(headPrimaryKey, bodyPrimaryKey, bodyHeadKey, &
 839                                 fileName)
 840    !
 841    !:Purpose: Read the values from obsDB file for the head and body table
 842    !           primary keys.
 843    !
 844    implicit none
 845
 846    ! Arguments:
 847    integer(8), allocatable, intent(out) :: headPrimaryKey(:)
 848    integer(8), allocatable, intent(out) :: bodyPrimaryKey(:)
 849    integer(8), allocatable, intent(out) :: bodyHeadKey(:)
 850    character(len=*),        intent(in)  :: fileName
 851
 852    ! Locals:
 853    integer :: numRows, numColumns
 854    integer(8), allocatable   :: tempHeadKey(:,:), tempBodyKey(:,:)
 855    character(len=3000)       :: query
 856    type(fSQL_STATUS)         :: stat ! sqlite error status
 857    type(fSQL_DATABASE)       :: db   ! sqlite file handle
 858    type(fSQL_STATEMENT)      :: stmt ! precompiled sqlite statements
 859
 860    ! open the obsDB file
 861    call fSQL_open(db, trim(fileName), status=stat)
 862    if ( fSQL_error(stat) /= FSQL_OK ) then
 863      write(*,*) 'odbf_getPrimaryKeys: fSQL_open: ', fSQL_errmsg(stat)
 864      call utl_abort('odbf_getPrimaryKeys: fSQL_open')
 865    end if
 866
 867    ! build the sqlite query for the HEADER primary key
 868    query = 'select ' // trim(obsHeadKeySqlName) // ' from ' // &
 869            trim(headTableName) // ';'
 870    write(*,*) 'odbf_getPrimaryKeys: query ---> ', trim(query)
 871
 872    ! read the values from the file
 873    call fSQL_prepare(db, trim(query), stmt, status=stat)
 874    ! note: "status" not set when getting integers
 875    call fSQL_get_many(stmt, nrows=numRows, ncols=numColumns, &
 876                       mode=FSQL_INT8)
 877    write(*,*) 'odbf_getPrimaryKeys: numRows = ', numRows, ', numColumns = ', numColumns
 878    allocate( headPrimaryKey(numRows) )
 879    allocate( tempHeadKey(numRows,1) )
 880    call fSQL_fill_matrix(stmt, tempHeadKey)
 881    headPrimaryKey(:) = tempHeadKey(:,1)
 882    deallocate(tempHeadKey)
 883
 884    ! build the sqlite query for the BODY primary key
 885    query = 'select ' // trim(obsBodyKeySqlName) // ' from ' // &
 886            trim(bodyTableName) // ';'
 887    write(*,*) 'odbf_getPrimaryKeys: query ---> ', trim(query)
 888
 889    ! read the values from the file
 890    call fSQL_prepare(db, trim(query) , stmt, status=stat)
 891    ! note: "status" not set when getting integers
 892    call fSQL_get_many(stmt, nrows=numRows, ncols=numColumns, &
 893                       mode=FSQL_INT8)
 894    write(*,*) 'odbf_getPrimaryKeys: numRows = ', numRows, ', numColumns = ', numColumns
 895    allocate( bodyPrimaryKey(numRows) )
 896    allocate( tempBodyKey(numRows,1) )
 897    call fSQL_fill_matrix(stmt, tempBodyKey)
 898    bodyPrimaryKey(:) = tempBodyKey(:,1)
 899    deallocate(tempBodyKey)
 900
 901    ! build the sqlite query for the BODY-HEAD key
 902    query = 'select ' // trim(obsHeadKeySqlName) // ' from ' // &
 903            trim(bodyTableName) // ';'
 904    write(*,*) 'odbf_getPrimaryKeys: query ---> ', trim(query)
 905
 906    ! read the values from the file
 907    call fSQL_prepare(db, trim(query) , stmt, status=stat)
 908    ! note: "status" not set when getting integers
 909    call fSQL_get_many(stmt, nrows=numRows, ncols=numColumns, &
 910                       mode=FSQL_INT8)
 911    write(*,*) 'odbf_getPrimaryKeys: numRows = ', numRows, ', numColumns = ', numColumns
 912    allocate( bodyHeadKey(numRows) )
 913    allocate( tempBodyKey(numRows,1) )
 914    call fSQL_fill_matrix(stmt, tempBodyKey)
 915    bodyHeadKey(:) = tempBodyKey(:,1)
 916    deallocate(tempBodyKey)
 917
 918    ! close the obsDB file
 919    call fSQL_free_mem(stmt)
 920    call fSQL_finalize(stmt)
 921    call fSQL_close(db, stat) 
 922
 923  end subroutine odbf_getPrimaryKeys
 924
 925  !--------------------------------------------------------------------------
 926  ! odbf_setSurfaceType
 927  !--------------------------------------------------------------------------
 928  subroutine odbf_setSurfaceType(obsdat, headIndexBegin, numRowsHeadTable, &
 929                                 fileName, tableName)
 930    !
 931    !:Purpose: Set the surface type based on lat-lon and some external mask files.
 932    !
 933    implicit none
 934
 935    ! Arguments:
 936    type(struct_obs), intent(inout) :: obsdat
 937    integer,          intent(in)    :: headIndexBegin
 938    integer,          intent(in)    :: numRowsHeadTable
 939    character(len=*), intent(in)    :: fileName
 940    character(len=*), intent(in)    :: tableName
 941
 942    ! Locals:
 943    integer              :: numRows, numColumns, headTableIndex, headIndex
 944    integer, allocatable :: columnValues(:,:)
 945    character(len=3000)  :: query
 946    type(fSQL_STATUS)    :: stat ! sqlite error status
 947    type(fSQL_DATABASE)  :: db   ! sqlite file handle
 948    type(fSQL_STATEMENT) :: stmt ! precompiled sqlite statements
 949
 950    ! open the obsDB file
 951    call fSQL_open(db, trim(fileName), status=stat)
 952    if ( fSQL_error(stat) /= FSQL_OK ) then
 953      write(*,*) 'odbf_setSurfaceType: fSQL_open: ', fSQL_errmsg(stat)
 954      call utl_abort('odbf_setSurfaceType: fSQL_open')
 955    end if
 956
 957    ! build the sqlite query
 958    query = 'select mask_mer(lat,lon), mask_glace_clim(lat,lon) from '&
 959             // trim(tableName) // ';'
 960    write(*,*) 'odbf_setSurfaceType: query ---> ', trim(query)
 961
 962    ! read the values from the query result
 963    call fSQL_prepare(db, trim(query), stmt, status=stat)
 964    call fSQL_get_many(stmt, nrows=numRows, ncols=numColumns, &
 965                       mode=FSQL_INT, status=stat)
 966    write(*,*) 'odbf_setSurfaceType: numRows = ', numRows, ', numColumns = ', numColumns
 967    if (numRows /= numRowsHeadTable) then
 968      write(*,*) 'odbf_setSurfaceType: numRows = ', numRows, &
 969                 ', numRowsHeadTable = ', numRowsHeadTable      
 970      call utl_abort('odbf_setSurfaceType: Number of rows found in mask query is &
 971                       not equal to total number of rows in head table')
 972    end if      
 973    allocate( columnValues(numRows, numColumns) )
 974    call fSQL_fill_matrix(stmt, columnValues)
 975
 976    ! set the values of STYP and TTYP
 977    do headTableIndex = 1, numRows
 978      headIndex = headTableIndex + headIndexBegin - 1
 979      call obs_headSet_i(obsdat, OBS_STYP, headIndex, columnValues(headTableIndex,1))
 980      
 981      if (columnValues(headTableIndex,1) == 1 .and. columnValues(headTableIndex,2) == 1) then
 982        ! set terrain type to 0 over water with sea ice
 983        call obs_headSet_i(obsdat, OBS_TTYP, headIndex, 0)
 984      else
 985        ! otherwise set terrain type to -1
 986        call obs_headSet_i(obsdat, OBS_TTYP, headIndex, -1)
 987      end if
 988
 989    end do
 990
 991    ! close the obsDB file
 992    call fSQL_free_mem(stmt)
 993    call fSQL_finalize(stmt)
 994    call fSQL_close(db, stat) 
 995
 996  end subroutine odbf_setSurfaceType
 997
 998  !--------------------------------------------------------------------------
 999  ! odbf_copyToObsSpaceHeadChar
1000  !--------------------------------------------------------------------------
1001  subroutine odbf_copyToObsSpaceHeadChar(obsdat, headCharSqlNames, &
1002                                         headCharValues, headIndexBegin)
1003    !
1004    !:Purpose: Copy character string values from a local table into
1005    !           obsSpaceData header rows. Currently, only the STATION ID
1006    !           and OBS_ITY (i.e. codeType from character sql column
1007    !           containing obs type name).
1008    !
1009    implicit none
1010
1011    ! Arguments:
1012    type(struct_obs), intent(inout) :: obsdat
1013    character(len=*), intent(in)    :: headCharSqlNames(:)
1014    character(len=*), intent(in)    :: headCharValues(:,:)
1015    integer,          intent(in)    :: headIndexBegin
1016
1017    ! Locals:
1018    character(len=lenSqlName), allocatable :: stIdSqlName(:), codeTypeSqlName(:)
1019    integer :: columnIndex, headTableIndex, headIndex
1020    integer :: numRowsHeadTable, codeType
1021
1022    numRowsHeadTable = size(headCharValues,1)
1023
1024    ! Set the STATION ID
1025    stIdSqlName = odbf_sqlNameFromObsSpaceName('STID')
1026    columnIndex = utl_findloc(headCharSqlNames(:), trim(stIdSqlName(1)))
1027    if (columnIndex == 0) then
1028      call utl_abort('odbf_copyToObsSpaceHeadChar: Station ID column not found in sql table')
1029    end if
1030    do headTableIndex = 1, numRowsHeadTable
1031      headIndex = headTableIndex + headIndexBegin - 1
1032      if (headTableIndex == 1) then
1033        write(*,*) 'odbf_copyToObsSpaceHeadChar: set header char column   : ', &
1034                   trim(headCharSqlNames(columnIndex))
1035      end if
1036      call obs_set_c(obsdat, 'STID', &
1037                     headIndex, headCharValues(headTableIndex,columnIndex))
1038    end do
1039
1040    ! Set the codeType (obs_ity) from a character column containing the obs type
1041    codeTypeSqlName = odbf_sqlNameFromObsSpaceName('ITY')
1042    columnIndex = utl_findloc(headCharSqlNames(:), trim(codeTypeSqlName(1)))
1043    if (columnIndex == 0) then
1044      call utl_abort('odbf_copyToObsSpaceHeadChar: Obs type column not found in sql table')
1045    end if
1046    do headTableIndex = 1, numRowsHeadTable
1047      headIndex = headTableIndex + headIndexBegin - 1
1048      if (headTableIndex == 1) then
1049        write(*,*) 'odbf_copyToObsSpaceHeadChar: set header char column   : ', &
1050                   trim(headCharSqlNames(columnIndex))
1051      end if
1052      codeType = codtyp_get_codtyp(trim(headCharValues(headTableIndex,columnIndex)))
1053      if (codeType == -1) then
1054        write(*,*) 'odbf_copyToObsSpaceHeadChar: obs type =', &
1055                   trim(headCharValues(headTableIndex,columnIndex))
1056        call utl_abort('odbf_copyToObsSpaceHeadChar: codtyp for this obs type not found') 
1057      end if
1058      call obs_headSet_i(obsdat, OBS_ITY, headIndex, codeType)
1059    end do ! do headTableIndex
1060
1061  end subroutine odbf_copyToObsSpaceHeadChar
1062
1063  !--------------------------------------------------------------------------
1064  ! odbf_copyToObsSpaceHeadDate
1065  !--------------------------------------------------------------------------
1066  subroutine odbf_copyToObsSpaceHeadDate(obsdat, headDateValues, headTimeValues, headIndexBegin)
1067    !
1068    !:Purpose: Copy date values from a local table into
1069    !           obsSpaceData header rows.
1070    !
1071    implicit none
1072
1073    ! Arguments:
1074    type(struct_obs), intent(inout) :: obsdat
1075    integer,          intent(in)    :: headDateValues(:)
1076    integer,          intent(in)    :: headTimeValues(:)
1077    integer,          intent(in)    :: headIndexBegin
1078
1079    ! Locals:
1080    integer :: headTableIndex, headIndex
1081    integer :: numRowsHeadTable
1082
1083    numRowsHeadTable = size(headDateValues,1)
1084
1085    do headTableIndex = 1, numRowsHeadTable
1086      headIndex = headTableIndex + headIndexBegin - 1
1087      if (headTableIndex == 1) then
1088        write(*,*) 'odbf_copyToObsSpaceHeadDate: set header date/time column'
1089      end if
1090      call obs_headSet_i(obsdat, obs_dat, headIndex, headDateValues(headTableIndex))
1091      call obs_headSet_i(obsdat, obs_etm, headIndex, headTimeValues(headTableIndex))
1092    end do
1093
1094  end subroutine odbf_copyToObsSpaceHeadDate
1095
1096  !--------------------------------------------------------------------------
1097  ! odbf_copyToObsSpaceHead
1098  !--------------------------------------------------------------------------
1099  subroutine odbf_copyToObsSpaceHead(obsdat, headSqlNames, headPrimaryKey, &
1100                                     headValues, headIndexBegin)
1101    !
1102    !:Purpose: Copy real and integer values from a local table into
1103    !           obsSpaceData header rows.
1104    !
1105    implicit none
1106
1107    ! Arguments:
1108    type(struct_obs), intent(inout) :: obsdat
1109    character(len=*), intent(in)    :: headSqlNames(:)
1110    integer(8),       intent(in)    :: headPrimaryKey(:)
1111    real(8),          intent(in)    :: headValues(:,:)
1112    integer,          intent(in)    :: headIndexBegin
1113
1114    ! Locals:
1115    integer :: columnIndex, matchIndex, headTableIndex, headIndex
1116    integer :: numRowsHeadTable, obsColumnIndex
1117
1118    numRowsHeadTable = size(headValues,1)
1119
1120    write(*,*) 'odbf_copyToObsSpaceHead: set header primary key'
1121    do headTableIndex = 1, numRowsHeadTable
1122      headIndex = headTableIndex + headIndexBegin - 1
1123      call obs_setHeadPrimaryKey(obsdat, headIndex, headPrimaryKey(headTableIndex))
1124    end do
1125
1126    column_loop: do columnIndex = 1, size(headSqlNames)
1127      matchIndex = utl_findloc(headMatchList(sqlColIndex,:), headSqlNames(columnIndex))
1128
1129      if (matchIndex == 0) then
1130
1131        write(*,*) 'odbf_copyToObsSpaceHead: unknown column name      : ', &
1132                   trim(headSqlNames(columnIndex))
1133
1134      else
1135
1136        obsColumnIndex = obs_columnIndexFromName(trim(headMatchList(obsColIndex,matchIndex)))
1137
1138        headTable_loop: do headTableIndex = 1, numRowsHeadTable
1139          headIndex = headTableIndex + headIndexBegin - 1
1140
1141          if (obs_columnDataType(obsColumnIndex) == 'real') then
1142            if ( obs_columnActive_RH(obsdat, obsColumnIndex) ) then
1143              if (headTableIndex == 1) then
1144                write(*,*) 'odbf_copyToObsSpaceHead: set header real column   : ', &
1145                           trim(headSqlNames(columnIndex))
1146              end if
1147              call obs_headSet_r(obsdat, obsColumnIndex, headIndex, &
1148                                 real(headValues(headTableIndex,columnIndex),&
1149                                 pre_obsReal))
1150            end if
1151          else if (obs_columnDataType(obsColumnIndex) == 'integer') then
1152            if ( obs_columnActive_IH(obsdat, obsColumnIndex) ) then
1153              if (headTableIndex == 1) then
1154                write(*,*) 'odbf_copyToObsSpaceHead: set header integer column: ', &
1155                           trim(headSqlNames(columnIndex))
1156              end if
1157              call obs_headSet_i(obsdat, obsColumnIndex, &
1158                                 headIndex, nint(headValues(headTableIndex,columnIndex)))
1159            end if
1160          else
1161            call utl_abort('odbf_copyToObsSpaceHead: unknown data type for obs header column')
1162          end if ! if (obs_columnDataType(obsColumnIndex) == 'real')
1163
1164        end do headTable_loop
1165
1166      end if ! if (matchIndex == 0)
1167
1168    end do column_loop
1169
1170  end subroutine odbf_copyToObsSpaceHead
1171
1172  !--------------------------------------------------------------------------
1173  ! odbf_copyToObsSpaceBody
1174  !--------------------------------------------------------------------------
1175  subroutine odbf_copyToObsSpaceBody(obsdat, bodySqlNames, &
1176                                     bodyPrimaryKey, bodyHeadKey, bodyValues, &
1177                                     bodyIndexBegin, headIndexBegin)
1178    !
1179    !:Purpose: Copy real and integer values from a local table into
1180    !           obsSpaceData body rows. Note: this version currently
1181    !           assumes that only 1 observed quantity is present for
1182    !           each row of the sqlite table. This is likely only valid
1183    !           for radiance observation types and therefore modifications
1184    !           will be required for other observation types.
1185    !
1186    implicit none
1187
1188    ! Arguments:
1189    type(struct_obs), intent(inout) :: obsdat
1190    character(len=*), intent(in)    :: bodySqlNames(:)
1191    integer(8),       intent(in)    :: bodyPrimaryKey(:)
1192    integer(8),       intent(in)    :: bodyHeadKey(:)
1193    real(8),          intent(in)    :: bodyValues(:,:)
1194    integer,          intent(in)    :: bodyIndexBegin
1195    integer,          intent(in)    :: headIndexBegin
1196
1197    ! Locals:
1198    character(len=lenSqlName), allocatable :: obsValueSqlNames(:)
1199    integer :: columnIndex, matchIndex, bodyTableIndex, bodyIndex, headIndex
1200    integer :: numRowsBodyTable, obsNlv, obsValueIndex, numObsValues
1201    integer(8) :: lastHeadKey
1202    integer, allocatable :: bodyColumnIndexObsValueList(:)
1203    integer, allocatable :: obsVarNoList(:)
1204    integer, allocatable :: matchIndexVec(:)
1205    integer, allocatable :: obsColumnIndex(:)
1206    logical              :: firstHead
1207
1208    numRowsBodyTable = size(bodyValues,1)
1209
1210    ! initialize some arrays to save lots of time in the main loops
1211    allocate(matchIndexVec(size(bodySqlNames)))
1212    do columnIndex = 1, size(bodySqlNames)
1213      matchIndexVec(columnIndex) = utl_findloc(bodyMatchList(sqlColIndex,:), &
1214                                               bodySqlNames(columnIndex))
1215    end do
1216    allocate(obsColumnIndex(numBodyMatch))
1217    do matchIndex = 1, numBodyMatch
1218      obsColumnIndex(matchIndex) = obs_columnIndexFromName(bodyMatchList(obsColIndex,matchIndex))
1219    end do
1220
1221    ! figure out column indexes for observation values (OBS_VAR)
1222    obsValueSqlNames = odbf_sqlNameFromObsSpaceName('VAR')
1223    numObsValues = size(obsValueSqlNames)
1224    allocate(obsVarNoList(numObsValues))
1225    allocate(bodyColumnIndexObsValueList(numObsValues))
1226    do obsValueIndex = 1, numObsValues
1227      bodyColumnIndexObsValueList(obsValueIndex) = &
1228           utl_findloc(bodySqlNames(:), obsValueSqlNames(obsValueIndex))
1229      if (bodyColumnIndexObsValueList(obsValueIndex) == 0) then
1230        write(*,*) 'odbf_copyToObsSpaceBody: obsValueSqlName = ', &
1231                   trim(obsValueSqlNames(obsValueIndex))
1232        call utl_abort('odbf_copyToObsSpaceBody: column with obs value not present')
1233      end if
1234      ! determine varNo for the observation value
1235      obsVarNoList(obsValueIndex) = odbf_varNoFromSqlName(obsValueSqlNames(obsValueIndex))
1236      write(*,*) 'odbf_copyToObsSpaceBody: obsVarNo = ', obsVarNoList(obsValueIndex)
1237    end do
1238
1239    lastHeadKey = 0
1240    firstHead = .true.
1241    headIndex = headIndexBegin - 1
1242    bodyIndex = bodyIndexBegin - 1
1243
1244    bodyIndex_loop: do bodyTableIndex = 1, numRowsBodyTable
1245
1246      obsValueIndex_loop: do obsValueIndex = 1, numObsValues
1247
1248        ! initialize count of number of body rows for each header row (OBS_NLV)
1249        if ( firstHead .or. (bodyHeadKey(bodyTableIndex) /= lastHeadKey) ) then
1250          headIndex = headIndex + 1
1251          call obs_headSet_i(obsdat, OBS_NLV, headIndex, 0)
1252          lastHeadKey = bodyHeadKey(bodyTableIndex)
1253          firstHead = .false.
1254        end if
1255
1256        ! check that the primary key for header table matches the value in the body table
1257        if ( obs_headPrimaryKey( obsdat, headIndex ) /= &
1258             bodyHeadKey(bodyTableIndex) ) then
1259          write(*,*) 'odbf_copyToObsSpaceBody: primary key in HEADER table = ', &
1260                     obs_headPrimaryKey( obsdat, headIndex )
1261          write(*,*) 'odbf_copyToObsSpaceBody: same key in BODY table      = ', &
1262                     bodyHeadKey(bodyTableIndex)
1263          call utl_abort('odbf_copyToObsSpaceBody: Primary key of HEADER table ' // &
1264                         'not equal to value in BODY table')
1265        end if
1266
1267        ! check if obs value is null/missing
1268        if ( bodyValues(bodyTableIndex,bodyColumnIndexObsValueList(obsValueIndex)) == &
1269             MPC_missingValue_R8 ) then
1270          cycle obsValueIndex_loop
1271        end if
1272
1273        ! check if element id is in list
1274        if ( utl_findloc(elemIdList(1:numElemIdList),obsVarNoList(obsValueIndex)) == 0 ) then
1275          cycle obsValueIndex_loop
1276        end if
1277
1278        ! add to count of number of body rows for each header row (OBS_NLV)
1279        obsNLV = obs_headElem_i(obsdat, OBS_NLV, headIndex)
1280        call obs_headSet_i(obsdat, OBS_NLV, headIndex, obsNLV + 1)
1281
1282        bodyIndex = bodyIndex + 1
1283
1284        ! copy body primary key to obsSpaceData
1285        if (bodyTableIndex == 1) then
1286          write(*,*) 'odbf_copyToObsSpaceBody: set body primary key'
1287        end if
1288        call obs_setBodyPrimaryKey(obsdat, bodyIndex, bodyPrimaryKey(bodyTableIndex))
1289
1290        ! Set a bunch of obsSpaceData body columns to 'missing'
1291        if (obs_columnActive_RB(obsdat, OBS_OMA))  call obs_bodySet_r(obsdat, OBS_OMA , bodyIndex, obs_missingValue_R)
1292        if (obs_columnActive_RB(obsdat, OBS_OMA0)) call obs_bodySet_r(obsdat, OBS_OMA0, bodyIndex, obs_missingValue_R)
1293        if (obs_columnActive_RB(obsdat, OBS_OMP))  call obs_bodySet_r(obsdat, OBS_OMP , bodyIndex, obs_missingValue_R)
1294        if (obs_columnActive_RB(obsdat, OBS_OMP6)) call obs_bodySet_r(obsdat, OBS_OMP6, bodyIndex, obs_missingValue_R)
1295        if (obs_columnActive_RB(obsdat, OBS_OER))  call obs_bodySet_r(obsdat, OBS_OER , bodyIndex, obs_missingValue_R)
1296        if (obs_columnActive_RB(obsdat, OBS_HPHT)) call obs_bodySet_r(obsdat, OBS_HPHT, bodyIndex, obs_missingValue_R)
1297        if (obs_columnActive_RB(obsdat, OBS_HAHT)) call obs_bodySet_r(obsdat, OBS_HAHT, bodyIndex, obs_missingValue_R)
1298        if (obs_columnActive_RB(obsdat, OBS_WORK)) call obs_bodySet_r(obsdat, OBS_WORK, bodyIndex, obs_missingValue_R)
1299        if (obs_columnActive_RB(obsdat, OBS_SIGI)) call obs_bodySet_r(obsdat, OBS_SIGI, bodyIndex, obs_missingValue_R)
1300        if (obs_columnActive_RB(obsdat, OBS_SIGO)) call obs_bodySet_r(obsdat, OBS_SIGO, bodyIndex, obs_missingValue_R)
1301        if (obs_columnActive_RB(obsdat, OBS_ZHA )) call obs_bodySet_r(obsdat, OBS_ZHA , bodyIndex, obs_missingValue_R)
1302        if (obs_columnActive_RB(obsdat, OBS_SEM )) call obs_bodySet_r(obsdat, OBS_SEM , bodyIndex, obs_missingValue_R)
1303        if (obs_columnActive_RB(obsdat, OBS_BCOR)) call obs_bodySet_r(obsdat, OBS_BCOR, bodyIndex, obs_missingValue_R)
1304
1305        ! set the varNo for this obsValue
1306        call obs_bodySet_i(obsdat, OBS_VNM, bodyIndex, obsVarNoList(obsValueIndex))
1307
1308        ! copy real and integer values into obsSpaceData
1309        columnIndex_loop: do columnIndex = 1, size(bodySqlNames)
1310          matchIndex = matchIndexVec(columnIndex)
1311
1312          if (matchIndex == 0) then
1313
1314            if (bodyTableIndex == 1) then
1315              write(*,*) 'odbf_copyToObsSpaceBody: unknown column name    : ', &
1316                         trim(bodySqlNames(columnIndex))
1317            end if
1318
1319          else
1320
1321            ! if this column corresponds to the obs value, then check if it is the one we want
1322            if ( obsColumnIndex(matchIndex) == OBS_VAR ) then
1323              if ( columnIndex /= bodyColumnIndexObsValueList(obsValueIndex) ) then
1324                ! skip this column
1325                cycle columnIndex_loop
1326                if (bodyTableIndex == 1) then
1327                  write(*,*) 'odbf_copyToObsSpaceBody: skip obs body column   : ', &
1328                             trim(bodySqlNames(columnIndex)), &
1329                             ' for obsValueIndex = ', obsValueIndex
1330                end if
1331              end if
1332            end if
1333
1334            if (obs_columnDataType(obsColumnIndex(matchIndex)) == 'real') then
1335              ! real values
1336              if ( obs_columnActive_RB(obsdat, obsColumnIndex(matchIndex)) ) then
1337                if (bodyTableIndex == 1) then
1338                  write(*,*) 'odbf_copyToObsSpaceBody: set body real column   : ', &
1339                             trim(bodySqlNames(columnIndex))
1340                end if
1341                call obs_bodySet_r(obsdat, obsColumnIndex(matchIndex), bodyIndex, &
1342                                   bodyValues(bodyTableIndex,columnIndex))
1343              end if
1344            else if (obs_columnDataType(obsColumnIndex(matchIndex)) == 'integer') then
1345              ! integer values
1346              if ( obs_columnActive_IB(obsdat, obsColumnIndex(matchIndex)) ) then
1347                if (bodyTableIndex == 1) then
1348                  write(*,*) 'odbf_copyToObsSpaceBody: set body integer column: ', &
1349                             trim(bodySqlNames(columnIndex))
1350                end if
1351                call obs_bodySet_i(obsdat, obsColumnIndex(matchIndex), &
1352                                   bodyIndex, nint(bodyValues(bodyTableIndex,columnIndex)))
1353              end if
1354            else
1355              call utl_abort('odbf_copyToObsSpaceBody: unknown data type for obs body column')
1356            end if ! if (obs_columnDataType(obsColumnIndex(matchIndex))
1357
1358          end if ! if (matchIndex == 0)
1359
1360        end do columnIndex_loop
1361
1362      end do obsValueIndex_loop
1363
1364    end do bodyIndex_loop
1365
1366    deallocate(matchIndexVec)
1367    deallocate(obsColumnIndex)
1368    deallocate(obsVarNoList)
1369    deallocate(bodyColumnIndexObsValueList)
1370
1371  end subroutine odbf_copyToObsSpaceBody
1372
1373  !--------------------------------------------------------------------------
1374  ! odbf_adjustValues
1375  !--------------------------------------------------------------------------
1376  subroutine odbf_adjustValues(obsdat, headIndexBegin, headIndexEnd)
1377    !
1378    !:Purpose: Adjust units and other minor modifications of some
1379    !           obsSpaceData columns after transfer from sqlite files
1380    !
1381    implicit none
1382
1383    ! Arguments:
1384    type(struct_obs), intent(inout) :: obsdat
1385    integer,          intent(in)    :: headIndexBegin
1386    integer,          intent(in)    :: headIndexEnd
1387
1388    ! Locals:
1389    integer :: headIndex, bodyIndexStart, bodyIndexEnd, bodyIndex
1390    integer :: instrument, obsSat, codeType, sensor
1391    real(8) :: obsLon, obsLat, surfEmiss
1392    character(len=2) :: obsFamily
1393
1394    do headIndex = headIndexBegin, headIndexEnd
1395
1396      obsFamily = obs_getfamily( obsdat, headerIndex_opt=headIndex )
1397      bodyIndexStart = obs_headElem_i(obsdat, OBS_RLN, headIndex)
1398      bodyIndexEnd   = obs_headElem_i(obsdat, OBS_NLV, headIndex) + &
1399                       bodyIndexStart - 1
1400
1401      ! Convert lon-lat from degrees to radians
1402
1403      obsLon = obs_headElem_r( obsdat, OBS_LON, headIndex )
1404      obsLat = obs_headElem_r( obsdat, OBS_LAT, headIndex )
1405
1406      if ( obsLon < 0.0D0 ) obsLon = obsLon + 360.0D0
1407      obsLon = obsLon * MPC_RADIANS_PER_DEGREE_R8
1408      obsLat = obsLat * MPC_RADIANS_PER_DEGREE_R8
1409
1410      call obs_headSet_r(obsdat, OBS_LON, headIndex, real(obsLon,pre_obsReal))
1411      call obs_headSet_r(obsdat, OBS_LAT, headIndex, real(obsLat,pre_obsReal))
1412
1413      ! Set global and observation flags to zero if specified
1414
1415      if (setObsFlagZero) then
1416        call obs_headSet_i(obsdat, OBS_ST1, headIndex, 0)
1417        do bodyIndex = bodyIndexStart, bodyIndexEnd
1418          call obs_bodySet_i(obsdat, OBS_FLG, bodyIndex, 0)
1419        end do
1420      end if
1421
1422      ! Various adjustment for radiance observations
1423
1424      if ( obsFamily == 'TO' ) then
1425
1426        instrument = obs_headElem_i( obsdat, OBS_INS, headIndex )
1427        obsSat     = obs_headElem_i( obsdat, OBS_SAT, headIndex )
1428        codeType   = obs_headElem_i( obsdat, OBS_ITY, headIndex )
1429        sensor     = obs_headElem_i( obsdat, OBS_SEN, headIndex )
1430
1431        ! set sensor to missing if not amsua/b, mhs or atms
1432
1433        if ( codeType /= codtyp_get_codtyp('amsua') .and. &
1434             codeType /= codtyp_get_codtyp('amsub') .and. &
1435             codeType /= codtyp_get_codtyp('mhs') .and. &
1436             codeType /= codtyp_get_codtyp('atms') ) then
1437          sensor = nint(MPC_missingValue_R8)
1438        end if
1439
1440        ! modify OBS_SAT, OBS_INS and OBS_SEN
1441
1442        if ( instrument == 420 ) obsSat  = 784
1443        if ( codeType == 202 .and. instrument == 620 ) instrument = 2046
1444        if ( sensor == nint(MPC_missingValue_R8) ) then
1445          sensor = 0
1446          if (instrument == nint(MPC_missingValue_R8) ) instrument = 0
1447        else
1448          instrument = obsu_cvt_obs_instrum(sensor)
1449        end if
1450        call obs_headSet_i(obsdat, OBS_INS, headIndex, instrument)
1451        call obs_headSet_i(obsdat, OBS_SAT, headIndex, obsSat)
1452        call obs_headSet_i(obsdat, OBS_SEN, headIndex, sensor)
1453
1454        ! change units for surface emissivity
1455
1456        if (obs_columnActive_RB(obsdat, OBS_SEM)) then
1457          do bodyIndex = bodyIndexStart, bodyIndexEnd
1458            surfEmiss = obs_bodyElem_r(obsdat, OBS_SEM, bodyIndex)
1459            surfEmiss = surfEmiss * 0.01D0
1460            call obs_bodySet_r(obsdat, OBS_SEM, bodyIndex, surfEmiss)
1461          end do
1462        end if
1463
1464      end if ! obsFamily = 'TO'
1465
1466    end do ! do headIndex
1467
1468  end subroutine odbf_adjustValues
1469
1470  !--------------------------------------------------------------------------
1471  ! odbf_sqlNameFromObsSpaceName
1472  !--------------------------------------------------------------------------
1473  function odbf_sqlNameFromObsSpaceName(obsSpaceName) result(sqlName)
1474    !
1475    !:Purpose: Return the corresponding sql file column name for a
1476    !           given obsSpaceData column name from the matching
1477    !           tables.
1478    !
1479    implicit none
1480
1481    ! Arguments:
1482    character(len=*), intent(in)           :: obsSpaceName
1483    ! Result:
1484    character(len=lenSqlName), allocatable :: sqlName(:)
1485
1486    ! Locals:
1487    integer                   :: numMatchFound, matchFoundIndex
1488    integer, allocatable      :: matchIndexList(:)
1489
1490    if (allocated(sqlName)) deallocate(sqlName)
1491
1492    ! first try the body matching list
1493    matchIndexList = utl_findlocs(bodyMatchList(obsColIndex,:), trim(obsSpaceName))
1494    if (matchIndexList(1) > 0) then
1495      numMatchFound = size(matchIndexList)
1496      allocate(sqlName(numMatchFound))
1497      do matchFoundIndex = 1, numMatchFound
1498        sqlName(matchFoundIndex) = bodyMatchList(sqlColIndex,matchIndexList(matchFoundIndex))
1499      end do
1500      return
1501    end if
1502
1503    ! now try the header matching list
1504    matchIndexList = utl_findlocs(headMatchList(obsColIndex,:), trim(obsSpaceName))
1505    if (matchIndexList(1) > 0) then
1506      numMatchFound = size(matchIndexList)
1507      allocate(sqlName(numMatchFound))
1508      do matchFoundIndex = 1, numMatchFound
1509        sqlName(matchFoundIndex) = headMatchList(sqlColIndex,matchIndexList(matchFoundIndex))
1510      end do
1511      return
1512    end if
1513
1514    ! not found in either list, abort
1515    write(*,*) 'odbf_sqlNameFromObsSpaceName: requested obsSpace name = ', trim(obsSpaceName)
1516    call utl_abort('odbf_sqlNameFromObsSpaceName: obsSpace name not found in matching list')
1517    
1518  end function odbf_sqlNameFromObsSpaceName
1519
1520  !--------------------------------------------------------------------------
1521  ! odbf_midasTabColFromObsSpaceName
1522  !--------------------------------------------------------------------------
1523  function odbf_midasTabColFromObsSpaceName(obsSpaceName, midasSQLColumnList) result(sqlColName)
1524    !
1525    !:Purpose: Return the corresponding sql file column name for a
1526    !           given obsSpaceData column name from the midas table
1527    !           matching list.
1528    !
1529    implicit none
1530
1531    ! Arguments:
1532    character(len=*),          intent(in) :: obsSpaceName
1533    character(len=lenSqlName), intent(in) :: midasSQLColumnList(:,:)
1534    ! Result:
1535    character(len=lenSqlName)    :: sqlColName
1536
1537    ! Locals:
1538    integer :: matchIndex
1539
1540    ! look in the midas table matching list
1541    matchIndex = utl_findloc(midasSQLColumnList(obsColIndex,:), trim(obsSpaceName))
1542    if (matchIndex > 0) then
1543      sqlColName = midasSQLColumnList(sqlColIndex,matchIndex)
1544      return
1545    end if
1546
1547    ! not found, abort
1548    write(*,*) 'odbf_midasTabColFromObsSpaceName: requested obsSpace name = ', trim(obsSpaceName)
1549    call utl_abort('odbf_midasTabColFromObsSpaceName: obsSpace name not found in midasSQLNamesList')
1550    
1551  end function odbf_midasTabColFromObsSpaceName
1552
1553  !--------------------------------------------------------------------------
1554  ! odbf_varNoFromSqlName
1555  !--------------------------------------------------------------------------
1556  function odbf_varNoFromSqlName(sqlName) result(varNo)
1557    !
1558    !:Purpose: Return the bufr element id number from the corresponding
1559    !           sql file column name of an observed value.
1560    !
1561    implicit none
1562
1563    ! Arguments:
1564    character(len=*), intent(in) :: sqlName
1565    ! Result:
1566    integer                      :: varNo
1567
1568    ! Locals:
1569    integer           :: matchIndex
1570    character(len=10) :: varNoStr
1571  
1572    matchIndex = utl_findloc(varNoList(sqlColIndex,:), trim(sqlName))
1573    if (matchIndex > 0) then
1574      varNoStr = varNoList(varNoColIndex,matchIndex)
1575      read(varNoStr,*) varNo
1576      return
1577    end if
1578
1579    write(*,*) 'odbf_varNoFromSqlName: requested sqlName = ', trim(sqlName)
1580    call utl_abort('odbf_varNoFromSqlName: not found in varNo list')
1581    
1582  end function odbf_varNoFromSqlName
1583
1584  !--------------------------------------------------------------------------
1585  ! odbf_updateFile
1586  !--------------------------------------------------------------------------
1587  subroutine odbf_updateFile(obsdat, fileName, familyType, fileIndex)
1588    !
1589    !:Purpose: Call subroutines to update MIDAS Header and Body tables
1590    !
1591    implicit none
1592    
1593    ! Arguments:
1594    type (struct_obs), intent(inout) :: obsdat
1595    character(len=*),  intent(in)    :: fileName
1596    character(len=*),  intent(in)    :: familyType
1597    integer,           intent(in)    :: fileIndex
1598
1599    call utl_tmg_start(14,'----UpdateObsDBfile')
1600    
1601    call odbf_setup()
1602
1603    ! Check if the Midas Header Table needs to be updated, specified from namelist
1604    if ( .not. utl_isNamelistPresent('namObsDbMIDASHeaderUpdate','./flnml') ) then
1605      if ( mmpi_myid == 0 ) then
1606        write(*,*) 'odbf_updateFile: namObsDbMIDASHeaderUpdate is missing in the namelist.'
1607        write(*,*) '                 The MIDAS Header Output Table will not be updated.'
1608      end if
1609    else
1610      ! Update the MIDAS Header Output Table
1611      call odbf_insertInMidasHeaderTable(obsdat, fileIndex, fileName, familyType)
1612    end if
1613
1614    ! Check if the Midas Body Table needs to be updated, specified from namelist
1615    if ( .not. utl_isNamelistPresent('namObsDbMIDASBodyUpdate','./flnml') ) then
1616      if ( mmpi_myid == 0 ) then
1617        write(*,*) 'odbf_updateFile: namObsDbMIDASBodyUpdate is missing in the namelist.'
1618        write(*,*) '                 The MIDAS Body Output Table will not be updated.'
1619      end if
1620    else
1621      ! Update the MIDAS Body Output Table
1622      call odbf_insertInMidasBodyTable(obsdat, fileIndex, fileName, familyType)
1623    end if
1624
1625    call utl_tmg_stop(14)
1626
1627  end subroutine odbf_updateFile
1628
1629  !--------------------------------------------------------------------------
1630  ! odbf_insertInMidasHeaderTable
1631  !--------------------------------------------------------------------------
1632  subroutine odbf_insertInMidasHeaderTable(obsdat, fileIndex, fileName, familyType)
1633    !
1634    !:Purpose: Insert selected columns in the MIDAS Header Output table using
1635    !           values from obsSpaceData. If the MIDAS Header table does not already
1636    !           exist, it is created by copying the observation table.
1637    !           A single table is created that contains all quantities being
1638    !           updated. Unlike the observation table, each observed variable
1639    !           is stored in a separate row and all quantities are in columns
1640    !           (e.g. ETOP, VTOP, ECF,...).
1641    !
1642    implicit none
1643    
1644    ! Arguments:
1645    type(struct_obs), intent(inout) :: obsdat
1646    character(len=*), intent(in)    :: fileName
1647    character(len=*), intent(in)    :: familyType 
1648    integer,          intent(in)    :: fileIndex
1649    
1650    ! Locals:
1651    type(fSQL_STATUS)    :: stat ! sqlite error status
1652    type(fSQL_DATABASE)  :: db   ! sqlite file handle
1653    type(fSQL_STATEMENT) :: stmt ! precompiled sqlite statements
1654    integer(8)           :: obsIdo
1655    integer              :: columnParamIndex
1656    integer              :: midasKey, obsIdf, updateItemIndex, updateValue_i
1657    integer              :: headIndex, maxNumHeader
1658    integer              :: obsSpaceColIndexSource, fnom, fclos, nulnam, ierr
1659    integer              :: maxNumHeaderAllMpi(mmpi_nprocs), obsSpaceColIndexSourceArr(15)
1660    real(8)              :: updateValue_r
1661    character(len=3000)  :: query, queryCreateTable, queryInsertInTable
1662    character(len=5000)  :: tableInsertColumnList
1663    logical              :: midasTableExists
1664    logical, save        :: nmlAlreadyRead = .false.
1665    character(len=6), parameter     :: midasTableType = 'header' !Define the type of MIDAS table: header/body
1666    integer, parameter   :: maxItemNumber = 15
1667    ! Namelist variables:
1668    integer,          save :: numberUpdateItems             ! MUST NOT BE INCLUDED IN NAMELIST!
1669    character(len=4), save :: updateItemList(maxItemNumber) ! obsSpace column names used to update the file
1670
1671    namelist/namObsDbMIDASHeaderUpdate/ numberUpdateItems, updateItemList
1672
1673    write(*,*)
1674    write(*,*) 'odbf_insertInMidasHeaderTable: Starting'
1675    write(*,*)
1676    write(*,*) 'odbf_insertInMidasHeaderTable: FileName   : ', trim(FileName)
1677    write(*,*) 'odbf_insertInMidasHeaderTable: FamilyType : ', FamilyType
1678    write(*,*) 'odbf_insertInMidasHeaderTable: fileIndex   : ', fileIndex
1679
1680    if (.not. nmlAlreadyRead) then
1681      nmlAlreadyRead = .true.
1682
1683      ! set default values of namelist variables
1684      updateItemList(:) = ''
1685      numberUpdateItems = MPC_missingValue_INT
1686
1687      ! Read the namelist for directives
1688      nulnam = 0
1689      ierr = fnom(nulnam,'./flnml','FTN+SEQ+R/O',0)
1690      read(nulnam, nml=namObsDbMIDASHeaderUpdate, iostat=ierr)
1691      if ( ierr /= 0 ) call utl_abort('odbf_insertInMidasHeaderTable: Error reading namelist')
1692      ierr = fclos(nulnam)
1693      if ( numberUpdateItems /= MPC_missingValue_INT) then
1694        call utl_abort('odbf_insertInMidasHeaderTable: check namObsDbMIDASHeaderUpdate namelist section: numberUpdateItems should be removed')
1695      end if
1696      numberUpdateItems = 0
1697      do updateItemIndex = 1, maxItemNumber
1698        if (trim(updateItemList(updateItemIndex)) == '') exit
1699        numberUpdateItems = numberUpdateItems + 1
1700      end do
1701      if ( mmpi_myid == 0 ) then
1702        write(*, nml=namObsDbMIDASHeaderUpdate)
1703      end if
1704    end if ! not nmlAlreadyRead
1705
1706    if (numberUpdateItems == 0) then
1707      write(*,*) 'odbf_insertInMidasHeaderTable: numberUpdateItems=0. ' // &
1708                 'MIDAS Header Output Table will not be updated.'
1709      return
1710    end if
1711
1712    ! check if midasTable already exists in the file
1713    midasTableExists = sqlu_sqlTableExists(fileName, midasHeadTableName)
1714
1715    if (.not. midasTableExists) then
1716      ! create midasTable by copying rearranging contents of observation table
1717      call odbf_createMidasHeaderTable(fileName)
1718    
1719      ! open the obsDB file
1720      call fSQL_open(db, trim(fileName), stat)
1721      if ( fSQL_error(stat) /= FSQL_OK ) then
1722        write(*,*) 'odbf_insertInMidasHeaderTable: fSQL_open: ', fSQL_errmsg(stat)
1723        call utl_abort('odbf_insertInMidasHeaderTable: fSQL_open')
1724      end if
1725
1726      ! Obtain the max number of header rows per mpi task
1727      maxNumHeader = 0
1728      HEADER1: do headIndex = 1, obs_numHeader(obsdat)
1729        obsIdf = obs_headElem_i( obsdat, OBS_IDF, headIndex )
1730        if ( obsIdf /= fileIndex ) cycle HEADER1
1731        maxNumHeader = maxNumHeader + 1
1732      end do HEADER1
1733
1734      call rpn_comm_allGather(maxNumHeader,       1, 'mpi_integer',  &
1735                              maxNumHeaderAllMpi, 1, 'mpi_integer', &
1736                              'GRID', ierr)
1737
1738      ! Set the midasKey to start counting based on the latest value from the previous 
1739      ! mpi task
1740      if( mmpi_myid == 0 ) then
1741        midasKey = 0
1742      else
1743        midasKey = sum(maxNumHeaderAllMpi(1:mmpi_myid))
1744      end if
1745
1746      ! set the primary key, keys to main obsDB tables and other basic info
1747      query = 'insert into ' // trim(midasHeadTableName) // '(' // &
1748              trim(midasHeadKeySqlName) // ',' //trim(obsHeadKeySqlName) // &
1749              ') values(?, ?);'
1750
1751      write(*,*) 'odbf_insertInMidasHeaderTable: query = ', trim(query)
1752      call fSQL_prepare(db, query, stmt, stat)
1753      call fSQL_begin(db)
1754
1755      HEADER: do headIndex = 1, obs_numHeader(obsdat)
1756
1757        obsIdf = obs_headElem_i( obsdat, OBS_IDF, headIndex )
1758        if ( obsIdf /= fileIndex ) cycle HEADER
1759
1760        midasKey = midasKey +1
1761        call fSQL_bind_param(stmt, PARAM_INDEX=1, INT_VAR=midasKey)
1762        obsIdo = obs_headPrimaryKey(obsdat, headIndex)
1763        call fSQL_bind_param(stmt, PARAM_INDEX = 2, INT8_VAR  = obsIdo)
1764
1765        call fSQL_exec_stmt (stmt)
1766      
1767      end do HEADER
1768
1769      call fSQL_finalize(stmt)
1770      call fSQL_commit(db)
1771
1772      ! close the obsDB file
1773      call fSQL_close(db, stat) 
1774    else
1775      write(*,*) 'odbf_insertInMidasHeaderTable: the midas header output table already exists, ' // &
1776                 'proceed to join with temporary table that has new columns.'
1777    end if ! .not.midasTableExists
1778
1779    ! open the obsDB file
1780    call fSQL_open(db, trim(fileName), stat)
1781    if ( fSQL_error(stat) /= FSQL_OK ) then
1782      write(*,*) 'odbf_insertInMidasHeaderTable: fSQL_open: ', fSQL_errmsg(stat)
1783      call utl_abort('odbf_insertInMidasHeaderTable: fSQL_open')
1784    end if
1785    
1786    tableInsertColumnList = ''
1787    obsSpaceColIndexSourceArr(:) = mpc_missingValue_int
1788    call getCreateTableInsertQueries(numberUpdateItems, updateItemList, midasTableType, &
1789                                     queryCreateTable, queryInsertInTable, &
1790                                     tableInsertColumnList, obsSpaceColIndexSourceArr(:))
1791
1792    ! Create a temporary table with new columns and values from obsSpaceData
1793    write(*,*) 'odbf_insertInMidasHeaderTable: queryCreateTable   -->', trim(queryCreateTable)
1794    call fSQL_do_many(db, queryCreateTable, stat)
1795    if ( fSQL_error(stat) /= FSQL_OK ) then
1796      write(*,*) 'fSQL_do_many: ', fSQL_errmsg(stat)
1797      call utl_abort('odbf_insertInMidasHeaderTable: Problem with fSQL_do_many')
1798    end if 
1799
1800    ! Prepare to insert into the table
1801    write(*,*) 'odbf_insertInMidasHeaderTable: queryInsertInTable -->', trim(queryInsertInTable)
1802    call fSQL_prepare(db, queryInsertInTable, stmt, stat)
1803    if ( fSQL_error(stat) /= FSQL_OK ) then
1804      write(*,*) 'odbf_insertInMidasHeaderTable: fSQL_prepare: ', fSQL_errmsg(stat)
1805      call utl_abort('odbf_insertInMidasHeaderTable: fSQL_prepare')
1806    end if
1807
1808    call fSQL_begin(db)
1809    HEADER2: do headIndex = 1, obs_numHeader(obsdat)
1810      obsIdf = obs_headElem_i( obsdat, OBS_IDF, headIndex )
1811      if ( obsIdf /= fileIndex ) cycle HEADER2
1812
1813      obsIdo  = obs_headPrimaryKey( obsdat, headIndex )
1814      call fSQL_bind_param(stmt, PARAM_INDEX=1, INT8_VAR=obsIdo)
1815
1816      do updateItemIndex = 1, numberUpdateItems
1817
1818        obsSpaceColIndexSource = obsSpaceColIndexSourceArr(updateItemIndex)
1819        columnParamIndex = updateItemIndex + 1
1820
1821        ! update the value, but set to null if it is missing
1822        if (obs_columnDataType(obsSpaceColIndexSource) == 'real') then
1823          updateValue_r = obs_headElem_r(obsdat, obsSpaceColIndexSource, headIndex)
1824
1825          if ( updateValue_r == obs_missingValue_R ) then
1826            call fSQL_bind_param(stmt, PARAM_INDEX=columnParamIndex)  ! sql null values
1827          else
1828            call fSQL_bind_param(stmt, PARAM_INDEX=columnParamIndex, REAL8_VAR=updateValue_r)
1829          end if
1830        else
1831          updateValue_i = obs_headElem_i(obsdat, obsSpaceColIndexSource, headIndex)
1832          if ( updateValue_i == mpc_missingValue_int ) then
1833            call fSQL_bind_param(stmt, PARAM_INDEX=columnParamIndex)  ! sql null values
1834          else
1835            call fSQL_bind_param(stmt, PARAM_INDEX=columnParamIndex, INT_VAR=updateValue_i)
1836          end if
1837        end if ! if (obs_columnDataType(obsSpaceColIndexSource) == 'real')
1838      end do ! do updateItemIndex
1839
1840      call fSQL_exec_stmt(stmt)
1841
1842    end do HEADER2
1843
1844    call fSQL_finalize(stmt)
1845    call fSQL_commit(db)
1846
1847    ! close the obsDB file
1848    call fSQL_close(db, stat)
1849
1850    call mergeTableInMidasTables(fileName, midasHeadTableName, obsHeadKeySqlName, &
1851                                 tableInsertColumnList)
1852
1853    write(*,*)
1854    write(*,*) 'odbf_insertInMidasHeaderTable: finished'
1855    write(*,*)
1856
1857  end subroutine odbf_insertInMidasHeaderTable
1858
1859  !--------------------------------------------------------------------------
1860  ! odbf_insertInMidasBodyTable
1861  !--------------------------------------------------------------------------
1862  subroutine odbf_insertInMidasBodyTable(obsdat, fileIndex, fileName, familyType)
1863    !
1864    !:Purpose: Insert selected columns in the MIDAS body table using
1865    !           values from obsSpaceData. If the MIDAS Body table does not already 
1866    !           exist, it is created by copying the observation table.
1867    !           A single table is created that contains all quantities being
1868    !           updated. Unlike the observation table, each observed variable
1869    !           is stored in a separate row and all quantities are in columns
1870    !           (e.g. obsValue, OMP, OMA,...).
1871    !
1872    implicit none
1873
1874    ! Arguments:
1875    type(struct_obs), intent(inout) :: obsdat
1876    character(len=*), intent(in)    :: fileName   
1877    character(len=*), intent(in)    :: familyType
1878    integer,          intent(in)    :: fileIndex
1879
1880    ! Locals:
1881    type(fSQL_STATUS)    :: stat ! sqlite error status
1882    type(fSQL_DATABASE)  :: db   ! sqlite file handle
1883    type(fSQL_STATEMENT) :: stmt ! precompiled sqlite statements
1884    integer(8)           :: obsIdo, obsIdd
1885    integer              :: columnParamIndex, columnIndex
1886    integer              :: obsIdf, midasKey, updateItemIndex, updateValue_i
1887    integer              :: headIndex, bodyIndex, bodyIndexBegin, bodyIndexEnd, maxNumBody
1888    integer              :: obsSpaceColIndexSource, fnom, fclos, nulnam, ierr
1889    integer              :: maxNumBodyAllMpi(mmpi_nprocs), obsSpaceColIndexSourceArr(15)
1890    real(8)              :: updateValue_r, obsValue
1891    character(len=3000)  :: query, queryCreateTable, queryInsertInTable
1892    character(len=5000)  :: tableInsertColumnList
1893    logical              :: midasTableExists
1894    logical, save        :: nmlAlreadyRead = .false.
1895    character(len=6), parameter  :: midasTableType='body' ! Define the type of MIDAS table: header/body
1896    integer, parameter   :: maxItemNumber = 15
1897    ! Namelist variables:
1898    integer,          save :: numberUpdateItems             ! MUST NOT BE INCLUDED IN NAMELIST!
1899    character(len=4), save :: updateItemList(maxItemNumber) ! obsSpace column names used to update the file
1900
1901    namelist/namObsDbMIDASBodyUpdate/ numberUpdateItems, updateItemList
1902
1903    write(*,*)
1904    write(*,*) 'odbf_insertInMidasBodyTable: Starting'
1905    write(*,*)
1906    write(*,*) 'odbf_insertInMidasBodyTable: FileName   : ', trim(FileName)
1907    write(*,*) 'odbf_insertInMidasBodyTable: FamilyType : ', FamilyType
1908    write(*,*) 'odbf_insertInMidasBodyTable: fileIndex : ', fileIndex
1909
1910    if (.not. nmlAlreadyRead) then
1911      nmlAlreadyRead = .true.
1912
1913      ! set default values of namelist variables
1914      updateItemList(:) = ''
1915      numberUpdateItems = MPC_missingValue_INT
1916
1917      ! Read the namelist for directives
1918      nulnam = 0
1919      ierr = fnom(nulnam,'./flnml','FTN+SEQ+R/O',0)
1920      read(nulnam, nml=namObsDbMIDASBodyUpdate, iostat=ierr)
1921      if ( ierr /= 0 ) call utl_abort('odbf_insertInMidasBodyTable: Error reading namelist')
1922      ierr = fclos(nulnam)
1923      if (numberUpdateItems /=  MPC_missingValue_INT) then
1924        call utl_abort('odbf_insertInMidasBodyTable: check namObsDbMIDASBodyUpdate namelist section: numberUpdateItems should be removed')
1925      end if
1926      numberUpdateItems = 0
1927      do updateItemIndex = 1, maxItemNumber
1928        if (trim(updateItemList(updateItemIndex)) == '') exit
1929        numberUpdateItems = numberUpdateItems + 1
1930      end do
1931      ! Add VNM/PPP/VAR/FLG to the updateItemList to ensure they are included in the sqlite table with values from obsSpaceData
1932      do columnIndex = 1, numBodyMidasTableRequired
1933        numberUpdateItems = numberUpdateItems + 1
1934        updateItemList(numberUpdateitems) = trim(midasBodyNamesList(2,columnIndex))
1935      end do
1936         
1937      numberUpdateItems = numberUpdateItems + 1
1938      updateItemList(numberUpdateItems) = 'FLG'      
1939
1940      if ( mmpi_myid == 0 ) then
1941        write(*,*) 'odbf_insertInMidasBodyTable: NOTE: the FLG/VNM/PPP/VAR columns are always added to update list'
1942        write(*, nml=namObsDbMIDASBodyUpdate)
1943      end if
1944    end if ! not nmlAlreadyRead
1945
1946    if (numberUpdateItems == 0) then
1947      write(*,*) 'odbf_insertInMidasBodyTable: numberUpdateItems=0. ' // &
1948                 'MIDAS Body Output Table will not be updated.'
1949      return
1950    end if
1951
1952    ! check if midasTable already exists in the file
1953    midasTableExists = sqlu_sqlTableExists(fileName, midasBodyTableName)
1954
1955    if (.not. midasTableExists) then
1956      ! create midasTable by copying rearranging contents of observation table
1957      call odbf_createMidasBodyTable(fileName)
1958
1959      ! open the obsDB file
1960      call fSQL_open(db, trim(fileName), stat)
1961      if ( fSQL_error(stat) /= FSQL_OK ) then
1962        write(*,*) 'odbf_insertInMidasBodyTable: fSQL_open: ', fSQL_errmsg(stat)
1963        call utl_abort('odbf_insertInMidasBodyTable: fSQL_open')
1964      end if
1965
1966      ! Obtain the max number of body rows per mpi task
1967      maxNumBody = 0
1968      HEADER1: do headIndex = 1, obs_numHeader(obsdat)
1969        obsIdf = obs_headElem_i( obsdat, OBS_IDF, headIndex )
1970        if ( obsIdf /= fileIndex ) cycle HEADER1
1971
1972        bodyIndexBegin = obs_headElem_i( obsdat, OBS_RLN, headIndex )
1973        bodyIndexEnd = bodyIndexBegin + &
1974                       obs_headElem_i( obsdat, OBS_NLV, headIndex ) - 1
1975        
1976        BODY1: do bodyIndex = bodyIndexBegin, bodyIndexEnd
1977          obsValue = obs_bodyElem_r(obsdat, OBS_VAR, bodyIndex)
1978          if ( obsValue == obs_missingValue_R ) cycle BODY1
1979          maxNumBody = maxNumBody + 1
1980        end do BODY1
1981      end do HEADER1
1982      
1983      call rpn_comm_allGather(maxNumBody,       1, 'mpi_integer',  &
1984                              maxNumBodyAllMpi, 1, 'mpi_integer', &
1985                            'GRID', ierr )
1986
1987      ! Set the midasKey to start counting based on the latest value from the previous 
1988      ! mpi task
1989      if( mmpi_myid == 0 ) then
1990        midasKey = 0
1991      else
1992        midasKey = sum(maxNumBodyAllMpi(1:mmpi_myid))
1993      end if
1994                      
1995      ! set the primary key, keys to main obsDB tables and other basic info
1996      query = 'insert into ' // trim(midasBodyTableName) // '(' // &
1997              trim(midasBodyKeySqlName) // ',' // trim(obsHeadKeySqlName) // ',' // &
1998              trim(obsBodyKeySqlName)  // ') values(?,?,?);'
1999      write(*,*) 'odbf_insertInMidasBodyTable: query = ', trim(query)
2000      call fSQL_prepare(db, query, stmt, stat)
2001      call fSQL_begin(db)
2002      HEADER: do headIndex = 1, obs_numHeader(obsdat)
2003
2004        obsIdf = obs_headElem_i( obsdat, OBS_IDF, headIndex )
2005        if ( obsIdf /= fileIndex ) cycle HEADER
2006
2007        bodyIndexBegin = obs_headElem_i( obsdat, OBS_RLN, headIndex )
2008        bodyIndexEnd = bodyIndexBegin + &
2009                       obs_headElem_i( obsdat, OBS_NLV, headIndex ) - 1
2010
2011        BODY: do bodyIndex = bodyIndexBegin, bodyIndexEnd
2012
2013          ! do not try to update if the observed value is missing
2014          obsValue = obs_bodyElem_r(obsdat, OBS_VAR, bodyIndex)
2015          if ( obsValue == obs_missingValue_R ) cycle BODY
2016
2017          midasKey = midasKey + 1
2018          call fSQL_bind_param(stmt, PARAM_INDEX=1, INT_VAR=midasKey)
2019          obsIdo  = obs_headPrimaryKey( obsdat, headIndex )
2020          call fSQL_bind_param(stmt, PARAM_INDEX=2, INT8_VAR=obsIdo)
2021          obsIdd  = obs_bodyPrimaryKey(obsdat, bodyIndex)
2022          call fSQL_bind_param(stmt, PARAM_INDEX=3, INT8_VAR=obsIdd)
2023
2024          call fSQL_exec_stmt(stmt)
2025
2026        end do BODY
2027
2028      end do HEADER
2029
2030      call fSQL_finalize(stmt)
2031      call fSQL_commit(db)
2032
2033      ! close the obsDB file
2034      call fSQL_close(db, stat) 
2035
2036    else
2037      write(*,*) 'odbf_insertInMidasBodyTable: the midas body output table already exists, ' // &
2038                 'proceed to join with temporary table that has new columns.'
2039    end if ! .not.midasTableExists
2040
2041    ! open the obsDB file
2042    call fSQL_open(db, trim(fileName), status=stat)
2043    if ( fSQL_error(stat) /= FSQL_OK ) then
2044      write(*,*) 'odbf_insertInMidasBodyTable: fSQL_open: ', fSQL_errmsg(stat)
2045      call utl_abort('odbf_insertInMidasBodyTable: fSQL_open '//fSQL_errmsg(stat) )
2046    end if
2047
2048    tableInsertColumnList = ''
2049    obsSpaceColIndexSourceArr(:) = mpc_missingValue_int
2050    call getCreateTableInsertQueries(numberUpdateItems, updateItemList, midasTableType, &
2051                                     queryCreateTable, queryInsertInTable, &
2052                                     tableInsertColumnList, obsSpaceColIndexSourceArr(:))
2053
2054    ! Create a temporary table with new columns and values from obsSpaceData
2055    write(*,*) 'odbf_insertInMidasBodyTable: queryCreateTable   -->', trim(queryCreateTable)
2056    call fSQL_do_many(db, queryCreateTable, stat)
2057    if ( fSQL_error(stat) /= FSQL_OK ) then
2058      write(*,*) 'fSQL_do_many: ', fSQL_errmsg(stat)
2059      call utl_abort('odbf_insertInMidasBodyTable: Problem with fSQL_do_many')
2060    end if 
2061
2062    ! Prepare to insert into the table
2063    write(*,*) 'odbf_insertInMidasBodyTable: queryInsertInTable -->', trim(queryInsertInTable)
2064    call fSQL_prepare(db, queryInsertInTable, stmt, stat)
2065    if ( fSQL_error(stat) /= FSQL_OK ) then
2066      write(*,*) 'odbf_insertInMidasBodyTable: fSQL_prepare: ', fSQL_errmsg(stat)
2067      call utl_abort('odbf_insertInMidasBodyTable: fSQL_prepare')
2068    end if
2069
2070    ! updateItemList(:) columns are updated within the BODY2 loop
2071    call fSQL_begin(db)
2072    HEADER2: do headIndex = 1, obs_numHeader(obsdat)
2073
2074      obsIdf = obs_headElem_i( obsdat, OBS_IDF, headIndex )
2075      if ( obsIdf /= fileIndex ) cycle HEADER2
2076
2077      bodyIndexBegin = obs_headElem_i( obsdat, OBS_RLN, headIndex )
2078      bodyIndexEnd = bodyIndexBegin + &
2079                     obs_headElem_i( obsdat, OBS_NLV, headIndex ) - 1
2080
2081      BODY2: do bodyIndex = bodyIndexBegin, bodyIndexEnd
2082      
2083        ! do not try to update if the observed value is missing
2084        obsValue = obs_bodyElem_r(obsdat, OBS_VAR, bodyIndex)
2085        if ( obsValue == obs_missingValue_R ) cycle BODY2
2086
2087        obsIdd = obs_bodyPrimaryKey(obsdat, bodyIndex)
2088        call fSQL_bind_param(stmt, PARAM_INDEX=1, INT8_VAR=obsIdd)
2089
2090        do updateItemIndex = 1, numberUpdateItems
2091
2092          obsSpaceColIndexSource = obsSpaceColIndexSourceArr(updateItemIndex)
2093          columnParamIndex = updateItemIndex + 1
2094
2095          ! update the value, but set to null if it is missing
2096          if (obs_columnDataType(obsSpaceColIndexSource) == 'real') then
2097            updateValue_r = obs_bodyElem_r(obsdat, obsSpaceColIndexSource, bodyIndex)
2098
2099            ! change units for surface emissivity
2100            if (obsSpaceColIndexSource == OBS_SEM) then
2101              updateValue_r =updateValue_r * 100.0D0
2102            end if
2103
2104            if ( updateValue_r == obs_missingValue_R ) then
2105              call fSQL_bind_param(stmt, PARAM_INDEX=columnParamIndex)  ! sql null values
2106            else
2107              call fSQL_bind_param(stmt, PARAM_INDEX=columnParamIndex, REAL8_VAR=updateValue_r)
2108            end if
2109          else
2110            updateValue_i = obs_bodyElem_i(obsdat, obsSpaceColIndexSource, bodyIndex)
2111            if ( updateValue_i == mpc_missingValue_int ) then
2112              call fSQL_bind_param(stmt, PARAM_INDEX=columnParamIndex)  ! sql null values
2113            else
2114              call fSQL_bind_param(stmt, PARAM_INDEX=columnParamIndex, INT_VAR=updateValue_i)
2115            end if
2116          end if ! if (obs_columnDataType(obsSpaceColIndexSource) == 'real')
2117        end do ! do updateItemIndex
2118
2119        call fSQL_exec_stmt(stmt)
2120
2121      end do BODY2
2122
2123    end do HEADER2
2124
2125    call fSQL_finalize(stmt)
2126    call fSQL_commit(db)
2127
2128    ! close the obsDB file
2129    call fSQL_close(db, stat)
2130
2131    call mergeTableInMidasTables(fileName, midasBodyTableName, obsBodyKeySqlName, &
2132                                 tableInsertColumnList)
2133
2134    write(*,*)
2135    write(*,*) 'odbf_insertInMidasBodyTable: finished'
2136    write(*,*)
2137
2138  end subroutine odbf_insertInMidasBodyTable
2139
2140  !--------------------------------------------------------------------------
2141  ! odbf_createMidasHeaderTable
2142  !--------------------------------------------------------------------------
2143  subroutine odbf_createMidasHeaderTable(fileName)
2144    !
2145    !:Purpose: Create the midasOutput Header table that stores all quantities computed
2146    !           in MIDAS at the level of the obsSpaceData Header table (e.g. ETOP, VTOP, ECF).
2147    !
2148    implicit none
2149
2150    ! Arguments:
2151    character(len=*),              intent(in)  :: fileName
2152
2153    ! Locals:
2154    character(len=3000)         :: query
2155    type(fSQL_STATUS)           :: stat ! sqlite error status
2156    type(fSQL_DATABASE)         :: db   ! sqlite file handle
2157    character(len=*), parameter :: myName = 'odbf_createMidasHeaderTable'
2158
2159    ! open the obsDB file
2160    call fSQL_open(db, trim(fileName), status=stat)
2161    if ( fSQL_error(stat) /= FSQL_OK ) then
2162      call utl_abort(myName//': fSQL_open '//fSQL_errmsg(stat))
2163    end if
2164
2165    ! create the new MIDAS table
2166    query = 'create table ' // trim(midasHeadTableName) // ' (' // new_line('A') // &
2167            '  ' // trim(midasHeadKeySqlName) // ' integer primary key,' // new_line('A') // &
2168            '  ' // trim(obsHeadKeySqlName) // ' integer' // new_line('A')
2169
2170    query = trim(query) // ');'
2171    write(*,*) myName//': query = ', trim(query)
2172    call fSQL_do_many(db, query, stat)
2173    if ( fSQL_error(stat) /= FSQL_OK ) then
2174      call utl_abort(myName//': Problem with fSQL_do_many '//fSQL_errmsg(stat))
2175    end if
2176
2177    ! close the obsDB file
2178    call fSQL_close(db, stat) 
2179
2180  end subroutine odbf_createMidasHeaderTable
2181
2182  !--------------------------------------------------------------------------
2183  ! odbf_createMidasBodyTable
2184  !--------------------------------------------------------------------------
2185  subroutine odbf_createMidasBodyTable(fileName)
2186    !
2187    !:Purpose: Create the midasOutput table that stores all quantities computed
2188    !           in MIDAS at the level of the obsSpaceData Body table (e.g. OMP, OMA, FLG).
2189    !
2190    implicit none
2191
2192    ! Arguments:
2193    character(len=*), intent(in)  :: fileName
2194
2195    ! Locals:
2196    character(len=3000)      :: query
2197    type(fSQL_STATUS)        :: stat ! sqlite error status
2198    type(fSQL_DATABASE)      :: db   ! sqlite file handle
2199    character(len=*), parameter :: myName = 'odbf_createMidasBodyTable'
2200
2201    ! open the obsDB file
2202    call fSQL_open(db, trim(fileName), status=stat)
2203    if ( fSQL_error(stat) /= FSQL_OK ) then
2204      call utl_abort(myName//': fSQL_open '//fSQL_errmsg(stat))
2205    end if
2206
2207    ! create the new MIDAS table
2208    query = 'create table ' // trim(midasBodyTableName) // ' (' // new_line('A') // &
2209            '  ' // trim(midasBodyKeySqlName) // ' integer primary key,' // new_line('A') // &
2210            '  ' // trim(obsHeadKeySqlName) // ' integer,' // new_line('A') // &
2211            '  ' // trim(obsBodyKeySqlName) // ' integer );'
2212
2213    write(*,*) myName//': query = ', trim(query)
2214    call fSQL_do_many(db, query, stat)
2215    if ( fSQL_error(stat) /= FSQL_OK ) then
2216      call utl_abort(myName//': Problem with fSQL_do_many '//fSQL_errmsg(stat))
2217    end if
2218
2219    ! close the obsDB file
2220    call fSQL_close(db, stat) 
2221
2222  end subroutine odbf_createMidasBodyTable
2223
2224  !--------------------------------------------------------------------------
2225  ! obdf_clean
2226  !--------------------------------------------------------------------------
2227  subroutine obdf_clean(fileName, familyType)
2228
2229    !:Purpose: After the observational thinning procedure, this subroutine removes
2230    !           rows that are flagged as thinned in MIDAS_BODY_OUTPUT Table 
2231    !           the rows in the Report, Observation and MIDAS_HEADER_OUTPUT with corresponding 
2232    !           ID_Report and ID_Observation are also removed. 
2233
2234    implicit none
2235
2236    ! Arguments:
2237    character(len=*),  intent(in) :: fileName
2238    character(len=*),  intent(in) :: familyType
2239
2240    ! Locals:
2241    character(len = 512)        :: query
2242    type(fSQL_STATUS)           :: stat ! sqlite error status
2243    type(fSQL_DATABASE)         :: db   ! sqlite file handle
2244    type(fSQL_STATEMENT)        :: stmt ! precompiled sqlite statements
2245    integer                     :: nulnam , ierr, fnom, fclos
2246    character(len = lenSqlName) :: flgSqlName
2247    
2248    ! Namelist variables:
2249    logical, save               :: useVacuum ! choose to 'vacuum' the file after cleaning to reduce file size
2250
2251    namelist/namObsDbClean/ useVacuum
2252
2253    ! default value
2254    useVacuum = .false.
2255
2256    if ( .not. utl_isNamelistPresent('namObsDbClean','./flnml') ) then
2257      if ( mmpi_myid == 0 ) then
2258        write(*,*) 'odbf_clean: namObsDbClean is missing in the namelist.'
2259        write(*,*) '            The default values will be taken.'
2260      end if
2261    else
2262      ! reading namelist variables
2263      nulnam  = 0
2264      ierr = fnom(nulnam ,'./flnml','FTN+SEQ+R/O',0)
2265      read(nulnam , nml=namObsDbClean, iostat=ierr)
2266      if ( ierr /= 0 ) call utl_abort('obdf_clean: Error reading namelist')
2267      ierr = fclos(nulnam )
2268    end if
2269    if ( mmpi_myid == 0 ) write(*, nml=namObsDbClean)
2270
2271    write(*,*)
2272    write(*,*) 'obdf_clean: Starting'
2273    write(*,*)
2274    write(*,*) 'obdf_clean: FileName   : ', trim(FileName)
2275    write(*,*) 'obdf_clean: FamilyType : ', FamilyType
2276
2277    ! open the obsDB file
2278    call fSQL_open(db, trim(fileName), stat)
2279    if ( fSQL_error(stat) /= FSQL_OK ) then
2280      write(*,*) 'obdf_clean: fSQL_open: ', fSQL_errmsg(stat)
2281      call utl_abort('obdf_clean: fSQL_open')
2282    end if
2283    
2284    flgSqlName = odbf_midasTabColFromObsSpaceName('FLG', midasBodyNamesList)
2285
2286    ! Mark for deletion all records with bit 11 (2048) set
2287    query = ' delete from '// trim(midasBodyTableName) //' where '// trim(flgSqlName) //' & 2048 =2048;'
2288
2289    call fSQL_prepare(db, query, stmt, stat)
2290    if ( fSQL_error(stat) /= FSQL_OK ) then
2291      write(*,*) 'obdf_clean: fSQL_prepare: ', fSQL_errmsg(stat)
2292      call utl_abort('obdf_clean: fSQL_prepare')
2293    end if
2294
2295    call fSQL_exec_stmt(stmt)
2296  
2297    query = 'create temporary table good_headers as select distinct '// trim(obsHeadKeySqlName) //' from '// trim(midasBodyTableName) //';'
2298    write(*,*) 'obdf_clean: query = ', trim(query)
2299    call fSQL_do_many(db, query, stat)
2300
2301    if ( fSQL_error(stat) /= FSQL_OK ) then
2302      write(*,*) 'fSQL_do_many: ', fSQL_errmsg(stat)
2303      call utl_abort('obdf_clean: Problem with fSQL_do_many')
2304    end if
2305
2306    query = 'delete from '// trim(midasHeadTableName) //' where '// trim(obsHeadKeySqlName) // &
2307            ' not in ( select '// trim(obsHeadKeySqlName) //' from good_headers );'
2308    write(*,*) 'obdf_clean: query = ', trim(query)
2309    call fSQL_prepare(db, query, stmt, stat)
2310    
2311    if ( fSQL_error(stat) /= FSQL_OK ) then
2312      write(*,*) 'obdf_clean: fSQL_prepare: ', fSQL_errmsg(stat)
2313      call utl_abort('obdf_clean: fSQL_prepare')
2314    end if
2315
2316    call fSQL_exec_stmt(stmt)
2317
2318    query = 'delete from '// trim(bodyTableName) //' where '// trim(obsHeadKeySqlName) // &
2319            ' not in ( select '// trim(obsHeadKeySqlName) //'  from good_headers );'
2320    write(*,*) 'obdf_clean: query = ', trim(query)
2321    call fSQL_prepare(db, query, stmt, stat)
2322
2323    if ( fSQL_error(stat) /= FSQL_OK ) then
2324      write(*,*) 'obdf_clean: fSQL_prepare: ', fSQL_errmsg(stat)
2325      call utl_abort('obdf_clean: fSQL_prepare')
2326    end if
2327
2328    call fSQL_exec_stmt(stmt)
2329
2330    query = 'delete from '// trim(headTableName) //' where '// trim(obsHeadKeySqlName) // &
2331            ' not in ( select '// trim(obsHeadKeySqlName) //'  from good_headers );'
2332    write(*,*) 'obdf_clean: query = ', trim(query)
2333    call fSQL_prepare(db, query, stmt, stat)
2334
2335    if ( fSQL_error(stat) /= FSQL_OK ) then
2336      write(*,*) 'obdf_clean: fSQL_prepare: ', fSQL_errmsg(stat)
2337      call utl_abort('obdf_clean: fSQL_prepare')
2338    end if
2339
2340    call fSQL_exec_stmt(stmt)
2341    call fSQL_finalize(stmt)
2342
2343    ! Reduces the size of SQL file 
2344    if ( useVacuum ) then
2345      query = 'vacuum;'
2346      write(*,*) 'obdf_clean: query = ', trim(query)
2347      call fSQL_do_many(db, query, stat)
2348
2349      if ( fSQL_error(stat) /= FSQL_OK ) then
2350        write(*,*) 'fSQL_do_many: ', fSQL_errmsg(stat)
2351        call utl_abort('obdf_clean: Problem with fSQL_do_many')
2352      end if
2353    end if
2354
2355    call fSQL_close(db, stat)
2356
2357  end subroutine obdf_clean
2358
2359  !--------------------------------------------------------------------------
2360  ! getCreateTableInsertQueries
2361  !--------------------------------------------------------------------------
2362  subroutine getCreateTableInsertQueries(numberUpdateItems, updateItemList, midasTableType, &
2363                                         queryCreateTable, queryInsertInTable, &
2364                                         tableInsertColumnList, obsSpaceColIndexSourceArr)
2365    !
2366    !:Purpose: Generate the queries for creating the table and insert columns into it.
2367    !
2368    implicit none
2369
2370    ! Arguments:
2371    integer,          intent(in)    :: numberUpdateItems            ! number of items in update list
2372    character(len=4), intent(in)    :: updateItemList(:)            ! update list
2373    character(len=*), intent(in)    :: midasTableType               ! table type: 'header' or 'body'
2374    character(len=*), intent(inout) :: queryCreateTable             ! query to create table
2375    character(len=*), intent(inout) :: queryInsertInTable           ! query to insert new columns in the table
2376    character(len=*), intent(inout) :: tableInsertColumnList        ! char of "combinedTableName.column1, combinedTableName.column2, .."
2377    integer         , intent(inout) :: obsSpaceColIndexSourceArr(:) ! list of obsSpaceData columnIndex for items in update list
2378
2379    ! Locals:
2380    integer :: updateItemIndex, ierr
2381    integer :: obsSpaceColIndexSource
2382    character(len=4)          :: obsSpaceColumnName
2383    character(len=20)         :: sqlDataType
2384    character(len=lenSqlName) :: sqlColumnName
2385    character(len=3000)       :: queryForValues
2386
2387    do updateItemIndex = 1, numberUpdateItems
2388
2389      ! get obsSpaceData column index for source of updated sql column
2390      obsSpaceColumnName = updateItemList(updateItemIndex)
2391      ierr = clib_toUpper(obsSpaceColumnName)
2392      obsSpaceColIndexSource = obs_columnIndexFromName(trim(obsSpaceColumnName))      
2393
2394      if (midasTableType == 'header') then
2395        sqlColumnName = odbf_midasTabColFromObsSpaceName(updateItemList(updateItemIndex), midasHeadNamesList)
2396      else if (midasTableType == 'body') then
2397        sqlColumnName = odbf_midasTabColFromObsSpaceName(updateItemList(updateItemIndex), midasBodyNamesList)
2398      end if
2399      write(*,*) 'getCreateTableInsertQueries: updating midasTable column: ', trim(sqlColumnName)
2400      write(*,*) 'getCreateTableInsertQueries: with contents of obsSpaceData column: ', &
2401                 trim(obsSpaceColumnName)
2402
2403      if (updateItemIndex == 1) then
2404        if (midasTableType == 'header') then
2405          queryCreateTable = 'create table '// trim(combinedTableName) // '(' // new_line('A') // &
2406                             '  ' // trim(obsHeadKeySqlName) // ' integer ' // new_line('A')
2407
2408          queryInsertInTable = 'insert into '// trim(combinedTableName) // '(' // new_line('A') // &
2409                               '  ' // trim(obsHeadKeySqlName) // new_line('A')
2410
2411        else if (midasTableType == 'body') then
2412          queryCreateTable = 'create table '// trim(combinedTableName) // '(' // new_line('A') // &
2413                             '  ' // trim(obsBodyKeySqlName) // ' integer ' // new_line('A')
2414
2415          queryInsertInTable = 'insert into '// trim(combinedTableName) // '(' // new_line('A') // &
2416                               '  ' // trim(obsBodyKeySqlName) // new_line('A')                              
2417        end if ! if (midasTableType == 'header')
2418
2419        queryForValues = 'values(?'
2420      end if ! if (updateItemIndex == 1)
2421
2422      if (obs_columnDataType(obsSpaceColIndexSource) == 'real') then
2423        sqlDataType = 'double'
2424      else
2425        sqlDataType = 'integer'
2426      end if
2427      queryCreateTable = trim(queryCreateTable) // ', ' // trim(sqlColumnName) // ' ' // trim(sqlDataType) // new_line('A')
2428      queryInsertInTable = trim(queryInsertInTable) // ', ' // trim(sqlColumnName) // new_line('A')
2429      queryForValues = trim(queryForValues) // ',?'
2430
2431      if (updateItemIndex == numberUpdateItems) then
2432        queryCreateTable = trim(queryCreateTable) // ');'
2433        queryForValues = trim(queryForValues) // ')'        
2434        queryInsertInTable = trim(queryInsertInTable) // ') ' // trim(queryForValues) // ';'
2435      end if
2436
2437      tableInsertColumnList = trim(tableInsertColumnList) // ', '// &
2438                              trim(combinedTableName) // '.' // trim(sqlColumnName) // new_line('A')
2439      obsSpaceColIndexSourceArr(updateItemIndex) = obsSpaceColIndexSource
2440    end do ! do updateItemIndex
2441
2442  end subroutine getCreateTableInsertQueries
2443
2444  !--------------------------------------------------------------------------
2445  ! mergeTableInMidasTables
2446  !--------------------------------------------------------------------------
2447  subroutine mergeTableInMidasTables(fileName, midasTableName, jointColumnName, &
2448                                     tableInsertColumnList)
2449    !
2450    !:Purpose: In a series of join/drop/alter merge input table and midasTable
2451    !           to create a new midasTable which contains the original columns plus 
2452    !           columns from the input table.   
2453    !
2454    implicit none
2455
2456    ! Arguments:
2457    character(len=*), intent(in) :: fileName              ! obsDB filename
2458    character(len=*), intent(in) :: midasTableName        ! name of original midas table to add column to
2459    character(len=*), intent(in) :: jointColumnName       ! name of column used to match original midas table and temporary table
2460    character(len=*), intent(in) :: tableInsertColumnList ! char of "combinedTableName.column1, combinedTableName.column2, .." to add to original midas table
2461
2462    ! Locals:
2463    type(fSQL_STATUS)   :: stat ! sqlite error status
2464    type(fSQL_DATABASE) :: db   ! sqlite file handle
2465    character(len=3000) :: query
2466
2467    write(*,*)
2468    write(*,*) 'mergeTableInMidasTables: START'
2469    write(*,*)
2470
2471    ! open the obsDB file
2472    call fSQL_open(db, trim(fileName), stat)
2473    if ( fSQL_error(stat) /= FSQL_OK ) then
2474      write(*,*) 'mergeTableInMidasTables: fSQL_open: ', fSQL_errmsg(stat)
2475      call utl_abort('mergeTableInMidasTables: fSQL_open')
2476    end if
2477
2478    ! Combine midasTable + inputTable -> table_tmp
2479    query = 'create table table_tmp as select ' // trim(midasTableName) // '.*' // new_line('A')
2480
2481    query = trim(query) // trim(tableInsertColumnList) // '  from ' // new_line('A') // &
2482            '  ' // trim(combinedTableName) // ' inner join ' // trim(midasTableName) // ' on ' // new_line('A') // &
2483            '  ' // trim(combinedTableName) // '.' // trim(jointColumnName) // '=' // &
2484            trim(midasTableName) // '.' // trim(jointColumnName) // ';'
2485   
2486    write(*,*) 'mergeTableInMidasTables: query ---> ', trim(query)
2487    call fSQL_do_many(db, query, stat)
2488    if ( fSQL_error(stat) /= FSQL_OK ) then
2489      write(*,*) 'fSQL_do_many: ', fSQL_errmsg(stat)
2490      call utl_abort('mergeTableInMidasTables: Problem with fSQL_do_many')
2491    end if
2492
2493    ! Drop the midasTable
2494    query = 'drop table ' // trim(midasTableName) // ';'
2495      
2496    write(*,*) 'mergeTableInMidasTables: query ---> ', trim(query)
2497    call fSQL_do_many(db, query, stat)
2498    if ( fSQL_error(stat) /= FSQL_OK ) then
2499      write(*,*) 'fSQL_do_many: ', fSQL_errmsg(stat)
2500      call utl_abort('mergeTableInMidasTables: Problem with fSQL_do_many')
2501    end if
2502
2503    ! Drop the inputTable
2504    query = 'drop table ' // trim(combinedTableName) // ';'
2505    write(*,*) 'mergeTableInMidasTables: query ---> ', trim(query)
2506    call fSQL_do_many(db, query, stat)
2507    if ( fSQL_error(stat) /= FSQL_OK ) then
2508      write(*,*) 'fSQL_do_many: ', fSQL_errmsg(stat)
2509      call utl_abort('mergeTableInMidasTables: Problem with fSQL_do_many')
2510    end if
2511
2512    ! Rename table_tmp -> midasTable
2513    query = 'alter table table_tmp rename to ' // trim(midasTableName) // ';'
2514
2515    write(*,*) 'mergeTableInMidasTables: query ---> ', trim(query)
2516    call fSQL_do_many(db, query, stat)
2517    if ( fSQL_error(stat) /= FSQL_OK ) then
2518      write(*,*) 'fSQL_do_many: ', fSQL_errmsg(stat)
2519      call utl_abort('mergeTableInMidasTables: Problem with fSQL_do_many')
2520    end if
2521
2522    ! close the obsDB file
2523    call fSQL_close(db, stat) 
2524
2525    write(*,*)
2526    write(*,*) 'mergeTableInMidasTables: END'
2527    write(*,*)
2528
2529  end subroutine mergeTableInMidasTables
2530
2531end module obsdbFiles_mod