obsDiagFiles_mod sourceΒΆ

  1module obsDiagFiles_mod
  2  ! MODULE obsDiagFiles_mod (prefix='diaf' category='3. Observation input/output')
  3  !
  4  !:Purpose:  To write the "diag" format SQLITE observation files. Data is stored in 
  5  !           obsSpaceData object.
  6  !
  7  use obsSpaceData_mod
  8  use midasMpi_mod
  9  use fSQLite
 10  use mathPhysConstants_mod
 11  use utilities_mod
 12  use ramDisk_mod
 13  use tovsNL_mod
 14  use rttov_const, only : ninst
 15  use codtyp_mod
 16  use sqliteUtilities_mod
 17  use ensembleObservations_mod
 18
 19  implicit none
 20
 21  save
 22
 23  private
 24
 25  public :: diaf_writeAllSqlDiagFiles
 26
 27  contains
 28
 29  !--------------------------------------------------------------------------
 30  ! diaf_writeAllSqlDiagFiles
 31  !--------------------------------------------------------------------------
 32  subroutine diaf_writeAllSqlDiagFiles(obsdat, sfFileName, onlyAssimObs, addFSOdiag, ensObs_opt)
 33    !
 34    !:Purpose: To prepare the writing of obsSpaceData content into SQLite format files
 35    !  
 36    implicit none
 37
 38    ! Arguments:
 39    type(struct_obs),           intent(inout) :: obsdat         ! obsSpaceData object
 40    character(len=*),           intent(in)    :: sfFileName     ! fileName acronym used for surface obs file
 41    logical         ,           intent(in)    :: onlyAssimObs   ! only write assimilated obs
 42    logical         ,           intent(in)    :: addFSOdiag     ! include FSO column in body table
 43    type(struct_eob), optional, intent(in)    :: ensObs_opt     ! ensObs object
 44    
 45    ! Locals:
 46    integer                :: familyIndex, codeTypeIndex, fileIndex
 47    character(len=2)       :: obsFamilyList(50)
 48    integer                :: obsFamilyListSize
 49    integer                :: tovsAllCodeTypeListSize, tovsAllCodeTypeList(ninst)
 50    integer                :: tovsCodeTypeListSize, tovsCodeTypeList(10)
 51    integer                :: tovsFileNameListSize
 52    character(len=20)      :: tovsFileNameList(30)
 53    character(len=20)      :: fileName
 54
 55    ! ensure all mpi tasks have same list of common obs family names
 56    call diaf_getObsFamilyListMpiGlobal(obsdat, obsFamilyListSize, obsFamilyList)
 57
 58    ! get list of all possible tovs codetype values and unique list of corresponding filenames
 59    call tvs_getAllIdBurpTovs(tovsAllCodeTypeListSize, tovsAllCodeTypeList)
 60    write(*,*) 'tovsAllCodeTypeListSize = ', tovsAllCodeTypeListSize
 61    write(*,*) 'tovsAllCodeTypeList = ', tovsAllCodeTypeList(1:tovsAllCodeTypeListSize)
 62    
 63    tovsFileNameListSize = 0
 64    tovsFileNameList(:) = 'XXXXX'
 65    do codeTypeIndex = 1, tovsAllCodeTypeListSize
 66      fileName = diaf_getObsFileName('TO', codeType_opt=tovsAllCodeTypeList(codeTypeIndex))
 67      if (all(tovsFileNameList(:) /= fileName)) then
 68        tovsFileNameListSize = tovsFileNameListSize + 1
 69        tovsFileNameList(tovsFileNameListSize) = fileName
 70      end if
 71    end do
 72    write(*,*) 'tovsFileNameListSize = ', tovsFileNameListSize
 73    write(*,*) 'tovsFileNameList = ', tovsFileNameList(1:tovsFileNameListSize)
 74    
 75    do familyIndex = 1, obsFamilyListSize
 76
 77      write(*,*) 'diaf_writeAllSqlDiagFiles: Family = ', familyIndex, obsFamilyList(familyIndex)
 78
 79      if (obsFamilyList(familyIndex) == 'TO') then
 80
 81        do fileIndex = 1, tovsFileNameListSize
 82          fileName = tovsFileNameList(fileIndex)
 83          write(*,*) 'tovs filename = ', fileName
 84
 85          ! get list of codetypes associated with this filename
 86          tovsCodeTypeListSize = 0
 87          tovsCodeTypeList(:) = MPC_missingValue_INT
 88          do codeTypeIndex = 1, tovsAllCodeTypeListSize
 89            if (fileName == diaf_getObsFileName('TO', codeType_opt=tovsAllCodeTypeList(codeTypeIndex))) then
 90              tovsCodeTypeListSize = tovsCodeTypeListSize + 1
 91              tovsCodeTypeList(tovsCodeTypeListSize) = tovsAllCodeTypeList(codeTypeIndex)
 92            end if
 93          end do
 94
 95          write(*,*) 'tovsCodeTypeListSize = ', tovsCodeTypeListSize
 96          write(*,*) 'tovsCodeTypeList = ', tovsCodeTypeList(1:tovsCodeTypeListSize) 
 97          call diaf_writeSqlDiagFile(obsdat, 'TO', onlyAssimObs, addFSOdiag, &
 98                                     tovsFileNameList(fileIndex), &
 99                                     tovsCodeTypeList(1:tovsCodeTypeListSize), & 
100                                     ensObs_opt=ensObs_opt ) 
101        end do
102
103      else
104
105        fileName = diaf_getObsFileName(obsFamilyList(familyIndex), sfFileName_opt=sfFileName)
106        call diaf_writeSqlDiagFile(obsdat, obsFamilyList(familyIndex), &
107                                   onlyAssimObs, addFSOdiag, fileName, & 
108                                   ensObs_opt=ensObs_opt ) 
109
110      end if   
111      
112    end do
113
114  end subroutine diaf_writeAllSqlDiagFiles
115 
116  !--------------------------------------------------------------------------
117  ! diaf_writeSqlDiagFile
118  !--------------------------------------------------------------------------
119  subroutine diaf_writeSqlDiagFile(obsdat, obsFamily, onlyAssimObs, addFSOdiag, instrumentFileName, codeTypeList_opt, ensObs_opt)
120    !
121    !:Purpose: To write the obsSpaceData content into SQLite format files
122    !
123    implicit none
124
125    ! Arguments:
126    type(struct_obs)           , intent(inout) :: obsdat
127    character(len=*)           , intent(in)    :: obsFamily
128    logical                    , intent(in)    :: onlyAssimObs
129    logical                    , intent(in)    :: addFSOdiag
130    character(len=*)           , intent(in)    :: instrumentFileName
131    integer          , optional, intent(in)    :: codeTypeList_opt(:)
132    type(struct_eob) , optional, intent(in)    :: ensObs_opt 
133
134    ! Locals:
135    type(fSQL_DATABASE)    :: db                                   ! type for SQLIte  file handle
136    type(fSQL_STATEMENT)   :: stmtData, stmtHeader, stmtEnsObs     ! type for precompiled SQLite statements
137    type(fSQL_STATUS)      :: stat                                 ! type for error status
138    integer                :: obsVarno, obsFlag, ASS, vertCoordType, codeType, date, time, idObs, idData, memberIndex
139    real                   :: obsValue, OMA, OMP, OER, FGE, PPP, lon, lat, altitude, ENSOBSTRL, ENSOBSANL
140    real                   :: latData, lonData
141    real                   :: ensInnovStdDev, ensObsErrStdDev, zhad, fso
142    integer                :: numberInsertions, numHeaders, headerIndex, bodyIndex, obsNlv, obsRln
143    character(len = 512)   :: queryData, queryHeader, queryCreate, queryCreateEnsObs 
144    character(len = 12)    :: idStation
145    character(len=30)      :: fileNameExtention
146    character(len=256)     :: fileName, fileNameDir
147    character(len=4)       :: cmyidx, cmyidy
148    logical                :: writeHeader
149        
150    ! determine initial idData,idObs to ensure unique values across mpi tasks
151    call sqlu_getInitialIdObsData(obsDat, obsFamily, idObs, idData, codeTypeList_opt)
152
153    ! return if this mpi task does not have any observations of this family
154    if (trim(instrumentFileName) == 'XXXXX') return
155
156    ! check if any obs exist for this file, if not return
157    numHeaders = 0
158    call obs_set_current_header_list(obsdat, obsFamily)
159    HEADERCOUNT: do
160      headerIndex = obs_getHeaderIndex(obsdat)
161      if (headerIndex < 0) exit HEADERCOUNT
162      if (present(codeTypeList_opt)) then
163        codeType  = obs_headElem_i(obsdat, OBS_ITY, headerIndex)
164        if (all(codeTypeList_opt(:) /= codeType)) cycle HEADERCOUNT
165      end if
166      numHeaders = numHeaders + 1
167    end do HEADERCOUNT
168    if (numHeaders == 0) return
169
170    fileNameDir = trim(ram_getRamDiskDir())
171    if (fileNameDir == ' ') &
172    write(*,*) 'diaf_writeSqlDiagFile: WARNING! The program may be slow creating many sqlite files in the same directory.'
173    write(*,*) 'diaf_writeSqlDiagFile: WARNING! Please, use the ram disk option prior to MIDAS run!'
174
175    if (obs_mpiLocal(obsdat)) then
176      write(cmyidy,'(I4.4)') (mmpi_myidy + 1)
177      write(cmyidx,'(I4.4)') (mmpi_myidx + 1)
178      fileNameExtention  = trim(cmyidx) // '_' // trim(cmyidy)
179    else
180      if (mmpi_myid > 0) return
181      fileNameExtention = ' '
182    end if
183    
184    fileName = trim(fileNameDir) // 'obs/dia' // trim(instrumentFileName) // '_' // trim(fileNameExtention)
185
186    write(*,*) 'diaf_writeSqlDiagFile: Creating file: ', trim(fileName)
187    call fSQL_open(db, fileName, stat)
188    if (fSQL_error(stat) /= FSQL_OK) write(*,*) 'diaf_writeSqlDiagFile: fSQL_open: ', fSQL_errmsg(stat),' filename: '//trim(fileName)
189
190    ! Create the tables HEADER and DATA
191    if (addFSOdiag) then
192      queryCreate = 'create table header (id_obs integer primary key, id_stn varchar(50), lat real, lon real, &
193                    &codtyp integer, date integer, time integer, elev real); &
194                    &create table data (id_data integer primary key, id_obs integer, varno integer, vcoord real, &
195                    &vcoord_type integer, obsvalue real, flag integer, oma real, ompt real, oma0 real, omp real, &
196                    &an_error real, fg_error real, obs_error real, sigi real, sigo real, zhad real, latd real, lond real, &
197                    &fso real);'
198    else
199      queryCreate = 'create table header (id_obs integer primary key, id_stn varchar(50), lat real, lon real, &
200                    &codtyp integer, date integer, time integer, elev real); &
201                    &create table data (id_data integer primary key, id_obs integer, varno integer, vcoord real, &
202                    &vcoord_type integer, obsvalue real, flag integer, oma real, ompt real, oma0 real, omp real, &
203                    &an_error real, fg_error real, obs_error real, sigi real, sigo real, zhad real, latd real, lond real);'
204    end if
205
206    call fSQL_do_many(db, queryCreate, stat)
207    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'diaf_writeSqlDiagFile: fSQL_do_many with query: '//trim(queryCreate))
208    
209    ! If the analysis members in obs space are allocated, make table queries for trial members and analysis members
210    if ( present( ensObs_opt ) ) then
211      ! Create
212      queryCreateEnsObs = 'create table ensobs (id_data integer, id_obs integer, id_member integer, obstrl real, obsanl real);'
213      call fSQL_do_many(db, queryCreateEnsObs, stat)
214      if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'diaf_writeSqlDiagFile: fSQL_do_many with query: '//trim(queryCreateEnsObs))
215    end if
216    
217    if (addFSOdiag) then
218      queryData = 'insert into data (id_data, id_obs, varno, vcoord, vcoord_type, obsvalue, flag, oma, oma0, ompt, fg_error, &
219                   &obs_error, sigi, sigo, zhad, latd, lond, fso) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);'
220    else
221      queryData = 'insert into data (id_data, id_obs, varno, vcoord, vcoord_type, obsvalue, flag, oma, oma0, ompt, fg_error, &
222                  &obs_error, sigi, sigo, zhad, latd, lond) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);'
223    end if
224    
225    queryHeader = 'insert into header (id_obs, id_stn, lat, lon, date, time, codtyp, elev) values(?,?,?,?,?,?,?,?); '
226
227    write(*,*) 'diaf_writeSqlDiagFile: Insert query Data   = ', trim(queryData)
228    write(*,*) 'diaf_writeSqlDiagFile: Insert query Header = ', trim(queryHeader)
229
230    call fSQL_begin(db)
231    call fSQL_prepare(db, queryData, stmtData, stat)
232    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'diaf_writeSqlDiagFile: fSQL_prepare: ')
233    call fSQL_prepare(db, queryHeader, stmtHeader, stat)
234    if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'diaf_writeSqlDiagFile: fSQL_prepare: ')
235    
236    if ( present( ensObs_opt ) ) then
237      ! Insert
238      queryCreateEnsObs = 'insert into ensobs (id_data, id_obs, id_member, obstrl, obsanl) values(?,?,?,?,?);'
239      write(*,*) 'diaf_writeSqlDiagFile: Insert query EnsObs   = ', trim(queryCreateEnsObs)
240
241      call fSQL_prepare(db, queryCreateEnsObs, stmtEnsObs, stat)
242      
243      if (fSQL_error(stat) /= FSQL_OK) call sqlu_handleError(stat, 'diaf_writeSqlDiagFile: fSQL_prepare: ')
244    end if
245    
246    numberInsertions = 0
247    call obs_set_current_header_list(obsdat, obsFamily)
248    HEADER: do
249
250      headerIndex = obs_getHeaderIndex(obsdat)
251      if (headerIndex < 0) exit HEADER
252        
253      codeType  = obs_headElem_i(obsdat, OBS_ITY, headerIndex)
254      if (present(codeTypeList_opt)) then
255        if (all(codeTypeList_opt(:) /= codeType)) cycle HEADER
256      end if
257
258      obsRln    = obs_headElem_i(obsdat, OBS_RLN, headerIndex)
259      obsNlv    = obs_headElem_i(obsdat, OBS_NLV, headerIndex)
260      idStation = obs_elem_c    (obsdat, 'STID' , headerIndex) 
261      altitude  = obs_headElem_r(obsdat, OBS_ALT, headerIndex)      
262      lon       = obs_headElem_r(obsdat, OBS_LON, headerIndex) * MPC_DEGREES_PER_RADIAN_R8
263      lat       = obs_headElem_r(obsdat, OBS_LAT, headerIndex) * MPC_DEGREES_PER_RADIAN_R8
264      if (lon > 180.) lon = lon - 360.
265      date      = obs_headElem_i(obsdat, OBS_DAT, headerIndex)
266      time      = obs_headElem_i(obsdat, OBS_ETM, headerIndex) * 100.
267
268      ! check if at least one observation will be written, if not skip this header
269      if (onlyAssimObs) then
270        writeHeader = .false.
271        BODYCHECK: do bodyIndex = obsRln, obsNlv + obsRln -1
272          OMP = obs_bodyElem_r(obsdat, OBS_OMP , bodyIndex)
273          FGE = obs_bodyElem_r(obsdat, OBS_HPHT, bodyIndex)
274          ASS = obs_bodyElem_i(obsdat, OBS_ASS , bodyIndex)
275          if ((ASS == obs_assimilated) .and.     &
276              (OMP /= obs_missingValue_R) .and.  &
277              (FGE /= obs_missingValue_R)) writeHeader = .true.
278        end do BODYCHECK
279        if (.not. writeHeader) cycle HEADER
280      end if
281
282      idObs = idObs + 1
283      call fSQL_bind_param(stmtHeader, param_index = 1, int_var  = idObs)
284      call fSQL_bind_param(stmtHeader, param_index = 2, char_var = idStation)
285      call fSQL_bind_param(stmtHeader, param_index = 3, real_var = lat) 
286      call fSQL_bind_param(stmtHeader, param_index = 4, real_var = lon) 
287      call fSQL_bind_param(stmtHeader, param_index = 5, int_var  = date) 
288      call fSQL_bind_param(stmtHeader, param_index = 6, int_var  = time) 
289      call fSQL_bind_param(stmtHeader, param_index = 7, int_var  = codeType) 
290      call fSQL_bind_param(stmtHeader, param_index = 8, real_var = altitude)
291      call fSQL_exec_stmt (stmtHeader)
292
293      BODY: do bodyIndex = obsRln, obsNlv + obsRln -1
294         
295        obsVarno      = obs_bodyElem_i(obsdat, OBS_VNM , bodyIndex)
296        obsFlag       = obs_bodyElem_i(obsdat, OBS_FLG , bodyIndex)
297        vertCoordType = obs_bodyElem_i(obsdat, OBS_VCO , bodyIndex)
298        obsValue      = obs_bodyElem_r(obsdat, OBS_VAR , bodyIndex)
299        OMA           = obs_bodyElem_r(obsdat, OBS_OMA , bodyIndex)
300        OMP           = obs_bodyElem_r(obsdat, OBS_OMP , bodyIndex)
301        OER           = obs_bodyElem_r(obsdat, OBS_OER , bodyIndex)
302        FGE           = obs_bodyElem_r(obsdat, OBS_HPHT, bodyIndex)
303        PPP           = obs_bodyElem_r(obsdat, OBS_PPP , bodyIndex)
304        ASS           = obs_bodyElem_i(obsdat, OBS_ASS , bodyIndex)
305        latData       = obs_bodyElem_r(obsdat, OBS_LATD, bodyIndex)
306        lonData       = obs_bodyElem_r(obsdat, OBS_LOND, bodyIndex)
307
308        ! skip obs if it was not assimilated
309        if (onlyAssimObs) then
310          if ((ASS /= obs_assimilated) .or.     &
311              (OMP == obs_missingValue_R) .or.  &
312              (FGE == obs_missingValue_R)) cycle BODY
313        end if
314
315        if (obs_columnActive_RB(obsdat, OBS_SIGI)) then
316          ensInnovStdDev = obs_bodyElem_r(obsdat, OBS_SIGI, bodyIndex)
317        else
318          ensInnovStdDev = obs_missingValue_R
319        end if
320        if (obs_columnActive_RB(obsdat, OBS_SIGO)) then
321          ensObsErrStdDev = obs_bodyElem_r(obsdat, OBS_SIGO, bodyIndex)
322        else
323          ensObsErrStdDev = obs_missingValue_R
324        end if
325        if (obs_columnActive_RB(obsdat, OBS_ZHA)) then
326          zhad = obs_bodyElem_r(obsdat, OBS_ZHA, bodyIndex)
327        else
328          zhad = obs_missingValue_R
329        end if
330        if (addFSOdiag) then
331          if (obs_columnActive_RB(obsdat, OBS_FSO)) then
332            fso = obs_bodyElem_r(obsdat, OBS_FSO, bodyIndex)
333          else
334            fso = obs_missingValue_R
335          end if
336        end if
337
338        select case(obsFamily)
339          case ('UA', 'AI', 'SW')
340            if (vertCoordType == 2) vertCoordType = 7004
341          case ('RO')
342            vertCoordType = 7007
343          case ('RA')
344            vertCoordType = 7007
345          case ('PR')
346            vertCoordType = 7006
347            PPP = PPP - altitude
348          case ('TO')
349            vertCoordType = 5042
350            if(codeType == codtyp_get_codtyp('amsua') .or. &
351               codeType == codtyp_get_codtyp('amsub') .or. &
352               codeType == codtyp_get_codtyp('mhs')) vertCoordType = 2150
353          case ('SF', 'SC', 'GP')
354            vertCoordType = MPC_missingValue_INT 
355        end select
356
357        ! insert order: id_data, id_obs, varno, vcoord, vcoord_type, obsvalue, flag, oma, oma0, ompt, fg_error, obs_error, sigi, sigo, zhad, latd, lond, fso
358        idData = idData + 1
359        call fSQL_bind_param(stmtData, param_index = 1, int_var  = idData)
360        call fSQL_bind_param(stmtData, param_index = 2, int_var  = idObs)
361        call fSQL_bind_param(stmtData, param_index = 3, int_var  = obsVarno)
362        call fSQL_bind_param(stmtData, param_index = 4, real_var = PPP)
363        if (vertCoordType == MPC_missingValue_INT) then
364          call fSQL_bind_param(stmtData, param_index = 5) 
365        else
366          call fSQL_bind_param(stmtData, param_index = 5, int_var  = vertCoordType) 
367        end if
368        call fSQL_bind_param(stmtData, param_index = 6, real_var = obsValue) 
369        call fSQL_bind_param(stmtData, param_index = 7, int_var  = obsFlag)
370        if (OMA == obs_missingValue_R) then
371          call fSQL_bind_param(stmtData, param_index = 8) 
372          call fSQL_bind_param(stmtData, param_index = 9) 
373        else
374          call fSQL_bind_param(stmtData, param_index = 8, real_var = OMA)
375          call fSQL_bind_param(stmtData, param_index = 9, real_var = OMA)
376        end if
377        if (OMP == obs_missingValue_R) then
378          call fSQL_bind_param(stmtData, param_index = 10) 
379        else
380          call fSQL_bind_param(stmtData, param_index = 10, real_var = OMP)
381        end if
382        if (FGE == obs_missingValue_R) then
383          call fSQL_bind_param(stmtData, param_index = 11) 
384        else
385          call fSQL_bind_param(stmtData, param_index = 11, real_var = FGE)
386        end if
387        if (OER == obs_missingValue_R) then
388          call fSQL_bind_param(stmtData, param_index = 12) 
389        else
390          call fSQL_bind_param(stmtData, param_index = 12, real_var = OER)
391        end if 
392        if (ensInnovStdDev == obs_missingValue_R) then
393          call fSQL_bind_param(stmtData, param_index = 13) 
394        else
395          call fSQL_bind_param(stmtData, param_index = 13, real_var = ensInnovStdDev)
396        end if 
397        if (ensObsErrStdDev == obs_missingValue_R) then
398          call fSQL_bind_param(stmtData, param_index = 14) 
399        else
400          call fSQL_bind_param(stmtData, param_index = 14, real_var = ensObsErrStdDev)
401        end if 
402        if (zhad == obs_missingValue_R) then
403          call fSQL_bind_param(stmtData, param_index = 15) 
404        else
405          call fSQL_bind_param(stmtData, param_index = 15, real_var = zhad)
406        end if 
407        if (latData == obs_missingValue_R) then
408          call fSQL_bind_param(stmtData, param_index = 16) 
409        else
410          latData = latData * MPC_DEGREES_PER_RADIAN_R8
411          call fSQL_bind_param(stmtData, param_index = 16, real_var = latData)
412        end if 
413        if (lonData == obs_missingValue_R) then
414          call fSQL_bind_param(stmtData, param_index = 17) 
415        else
416          lonData = lonData * MPC_DEGREES_PER_RADIAN_R8
417          call fSQL_bind_param(stmtData, param_index = 17, real_var = lonData)
418        end if 
419        if (addFSOdiag) then
420          if (fso == obs_missingValue_R) then
421            call fSQL_bind_param(stmtData, param_index = 18)
422          else
423            call fSQL_bind_param(stmtData, param_index = 18, real_var = fso)
424          end if
425        end if
426
427        call fSQL_exec_stmt (stmtData)
428
429        numberInsertions = numberInsertions + 1
430
431        if ( present( ensObs_opt ) ) then
432          if ( .not. allocated(ensObs_opt%Yb_r4) ) then
433            call utl_abort('diaf_writeSqlDiagFile: ensObs%Yb_r4 must be allocated and it is not')
434          end if
435          ! Loop over members. insert order: id_data, id_obs, id_member, obstrl, obsanl
436          do memberIndex = 1, ensObs_opt%numMembers
437            ENSOBSTRL = ensObs_opt%Yb_r4(memberIndex,bodyIndex)
438            if (ensObs_opt%meanRemoved) then
439              ENSOBSTRL = ENSOBSTRL + ensObs_opt%meanYb(bodyIndex) ! Yb_r4 has mean removed, so add back
440            end if
441            call fSQL_bind_param(stmtEnsObs, param_index = 1, int_var  = idData)
442            call fSQL_bind_param(stmtEnsObs, param_index = 2, int_var  = idObs)
443            call fSQL_bind_param(stmtEnsObs, param_index = 3, int_var  = memberIndex)
444            call fSQL_bind_param(stmtEnsObs, param_index = 4, real_var = ENSOBSTRL)
445            if ( allocated(ensObs_opt%Ya_r4) ) then
446              ENSOBSANL = ensObs_opt%Ya_r4(memberIndex,bodyIndex)
447              call fSQL_bind_param(stmtEnsObs, param_index = 5, real_var = ENSOBSANL)
448            else
449              call fSQL_bind_param(stmtEnsObs, param_index = 5)
450            end if
451            call fSQL_exec_stmt (stmtEnsObs)
452          end do
453        end if  
454          
455      end do BODY
456     
457    end do HEADER
458    
459    call fSQL_finalize (stmtData)
460
461    write(*,*) 'diaf_writeSqlDiagFile: Observation Family: ', obsFamily,', number of insertions: ', numberInsertions
462
463    call fSQL_commit(db)
464    call fSQL_close(db, stat)
465
466  end subroutine diaf_writeSqlDiagFile
467
468  !--------------------------------------------------------------------------
469  ! diaf_getObsFileName
470  !--------------------------------------------------------------------------
471  function diaf_getObsFileName(obsFamily, sfFileName_opt, codetype_opt) result(fileName)
472    !
473    !:Purpose: Return the part of the observation file name associated
474    !           with the type of observation it contains.
475    !
476    implicit none
477
478    ! Arguments:
479    character(len=*)          , intent(in) :: obsFamily
480    character(len=*), optional, intent(in) :: sfFileName_opt ! fileName acronym used for surface obs file
481    integer,          optional, intent(in) :: codetype_opt
482    ! Result:
483    character(len=20)          :: fileName
484
485    if (obsFamily == 'TO') then
486      if (.not. present(codetype_opt)) then
487        call utl_abort('diaf_getObsFileName: codetype_opt must be specified for TO family')
488      end if
489
490      if (codtyp_get_name(codeType_opt) == 'radianceclear') then
491        fileName  = 'csr'
492      else if (codtyp_get_name(codeType_opt) == 'mhs' .or. codtyp_get_name(codeType_opt) == 'amsub') then
493        if (tvs_isInstrumAllskyHuAssim(tvs_getInstrumentId(codtyp_get_name(codeType_opt)))) then
494          fileName = 'to_amsub_allsky'
495        else
496          fileName = 'to_amsub'
497        end if
498      else if (codtyp_get_name(codeType_opt) == 'amsua') then
499        if (tvs_isInstrumAllskyTtAssim(tvs_getInstrumentId(codtyp_get_name(codeType_opt)))) then
500          fileName = 'to_amsua_allsky'
501        else
502          fileName = 'to_amsua'
503        end if
504      else if (codtyp_get_name(codeType_opt) == 'ssmi') then
505        fileName = 'ssmis'
506      else if (codtyp_get_name(codeType_opt) == 'crisfsr') then
507        fileName = 'cris'
508      else if (codtyp_get_name(codeType_opt) == 'atms') then
509        if (tvs_isInstrumAllskyTtAssim(tvs_getInstrumentId(codtyp_get_name(codeType_opt))) .or. &
510            tvs_isInstrumAllskyHuAssim(tvs_getInstrumentId(codtyp_get_name(codeType_opt)))) then
511          fileName = 'atms_allsky'
512        else
513          fileName = 'atms'        
514        end if
515      else
516        fileName = codtyp_get_name(codeType_opt)
517      end if
518    else
519      if (.not. present(sfFileName_opt)) then
520        call utl_abort('diaf_getObsFileName: sfFileName_opt must be specified')
521      end if
522      call up2low(obsFamily, fileName)
523      if (fileName == 'ra') fileName = 'radar'
524      if (fileName == 'sf') then
525        ! use either 'sf' or 'sfc' for filename with surface obs
526        fileName = sfFileName_opt
527      end if
528    end if
529
530  end function diaf_getObsFileName
531
532  !--------------------------------------------------------------------------
533  ! diaf_getObsFamilyListMpiGlobal
534  !--------------------------------------------------------------------------
535  subroutine diaf_getObsFamilyListMpiGlobal(obsdat, obsFamilyListSizeCommon,  &
536                                            obsFamilyListCommon)
537    !
538    !:Purpose: Obtain a common set of obs family names over all mpi tasks
539    !
540    implicit none
541      
542    ! Arguments:
543    type(struct_obs), intent(inout) :: obsdat
544    integer         , intent(out)   :: obsFamilyListSizeCommon
545    character(len=*), intent(out)   :: obsFamilyListCommon(:)
546
547    ! Locals:
548    integer                       :: headerIndex, familyIndex, charIndex, procIndex, nsize, ierr
549    integer                       :: obsFamilyListSizeMpiLocal, obsFamilyListSizeMaxMpiLocal, obsFamilyListSizeMax
550    character(len=2), allocatable :: obsFamilyListMpiLocal(:)
551    character(len=2), allocatable :: obsFamilyListMpiGlobal(:,:)
552    character(len=2)              :: currentObsFamily
553    integer, allocatable          :: intObsFamilyListMpiLocal(:,:)
554    integer, allocatable          :: intObsFamilyListMpiGlobal(:,:,:)
555    integer, allocatable          :: allObsFamilyListSizeMpiLocal(:)
556
557    obsFamilyListSizeMax = size(obsFamilyListCommon)
558    write(*,*) 'obsFamilyListSizeMax =', obsFamilyListSizeMax
559
560    ! get family list for this mpi task
561    obsFamilyListSizeMpiLocal = 0
562    allocate(obsFamilyListMpiLocal(obsFamilyListSizeMax))
563    obsFamilyListMpiLocal(:) = 'XX'
564    HEADER: do headerIndex = 1, obs_numHeader(obsdat)
565      currentObsFamily = obs_getFamily(obsdat, headerIndex_opt=headerIndex) 
566      if (any(obsFamilyListMpiLocal(:) == currentObsFamily)) cycle HEADER
567      obsFamilyListSizeMpiLocal = obsFamilyListSizeMpiLocal + 1
568      obsFamilyListMpiLocal(obsFamilyListSizeMpiLocal) = currentObsFamily
569      write(*,*) 'add the family: ', currentObsFamily
570    end do HEADER
571    write(*,*) 'obsFamilyListSizeMpiLocal =', obsFamilyListSizeMpiLocal
572    write(*,*) 'obsFamilyListMpiLocal = ', obsFamilyListMpiLocal(1:obsFamilyListSizeMpiLocal)
573
574    allocate(allObsFamilyListSizeMpiLocal(mmpi_nprocs))
575    call rpn_comm_allgather(obsFamilyListSizeMpiLocal,    1, 'mpi_integer',  &
576                            allObsFamilyListSizeMpiLocal, 1, 'mpi_integer', 'GRID', ierr)
577    call rpn_comm_allreduce(obsFamilyListSizeMpiLocal, obsFamilyListSizeMaxMpiLocal,1,'mpi_integer','mpi_max','GRID',ierr)
578
579    ! convert local family list from characters to integers
580    allocate(intObsFamilyListMpiLocal(len(currentObsFamily),obsFamilyListSizeMaxMpiLocal))
581    intObsFamilyListMpiLocal(:,:)=0
582    do familyIndex = 1, obsFamilyListSizeMpiLocal
583      do charIndex = 1, len(currentObsFamily)
584        intObsFamilyListMpiLocal(charIndex,familyIndex) =  &
585             iachar(obsFamilyListMpiLocal(familyIndex)(charIndex:charIndex))
586      end do
587    end do
588
589    ! communicate obs family list to all mpi tasks as integers
590    allocate(intObsFamilyListMpiGlobal(len(currentObsFamily),obsFamilyListSizeMaxMpiLocal,mmpi_nprocs))
591    nsize = size(intObsFamilyListMpiLocal)
592    call rpn_comm_allgather(intObsFamilyListMpiLocal,  nsize, 'mpi_integer',  &
593                            intObsFamilyListMpiGlobal, nsize, 'mpi_integer', 'GRID', ierr)
594
595    ! convert global family lists from integers to characters
596    allocate(obsFamilyListMpiGlobal(obsFamilyListSizeMaxMpiLocal,mmpi_nprocs))
597    obsFamilyListMpiGlobal(:,:) = 'XX'
598    do procIndex = 1, mmpi_nprocs
599      do familyIndex = 1, allObsFamilyListSizeMpiLocal(procIndex)
600        do charIndex=1,len(currentObsFamily)
601          obsFamilyListMpiGlobal(familyIndex,procIndex)(charIndex:charIndex) =  &
602               achar(intObsFamilyListMpiGlobal(charIndex,familyIndex,procIndex))
603        end do
604      end do
605      write(*,*) 'obsFamilyListMpiGlobal = ', procIndex,  &
606           obsFamilyListMpiGlobal(1:allObsFamilyListSizeMpiLocal(procIndex),procIndex)
607    end do
608
609    ! construct single common list of families to be used for all mpi tasks
610    obsFamilyListCommon(:) = 'YY'
611    obsFamilyListSizeCommon = 0
612    do procIndex = 1, mmpi_nprocs
613      FAMILY: do familyIndex = 1, obsFamilyListSizeMaxMpiLocal
614        if (obsFamilyListMpiGlobal(familyIndex,procIndex) == 'XX') cycle FAMILY
615        if (any(obsFamilyListCommon(:) == obsFamilyListMpiGlobal(familyIndex,procIndex))) cycle FAMILY
616        obsFamilyListSizeCommon = obsFamilyListSizeCommon + 1
617        obsFamilyListCommon(obsFamilyListSizeCommon) = obsFamilyListMpiGlobal(familyIndex,procIndex)
618      end do FAMILY
619    end do
620    write(*,*) 'obsFamilyListSizeCommon = ', obsFamilyListSizeCommon
621    write(*,*) 'obsFamilyListCommon = ', obsFamilyListCommon(1:obsFamilyListSizeCommon)
622
623    deallocate(allObsFamilyListSizeMpiLocal)
624    deallocate(obsFamilyListMpiGlobal)
625    deallocate(intObsFamilyListMpiGlobal)
626    deallocate(intObsFamilyListMpiLocal)
627    deallocate(obsFamilyListMpiLocal)
628
629  end subroutine diaf_getObsFamilyListMpiGlobal
630
631end module obsDiagFiles_mod