obsTimeInterp_mod sourceΒΆ

  1module obsTimeInterp_mod
  2  ! MODULE obsTimeInterp_mod (prefix='oti' category='4. Data Object transformations')
  3  !
  4  !:Purpose:  To store public variables and procedures related to the time
  5  !           coordinate.
  6  !
  7  use midasMpi_mod
  8  use utilities_mod
  9  use timecoord_mod
 10  use obsSpaceData_mod
 11  use obsFamilyList_mod
 12  
 13  implicit none
 14  save
 15  private
 16  
 17  ! public derived type
 18  public :: struct_oti
 19
 20  ! public procedures
 21  public :: oti_setup, oti_deallocate
 22  public :: oti_timeBinning
 23  public :: oti_setTimeInterpWeight, oti_getTimeInterpWeight, oti_getTimeInterpWeightMpiGlobal
 24  public :: oti_timeInterpWeightAllZero
 25
 26  type struct_oti
 27    real(8), pointer :: timeInterpWeight(:,:) => NULL() ! weights for temporal interpolation to obs times
 28    real(8), pointer :: timeInterpWeightMpiGlobal(:,:,:) => NULL() ! mpi global version of weights
 29  end type struct_oti
 30
 31  integer, parameter :: maxNumWrites = 50
 32
 33  integer, external :: get_max_rss
 34
 35contains
 36
 37  subroutine oti_timeBinning( obsSpaceData, nstepobs )
 38    !
 39    implicit none
 40
 41    ! Arguments:
 42    type(struct_obs), intent(in) :: obsSpaceData
 43    integer         , intent(in) :: nstepobs
 44
 45    ! Locals:
 46    integer :: stepIndex, headerIndex, familyIndex
 47    integer :: bodyIndex, bodyIndexBeg, bodyIndexEnd, nsize, ierr
 48    integer, allocatable :: idataass(:,:), inumheader(:,:)
 49    integer, allocatable :: my_idataass(:,:), my_inumheader(:,:)
 50    character(len=256)   :: formatspec, formatspec2
 51    real(8)              :: stepObsIndex
 52    integer, save :: numWrites = 0
 53
 54    if ( .not.tim_initialized() ) call utl_abort('oti_timeBinning: timeCoord module not initialized')
 55
 56    allocate(idataass(ofl_numFamily,nStepObs+1))
 57    allocate(my_idataass(ofl_numFamily,nStepObs+1))
 58    my_idataass(:,:) = 0
 59    allocate(inumheader(ofl_numFamily,nStepObs+1))
 60    allocate(my_inumheader(ofl_numFamily,nStepObs+1))
 61    my_inumheader(:,:) = 0
 62
 63    do headerIndex = 1, obs_numheader(obsSpaceData)
 64      call tim_getStepObsIndex(stepObsIndex,tim_getDatestamp(), &
 65           obs_headElem_i(obsSpaceData,OBS_DAT,headerIndex), &
 66           obs_headElem_i(obsSpaceData,OBS_ETM,headerIndex),nstepobs)
 67      if (stepObsIndex > 0.0d0) then
 68        stepIndex = nint(stepObsIndex)
 69        bodyIndexBeg = obs_headElem_i(obsSpaceData,OBS_RLN,headerIndex)
 70        bodyIndexEnd = obs_headElem_i(obsSpaceData,OBS_NLV,headerIndex) + bodyIndexBeg - 1          
 71        do familyIndex = 1, ofl_numFamily
 72          if (obs_getfamily(obsSpaceData,headerIndex_opt=headerIndex) == ofl_familyList(familyIndex)) then
 73            my_inumheader(familyIndex,stepIndex) = my_inumheader(familyIndex,stepIndex)+1
 74            my_inumheader(familyIndex,nStepObs+1) = my_inumheader(familyIndex,nStepObs+1)+1
 75            do bodyIndex = bodyIndexBeg, bodyIndexEnd
 76              if ( obs_bodyElem_i(obsSpaceData,OBS_ASS,bodyIndex) == obs_assimilated) then
 77                my_idataass(familyIndex,stepIndex) = my_idataass(familyIndex,stepIndex) + 1
 78                my_idataass(familyIndex,nStepObs+1) = &
 79                     my_idataass(familyIndex,nStepObs+1) + 1
 80              end if
 81            end do
 82          end if
 83        end do
 84      else
 85        numWrites = numWrites + 1
 86        if (numWrites < maxNumWrites) then
 87          write(*,*) 'oti_timeBinning: observation outside time window:',headerIndex,stepObsIndex
 88        else if (numWrites == maxNumWrites) then
 89          write(*,*) 'oti_timeBinning: more observations outside time window, but reached'
 90          write(*,*) '                 maximum number of writes to the listing.'
 91        end if
 92      end if
 93    end do
 94
 95    formatspec ='(1x,a6,":"'
 96    do stepIndex = 1,nStepObs
 97      formatspec = trim(formatspec)//',1x,i9' ! this is for each time bin
 98    end do
 99    formatspec = trim(formatspec)//',1x,i9' ! this is for the total
