sqliteRead_mod sourceΒΆ

   1module sqliteRead_mod
   2  ! MODULE sqliteRead_mod (prefix='sqlr' category='3. Observation input/output')
   3  !
   4  !:Purpose:  To read and update SQLITE observation files. Data is stored in 
   5  !           obsSpaceData object.
   6  !
   7  use codePrecision_mod
   8  use obsSpaceData_mod
   9  use midasMpi_mod
  10  use fSQLite
  11  use mathPhysConstants_mod
  12  use obsUtil_mod
  13  use utilities_mod
  14  use bufr_mod
  15  use ramDisk_mod
  16  use codtyp_mod
  17  use obsVariableTransforms_mod
  18  use obsFilter_mod
  19  use sqliteUtilities_mod
  20  use radialVelocity_mod
  21
  22  implicit none
  23
  24  save
  25
  26  private
  27
  28  public :: sqlr_insertSqlite, sqlr_updateSqlite, sqlr_readSqlite
  29  public :: sqlr_cleanSqlite, sqlr_readSqlite_avhrr, sqlr_addCloudParametersandEmissivity
  30  public :: sqlr_writePseudoSSTobs, sqlr_writeEmptyPseudoSSTobsFile
  31  public :: sqlr_getColumnValuesDate
  32
  33  contains
  34  
  35  !--------------------------------------------------------------------------
  36  ! sqlr_readSqlite_avhrr
  37  !--------------------------------------------------------------------------
  38  subroutine sqlr_readSqlite_avhrr(obsdat, fileName, headerIndexBegin, headerIndexEnd)
  39    !
  40    !:Purpose: To read SQLite avhrr_cloud parameters.
  41    ! 
  42    implicit none
  43
  44    ! Arguments:
  45    type (struct_obs), intent(inout) :: obsdat     ! ObsSpaceData Structure
  46    character(len=*) , intent(in)    :: fileName   ! SQLite filename
  47    integer          , intent(in)    :: headerIndexBegin
  48    integer          , intent(in)    :: headerIndexEnd
  49
  50    ! Locals:
  51    type(fSQL_DATABASE)      :: db   ! type for SQLIte  file handle
  52    type(fSQL_STATEMENT)     :: stmt ! type for precompiled SQLite statements
  53    type(fSQL_STATUS)        :: stat ! type for error status
  54    integer                  :: obsIdo
  55    character(len=128)       :: querySqlite
  56    integer                  :: rowIndex, headerIndex, columnIndex
  57    integer                  :: numberAvhrrRows ,  numberAvhrrColumns
  58    real, allocatable        :: matdata(:,:)
  59    REAL(pre_obsReal) :: CFRAC,MOYRAD,STDRAD
  60
  61    write(*,*) 'sqlr_readSqlite_avhrr: fileName : ', trim(fileName)
  62
  63    if (.not. sqlu_sqlTableExists(trim(fileName), 'avhrr')) then
  64      write(*,*) 'sqlr_readSqlite_avhrr: Table avhrr does not exist :  ... return  '
  65      return
  66    end if
  67    write(*,*) 'sqlr_readSqlite_avhrr: Table avhrr exists: insert contents into obsdat '
  68
  69    call fSQL_open(db, trim(fileName) ,stat)
  70    if (fSQL_error(stat) /= FSQL_OK) then
  71      call utl_abort('sqlr_readSqlite_avhrr: fSQL_open '//fSQL_errmsg(stat))
  72    end if
  73
  74    querySqlite = ' select mean_radiance,stddev_radiance,fractionClearPixels from avhrr where id_obs = ? '
  75    call fSQL_prepare(db, querySqlite , stmt, stat)
  76    write(*,*) 'sqlr_readSqlite_avhrr: obs_getNchanAvhr=',obs_getNchanAvhrr()
  77    do headerIndex = headerIndexBegin, headerIndexEnd
  78      obsIdo = obs_headPrimaryKey(obsdat, headerIndex)
  79      call fSQL_bind_param(stmt, param_index = 1, int_var = obsIdo)
  80      call fSQL_exec_stmt(stmt)
  81      call fSQL_get_many(stmt, nrows = numberAvhrrRows , ncols = numberAvhrrColumns , mode = FSQL_REAL)
  82      allocate(matdata(numberAvhrrRows, numberAvhrrColumns))
  83      matdata(:,:) = MPC_missingValue_R4
  84      call fSQL_fill_matrix(stmt, matdata)
  85
  86      rowIndex = 1
  87      do columnIndex = OBS_CF1, OBS_CF7
  88        if(obs_columnActive_RH(obsdat,columnIndex)) then
  89          CFRAC = matdata(rowIndex,3)
  90          call obs_headSet_r(obsdat,columnIndex,headerIndex, CFRAC)
  91          rowIndex = rowIndex + 6
  92        end if
  93      end do
  94
  95      rowIndex = 1
  96      do columnIndex = OBS_M1C1, OBS_M7C6
  97        if(obs_columnActive_RH(obsdat,columnIndex)) then
  98          MOYRAD = matdata(rowIndex,1) * 100000.d0
  99          call obs_headSet_r(obsdat,columnIndex,headerIndex, MOYRAD)
 100          rowIndex = rowIndex + 1
 101        endif
 102      end do
 103
 104      rowIndex = 1
 105      do columnIndex = OBS_S1C1, OBS_S7C6
 106        if(obs_columnActive_RH(obsdat,columnIndex)) then
 107          STDRAD = matdata(rowIndex,2) * 100000.d0
 108          call obs_headSet_r(obsdat,columnIndex,headerIndex,  STDRAD)
 109          rowIndex = rowIndex + 1
 110        end if
 111      end do
 112
 113      deallocate(matdata)
 114      call fSQL_free_mem(stmt)
 115
 116    end do
 117
 118    call fSQL_finalize(stmt)
 119    call fSQL_close(db, stat)
 120
 121  end subroutine sqlr_readSqlite_avhrr
 122
 123  !--------------------------------------------------------------------------
 124  ! sqlr_readSqlite
 125  !--------------------------------------------------------------------------
 126  subroutine sqlr_readSqlite(obsdat, familyType, fileName)
 127    !
 128    !:Purpose: To read SQLite namelist and files.
 129    ! 
 130    implicit none
 131
 132    ! Arguments:
 133    type (struct_obs), intent(inout) :: obsdat     ! ObsSpaceData Structure
 134    character(len=*) , intent(in)    :: familyType ! Family Type 
 135    character(len=*) , intent(in)    :: fileName   ! SQLite filename
 136
 137    ! Locals:
 138    character(len=12)        :: idStation
 139    integer                  :: codeType, obsDate, obsTime, obsStatus, obsFlag, obsVarno
 140    integer(8)               :: headPrimaryKey
 141    integer(8), allocatable  :: bodyHeadKeys(:), bodyPrimaryKeys(:)
 142    integer(8), allocatable  :: tempBodyKeys(:,:)
 143    logical                  :: finishedWithHeader
 144    real(pre_obsReal)        :: elevReal, xlat, xlon, vertCoord
 145    real                     :: elev    , obsLat, obsLon, elevFact
 146    real                     :: beamAzimuth, beamRangeStart, beamRangeEnd, beamElevation          
 147    real(pre_obsReal)        :: beamAzimuthReal, beamElevationReal
 148    real(pre_obsReal)        :: beamLat, beamLon, beamHeight, beamDistance, beamRange
 149    integer                  :: vertCoordType, vertCoordFact, fnom, fclos, nulnam, ierr, idProf
 150    real                     :: zenithReal, solarZenithReal, CloudCoverReal, solarAzimuthReal
 151    integer                  :: roQcFlag
 152    real(pre_obsReal)        :: geoidUndulation, earthLocRadCurv, obsValue, obsError
 153    real(8)                  :: geoidUndulation_R8, earthLocRadCurv_R8, azimuthReal_R8
 154    integer                  :: trackCellNum, iceChartID
 155    real(pre_obsReal)        :: modelWindSpeed
 156    real(8)                  :: modelWindSpeed_R8
 157    integer                  :: iasiImagerCollocationFlag, iasiGeneralQualityFlag
 158    integer                  :: obsSat, landSea, terrainType, instrument, sensor
 159    integer                  :: rowIndex, obsNlv, headerIndex, bodyIndex
 160    integer                  :: numBody, numHeader
 161    real(pre_obsReal), parameter :: zemFact = 0.01
 162    character(len=1024)      :: queryHeader, queryIDs
 163    character(len=256)       :: csqlcrit, selectIDs
 164    character(len=1024)      :: columnsHeader
 165    logical                  :: finished
 166    type(fSQL_DATABASE)      :: db         ! type for SQLIte  file handle
 167    type(fSQL_STATEMENT)     :: stmt,stmt2 ! type for precompiled SQLite statements
 168    type(fSQL_STATUS)        :: stat       !type for error status
 169    character(len=256)       :: sqlDataOrder, extraQueryData
 170    character(len=256), allocatable :: listElemArray(:)
 171    integer, allocatable            :: listElemArrayInteger(:)
 172    integer                  :: numberBodyRows, numberBodyColumns, numberIDsRows, numberIDsColumns
 173    integer                  :: columnIndex
 174    integer, parameter :: lenSqlName    = 60
 175    character(len=lenSqlName), allocatable :: headSqlNames(:), bodySqlNames(:)
 176    character(len=lenSqlName), parameter :: headSqlNamesToRead(32) = (/'ID_STN','LAT','LON','CODTYP', &
 177         'DATE','TIME','STATUS','ELEV','ANTENNA_ALTITUDE','LAND_SEA','ID_SAT','INSTRUMENT','SENSOR', &
 178         'ZENITH','SOLAR_ZENITH','AZIMUTH','SOLAR_AZIMUTH','TERRAIN_TYPE','CLOUD_COVER', &
 179         'FANION_QUAL_IASI_SYS_IND','INDIC_NDX_QUAL_GEOM','RO_QC_FLAG','GEOID_UNDULATION', &
 180         'EARTH_LOCAL_RAD_CURV','CENTER_AZIMUTH','CENTER_ELEVATION','RANGE_START','RANGE_END', &
 181         'ID_PROF','CHARTINDEX','TRACK_CELL_NO','MOD_WIND_SPD'/)
 182    real(8),                   allocatable :: bodyValues(:,:), codtypInFileList(:,:)
 183    logical :: beamRangeFound
 184    real(pre_obsReal)           :: missingValue
 185
 186    ! Namelist variables:
 187    integer                  :: numberElem        ! MUST NOT BE INCLUDED IN NAMELIST!
 188    character(len=256)       :: listElem          ! list of bufr element ids to read
 189    character(len=256)       :: sqlExtraDat       ! can be used e.g. for ' and id_obs in (select id_obs from header where...) '
 190    character(len=256)       :: sqlExtraDat_sat   ! same as sqlExtraDat, but only for SST satellite obs
 191    character(len=256)       :: sqlExtraHeader    ! can be used e.g. for ' id_stn in (...) '
 192    character(len=256)       :: sqlExtraHeader_sat! same as sqlExtraHeader, but only for SST satellite obs
 193    integer                  :: codtyp_sat(10)    ! list of codtyp values of SST satellite obs (default is 88)
 194    character(len=256)       :: sqlNull           ! can be used e.g. for ' and obsvalue is not null '
 195
 196    namelist /NAMSQLtovs/ numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 197    namelist /NAMSQLua/   numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 198    namelist /NAMSQLai/   numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 199    namelist /NAMSQLsw/   numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 200    namelist /NAMSQLro/   numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 201    namelist /NAMSQLsfc/  numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 202    namelist /NAMSQLsc/   numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 203    namelist /NAMSQLpr/   numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 204    namelist /NAMSQLal/   numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 205    namelist /NAMSQLgl/   numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 206    namelist /NAMSQLradar/numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull
 207    namelist /NAMSQLsst/  numberElem,listElem,sqlExtraDat,sqlExtraHeader,sqlNull, &
 208                          sqlExtraDat_sat,sqlExtraHeader_sat,codtyp_sat
 209
 210    missingValue = real(MPC_missingValue_R8,pre_obsReal)
 211
 212    write(*,*) 'sqlr_readSqlite: fileName   : ', trim(fileName)
 213    write(*,*) 'sqlr_readSqlite: familyType : ', trim(familyType)
 214
 215    ! Get the codtyp from the file to use in determining how to build the main queries
 216    call sqlu_getColumnValuesNum(codtypInFileList, fileName=trim(fileName), &
 217                                 tableName='header', sqlColumnNames=(/'codtyp'/), &
 218                                 extraQuery_opt='group by codtyp')
 219    write(*,*) 'sqlr_readSqlite: codtyp array = ', codtypInFileList(:,:)
 220
 221    ! Default namelist variable values
 222    sqlExtraHeader     = ''
 223    sqlExtraDat        = ''
 224    sqlExtraHeader_sat = ''
 225    sqlExtraDat_sat    = ''
 226    codtyp_sat(:)      = MPC_missingValue_INT
 227    codtyp_sat(1)      = 88
 228    sqlNull            = ''
 229    listElem           = ''
 230    numberElem         = MPC_missingValue_INT
 231    
 232    ! Set the type of vertical coordinate
 233    vertCoordType  = 1
 234    select case(trim(familyType))
 235      case('RA','PR','AL','RO','SF','TM','SC','GL','HY','GP')
 236        vertCoordType = 1
 237      case('UA','AI','SW')
 238        vertCoordType = 2
 239      case('TO')
 240        vertCoordType = 3
 241      case default
 242        call utl_abort('sqlr_readSqlite: unknown family '//trim(familyType))
 243    end select
 244
 245    ! Set multiplying factor for vertical coordinate
 246    select case(trim(familyType))
 247      case('SF','TM','SC')
 248        vertCoordFact = 0
 249      case default
 250        vertCoordFact = 1
 251    end select
 252
 253    ! Set multiplying factor used when adding elevation to vcoord
 254    select case(trim(familyType))
 255      case('PR','SF','TM','GP')
 256        elevFact=1.0
 257      case default
 258        elevFact=0.0
 259    end select
 260
 261    ! Read appropriate namelist based on obs family
 262    nulnam = 0
 263    ierr=fnom(nulnam, './flnml', 'FTN+SEQ+R/O', 0)
 264    select case(trim(familyType))
 265      case('TO')
 266        read(nulnam, nml = NAMSQLtovs, iostat = ierr)
 267        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLtovs')
 268        if (mmpi_myid == 0) write(*, nml = NAMSQLtovs)
 269      case('UA')
 270        read(nulnam, nml = NAMSQLua, iostat = ierr)
 271        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLua')
 272        if (mmpi_myid == 0) write(*, nml = NAMSQLua)
 273      case ('AI')
 274        read(nulnam, nml = NAMSQLai, iostat = ierr)
 275        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLai')
 276        if (mmpi_myid == 0) write(*, nml = NAMSQLai)
 277      case ('SW')
 278        read(nulnam, nml = NAMSQLsw, iostat = ierr)
 279        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLsw')
 280        if (mmpi_myid == 0) write(*, nml = NAMSQLsw)
 281      case ('PR')
 282        read(nulnam, nml = NAMSQLpr, iostat = ierr)
 283        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLpr')
 284        if (mmpi_myid == 0) write(*, nml =  NAMSQLpr)
 285      case ('AL')  
 286        read(nulnam, nml = NAMSQLal, iostat = ierr)
 287        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLal')
 288        if (mmpi_myid == 0) write(*, nml =  NAMSQLal)
 289      case ('RO')     
 290        read(nulnam, nml = NAMSQLro, iostat = ierr)
 291        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLro')
 292        if (mmpi_myid == 0) write(*, nml = NAMSQLro)
 293      case ('SF','GP','HY')
 294        read(nulnam, nml = NAMSQLsfc, iostat = ierr)
 295        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLsfc')
 296        if (mmpi_myid == 0) write(*, nml = NAMSQLsfc)        
 297      case ('SC')
 298        read(nulnam, nml = NAMSQLsc, iostat = ierr)
 299        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLsc')
 300        if (mmpi_myid == 0) write(*, nml = NAMSQLsc)
 301      case ('TM')
 302        read(nulnam, nml = NAMSQLsst, iostat = ierr)
 303        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLsst')
 304        if (mmpi_myid == 0) write(*, nml = NAMSQLsst)
 305        do rowIndex = 1, size(codtypInFileList,1)
 306          if (any(nint(codtypInFileList(rowIndex,1)) == codtyp_sat(:))) then
 307            write(*,*) 'sqlr_readSqlite: Found satellite SST observation in file:', &
 308                       nint(codtypInFileList(rowIndex,1))
 309            sqlExtraHeader = sqlExtraHeader_sat
 310            sqlExtraDat = sqlExtraDat_sat
 311          end if
 312        end do
 313      case ('GL')
 314        read(nulnam, nml = NAMSQLgl, iostat = ierr)
 315        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLgl')
 316        if (mmpi_myid == 0) write(*, nml =  NAMSQLgl)
 317      case('RA')
 318        read(nulnam, nml = NAMSQLradar, iostat = ierr)
 319        if (ierr /= 0) call utl_abort('sqlr_readSqlite: Error reading namelist: NAMSQLradar')
 320        if (mmpi_myid == 0) write(*, nml =  NAMSQLradar) 
 321      case default
 322        call utl_abort('sqlr_readSqlite: No namelist read for this family: '//trim(familyType))
 323    end select
 324    ierr=fclos(nulnam)
 325    
 326    if (numberElem /= MPC_missingValue_R4) then
 327      call utl_abort('sqlr_readSqlite: check namelist, numberElem should be removed')
 328    end if
 329    numberElem = count(transfer(listElem, 'a', len(listElem)) == ',') + 1
 330
 331    ! Set the observation variable transforms
 332    if (numberElem > 0) then
 333      call utl_splitString(listElem,',', listElemArray)
 334      call utl_stringArrayToIntegerArray(listElemArray, listElemArrayInteger)
 335      call ovt_setup(listElemArrayInteger)
 336      deallocate(listElemArrayInteger)
 337      deallocate(listElemArray)
 338    end if
 339
 340    ! Compose SQL queries
 341
 342    ! Get list of all header table columns in file, then remove those we don't read
 343    call sqlu_getSqlColumnNames(headSqlNames, fileName=trim(fileName), &
 344                                tableName='header', dataType='all')
 345    do columnIndex = 1, size(headSqlNames)
 346      if (utl_findloc(headSqlNamesToRead,headSqlNames(columnIndex)) == 0) then
 347        headSqlNames(columnIndex) = ''
 348      end if
 349    end do
 350    call utl_removeEmptyStrings(headSqlNames)
 351    do columnIndex = 1, size(headSqlNames)
 352      write(*,*) 'sqlr_readSqlite: headSqlNames   =', columnIndex, &
 353                 trim(headSqlNames(columnIndex))
 354    end do
 355    call utl_combineString(columnsHeader, ',', headSqlNames)
 356
 357    ! ordering of data that will get read in matdata, bodyPrimaryKeys and bodyHeadKeys
 358    sqlDataOrder = ' order by id_obs, varno, id_data' 
 359
 360    selectIDs  = 'select id_data, id_obs'
 361    csqlcrit = 'varno in ('//trim(listElem)//')'//trim(sqlExtraDat)//trim(SQLNull)
 362
 363    ! Determine names of body columns present in sqlite file
 364    call sqlu_getSqlColumnNames(bodySqlNames, fileName=trim(fileName), &
 365                                tableName='data', dataType='numeric')
 366    do columnIndex = 1, size(bodySqlNames)
 367      write(*,*) 'sqlr_readSqlite: bodySqlNames   =', columnIndex, &
 368                 trim(bodySqlNames(columnIndex))
 369    end do
 370
 371    ! Read all of the columns in the file, then we decide what to do with them later
 372    extraQueryData = 'where '//trim(csqlcrit)//trim(sqlDataOrder)
 373    call sqlu_getColumnValuesNum(bodyValues, fileName=trim(fileName), &
 374                                 tableName='data', sqlColumnNames=bodySqlNames, &
 375                                 extraQuery_opt=extraQueryData)
 376    numberBodyRows    = size(bodyValues,1)
 377    numberBodyColumns = size(bodyValues,2)
 378
 379    ! It is very important that queryIDs and queryData be identical except for the column names being selected
 380    queryIDs  = trim(selectIDs) //' from data where '//trim(csqlcrit)//trim(sqlDataOrder)//';'
 381    write(*,'(4a)') 'sqlr_readSqlite: queryIDs     --> ', trim(queryIDs)
 382
 383    ! Open the sqlite file
 384    call fSQL_open(db, trim(fileName) ,stat)
 385    if (fSQL_error(stat) /= FSQL_OK) then
 386      call utl_abort('sqlr_readSqlite: fSQL_open '//fSQL_errmsg(stat))
 387    end if
 388
 389    ! Read id_data and id_obs columns in the body table ("status" not set when getting integers)
 390    call fSQL_prepare(db, trim(queryIDs) , stmt2, status=stat)
 391    call fSQL_get_many(stmt2, nrows=numberIDsRows, ncols=numberIDsColumns, &
 392                       mode=FSQL_INT8)
 393    allocate(tempBodyKeys(numberIDsRows,numberIDsColumns))
 394    call fSQL_fill_matrix(stmt2, tempBodyKeys)
 395    allocate(bodyPrimaryKeys(numberIDsRows))
 396    allocate(bodyHeadKeys(numberIDsRows))
 397    bodyPrimaryKeys(:) = tempBodyKeys(:,1)
 398    bodyHeadKeys(:)    = tempBodyKeys(:,2)
 399    deallocate(tempBodyKeys)
 400    call fSQL_free_mem(stmt2)
 401    call fSQL_finalize(stmt2)
 402
 403    if (numberIDsRows /= numberBodyRows) then
 404      call utl_abort('sqlr_readSqlite: number of body keys not equal to number of rows in bodyValues')
 405    end if
 406
 407    headerIndex  = obs_numHeader(obsdat)
 408    bodyIndex = obs_numBody(obsdat)
 409    numHeader = obs_numHeader(obsdat)
 410    numBody   = obs_numBody(obsdat)
 411    write(*,*) 'sqlr_readSqlite: DEBUT numheader  =', numHeader
 412    write(*,*) 'sqlr_readSqlite: DEBUT numbody    =', numBody
 413    write(*,*) 'sqlr_readSqlite:  numberBodyRows numberBodyColumns =', numberBodyRows, numberBodyColumns
 414    write(*,*) 'sqlr_readSqlite: =========================================='
 415
 416    ! Here is a summary of what is going on in the rest of this routine:
 417    ! 
 418    ! for each row of the body table (which has already been read)
 419    !
 420    !   if this is the first id_data associated with a given id_obs
 421    !     -> read associated header and write to obsSpaceData
 422    !
 423    !   -> write body entry in obsSpaceData
 424    !
 425    obsNlv = 0
 426    call fSQL_begin(db)
 427    BODY: do rowIndex = 1, numberIDsRows
 428
 429      ! id_obs, bodyIndex and obsNlv values for this entry
 430      headPrimaryKey = bodyHeadKeys(rowIndex)
 431      bodyIndex = bodyIndex + 1
 432      obsNlv    = obsNlv + 1
 433      call obs_setBodyPrimaryKey(obsdat, bodyIndex, bodyPrimaryKeys(rowIndex))
 434
 435      if (obs_columnActive_RB(obsdat, OBS_OMA))  call obs_bodySet_r(obsdat, OBS_OMA , bodyIndex, missingValue)
 436      if (obs_columnActive_RB(obsdat, OBS_OMA0)) call obs_bodySet_r(obsdat, OBS_OMA0, bodyIndex, missingValue)
 437      if (obs_columnActive_RB(obsdat, OBS_OMP))  call obs_bodySet_r(obsdat, OBS_OMP , bodyIndex, missingValue)
 438      if (obs_columnActive_RB(obsdat, OBS_OMP6)) call obs_bodySet_r(obsdat, OBS_OMP6, bodyIndex, missingValue)
 439      if (obs_columnActive_RB(obsdat, OBS_OER))  call obs_bodySet_r(obsdat, OBS_OER , bodyIndex, missingValue)
 440      if (obs_columnActive_RB(obsdat, OBS_HPHT)) call obs_bodySet_r(obsdat, OBS_HPHT, bodyIndex, missingValue)
 441      if (obs_columnActive_RB(obsdat, OBS_HAHT)) call obs_bodySet_r(obsdat, OBS_HAHT, bodyIndex, missingValue)
 442      if (obs_columnActive_RB(obsdat, OBS_WORK)) call obs_bodySet_r(obsdat, OBS_WORK, bodyIndex, missingValue)
 443      if (obs_columnActive_RB(obsdat, OBS_SIGI)) call obs_bodySet_r(obsdat, OBS_SIGI, bodyIndex, missingValue)
 444      if (obs_columnActive_RB(obsdat, OBS_SIGO)) call obs_bodySet_r(obsdat, OBS_SIGO, bodyIndex, missingValue)
 445      if (obs_columnActive_RB(obsdat, OBS_ZHA))  call obs_bodySet_r(obsdat, OBS_ZHA , bodyIndex, missingValue)
 446      if (obs_columnActive_RB(obsdat, OBS_BCOR)) call obs_bodySet_r(obsdat, OBS_BCOR, bodyIndex, missingValue)
 447
 448      READHEADER: if (obsNlv == 1) then
 449        headerIndex = headerIndex + 1
 450
 451        ! This is the first body entry associated with a new headPrimaryKey
 452        call obs_headSet_i(obsdat, OBS_RLN, headerIndex, bodyIndex)
 453
 454        ! we read the associated header
 455        if (trim(sqlExtraHeader) == '') then
 456          queryHeader = 'select '//trim(columnsHeader)//' from header where id_obs = ? '
 457        else
 458          queryHeader = 'select '//trim(columnsHeader)//' from header where '//trim(sqlExtraHeader)//' and id_obs = ? '
 459        end if
 460
 461        if (rowIndex == 1) then
 462          write(*,'(4a)') 'sqlr_readSqlite: first queryHeader    --> ', trim(queryHeader)
 463        end if
 464        call fSQL_prepare(db, queryHeader, stmt, stat)
 465        if (fSQL_error(stat)  /= FSQL_OK) then
 466          write(*,*) 'sqlr_readSqlite: problem reading header entry. Query: ', trim(queryHeader)
 467          call sqlu_handleError(stat, 'fSQL_prepare')
 468        end if
 469
 470        call fSQL_bind_param(stmt, param_index = 1, int8_var = headPrimaryKey)
 471
 472        call fSQL_get_row(stmt, finished)
 473        if (finished) then
 474          write(*,*) 'sqlr_readSqlite: problem reading header entry. Query:', trim(queryHeader)
 475          call utl_abort('Problem with fSQL_get_row()')
 476        end if
 477
 478        ! The query result is inserted into variables
 479        terrainType = MPC_missingValue_INT
 480        landSea     = MPC_missingValue_INT
 481        sensor      = MPC_missingValue_INT
 482        cloudCoverReal = MPC_missingValue_R4
 483        elev = 0.
 484        elevReal = 0.
 485        solarAzimuthReal = MPC_missingValue_R4
 486        solarZenithReal  = MPC_missingValue_R4
 487        zenithReal = MPC_missingValue_R4
 488        instrument = MPC_missingValue_INT
 489        azimuthReal_R8 = MPC_missingValue_R8
 490        beamAzimuth    = MPC_missingValue_R4
 491        beamElevation  = MPC_missingValue_R4
 492        beamRangeStart = MPC_missingValue_R4
 493        beamRangeEnd   = MPC_missingValue_R4
 494        obsSat = MPC_missingValue_INT
 495
 496        ! Set some default values
 497        if (trim(familyType) == 'TO') then
 498          call obs_headSet_i(obsdat, OBS_TTYP, headerIndex, terrainType)
 499        end if
 500        call obs_headSet_r(obsdat, OBS_ALT, headerIndex, elevReal)
 501
 502        call obs_setFamily(obsdat, trim(familyType), headerIndex)
 503        call obs_setHeadPrimaryKey(obsdat, headerIndex, headPrimaryKey)
 504        call obs_headSet_i(obsdat, OBS_ONM, headerIndex, headerIndex)
 505        do columnIndex = 1, size(headSqlNames)
 506          select case(trim(headSqlNames(columnIndex)))
 507            case('LAT')
 508              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = obsLat)
 509              xlat = obsLat * MPC_RADIANS_PER_DEGREE_R8
 510              call obs_headSet_r(obsdat, OBS_LAT, headerIndex, xLat)
 511            case('LON')
 512              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = obsLon)
 513              if (obsLon < 0.) obsLon = obsLon + 360.
 514              xlon = obsLon * MPC_RADIANS_PER_DEGREE_R8
 515              call obs_headSet_r(obsdat, OBS_LON, headerIndex, xLon)
 516            case('CODTYP')
 517              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = codeType)
 518              call obs_headSet_i(obsdat, OBS_ITY, headerIndex, codeType)
 519            case('DATE')
 520              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = obsDate)
 521              call obs_headSet_i(obsdat, OBS_DAT, headerIndex, obsDate)
 522            case('TIME')
 523              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = obsTime)
 524              obsTime = obsTime/100
 525              call obs_headSet_i(obsdat, OBS_ETM, headerIndex, obsTime)
 526            case('ID_STN')
 527              call fSQL_get_column(stmt, COL_INDEX = columnIndex, char_var  = idStation)
 528              call obs_set_c(obsdat, 'STID' , headerIndex, trim(idStation))
 529            case('STATUS')
 530              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = obsStatus)
 531              call obs_headSet_i(obsdat, OBS_ST1, headerIndex, obsStatus)
 532            case('ELEV','ANTENNA_ALTITUDE')
 533              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = elev, REAL_MISSING=MPC_missingValue_R4)
 534              elevReal=elev
 535              if (trim(familyType) /= 'TO') then
 536                call obs_headSet_r(obsdat, OBS_ALT ,headerIndex, elevReal)
 537              end if
 538            case('LAND_SEA')
 539              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = landSea, INT_MISSING=MPC_missingValue_INT)
 540              call obs_headSet_i(obsdat, OBS_STYP, headerIndex, landSea)
 541            case('ID_SAT')
 542              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = obsSat, INT_MISSING=MPC_missingValue_INT)
 543              call obs_headSet_i(obsdat, OBS_SAT , headerIndex, obsSat)
 544            case('INSTRUMENT')
 545              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = instrument, INT_MISSING=MPC_missingValue_INT)
 546            case('ZENITH')
 547              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = zenithReal, REAL_MISSING=MPC_missingValue_R4)
 548              call obs_headSet_r(obsdat, OBS_SZA , headerIndex, real(zenithReal,kind=pre_obsReal))
 549            case('SOLAR_ZENITH')
 550              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = solarZenithReal, REAL_MISSING=MPC_missingValue_R4)
 551              call obs_headSet_r(obsdat, OBS_SUN , headerIndex, real(solarZenithReal,kind=pre_obsReal))
 552            case('AZIMUTH')
 553              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real8_var = azimuthReal_R8)
 554              call obs_headSet_r(obsdat, OBS_AZA , headerIndex, real(azimuthReal_R8,kind=pre_obsReal))
 555            case('TERRAIN_TYPE')
 556              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = terrainType, INT_MISSING=MPC_missingValue_INT)
 557              call obs_headSet_i(obsdat, OBS_TTYP, headerIndex, terrainType)
 558            case('CLOUD_COVER')
 559              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = cloudCoverReal, REAL_MISSING=MPC_missingValue_R4)
 560              call obs_headSet_r(obsdat, OBS_CLF , headerIndex, real(cloudCoverReal,kind=pre_obsReal))
 561            case('SOLAR_AZIMUTH')
 562              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = solarAzimuthReal, REAL_MISSING=MPC_missingValue_R4)
 563              call obs_headSet_r(obsdat, OBS_SAZ , headerIndex, real(solarAzimuthReal,kind=pre_obsReal))
 564            case('SENSOR')
 565              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = sensor, INT_MISSING=MPC_missingValue_INT)
 566            case('FANION_QUAL_IASI_SYS_IND')
 567              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = iasiGeneralQualityFlag, INT_MISSING=MPC_missingValue_INT)
 568              if (obs_columnActive_IH(obsdat,OBS_GQF)) then
 569                call obs_headSet_i(obsdat,OBS_GQF, headerIndex, iasiGeneralQualityFlag)
 570              end if
 571            case('INDIC_NDX_QUAL_GEOM')
 572              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = iasiImagerCollocationFlag, INT_MISSING=MPC_missingValue_INT)
 573              if (obs_columnActive_IH(obsdat,OBS_GQL)) then
 574                call obs_headSet_i(obsdat, OBS_GQL, headerIndex, iasiImagerCollocationFlag)
 575              end if
 576            case('RO_QC_FLAG')
 577              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = roQcFlag, INT_MISSING=MPC_missingValue_INT)
 578              call obs_headSet_i(obsdat, OBS_ROQF, headerIndex, roQcFlag)
 579            case('GEOID_UNDULATION')
 580              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real8_var = geoidUndulation_R8)
 581              geoidUndulation = geoidUndulation_R8
 582              call obs_headSet_r(obsdat, OBS_GEOI, headerIndex, geoidUndulation)
 583            case('EARTH_LOCAL_RAD_CURV')
 584              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real8_var = earthLocRadCurv_R8)
 585              earthLocRadCurv = earthLocRadCurv_R8
 586              call obs_headSet_r(obsdat, OBS_TRAD, headerIndex, earthLocRadCurv)
 587            case('CENTER_AZIMUTH')
 588              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = beamAzimuth)
 589              beamAzimuthReal   = beamAzimuth   * MPC_RADIANS_PER_DEGREE_R8
 590              call obs_headSet_r(obsdat, OBS_RZAM, headerIndex, beamAzimuthReal)
 591            case('CENTER_ELEVATION')
 592              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = beamElevation)
 593              beamElevationReal = beamElevation * MPC_RADIANS_PER_DEGREE_R8
 594              call obs_headSet_r(obsdat, OBS_RELE, headerIndex, beamElevationReal)
 595            case('RANGE_START')
 596              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = beamRangeStart)
 597              call obs_headSet_r(obsdat, OBS_RANS, headerIndex, real(beamRangeStart,kind=pre_obsReal))
 598            case('RANGE_END')
 599              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real_var  = beamRangeEnd)
 600              call obs_headSet_r(obsdat, OBS_RANE, headerIndex, real(beamRangeEnd,kind=pre_obsReal))
 601            case('ID_PROF')
 602              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = idProf)
 603              call obs_headSet_i(obsdat, OBS_PRFL, headerIndex, idProf)
 604            case('CHARTINDEX')
 605              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = iceChartID)
 606              call obs_headSet_i(obsdat, OBS_CHID, headerIndex, iceChartID)
 607            case('TRACK_CELL_NO')
 608              call fSQL_get_column(stmt, COL_INDEX = columnIndex, int_var   = trackCellNum)
 609              if (trackCellNum > 21) trackCellNum = 43 - trackCellNum
 610              call obs_headSet_i(obsdat, OBS_FOV, headerIndex, trackCellNum)
 611            case('MOD_WIND_SPD')
 612              call fSQL_get_column(stmt, COL_INDEX = columnIndex, real8_var = modelWindSpeed_R8)
 613              modelWindSpeed = modelWindSpeed_R8
 614              call obs_headSet_r(obsdat, OBS_MWS, headerIndex, modelWindSpeed)
 615          end select
 616        end do
 617
 618        ! we are done reading this header entry
 619        call fSQL_finalize(stmt)
 620
 621        ! adjust some values that were just read
 622        if (trim(familyType) == 'TO') then
 623          if (instrument == 420) obsSat  = 784
 624          if (codeType == codtyp_get_codtyp('crisfsr') .and. instrument == 620) instrument = 2046
 625          if (sensor == MPC_missingValue_INT) then
 626            sensor = 0
 627            if (instrument == MPC_missingValue_INT) instrument = 0
 628          else
 629            instrument = obsu_cvt_obs_instrum(sensor)
 630          end if
 631          call obs_headSet_i(obsdat, OBS_SAT , headerIndex, obsSat)
 632          call obs_headSet_i(obsdat, OBS_INS , headerIndex, instrument)
 633        end if
 634
 635      end if READHEADER
 636
 637      ! Initialize some obsSpaceData body values
 638      if (obs_columnActive_RB(obsdat,OBS_LATD)) then
 639        call obs_bodySet_r(obsdat, OBS_LATD, bodyIndex, obs_missingValue_R)
 640      end if
 641      if (obs_columnActive_RB(obsdat,OBS_LOND)) then
 642        call obs_bodySet_r(obsdat, OBS_LOND, bodyIndex, obs_missingValue_R)
 643      end if
 644      if (obs_columnActive_RB(obsdat,OBS_SEM)) then
 645        call obs_bodySet_r(obsdat, OBS_SEM, bodyIndex, 0.0)
 646      end if
 647
 648      ! Copy body row into obsspacedata by looping over all columns that were read
 649      beamRangeFound = .false. ! Needed to overwrite obs_ppp, regardless of column order
 650      do columnIndex = 1, size(bodySqlNames)
 651        select case(trim(bodySqlNames(columnIndex)))
 652          case('VCOORD')
 653            vertCoord = bodyValues(rowIndex,columnIndex)
 654            if (trim(familyType) /= 'RA' .and. trim(familyType) /= 'TO') then
 655              vertCoord = vertCoord * vertCoordFact + elevReal * elevFact
 656            end if
 657            call obs_bodySet_r(obsdat, OBS_PPP, bodyIndex, vertCoord)
 658          case('VCOORD_TYPE')
 659            ! This is set earlier based on familyType
 660            call obs_bodySet_i(obsdat, OBS_VCO, bodyIndex, vertCoordType)
 661          case('VARNO')
 662            obsVarno = int(bodyValues(rowIndex,columnIndex))
 663            call obs_bodySet_i(obsdat, OBS_VNM, bodyIndex, obsVarno)
 664          case('OBSVALUE')
 665            obsValue = bodyValues(rowIndex,columnIndex)
 666            if (trim(familyType) == 'TO' .and. obsValue == MPC_missingValue_R8) then
 667              ! Is this really needed???
 668              obsValue = real(MPC_missingValue_R8,pre_obsReal)
 669            end if
 670            call obs_bodySet_r(obsdat, OBS_VAR, bodyIndex, obsValue)
 671          case('OBS_ERROR')
 672            obsError = bodyValues(rowIndex,columnIndex)
 673            if (obsError > 0.0D0) then
 674              call obs_bodySet_r(obsdat, OBS_OER, bodyIndex, obsError)
 675            end if
 676          case('FLAG')
 677            obsFlag = int(bodyValues(rowIndex,columnIndex))
 678            if (obsFlag == mpc_missingValue_INT) obsFlag = 0
 679            call obs_bodySet_i(obsdat, OBS_FLG, bodyIndex, obsFlag)
 680          case('SURF_EMISS')
 681            if (obs_columnActive_RB(obsdat,OBS_SEM)) then
 682              call obs_bodySet_r(obsdat, OBS_SEM, bodyIndex, bodyValues(rowIndex,columnIndex) * zemFact)
 683            end if
 684          case('BIAS_CORR')
 685            if (obs_columnActive_RB(obsdat,OBS_BCOR)) then
 686              call obs_bodySet_r(obsdat, OBS_BCOR, bodyIndex, bodyValues(rowIndex,columnIndex))
 687            end if
 688          case('RANGE')
 689            beamRangeFound = .true.
 690            beamRange = bodyValues(rowIndex,columnIndex)
 691            call obs_bodySet_r(obsdat, OBS_LOCI, bodyIndex, beamRange)
 692            ! elevation and azimuths are converted to radians and pre_obsReal precision
 693            beamElevationReal = beamElevation * MPC_RADIANS_PER_DEGREE_R8
 694            beamAzimuthReal   = beamAzimuth   * MPC_RADIANS_PER_DEGREE_R8
 695
 696            call rdv_getlatlonHRfromRange(xlat, xlon, beamElevationReal, beamAzimuthReal, & !in
 697                                          elevReal, beamRange,                            & !in
 698                                          beamLat, beamLon, beamHeight, beamDistance)       !out
 699            call obs_bodySet_r(obsdat, OBS_LATD, bodyIndex, beamLat)
 700            call obs_bodySet_r(obsdat, OBS_LOND, bodyIndex, beamLon)
 701        end select
 702      end do
 703
 704      ! Replace the value of obs_ppp if a radar beam range value was found
 705      if (beamRangeFound) call obs_bodySet_r(obsdat, OBS_PPP, bodyIndex, beamHeight)
 706
 707      ! Activate the 'rejected by selection process' bit if observed value is missing
 708      if (obsValue == MPC_missingValue_R8) then
 709        obsFlag  = obs_bodyElem_i(obsdat, obs_flg, bodyIndex)
 710        call obs_bodySet_i(obsdat, OBS_FLG, bodyIndex, ibset(obsFlag,11))
 711      end if
 712
 713      ! In some cases we need to add extra row(s) to the BODY table
 714      if (.not. filt_bufrCodeAssimilated(obsVarno) .and. &
 715          .not. ovt_bufrCodeSkipped(obsVarno)) then
 716          
 717        ! Add an extra row to the obsSpaceData body table
 718        ! to contain quantity later calculated by ovt_transformObsValue
 719        call obs_setBodyPrimaryKey(obsdat, bodyIndex+1, -1)
 720        call sqlr_addExtraDataRow(obsdat, vertCoord * vertCoordFact + elevReal * elevFact, &
 721                                  ovt_getDestinationBufrCode(obsVarno), &
 722                                  vertCoordType, bodyIndex+1)
 723        bodyIndex = bodyIndex + 1
 724        obsNlv = obsNlv + 1
 725        if (ovt_isWindObs(obsVarno)) then
 726          ! Add an extra row for the other wind component
 727          call obs_setBodyPrimaryKey(obsdat, bodyIndex+1, -1)
 728          call sqlr_addExtraDataRow(obsdat, vertCoord * vertCoordFact + elevReal * elevFact, &
 729                                    ovt_getDestinationBufrCode(obsVarno,extra_opt=.true.), &
 730                                    vertCoordType, bodyIndex+1)
 731          bodyIndex = bodyIndex + 1
 732          obsNlv = obsNlv + 1
 733        end if
 734      end if
 735
 736      ! Logic to determine whether we are finished with this header entry
 737      finishedWithHeader = .false.
 738      if (rowIndex == numberIDsRows) then
 739        ! This is the last body entry, nothing comes after.
 740        finishedWithHeader = .true.
 741      else
 742        if (headPrimaryKey /= bodyHeadKeys(rowIndex+1)) then
 743          ! we just treated the last body entry associated with this id_obs (headPrimaryKey)
 744          ! make header for this id_obs
 745          finishedWithHeader = .true.
 746        end if
 747      end if
 748      if (finishedWithHeader) then
 749        ! Record the number of body rows associated with this header entry
 750        call obs_headSet_i(obsdat, OBS_NLV, headerIndex, obsNlv)
 751        obsNlv = 0 ! and reset counter for next batch of body entries
 752      end if
 753
 754    end do BODY
 755    call fSQL_commit(db)
 756
 757    numHeader = obs_numHeader(obsdat)
 758    numBody   = obs_numBody(obsdat)
 759    write(*,*) 'sqlr_readSqlite: FIN numheader  =', numHeader
 760    write(*,*) 'sqlr_readSqlite: FIN numbody    =', numBody
 761
 762    call fSQL_close(db, stat) 
 763    if (fSQL_error(stat) /= FSQL_OK) then
 764      write(*,*) 'sqlr_readSqlite: problem closing sqlite db', trim(fileName)
 765      call sqlu_handleError(stat, 'fSQL_close')
 766    end if
 767
 768  end subroutine sqlr_readSqlite
 769
 770  !--------------------------------------------------------------------------
 771  ! sqlr_addExtraDataRow
 772  !--------------------------------------------------------------------------
 773  subroutine sqlr_addExtraDataRow(obsdat, vertCoord, obsVarno, vertCoordType, numberData)
 774    !
 775    !:Purpose: Initialize data values for an extra row in data table.
 776    !
 777    implicit none
 778
 779    ! Arguments:
 780    type (struct_obs), intent(inout) :: obsdat
 781    real(pre_obsReal), intent(in)    :: vertCoord
 782    integer          , intent(in)    :: obsVarno
 783    integer          , intent(in)    :: vertCoordType
 784    integer          , intent(in)    :: numberData
 785
 786    call obs_bodySet_r(obsdat, OBS_PPP,  numberData, vertCoord)
 787    call obs_bodySet_r(obsdat, OBS_VAR,  numberData, obs_missingValue_R)
 788    call obs_bodySet_i(obsdat, OBS_VNM,  numberData, obsVarno)
 789    call obs_bodySet_i(obsdat, OBS_FLG,  numberData, 0)
 790    call obs_bodySet_i(obsdat, OBS_VCO,  numberData, vertCoordType)
 791    call obs_bodySet_r(obsdat, OBS_LATD, numberData, obs_missingValue_R)
 792    call obs_bodySet_r(obsdat, OBS_LOND, numberData, obs_missingValue_R)
 793    if (obs_columnActive_RB(obsdat, OBS_OMA))  call obs_bodySet_r(obsdat, OBS_OMA , numberData, obs_missingValue_R)
 794    if (obs_columnActive_RB(obsdat, OBS_OMA0)) call obs_bodySet_r(obsdat, OBS_OMA0, numberData, obs_missingValue_R)
 795    if (obs_columnActive_RB(obsdat, OBS_OMP))  call obs_bodySet_r(obsdat, OBS_OMP , numberData, obs_missingValue_R)
 796    if (obs_columnActive_RB(obsdat, OBS_OMP6)) call obs_bodySet_r(obsdat, OBS_OMP6, numberData, obs_missingValue_R)
 797    if (obs_columnActive_RB(obsdat, OBS_OER))  call obs_bodySet_r(obsdat, OBS_OER , numberData, obs_missingValue_R)
 798    if (obs_columnActive_RB(obsdat, OBS_HPHT)) call obs_bodySet_r(obsdat, OBS_HPHT, numberData, obs_missingValue_R)
 799    if (obs_columnActive_RB(obsdat, OBS_HAHT)) call obs_bodySet_r(obsdat, OBS_HAHT, numberData, obs_missingValue_R)
 800    if (obs_columnActive_RB(obsdat, OBS_WORK)) call obs_bodySet_r(obsdat, OBS_WORK, numberData, obs_missingValue_R)
 801    if (obs_columnActive_RB(obsdat, OBS_SIGI)) call obs_bodySet_r(obsdat, OBS_SIGI, numberData, obs_missingValue_R)
 802    if (obs_columnActive_RB(obsdat, OBS_SIGO)) call obs_bodySet_r(obsdat, OBS_SIGO, numberData, obs_missingValue_R)
 803    if (obs_columnActive_RB(obsdat, OBS_ZHA))  call obs_bodySet_r(obsdat, OBS_ZHA , numberData, obs_missingValue_R)
 804    if (obs_columnActive_RB(obsdat, OBS_BCOR)) call obs_bodySet_r(obsdat, OBS_BCOR, numberData, obs_missingValue_R)
 805
 806  end subroutine sqlr_addExtraDataRow
 807
 808  !--------------------------------------------------------------------------
 809  ! sqlr_addColumn
 810  !--------------------------------------------------------------------------
 811  subroutine sqlr_addColumn(obsSpaceColIndexSource, columnName, tableName, fileName)
 812    !
 813    !:Purpose: Add columns to sqlite tables that does not previously exists.
 814    !
 815    implicit none
 816
 817    ! Arguments:  
 818    character(len=*)   , intent(in)    :: fileName    
 819    character(len=*)   , intent(in)    :: tableName   
 820    character(len=*)   , intent(in)    :: columnName   
 821    integer,             intent(in)    :: obsSpaceColIndexSource  
 822
 823    ! Locals: 
 824    character(len=3000)         :: query
 825    character(len=10)           :: sqlDataType
 826    character(len=*), parameter :: myName = 'sqlr_addColumn'
 827    type(fSQL_STATUS)        :: stat ! sqlite error status
 828    type(fSQL_DATABASE)      :: db   ! sqlite file handle
 829
 830    ! open the obsDB file
 831    call fSQL_open( db, trim(fileName), status=stat )
 832    if ( fSQL_error(stat) /= FSQL_OK ) then
 833      call utl_abort( myName//': fSQL_open '//fSQL_errmsg(stat) )
 834    end if
 835
 836    if ( obs_columnDataType(obsSpaceColIndexSource) == 'real' ) then
 837      sqlDataType = 'double'
 838    else
 839      sqlDataType = 'integer'
 840    end if
 841
 842    query = 'alter table ' // trim(tableName) // ' add column ' // &
 843                trim(columnName) // ' ' // trim(sqlDataType) // ';'
 844
 845    write(*,*) myName//': query = ', trim(query)
 846    call fSQL_do_many( db, query, stat )
 847    if ( fSQL_error(stat) /= FSQL_OK ) then
 848      call utl_abort( myName//': Problem with fSQL_do_many '//fSQL_errmsg(stat) )
 849    end if
 850
 851    ! close the sqlite file
 852    call fSQL_close( db, stat ) 
 853
 854  end subroutine sqlr_addColumn  
 855
 856  !--------------------------------------------------------------------------
 857  ! sqlr_updateSqlite
 858  !--------------------------------------------------------------------------
 859  subroutine sqlr_updateSqlite(db, obsdat, familyType, fileName, fileNumber)
 860    !
 861    !:Purpose: update SQLite files. List of items to update is in the 
 862    !           namSQLUpdate namelist
 863    implicit none
 864    
 865    ! Arguments:
 866    type(fSQL_database), intent(inout) :: db         ! SQL database
 867    type (struct_obs)  , intent(inout) :: obsdat     ! obsSpaceData
 868    character(len=*)   , intent(in)    :: fileName   ! file name  
 869    character(len=*)   , intent(in)    :: familyType ! Observation Family Type 
 870    integer            , intent(in)    :: fileNumber ! FILE NUMBER ASSOCIATED WITH db
 871    
 872    ! Locals:
 873    type(fSQL_statement)        :: stmt ! prepared statement for  SQLite
 874    type(fSQL_status)           :: stat ! type error status
 875    integer                     :: obsRln, obsNlv, obsIdf, obsFlag
 876    integer                     :: obsStatus, last_question, landSea, terrainType
 877    integer(8)                  :: headPrimaryKey, bodyPrimaryKey
 878    integer                     :: itemIndex, headPrimaryKeyIndex, landSeaIndex
 879    integer                     :: headerIndex, bodyIndex
 880    character(len =   3)        :: item
 881    integer                     :: updateList(20), fnom, fclos, nulnam, ierr
 882    character(len =  10)        :: columnName
 883    character(len = 128)        :: query
 884    character(len = 356)        :: itemChar, columnNameChar
 885    logical                     :: back, nonEmptyColumn, nonEmptyColumn_mpiglobal
 886    real(4)                     :: romp, obsValue, scaleFactor, columnValue
 887    integer, parameter :: maxNumberUpdate = 15
 888
 889    ! Namelist variables:
 890    integer          :: numberUpdateItems                    ! MUST NOT BE INCLUDED IN NAMELIST!
 891    integer          :: numberUpdateItemsRadar               ! MUST NOT BE INCLUDED IN NAMELIST!
 892    character(len=3) :: itemUpdateList(maxNumberUpdate)      ! List of columns to be updated (e.g.'OMA','OMP')
 893    character(len=3) :: itemUpdateListRadar(maxNumberUpdate) ! List of columns to be updated for Radar data
 894
 895    namelist/namSQLUpdate/ numberUpdateItems,      itemUpdateList,     &
 896                           numberUpdateItemsRadar, itemUpdateListRadar
 897
 898    write(*,*) 'sqlr_updateSqlite: Starting ===================  '
 899
 900    ! set default values of namelist variables
 901    itemUpdateList(:) = ''
 902    itemUpdateListRadar(:) = ''
 903    numberUpdateItems = MPC_missingValue_INT
 904    numberUpdateItemsRadar = MPC_missingValue_INT
 905
 906    ! Read the namelist for directives
 907    nulnam = 0
 908    ierr   = fnom(nulnam, './flnml', 'FTN+SEQ+R/O', 0)
 909    read(nulnam,nml = namSQLUpdate, iostat = ierr)
 910    if (ierr /= 0) call utl_abort('sqlr_updateSqlite: Error reading namelist')
 911    if (mmpi_myid == 0) write(*, nml = namSQLUpdate)
 912    ierr = fclos(nulnam)
 913    if (numberUpdateItems /= MPC_missingValue_INT) then
 914      call utl_abort('sqlr_updateSqlite: check namelist section namSQLUpdate, numberUpdateItems should be removed')
 915    end if
 916    if (numberUpdateItemsRadar /= MPC_missingValue_INT) then
 917      call utl_abort('sqlr_updateSqlite: check namelist section namSQLUpdate, numberUpdateItemsRadar should be removed')
 918    end if
 919    numberUpdateItems = 0
 920    do itemIndex = 1, maxNumberUpdate
 921      if (trim(itemUpdateList(itemIndex)) == '') exit
 922      numberUpdateItems = numberUpdateItems + 1
 923    end do
 924    numberUpdateItemsRadar = 0
 925    do itemIndex = 1, maxNumberUpdate
 926      if (trim(itemUpdateListRadar(itemIndex)) == '') exit
 927      numberUpdateItemsRadar = numberUpdateItemsRadar + 1
 928    end do
 929    
 930    ! Append extra sqlite columns to update to itemUpdateList
 931    if (trim(familyType) == 'RA') then
 932      do itemIndex = 1, numberUpdateItemsRadar
 933        numberUpdateItems = numberUpdateItems + 1
 934        itemUpdateList(numberUpdateItems) = itemUpdateListRadar(itemIndex)
 935      end do
 936    end if
 937
 938    write(*,*) 'sqlr_updateSqlite: family type   = ', trim(familyType)
 939    write(*,*) 'sqlr_updateSqlite: number of items to update: ', numberUpdateItems
 940    write(*,*) 'sqlr_updateSqlite: file name     = ', trim(fileName)
 941    write(*,*) 'sqlr_updateSqlite: missing value = ', MPC_missingValue_R8    
 942
 943    ! create query
 944    itemChar='  '
 945
 946    do itemIndex = 1, numberUpdateItems
 947      
 948      item = itemUpdateList(itemIndex)
 949      write(*,*) 'sqlr_updateSqlite: updating ', itemIndex, trim(item)
 950
 951      select case(item)
 952      case('OMA')
 953        updateList(itemIndex) = OBS_OMA
 954        columnName = 'oma'
 955      case('OMP')
 956        updateList(itemIndex) = OBS_OMP
 957        columnName = 'omp'
 958      case('VAR')
 959        updateList(itemIndex) = OBS_VAR
 960        columnName = 'obsvalue'
 961      case('OER')
 962        updateList(itemIndex) = OBS_OER
 963        columnName = 'obs_error'
 964      case('FGE')
 965        updateList(itemIndex) = OBS_HPHT
 966        columnName = 'fg_error'
 967      case('EMI')
 968        updateList(itemIndex) = OBS_SEM
 969        columnName = 'surf_emiss'
 970      case('COR')
 971        updateList(itemIndex) = OBS_BCOR
 972        columnName = 'bias_corr'
 973      case('ALT')
 974        updateList(itemIndex) = OBS_PPP
 975        columnName = 'vcoord'
 976      case DEFAULT
 977        call utl_abort('sqlr_updateSqlite: invalid item '// columnName //' EXIT sqlr_updateSQL!!!')
 978      end select
 979
 980      ! Check if column exist. If not, add column when corresponding
 981      ! obsspacedata variable have non-missing values
 982      if (sqlu_sqlColumnExists(fileName, 'data', columnName)) then
 983        itemChar = trim(itemChar)//','// trim(columnName) // ' = ? '
 984      else
 985        write(*,*) 'sqlr_updateSqlite: WARNING: column '//columnName// &
 986                   ' does not exist in the file '//trim(fileName)
 987
 988        nonEmptyColumn = .false.
 989        ! Check if the ObsSpaceData variable contains non-missing values
 990        HEADERCHCK: do headerIndex = 1, obs_numHeader(obsdat)
 991
 992          obsIdf = obs_headElem_i(obsdat,OBS_IDF, headerIndex)
 993          if (obsIdf /= fileNumber) cycle HEADERCHCK
 994          
 995          obsRln = obs_headElem_i(obsdat, OBS_RLN, headerIndex)
 996          obsNlv = obs_headElem_i(obsdat, OBS_NLV, headerIndex)
 997    
 998          BODYCHCK: do bodyIndex = obsRln, obsNlv + obsRln - 1
 999            columnValue = obs_bodyElem_r(obsdat, updateList(itemIndex), bodyIndex)
1000
1001            if (columnValue /= obs_missingValue_R) then
1002              nonEmptyColumn = .true.         
1003              exit HEADERCHCK
1004            end if
1005          
1006          end do BODYCHCK
1007        end do HEADERCHCK
1008
1009        call rpn_comm_allreduce(nonEmptyColumn,nonEmptyColumn_mpiglobal,1, &
1010                              "MPI_LOGICAL","MPI_LOR","grid",ierr)
1011
1012        ! Add column into SQLite file if ObsSpaceData value containes non-missing values
1013        if (nonEmptyColumn_mpiglobal) then
1014          write(*,*) 'sqlr_updateSqlite: ' // trim(columnName) // ' column is not empty and will be added'
1015          call sqlr_addColumn(updateList( itemIndex ), columnName, 'data', fileName)
1016          itemChar = trim(itemChar)//','// trim(columnName) // ' = ? '
1017        end if
1018
1019      end if
1020    end do
1021
1022    back=.true.
1023    last_question  = scan(itemChar, '?', back)
1024    columnNameChar = itemChar(1:last_question)
1025    itemChar    = columnNameChar
1026    
1027    query = ' update data set flag = ? '//trim(itemChar)
1028    query = trim(query)//' where id_data = ?  ;'
1029    write(*,*) 'sqlr_updateSqlite: update query --->  ', query
1030    call fSQL_do_many(db, 'PRAGMA  synchronous = OFF; PRAGMA journal_mode = OFF;')
1031    call fSQL_prepare(db, query , stmt, stat)
1032    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'fSQL_prepare : ')
1033    call fSQL_begin(db)
1034
1035    HEADER: do headerIndex = 1, obs_numHeader(obsdat)
1036 
1037      obsIdf = obs_headElem_i(obsdat,OBS_IDF, headerIndex)
1038 
1039      if (obsIdf /= fileNumber) cycle HEADER
1040      headPrimaryKey = obs_headPrimaryKey(obsdat, headerIndex)
1041      obsRln = obs_headElem_i(obsdat, OBS_RLN, headerIndex)
1042      obsNlv = obs_headElem_i(obsdat, OBS_NLV, headerIndex)
1043
1044      BODY: do bodyIndex = obsRln, obsNlv + obsRln - 1
1045
1046        obsFlag = obs_bodyElem_i(obsdat, OBS_FLG, bodyIndex)
1047        bodyPrimaryKey  = obs_bodyPrimaryKey(obsdat, bodyIndex)
1048
1049        call fSQL_bind_param(stmt, param_index = 1, int_var = obsFlag)
1050        
1051        ITEMS: do itemIndex = 1, numberUpdateItems
1052
1053          obsValue = obs_bodyElem_r(obsdat, OBS_VAR, bodyIndex)
1054          if (obsValue /= obs_missingValue_R) then  
1055            romp = obs_bodyElem_r(obsdat, updateList(itemIndex), bodyIndex)
1056            if (romp == obs_missingValue_R) then
1057              call fSQL_bind_param(stmt, param_index = itemIndex + 1) ! sql null values
1058            else
1059              scaleFactor=1.0
1060              if (updateList(itemIndex) == OBS_SEM) scaleFactor=100.0
1061              call fSQL_bind_param(stmt, param_index = itemIndex + 1, real_var = romp*scaleFactor)
1062            end if
1063          else
1064            call fSQL_bind_param(stmt, param_index = itemIndex + 1)  ! sql null values
1065          end if
1066
1067        end do ITEMS
1068
1069        call fSQL_bind_param(stmt, param_index = numberUpdateItems + 2, int8_var  = bodyPrimaryKey)
1070        call fSQL_exec_stmt (stmt)
1071
1072      end do BODY
1073
1074    end do HEADER
1075
1076    call fSQL_finalize(stmt)
1077
1078    if (trim(familyType) /= 'GL'.and. trim(familyType) /= 'RA') then
1079
1080      ! UPDATES FOR THE STATUS FLAGS and land_sea (for satellites) IN THE HEADER TABLE
1081      ! The variables 'headPrimaryKeyIndex' and 'landSeaIndex' are defined here and used below.
1082      ! Any change in this logic must be coherent with the code below!
1083      if (trim(familyType) == 'TO') then
1084        query = ' update header set status  = ?, land_sea= ? where id_obs = ? '
1085        landSeaIndex = 2
1086        headPrimaryKeyIndex = 3
1087      else
1088        query = ' update header set status  = ?  where id_obs = ? '
1089        headPrimaryKeyIndex = 2
1090      end if
1091      call fSQL_prepare(db, query , stmt, stat)
1092      if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat,'fSQL_prepare : ')
1093
1094      HEADER2: do headerIndex = 1,obs_numHeader(obsdat)
1095         
1096        terrainType=MPC_missingValue_INT
1097        obsIdf = obs_headElem_i(obsdat, OBS_IDF, headerIndex)
1098        if (obsIdf /= fileNumber) cycle HEADER2
1099        headPrimaryKey = obs_headPrimaryKey(obsdat, headerIndex)
1100        obsStatus = obs_headElem_i(obsdat, OBS_ST1, headerIndex)
1101        landsea   = obs_headElem_i(obsdat, OBS_STYP,headerIndex)
1102        call fSQL_bind_param(stmt, param_index = 1, int_var  = obsStatus)
1103        ! The variables 'headPrimaryKeyIndex' and 'landSeaIndex' are defined above and
1104        ! they must be coherent with the query designed above
1105        call fSQL_bind_param(stmt, param_index = headPrimaryKeyIndex, int8_var = headPrimaryKey)
1106        if (trim(familyType) == 'TO') then
1107          call fSQL_bind_param(stmt, param_index = landSeaIndex, int_var  = landSea)
1108        end if
1109
1110        call fSQL_exec_stmt (stmt)
1111
1112      end do HEADER2
1113    
1114      call fSQL_finalize(stmt)
1115
1116    end if
1117
1118    call fSQL_commit(db, stat)
1119    if (fSQL_error(stat)  /= FSQL_OK) then
1120      call sqlu_handleError(stat, 'sqlr_updateSqlite: fSQL_commit')
1121    end if
1122    write(*,*) 'sqlr_updateSqlite: End ===================  ', trim(familyType)
1123
1124  end subroutine sqlr_updateSqlite
1125
1126  !--------------------------------------------------------------------------
1127  ! sqlr_addCloudParametersandEmissivity
1128  !--------------------------------------------------------------------------
1129  subroutine sqlr_addCloudParametersandEmissivity(db, obsdat, fileNumber)
1130    !
1131    !:Purpose: Add a new table if it doesn't already exist `cld_params` with
1132    !          cloud-related information.
1133    !
1134    implicit none
1135
1136    ! Arguments:
1137    type(fSQL_DATABASE), intent(inout) :: db   ! SQLite file handle
1138    type(struct_obs),    intent(in)    :: obsdat
1139    integer,             intent(in)    :: fileNumber
1140
1141    ! Locals:
1142    type(fSQL_STATEMENT)   :: stmt ! type for precompiled SQLite statements
1143    type(fSQL_STATUS)      :: stat !type for error status
1144    character(len = 256)   :: query
1145    integer                :: numberInsert, headerIndex, obsIdo, obsIdf
1146    integer                :: NCO2
1147    real                   :: ETOP,VTOP,ECF,VCF,HE,ZTSR,ZTM,ZTGM,ZLQM,ZPS
1148
1149    ! First create the table if it does not already exist
1150    query = 'create table if not exists cld_params(id_obs integer,ETOP real,VTOP real, &
1151             ECF real,VCF real,HE real,ZTSR real,NCO2 integer,ZTM real,ZTGM real,ZLQM real,ZPS real);'
1152    write(*,*) 'sqlr_addCloudParametersandEmissivity: Create query = ', trim(query)
1153
1154    call fSQL_do(db, trim(query), stat)
1155    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'fSQL_do : ')
1156
1157    ! Insert values in the table
1158    query = 'insert into cld_params(id_obs,ETOP,VTOP,ECF,VCF,HE,ZTSR,NCO2,ZTM,ZTGM,ZLQM,ZPS) &
1159             values(?,?,?,?,?,?,?,?,?,?,?,?);'
1160    write(*,*) 'sqlr_addCloudParametersandEmissivity: Insert query = ', trim(query)
1161
1162    call fSQL_prepare(db, query, stmt, stat)
1163    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'fSQL_prepare : ')
1164    call fSQL_begin(db)
1165    numberInsert=0
1166    HEADER: do headerIndex = 1, obs_numHeader(obsdat)
1167      obsIdf = obs_headElem_i(obsdat, OBS_IDF, headerIndex)
1168      if (obsIdf /= fileNumber) cycle HEADER
1169      obsIdo   = obs_headPrimaryKey(obsdat, headerIndex)
1170      ETOP = obs_headElem_r(obsdat, OBS_ETOP, headerIndex)
1171      VTOP = obs_headElem_r(obsdat, OBS_VTOP, headerIndex)
1172      ECF  = obs_headElem_r(obsdat, OBS_ECF,  headerIndex)
1173      VCF  = obs_headElem_r(obsdat, OBS_VCF,  headerIndex)
1174      HE   = obs_headElem_r(obsdat, OBS_HE,   headerIndex)
1175      ZTSR = obs_headElem_r(obsdat, OBS_ZTSR, headerIndex)
1176      NCO2 = obs_headElem_i(obsdat, OBS_NCO2, headerIndex)
1177      ZTM  = obs_headElem_r(obsdat, OBS_ZTM,  headerIndex)
1178      ZTGM = obs_headElem_r(obsdat, OBS_ZTGM, headerIndex)
1179      ZLQM = obs_headElem_r(obsdat, OBS_ZLQM, headerIndex)
1180      ZPS  = obs_headElem_r(obsdat, OBS_ZPS,  headerIndex)
1181
1182      call fSQL_bind_param(stmt, param_index = 1, int_var  = obsIdo)
1183      call fSQL_bind_param(stmt, param_index = 2, real_var = ETOP)
1184      call fSQL_bind_param(stmt, param_index = 3, real_var = VTOP)
1185      call fSQL_bind_param(stmt, param_index = 4, real_var = ECF)
1186      call fSQL_bind_param(stmt, param_index = 5, real_var = VCF)
1187      call fSQL_bind_param(stmt, param_index = 6, real_var = HE)
1188      call fSQL_bind_param(stmt, param_index = 7, real_var = ZTSR)
1189      call fSQL_bind_param(stmt, param_index = 8, int_var  = NCO2)
1190      call fSQL_bind_param(stmt, param_index = 9, real_var = ZTM)
1191      call fSQL_bind_param(stmt, param_index = 10,real_var = ZTGM)
1192      call fSQL_bind_param(stmt, param_index = 11,real_var = ZLQM)
1193      call fSQL_bind_param(stmt, param_index = 12,real_var = ZPS)
1194
1195      call fSQL_exec_stmt (stmt)
1196      numberInsert=numberInsert +1
1197    end do HEADER
1198
1199    call fSQL_finalize(stmt)
1200    call fSQL_commit(db)
1201    write(*,'(a,i8)') 'sqlr_addCloudParametersandEmissivity: NUMBER OF INSERTIONS ----> ', numberInsert
1202
1203  end subroutine sqlr_addCloudParametersandEmissivity
1204
1205  !--------------------------------------------------------------------------
1206  ! sqlr_insertSqlite
1207  !--------------------------------------------------------------------------
1208  subroutine sqlr_insertSqlite(db, obsdat, familyType, fileName, fileNumber)
1209    !
1210    !:Purpose: Insert rows in the sqlite file data table for bufr element IDs
1211    !          specified in the namelist block `namSQLInsert`.
1212    !
1213    implicit none
1214
1215    ! Arguments:
1216    type(fSQL_DATABASE), intent(inout) :: db   ! type for SQLIte  file handle
1217    type(struct_obs),    intent(in)    :: obsdat
1218    character(len=*),    intent(in)    :: familyType
1219    character(len=*),    intent(in)    :: fileName
1220    integer         ,    intent(in)    :: fileNumber
1221
1222    ! Locals:
1223    type(fSQL_STATEMENT)   :: stmt ! type for precompiled SQLite statements
1224    type(fSQL_STATUS)      :: stat !type for error status
1225    integer                :: obsVarno, obsFlag, vertCoordType, fnom, fclos, nulnam, ierr 
1226    real                   :: obsValue, OMA, OMP, OER, FGE, PPP
1227    integer                :: numberInsert, headerIndex, bodyIndex, numHeader, itemIndex
1228    integer                :: obsNlv, obsRln, obsIdf, insertItem
1229    integer(8)             :: bodyPrimaryKey, headPrimaryKey
1230    character(len = 256)   :: query
1231    logical                :: llok    
1232    integer, parameter     :: maxNumberInsertItems = 15
1233
1234    ! Namelist variables
1235    integer :: itemInsertList(maxNumberInsertItems) ! List of bufr element ids to insert in sql file data table
1236    integer :: numberInsertItems                    ! MUST NOT BE INCLUDED IN NAMELIST!
1237
1238    namelist/namSQLInsert/ numberInsertItems, itemInsertList
1239
1240    write(*,*)  'sqlr_insertSqlite: --- Starting ---   '
1241    numHeader = obs_numHeader(obsdat)
1242    write(*,*) ' FAMILY ---> ', trim(familyType), '  headerIndex  ----> ', numHeader
1243    write(*,*) ' fileName -> ', trim(fileName)
1244
1245    ! set default values of namelist variables
1246    numberInsertItems = MPC_missingValue_INT
1247    itemInsertList(:) = MPC_missingValue_INT
1248
1249    nulnam = 0
1250    ierr=fnom(nulnam, './flnml', 'FTN+SEQ+R/O', 0)
1251    read(nulnam, nml = namSQLInsert, iostat = ierr)
1252    if (ierr /= 0) call utl_abort('sqlr_insertSqlite: Error reading namelist')
1253    if (mmpi_myid == 0) write(*, nml = namSQLInsert)
1254    ierr=fclos(nulnam)
1255    if (numberInsertItems /= MPC_missingValue_INT) then
1256      call utl_abort('sqlr_insertSqlite: check namSQLInsert namelist section, you need to remove numberInsertItems')
1257    end if
1258    numberInsertItems = 0
1259    do itemIndex = 1, maxNumberInsertItems
1260      if ( itemInsertList(itemIndex) == MPC_missingValue_INT) exit
1261      numberInsertItems = numberInsertItems + 1
1262    end do
1263    
1264    write(*,*) ' INSERT INTO SQLITE FILE ELEMENTS :--> ',(itemInsertList(insertItem), insertItem = 1, numberInsertItems)
1265
1266    select case(trim(familyType))
1267
1268      case('SF', 'SC', 'GP')
1269
1270        query = 'insert into data (id_obs,varno,vcoord,obsvalue,flag,oma,omp,fg_error,obs_error) values(?,?,?,?,?,?,?,?,?);'
1271
1272      case DEFAULT
1273
1274        query = 'insert into data (id_obs,varno,vcoord,vcoord_type,obsvalue,flag,oma,omp,fg_error,obs_error) values(?,?,?,?,?,?,?,?,?,?);'
1275
1276    end select
1277
1278    query=trim(query)
1279    write(*,*) ' === Family Type === ',trim(familyType)
1280    write(*,*) ' Insert query = ', trim(query)
1281
1282    call fSQL_begin(db)
1283    call fSQL_prepare(db, query, stmt, stat)
1284    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'fSQL_prepare : ')
1285
1286    numberInsert=0
1287    HEADER: do headerIndex = 1, obs_numHeader(obsdat)
1288
1289      obsIdf = obs_headElem_i(obsdat, OBS_IDF, headerIndex)
1290      if (obsIdf /= fileNumber) cycle HEADER
1291      headPrimaryKey = obs_headPrimaryKey(obsdat, headerIndex)
1292      obsRln = obs_headElem_i(obsdat, OBS_RLN, headerIndex)
1293      obsNlv = obs_headElem_i(obsdat, OBS_NLV, headerIndex)
1294
1295      BODY: do bodyIndex = obsRln, obsNlv + obsRln -1
1296
1297        bodyPrimaryKey= obs_bodyPrimaryKey(obsdat, bodyIndex)
1298        obsVarno      = obs_bodyElem_i(obsdat, OBS_VNM , bodyIndex)
1299        obsFlag       = obs_bodyElem_i(obsdat, OBS_FLG , bodyIndex)
1300        vertCoordType = obs_bodyElem_i(obsdat, OBS_VCO , bodyIndex)
1301        obsValue      = obs_bodyElem_r(obsdat, OBS_VAR , bodyIndex)
1302        OMA           = obs_bodyElem_r(obsdat, OBS_OMA , bodyIndex)
1303        OMP           = obs_bodyElem_r(obsdat, OBS_OMP , bodyIndex)
1304        OER           = obs_bodyElem_r(obsdat, OBS_OER , bodyIndex)
1305        FGE           = obs_bodyElem_r(obsdat, OBS_HPHT, bodyIndex)
1306        PPP           = obs_bodyElem_r(obsdat, OBS_PPP , bodyIndex)
1307
1308        llok = .false.
1309        do insertItem = 1, numberInsertItems
1310          if (obsVarno == itemInsertList(insertItem)) llok=.true.
1311        end do
1312
1313        if (bodyPrimaryKey == -1 .and. llok) then
1314          select case(trim(familyType))
1315            case('SF', 'SC', 'GP')
1316              call fSQL_bind_param(stmt, param_index = 1, int8_var = headPrimaryKey)
1317              call fSQL_bind_param(stmt, param_index = 2, int_var  = obsVarno)
1318              call fSQL_bind_param(stmt, param_index = 3, real_var = PPP)
1319              if (obsValue == obs_missingValue_R) then          ! sql null values
1320                call fSQL_bind_param(stmt, param_index = 4)
1321              else
1322                call fSQL_bind_param(stmt, param_index = 4, real_var = obsValue)
1323              end if
1324              call fSQL_bind_param(stmt, param_index = 5, int_var  = obsFlag)
1325              if (OMA == obs_missingValue_R) then
1326                call fSQL_bind_param(stmt, param_index = 6) 
1327              else 
1328                call fSQL_bind_param(stmt, param_index = 6, real_var = OMA) 
1329              end if
1330              if (OMP == obs_missingValue_R) then
1331                call fSQL_bind_param(stmt, param_index = 7) 
1332              else
1333                call fSQL_bind_param(stmt, param_index = 7, real_var = OMP) 
1334              end if
1335              if (FGE == obs_missingValue_R) then
1336                call fSQL_bind_param(stmt, param_index = 8) 
1337              else
1338                call fSQL_bind_param(stmt, param_index = 8, real_var = FGE)
1339              end if
1340              if (OER == obs_missingValue_R) then
1341                call fSQL_bind_param(stmt, param_index = 9) 
1342              else
1343                call fSQL_bind_param(stmt, param_index = 9, real_var = OER) 
1344              end if
1345            case DEFAULT
1346              call fSQL_bind_param(stmt, param_index = 1, int8_var = headPrimaryKey)
1347              call fSQL_bind_param(stmt, param_index = 2, int_var  = obsVarno)
1348              call fSQL_bind_param(stmt, param_index = 3, real_var = PPP) 
1349              call fSQL_bind_param(stmt, param_index = 4, int_var  = vertCoordType)
1350              if (obsValue == obs_missingValue_R) then
1351                call fSQL_bind_param(stmt, param_index = 5)
1352              else
1353                call fSQL_bind_param(stmt, param_index = 5, real_var = obsValue)
1354              end if
1355              call fSQL_bind_param(stmt, param_index = 6, int_var  = obsFlag)
1356              if (OMA == obs_missingValue_R) then
1357                call fSQL_bind_param(stmt, param_index = 7) 
1358              else 
1359                call fSQL_bind_param(stmt, param_index = 7, real_var = OMA)
1360              end if
1361              if (OMP == obs_missingValue_R) then
1362                call fSQL_bind_param(stmt, param_index = 8) 
1363              else
1364                call fSQL_bind_param(stmt, param_index = 8, real_var = OMP) 
1365              end if
1366              if (FGE == obs_missingValue_R) then
1367                call fSQL_bind_param(stmt, param_index = 9) 
1368              else
1369                call fSQL_bind_param(stmt, param_index = 9, real_var = FGE) 
1370              end if
1371              if (OER == obs_missingValue_R) then
1372                call fSQL_bind_param(stmt, param_index = 10) 
1373              else
1374                call fSQL_bind_param(stmt, param_index = 10, real_var = OER)
1375              end if
1376          end select
1377          call fSQL_exec_stmt (stmt)
1378
1379          numberInsert = numberInsert + 1
1380        end if
1381
1382      end do BODY
1383
1384    end do HEADER
1385
1386    call fSQL_finalize(stmt)
1387    call fSQL_commit(db)
1388    write(*,'(3a,i8)') 'sqlr_insertSqlite: FAMILY ---> ' ,trim(familyType), &
1389         '  NUMBER OF INSERTIONS ----> ', numberInsert
1390
1391  end subroutine sqlr_insertSqlite
1392
1393  !--------------------------------------------------------------------------
1394  ! sqlr_cleanSqlite
1395  !--------------------------------------------------------------------------
1396  subroutine sqlr_cleanSqlite(db, fileName)
1397    !
1398    !:Purpose: Remove flagged (bit 11 set) observations in an SQLite file
1399    !
1400    implicit none
1401
1402    ! Arguments:
1403    type(fSQL_DATABASE), intent(inout) :: db   ! SQLite file handle
1404    character(len=*),    intent(in)    :: fileName
1405
1406    ! Locals:
1407    character(len = 128) :: query
1408    type(fSQL_STATEMENT) :: statement ! prepared statement for SQLite
1409    type(fSQL_STATUS)    :: status
1410
1411    call fSQL_open(db, fileName, status)
1412    if (fSQL_error(status) /= FSQL_OK) then
1413      write(*,*) 'sqlr_cleanSqlite: ERROR: ', fSQL_errmsg(status)
1414    end if
1415    ! Mark for deletion all records with bit 11 (2048) set
1416    query = ' delete from data where flag & 2048;'
1417    call fSQL_prepare(db, query, statement, status)
1418    if (fSQL_error(status) /= FSQL_OK) &
1419      call sqlu_handleError(status, 'thinning fSQL_prepare : ')
1420    call fSQL_begin(db)
1421    call fSQL_exec_stmt(statement)
1422    call fSQL_finalize(statement)
1423    call fSQL_commit(db)
1424    write(*,*) 'sqlr_cleanSqlite: closed database -->', trim(FileName)
1425    call fSQL_close(db, status)
1426
1427  end subroutine sqlr_cleanSqlite
1428
1429  !--------------------------------------------------------------------------
1430  ! sqlr_writePseudoSSTobs
1431  !--------------------------------------------------------------------------
1432  subroutine sqlr_writePseudoSSTobs(obsData, obsFamily, instrumentFileName)
1433    !
1434    !:Purpose: To write the obsSpaceData content into SQLite format files
1435    !
1436    implicit none
1437
1438    ! Arguments:
1439    type(struct_obs) , intent(inout) :: obsData
1440    character(len=*) , intent(in)    :: obsFamily
1441    character(len=*) , intent(in)    :: instrumentFileName
1442            
1443    ! Locals:
1444    type(fSQL_DATABASE)    :: db                        ! type for SQLIte  file handle
1445    type(fSQL_STATEMENT)   :: stmtData, stmtHeader      ! type for precompiled SQLite statements
1446    type(fSQL_STATUS)      :: stat                      ! type for error status
1447    integer                :: obsVarno, obsFlag, ASS, codeType, date, time, idObs, idData
1448    integer, parameter     :: obsStatus = 3072 
1449    real                   :: obsValue, OMA, OMP, OER, FGE, PPP, lon, lat, altitude
1450    integer                :: numberInsertions, numHeaders, headerIndex, bodyIndex, obsNlv, obsRln
1451    character(len = 512)   :: queryData, queryHeader, queryCreate
1452    character(len = 12)    :: idStation
1453    character(len=256)     :: fileName, fileNameDir
1454    character(len=4)       :: cmyidx, cmyidy
1455        
1456    write(*,*) 'sqlr_writePseudoSSTobs: starting...'
1457     
1458    ! determine initial idData,idObs to ensure unique values across mpi tasks
1459    call sqlu_getInitialIdObsData(obsData, obsFamily, idObs, idData)
1460
1461    ! return if this mpi task does not have any observations of this family
1462    if (trim(instrumentFileName) == 'XXXXX') return
1463
1464    ! check if any obs exist for this file, if not return
1465    numHeaders = 0
1466    call obs_set_current_header_list(obsData, obsFamily)
1467    HEADERCOUNT: do
1468      headerIndex = obs_getHeaderIndex(obsData)
1469      if (headerIndex < 0) exit HEADERCOUNT
1470      numHeaders = numHeaders + 1
1471    end do HEADERCOUNT
1472    if (numHeaders == 0) return
1473
1474    fileNameDir = trim(ram_getRamDiskDir())
1475    if (fileNameDir == ' ') then
1476      write(*,*) 'sqlr_writePseudoSSTobs: WARNING! The program may be slow creating many sqlite files in the same directory.'
1477      write(*,*) 'sqlr_writePseudoSSTobs: WARNING! Please, use the ram disk option prior to MIDAS run!'
1478    end if  
1479
1480    if (obs_mpiLocal(obsData)) then
1481      write(cmyidy,'(I4.4)') (mmpi_myidy + 1)
1482      write(cmyidx,'(I4.4)') (mmpi_myidx + 1)
1483      fileName = trim(fileNameDir)//'obs/'//trim(instrumentFileName)//'_'//trim(cmyidx)//'_'//trim(cmyidy)
1484    else
1485      if (mmpi_myid > 0) return
1486      fileName = trim(fileNameDir)//'obs/'//trim(instrumentFileName)
1487    end if
1488    
1489
1490    write(*,*) 'sqlr_writePseudoSSTobs: Creating file: ', trim(fileName)
1491    call fSQL_open(db, fileName, stat)
1492    if (fSQL_error(stat) /= FSQL_OK) write(*,*) 'sqlr_writePseudoSSTobs: fSQL_open: ', fSQL_errmsg(stat),' filename: '//trim(fileName)
1493
1494    ! Create the tables HEADER and DATA
1495    queryCreate = 'create table header (id_obs integer primary key, id_stn varchar(50), lat real, lon real, &
1496                  &codtyp integer, date integer, time integer, elev real, status integer); &
1497                  &create table data (id_data integer primary key, id_obs integer, varno integer, vcoord real, &
1498                  &vcoord_type integer, obsvalue real, flag integer, oma real, ompt real, oma0 real, omp real, &
1499                  &an_error real, fg_error real, obs_error real);'
1500    
1501    call fSQL_do_many(db, queryCreate, stat)
1502    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'sqlr_writePseudoSSTobs: fSQL_do_many with query: '//trim(queryCreate))
1503    
1504    queryHeader = ' insert into header (id_obs, id_stn, lat, lon, date, time, codtyp, elev, status) values(?,?,?,?,?,?,?,?,?); '
1505    queryData = 'insert into data (id_data, id_obs, varno, vcoord, vcoord_type, obsvalue, flag, oma, oma0, ompt, fg_error, &
1506                &obs_error) values(?,?,?,?,?,?,?,?,?,?,?,?);'
1507
1508    write(*,*) 'sqlr_writePseudoSSTobs: Insert query Data   = ', trim(queryData)
1509    write(*,*) 'sqlr_writePseudoSSTobs: Insert query Header = ', trim(queryHeader)
1510
1511    call fSQL_begin(db)
1512    call fSQL_prepare(db, queryData, stmtData, stat)
1513    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'sqlr_writePseudoSSTobs: fSQL_prepare:')
1514    call fSQL_prepare(db, queryHeader, stmtHeader, stat)
1515    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'sqlr_writePseudoSSTobs: fSQL_prepare:')
1516     
1517    numberInsertions = 0
1518
1519    call obs_set_current_header_list(obsData, obsFamily)
1520    HEADER: do
1521
1522      headerIndex = obs_getHeaderIndex(obsData)
1523      if (headerIndex < 0) exit HEADER
1524
1525      codeType  = obs_headElem_i(obsData, OBS_ITY, headerIndex)
1526      obsRln    = obs_headElem_i(obsData, OBS_RLN, headerIndex)
1527      obsNlv    = obs_headElem_i(obsData, OBS_NLV, headerIndex)
1528      idStation = obs_elem_c    (obsData, 'STID' , headerIndex)
1529      altitude  = obs_headElem_r(obsData, OBS_ALT, headerIndex)
1530      lon       = obs_headElem_r(obsData, OBS_LON, headerIndex) * MPC_DEGREES_PER_RADIAN_R8
1531      lat       = obs_headElem_r(obsData, OBS_LAT, headerIndex) * MPC_DEGREES_PER_RADIAN_R8
1532      if (lon > 180.) lon = lon - 360.
1533      date      = obs_headElem_i(obsData, OBS_DAT, headerIndex)
1534      time      = obs_headElem_i(obsData, OBS_ETM, headerIndex) * 100.
1535
1536      idObs = idObs + 1
1537      call fSQL_bind_param(stmtHeader, param_index = 1, int_var  = idObs)
1538      call fSQL_bind_param(stmtHeader, param_index = 2, char_var = idStation)
1539      call fSQL_bind_param(stmtHeader, param_index = 3, real_var = lat) 
1540      call fSQL_bind_param(stmtHeader, param_index = 4, real_var = lon) 
1541      call fSQL_bind_param(stmtHeader, param_index = 5, int_var  = date) 
1542      call fSQL_bind_param(stmtHeader, param_index = 6, int_var  = time) 
1543      call fSQL_bind_param(stmtHeader, param_index = 7, int_var  = codeType) 
1544      call fSQL_bind_param(stmtHeader, param_index = 8, real_var = altitude)
1545      call fSQL_bind_param(stmtHeader, param_index = 9, int_var = obsStatus)
1546      call fSQL_exec_stmt (stmtHeader)
1547
1548      BODY: do bodyIndex = obsRln, obsNlv + obsRln -1
1549         
1550        obsVarno      = obs_bodyElem_i(obsData, OBS_VNM , bodyIndex)
1551        obsFlag       = obs_bodyElem_i(obsData, OBS_FLG , bodyIndex)
1552        obsValue      = obs_bodyElem_r(obsData, OBS_VAR , bodyIndex)
1553        OMA           = obs_bodyElem_r(obsData, OBS_OMA , bodyIndex)
1554        OMP           = obs_bodyElem_r(obsData, OBS_OMP , bodyIndex)
1555        OER           = obs_bodyElem_r(obsData, OBS_OER , bodyIndex)
1556        FGE           = obs_bodyElem_r(obsData, OBS_HPHT, bodyIndex)
1557        PPP           = obs_bodyElem_r(obsData, OBS_PPP , bodyIndex)
1558        ASS           = obs_bodyElem_i(obsData, OBS_ASS , bodyIndex)
1559
1560        ! insert order: id_obs,varno,vcoord,vcoord_type,obsvalue,flag,oma,oma0,ompt,fg_error,obs_error
1561        idData = idData + 1
1562        call fSQL_bind_param(stmtData, param_index = 1, int_var  = idData)
1563        call fSQL_bind_param(stmtData, param_index = 2, int_var  = idObs)
1564        call fSQL_bind_param(stmtData, param_index = 3, int_var  = obsVarno)
1565        call fSQL_bind_param(stmtData, param_index = 4, real_var = PPP)
1566        call fSQL_bind_param(stmtData, param_index = 5) 
1567        call fSQL_bind_param(stmtData, param_index = 6, real_var = obsValue) 
1568        call fSQL_bind_param(stmtData, param_index = 7, int_var  = obsFlag)
1569        if (OMA == obs_missingValue_R) then
1570          call fSQL_bind_param(stmtData, param_index = 8) 
1571          call fSQL_bind_param(stmtData, param_index = 9) 
1572        else
1573          call fSQL_bind_param(stmtData, param_index = 8, real_var = OMA)
1574          call fSQL_bind_param(stmtData, param_index = 9, real_var = OMA)
1575        end if
1576        if (OMP == obs_missingValue_R) then
1577          call fSQL_bind_param(stmtData, param_index = 10) 
1578        else
1579          call fSQL_bind_param(stmtData, param_index = 10, real_var = OMP)
1580        end if
1581        if (FGE == obs_missingValue_R) then
1582          call fSQL_bind_param(stmtData, param_index = 11) 
1583        else
1584          call fSQL_bind_param(stmtData, param_index = 11, real_var = FGE)
1585        end if
1586        if (OER == obs_missingValue_R) then
1587          call fSQL_bind_param(stmtData, param_index = 12) 
1588        else
1589          call fSQL_bind_param(stmtData, param_index = 12, real_var = OER)
1590        end if 
1591        call fSQL_exec_stmt (stmtData)
1592
1593        numberInsertions = numberInsertions + 1
1594
1595      end do BODY
1596     
1597    end do HEADER
1598    
1599    call fSQL_finalize(stmtData)
1600
1601    write(*,*) 'sqlr_writePseudoSSTobs: Observation Family: ', obsFamily, &
1602               ', number of insertions: ', numberInsertions
1603
1604    call fSQL_commit(db)
1605    call fSQL_close(db, stat)
1606
1607  end subroutine sqlr_writePseudoSSTobs
1608  
1609  !--------------------------------------------------------------------------
1610  ! sqlr_writeEmptyPseudoSSTobsFile
1611  !--------------------------------------------------------------------------
1612  subroutine sqlr_writeEmptyPseudoSSTobsFile(obsData, obsFamily, instrumentFileName)
1613    !
1614    !:Purpose: to generate an empty SQLite SST pseudo obs file for mpi tasks,
1615    !           with no sea-ice on them.
1616    !
1617    implicit none
1618
1619    ! Arguments:
1620    type(struct_obs) , intent(inout) :: obsData   
1621    character(len=*) , intent(in)    :: obsFamily
1622    character(len=*) , intent(in)    :: instrumentFileName
1623            
1624    ! Locals:
1625    type(fSQL_DATABASE)    :: db                        ! type for SQLIte  file handle
1626    type(fSQL_STATEMENT)   :: stmtData, stmtHeader      ! type for precompiled SQLite statements
1627    type(fSQL_STATUS)      :: stat                      ! type for error status
1628    character(len = 512)   :: queryCreate
1629    character(len = 512)   :: queryHeader, queryData  
1630    integer                :: idObs, idData
1631    character(len=30)      :: fileNameExtention
1632    character(len=256)     :: fileName, fileNameDir
1633    character(len=4)       :: cmyidx, cmyidy
1634        
1635    ! determine initial idData,idObs to ensure unique values across mpi tasks
1636    call sqlu_getInitialIdObsData(obsData, obsFamily, idObs, idData)
1637    
1638    fileNameDir = trim(ram_getRamDiskDir())
1639    if (fileNameDir == ' ') &
1640    write(*,*) 'sqlr_writeEmptyPseudoSSTobsFile: WARNING! The program may be slow creating many sqlite files in the same directory.'
1641    write(*,*) 'sqlr_writeEmptyPseudoSSTobsFile: WARNING! Please, use the ram disk option prior to MIDAS run!'
1642
1643    if (obs_mpiLocal(obsData)) then
1644      write(cmyidy,'(I4.4)') (mmpi_myidy + 1)
1645      write(cmyidx,'(I4.4)') (mmpi_myidx + 1)
1646      fileNameExtention  = trim(cmyidx) // '_' // trim(cmyidy)
1647    else
1648      if (mmpi_myid > 0) return
1649      fileNameExtention = ' '
1650    end if
1651    
1652    fileName = trim(fileNameDir) // 'obs/' // trim(instrumentFileName) // '_' // trim(fileNameExtention)
1653
1654    write(*,*) 'sqlr_writeEmptyPseudoSSTobsFile: Creating file: ', trim(fileName)
1655    call fSQL_open(db, fileName, stat)
1656    if (fSQL_error(stat) /= FSQL_OK) write(*,*) 'sqlr_writeEmptyPseudoSSTobsFile: fSQL_open: ', fSQL_errmsg(stat),' filename: '//trim(fileName)
1657
1658    ! Create the tables HEADER and DATA
1659    queryCreate = 'create table header (id_obs integer primary key, id_stn varchar(50), lat real, lon real, &
1660                  &codtyp integer, date integer, time integer, elev real, status integer); &
1661                  &create table data (id_data integer primary key, id_obs integer, varno integer, vcoord real, &
1662                  &vcoord_type integer, obsvalue real, flag integer, oma real, ompt real, oma0 real, omp real, &
1663                  &an_error real, fg_error real, obs_error real);'
1664    
1665    call fSQL_do_many(db, queryCreate, stat)
1666    if (fSQL_error(stat) /= FSQL_OK) then
1667      call sqlu_handleError(stat, 'sqlr_writeEmptyPseudoSSTobsFile: fSQL_do_many with query: '//trim(queryCreate))
1668    end if
1669    
1670    queryHeader = ' insert into header (id_obs, id_stn, lat, lon, date, time, codtyp, elev, status) values(?,?,?,?,?,?,?,?,?); '
1671    queryData = 'insert into data (id_data, id_obs, varno, vcoord, vcoord_type, obsvalue, flag, oma, oma0, ompt, fg_error, &
1672                &obs_error) values(?,?,?,?,?,?,?,?,?,?,?,?);'
1673
1674    write(*,*) 'sqlr_writeEmptyPseudoSSTobsFile: Insert query Data   = ', trim(queryData)
1675    write(*,*) 'sqlr_writeEmptyPseudoSSTobsFile: Insert query Header = ', trim(queryHeader)
1676
1677    call fSQL_begin(db)
1678    call fSQL_prepare(db, queryData, stmtData, stat)
1679    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'sqlr_writeEmptyPseudoSSTobsFile: fSQL_prepare:')
1680    call fSQL_prepare(db, queryHeader, stmtHeader, stat)
1681    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'sqlr_writeEmptyPseudoSSTobsFile: fSQL_prepare:')
1682
1683    call fSQL_commit(db)
1684    call fSQL_close(db, stat)
1685
1686  end subroutine sqlr_writeEmptyPseudoSSTobsFile
1687
1688  !--------------------------------------------------------------------------
1689  ! sqlr_getColumnValuesDate
1690  !--------------------------------------------------------------------------
1691  subroutine sqlr_getColumnValuesDate(columnDateValues, columnTimeValues, fileName)
1692    !
1693    !:Purpose: Read the date and time column values from sqlite file.
1694    !
1695    implicit none
1696
1697    ! Arguments:
1698    integer, allocatable, intent(out) :: columnDateValues(:)
1699    integer, allocatable, intent(out) :: columnTimeValues(:)
1700    character(len=*),     intent(in)  :: fileName
1701
1702    ! Locals:
1703    integer              :: numRows, numColumns, rowIndex
1704    character(len=20), allocatable :: columnValuesStr(:,:)
1705    character(len=3000)  :: query
1706    type(fSQL_STATUS)    :: stat ! sqlite error status
1707    type(fSQL_DATABASE)  :: db   ! sqlite file handle
1708    type(fSQL_STATEMENT) :: stmt ! precompiled sqlite statements
1709
1710    ! open the sqlite file
1711    call fSQL_open( db, trim(fileName), status=stat )
1712    if ( fSQL_error(stat) /= FSQL_OK ) then
1713      write(*,*) 'sqlr_getColumnValuesDate: fSQL_open: ', fSQL_errmsg(stat)
1714      call utl_abort( 'sqlr_getColumnValuesDate: fSQL_open' )
1715    end if
1716
1717    ! Get the date and time
1718
1719    ! build the sqlite query
1720    query = 'select date, time from header;'
1721    write(*,*) 'sqlr_getColumnValuesDate: query ---> ', trim(query)
1722
1723    ! read the values from the file
1724    call fSQL_prepare( db, trim(query), stmt, status=stat )
1725    call fSQL_get_many( stmt, nrows=numRows, ncols=numColumns, &
1726                        mode=FSQL_CHAR, status=stat )
1727    if ( fSQL_error(stat) /= FSQL_OK ) then
1728      write(*,*) 'sqlr_getColumnValuesDate: fSQL_get_many: ', fSQL_errmsg(stat)
1729      call utl_abort('sqlr_getColumnValuesDate: problem with fSQL_get_many')
1730    end if
1731    write(*,*) 'sqlr_getColumnValuesDate: numRows = ', numRows, ', numColumns = ', numColumns
1732    allocate( columnValuesStr(numRows,2) )
1733    call fSQL_fill_matrix( stmt, columnValuesStr )
1734    allocate( columnDateValues(numRows) )
1735    allocate( columnTimeValues(numRows) )
1736    do rowIndex = 1, numRows
1737      read(columnValuesStr(rowIndex,1),*) columnDateValues(rowIndex)
1738      read(columnValuesStr(rowIndex,2),*) columnTimeValues(rowIndex)
1739      columnTimeValues(rowIndex) = columnTimeValues(rowIndex)/100
1740    end do
1741
1742    deallocate(columnValuesStr)
1743
1744    ! close the sqlite file
1745    call fSQL_free_mem( stmt )
1746    call fSQL_finalize( stmt )
1747    call fSQL_close( db, stat ) 
1748
1749  end subroutine sqlr_getColumnValuesDate
1750
1751end module sqliteRead_mod