100    formatspec = trim(formatspec)//')'
101
102    formatspec2 = '(1x,a6,":"'
103    do stepIndex = 1,nStepObs
104      formatspec2 = trim(formatspec2)//',1x,i9'
105    end do
106    formatspec2 = trim(formatspec2)//',1x,a9)'
107
108    write(*,*) '-----------------------------------------------------------------'
109    write(*,*) 'Distribution of number of headers over stepobs ON LOCAL PROCESSOR'
110    write(*,trim(formatspec2)) 'Bin#',(stepIndex, stepIndex = 1, nStepObs),'Total'
111    do familyIndex = 1, ofl_numFamily
112      write(*,trim(formatspec)) ofl_familyList(familyIndex),(my_inumheader(familyIndex,stepIndex), &
113            stepIndex = 1, nStepObs+1)
114    end do
115    write(*,trim(formatspec)) 'ALL',(sum(my_inumheader(:,stepIndex)), stepIndex = 1, nStepObs+1)
116    write(*,*) '----------------------------------------------------------------'
117    write(*,*) 'Distribution of assimilated data over stepobs ON LOCAL PROCESSOR'
118    write(*,trim(formatspec2)) 'Bin#', (stepIndex, stepIndex = 1, nStepObs),'Total'
119    do familyIndex = 1, ofl_numFamily
120      write(*,trim(formatspec)) ofl_familyList(familyIndex), (my_idataass(familyIndex,stepIndex), &
121            stepIndex = 1, nStepObs+1)
122    end do
123    write(*,trim(formatspec)) 'ALL',(sum(my_idataass(:,stepIndex)),stepIndex=1,nStepObs+1)
124    write(*,*) '----------------------------------------------------------------'
125
126    nsize = size(inumheader)
127    call rpn_comm_allreduce(my_inumheader, inumheader, nsize, &
128         "mpi_integer", "mpi_sum", "GRID", ierr)
129    deallocate(my_inumheader) 
130    nsize = size(idataass)
131    call rpn_comm_allreduce(my_idataass, idataass, nsize, &
132         "mpi_integer", "mpi_sum", "GRID", ierr)
133    deallocate(my_idataass) 
134    if (mmpi_myid == 0) then
135      write(*,*) '----------------------------------------------------------------'
136      write(*,*) 'Distribution of number of headers over stepobs ON ALL PROCESSORS'
137      write(*,trim(formatspec2)) 'Bin#', (stepIndex, stepIndex = 1, nStepObs), 'Total'
138      do familyIndex = 1, ofl_numFamily
139        write(*,trim(formatspec)) ofl_familyList(familyIndex), (inumheader(familyIndex,stepIndex), &
140              stepIndex = 1, nStepObs+1)
141      end do
142      write(*,trim(formatspec)) 'ALL', (sum(inumheader(:,stepIndex)), stepIndex = 1, nStepObs+1)
143      write(*,*) '---------------------------------------------------------------'
144      write(*,*) 'Distribution of assimilated data over stepobs ON ALL PROCESSORS'
145      write(*,trim(formatspec2)) 'Bin#', (stepIndex, stepIndex = 1, nStepObs), 'Total'
146      do familyIndex = 1, ofl_numFamily
147        write(*,trim(formatspec)) ofl_familyList(familyIndex), (idataass(familyIndex,stepIndex), &
148              stepIndex = 1, nStepObs+1)
149      end do
150      write(*,trim(formatspec)) 'ALL', (sum(idataass(:,stepIndex)), stepIndex = 1, nStepObs+1)
151      write(*,*) '---------------------------------------------------------------'
152    end if
153
154    deallocate(idataass)
155    deallocate(inumheader)
156
157  end subroutine oti_timeBinning
158
159
160  subroutine oti_setup(oti, obsSpaceData, numStep, headerIndexBeg, headerIndexEnd, &
161                       interpType_opt, flagObsOutside_opt)
162    !
163    implicit none
164
165    ! Arguments:
166    type(struct_oti), pointer , intent(out)   :: oti
167    type(struct_obs)          , intent(inout) :: obsSpaceData
168    integer                   , intent(in)    :: numStep
169    integer                   , intent(in)    :: headerIndexBeg
170    integer                   , intent(in)    :: headerIndexEnd
171    character(len=*), optional, intent(in)    :: interpType_opt
172    logical,          optional, intent(in)    :: flagObsOutside_opt
173
174    ! Locals:
175    integer             :: headerIndex
176    real(8)             :: stepObsIndex
177    integer, save       :: numWrites = 0
178    
179    if ( associated(oti) ) then
180      call utl_abort('oti_setup: the supplied oti pointer is not null!')
181    endif
182
183    allocate(oti)
184
185    if ( .not.tim_initialized() ) call utl_abort('oti_setup: timeCoord module not initialized')
186
187    if ( numStep > 1 .and. .not. present(interpType_opt) ) then
188      call utl_abort('oti_setup: interpType_opt must be specified when numStep > 1')
189    end if
190
191    if ( trim(interpType_opt) == 'LINEAR' .and. tim_fullyUseExtremeTimeBins) then
192      call utl_abort('oti_setup: LINEAR time interpolation is not compatible with ' // &
193                     'tim_fullyUseExtremeTimeBins==.true.')
194    end if
195
196    if (mmpi_myid == 0) write(*,*) ' '
197    if (mmpi_myid == 0) write(*,*) '-------- Entering oti_setup ---------'
198    if (mmpi_myid == 0) write(*,*) ' '
199
200    if (mmpi_myid == 0) write(*,*) 'oti_setup: Number of step obs for time interpolation : ', numStep
201
202    allocate(oti%timeInterpWeight(headerIndexBeg:headerIndexEnd,numStep))
203    oti%timeInterpWeight(:,:) = 0.0d0
204
205    do headerIndex = headerIndexBeg, headerIndexEnd
206
207      ! building floating point step index
208      call tim_getStepObsIndex(stepObsIndex,tim_getDatestamp(),  &
209                               obs_headElem_i(obsSpaceData,OBS_DAT,headerIndex),  &
210                               obs_headElem_i(obsSpaceData,OBS_ETM,headerIndex), numStep)
211      
212      ! leave all weights zero if obs time is out of range, otherwise set weights
213      if (.not.tim_fullyUseExtremeTimeBins .and. (ceiling(stepObsIndex) > numStep .or. floor(stepObsIndex) < 1)) then
214        numWrites = numWrites + 1
215        if (numWrites < maxNumWrites) then
216          write(*,'(a,i10,f8.2,i7,2a,i10,i6)') 'oti_setup: observation outside time window, headerIndex =', &
217                                               headerIndex, stepObsIndex, &
218                                               obs_headElem_i(obsSpaceData, OBS_ITY, headerIndex), ' ', &
219                                               obs_elem_c    (obsSpaceData, 'STID' , headerIndex), &
220                                               obs_headElem_i(obsSpaceData,OBS_DAT,headerIndex), &
221                                               obs_headElem_i(obsSpaceData,OBS_ETM,headerIndex)  
222        else if (numWrites == maxNumWrites) then
223          write(*,*) 'oti_setup: More obs outside time window, but reached maximum number of writes to the listing.'
224        end if
225      else if (tim_fullyUseExtremeTimeBins .and. (nint(stepObsIndex) > numStep .or. nint(stepObsIndex) < 1)) then
226        numWrites = numWrites + 1
227        if (numWrites < maxNumWrites) then
228          write(*,'(a,2i,2a,i10,i6)') 'oti_setup: observation outside time window, headerIndex =',headerIndex, &
229                                      obs_headElem_i(obsSpaceData, OBS_ITY, headerIndex), ' ', &
230                                      obs_elem_c    (obsSpaceData, 'STID' , headerIndex), &
231                                      obs_headElem_i(obsSpaceData,OBS_DAT,headerIndex), &
232                                      obs_headElem_i(obsSpaceData,OBS_ETM,headerIndex)  
233        else if (numWrites == maxNumWrites) then
234          write(*,*) 'oti_setup: More obs outside time window, but reached maximum number of writes to the listing.'
235        end if
236      else
237        if (numStep == 1) then
238          call oti_setTimeInterpWeight(oti, 1.0d0, headerIndex, 1)
239        else
240          if (trim(interpType_opt) == 'LINEAR') then
241            if (stepObsIndex >= real(numStep,8)) then
242              ! special case not handled by general approach
243              call oti_setTimeInterpWeight(oti, 1.0d0, headerIndex, numStep)
244            else
245              ! general approach for most observations
246              call oti_setTimeInterpWeight(oti, 1.0d0-(stepObsIndex-floor(stepObsIndex)), headerIndex, floor(stepObsIndex))
247              call oti_setTimeInterpWeight(oti, stepObsIndex-floor(stepObsIndex), headerIndex, floor(stepObsIndex)+1)
248            end if
249          else if ( trim(interpType_opt) == 'NEAREST' ) then
250            if (nint(stepObsIndex) > numStep) then
251              write(*,*) 'stepObsIndex = ', stepObsIndex
252              call utl_abort('oti_setup: stepObsIndex is too large!')
253            end if
254            call oti_setTimeInterpWeight(oti, 1.0d0, headerIndex, nint(stepObsIndex))
255          else
256            call utl_abort('oti_setup: unknown interpolation type : ' // trim(interpType_opt))
257          end if
258        end if
259      end if
260
261    end do
262
263    if ( present(flagObsOutside_opt) ) then
264      if (flagObsOutside_opt) call oti_flagObsOutsideWindow(oti, obsSpaceData, headerIndexBeg, headerIndexEnd)
265    end if
266
267    ! also setup MPI global version of weights, needed for s2c_nl
268    call oti_setupMpiGlobal(oti)
269
270    if (mmpi_myid == 0) write(*,*) ' '
271    if (mmpi_myid == 0) write(*,*) '-------- End of oti_setup ---------'
272    if (mmpi_myid == 0) write(*,*) ' '
273
274  end subroutine oti_setup
275
276
277  subroutine oti_deallocate(oti)
278    implicit none
279
280    ! Arguments:
281    type(struct_oti), pointer, intent(inout) :: oti
282
283    if (associated(oti%timeInterpWeight)) deallocate(oti%timeInterpWeight)
284    if (associated(oti%timeInterpWeightMpiGlobal)) deallocate(oti%timeInterpWeightMpiGlobal)
285    if (associated(oti)) deallocate(oti)
286    nullify(oti)
287
288  end subroutine oti_deallocate
289
290
291  subroutine oti_setupMpiGlobal( oti )
292    !
293    implicit none
294
295    ! Arguments:
296    type(struct_oti), pointer, intent(inout) :: oti
297
298    ! Locals:
299    integer              :: numHeader, numHeaderMax, numStep, nsize, ierr
300    real(8), allocatable :: timeInterpWeightMax(:,:)
301
302    if ( .not.associated(oti%timeInterpWeight) ) then
303      call utl_abort('oti_setupMpiGlobal: oti_setup must first be called')
304    end if
305
306    numHeader = size(oti%timeInterpWeight,1)
307    numStep = size(oti%timeInterpWeight,2)
308    write(*,*) 'oti_setupMpiGlobal: before allreduce ', numHeader, numStep
309    call rpn_comm_allreduce(numHeader, numHeaderMax, 1,  &
310                            'MPI_INTEGER', 'MPI_MAX', 'GRID', ierr)
311
312    write(*,*) 'oti_setupMpiGlobal: allocating array of dimension ', &
313               numHeaderMax, numStep, mmpi_nprocs 
314    allocate(oti%timeInterpWeightMpiGlobal(numHeaderMax,numStep,mmpi_nprocs))
315
316    ! copy over timeInterpWeight into a local array with same size for all mpi tasks
317    allocate(timeInterpWeightMax(numHeaderMax,numStep))
318    timeInterpWeightMax(:,:) = 0.0d0
319    timeInterpWeightMax(1:numHeader,1:numStep) = oti%timeInterpWeight(:,1:numStep)
320
321    nsize = numHeaderMax * numStep 
322    call rpn_comm_allgather(timeInterpWeightMax,           nsize, 'MPI_REAL8',  &
323                            oti%timeInterpWeightMpiGlobal, nsize, 'MPI_REAL8',  &
324                            'GRID', ierr)
325
326    deallocate(timeInterpWeightMax)
327
328    write(*,*) 'Memory Used: ',get_max_rss()/1024,'Mb'
329
330  end subroutine oti_setupMpiGlobal
331
332
333  subroutine oti_setTimeInterpWeight( oti, weight_in, headerIndex, stepObs )
334    !
335    implicit none
336
337    ! Arguments:
338    type(struct_oti), pointer, intent(inout) :: oti
339    integer,                   intent(in)    :: headerIndex
340    integer,                   intent(in)    :: stepObs
341    real(8),                   intent(in)    :: weight_in
342
343    oti%timeInterpWeight(headerIndex, stepObs) = weight_in
344
345  end subroutine oti_setTimeInterpWeight
346
347
348  function oti_getTimeInterpWeight( oti, headerIndex, stepObs ) result(weight_out)
349    !
350    implicit none
351
352    ! Arguments:
353    type(struct_oti), pointer, intent(inout) :: oti
354    integer,                   intent(in)    :: headerIndex
355    integer,                   intent(in)    :: stepObs
356    ! Result:
357    real(8)                   :: weight_out
358
359    weight_out = oti%timeInterpWeight(headerIndex, stepObs)
360
361  end function oti_getTimeInterpWeight
362
363
364  function oti_getTimeInterpWeightMpiGlobal( oti, headerIndex, stepObs, procIndex ) result(weight_out)
365    !
366    implicit none
367  
368    ! Arguments:
369    type(struct_oti), pointer, intent(inout) :: oti
370    integer,                   intent(in)    :: headerIndex
371    integer,                   intent(in)    :: stepObs
372    integer,                   intent(in)    :: procIndex
373    ! Result:
374    real(8)                   :: weight_out
375
376    weight_out = oti%timeInterpWeightMpiGlobal(headerIndex, stepObs, procIndex)
377
378  end function oti_getTimeInterpWeightMpiGlobal
379
380
381  function oti_timeInterpWeightAllZero( oti, headerIndex ) result(allZero)
382    !
383    implicit none
384
385    ! Arguments:
386    type(struct_oti), pointer, intent(inout) :: oti
387    integer,                   intent(in)    :: headerIndex
388    ! Result:
389    logical                   :: allZero
390
391    if ( .not.associated(oti%timeInterpWeight) ) then
392      call utl_abort('oti_timeInterpWeightAllZero: oti_setup must first be called')
393    end if
394
395    allZero = all(oti%timeInterpWeight(headerIndex, :) == 0.0d0)
396
397  end function oti_timeInterpWeightAllZero
398
399
400  subroutine oti_flagObsOutsideWindow( oti, obsSpaceData, headerIndexBeg, headerIndexEnd )
401    !
402    implicit none
403
404    ! Arguments:
405    type(struct_oti), pointer, intent(inout) :: oti
406    type(struct_obs)         , intent(inout) :: obsSpaceData
407    integer                  , intent(in)    :: headerIndexBeg
408    integer                  , intent(in)    :: headerIndexEnd
409
410    ! Locals:
411    integer :: headerIndex, bodyIndex, bodyIndexBeg, bodyIndexEnd
412    integer :: obsDAT, obsETM
413    integer, save :: numWrites = 0
414
415    if ( .not.associated(oti%timeInterpWeight) ) then
416      call utl_abort('oti_flagObsOutsideWindow: oti_setup must first be called')
417    end if
418
419    do headerIndex = headerIndexBeg, headerIndexEnd
420
421      if ( oti_timeInterpWeightAllZero(oti, headerIndex) ) then
422        ! obs is outside of assimilation window
423
424        numWrites = numWrites + 1
425        if (numWrites < maxNumWrites) then
426          obsDAT = obs_headElem_i(obsSpaceData,OBS_DAT,headerIndex)
427          obsETM = obs_headElem_i(obsSpaceData,OBS_ETM,headerIndex)
428          write(*,*) 'oti_flagObsOutsideWindow: Observation time outside assimilation window: ',  &
429               obsDAT, obsETM
430        else if (numWrites == maxNumWrites) then
431          write(*,*) 'oti_flagObsOutsideWindow: More rejects, but reached maximum number of writes to the listing.'
432        end if
433
434        ! flag these observations as out of time domain and turn off its assimilation flag
435        bodyIndexBeg = obs_headElem_i(obsSpaceData, OBS_RLN, headerIndex)
436        bodyIndexEnd = obs_headElem_i(obsSpaceData, OBS_NLV, headerIndex) + bodyIndexBeg -1
437        do bodyIndex = bodyIndexBeg, bodyIndexEnd
438          call obs_bodySet_i(obsSpaceData, OBS_ASS, bodyIndex, obs_notAssimilated)
439        end do
440        call obs_headSet_i(obsSpaceData, OBS_ST1, headerIndex,  &
441             ibset( obs_headElem_i(obsSpaceData, OBS_ST1, headerIndex), 05))
442
443        ! set the weight to 1 for time step 1 so that the column will be interpolated
444        call oti_setTimeInterpWeight(oti, 1.0d0, headerIndex, 1)
445      end if
446
447    end do
448
449  end subroutine oti_flagObsOutsideWindow
450
451end module obsTimeInterp_mod