NuMRI/kalman/other/gen_measurements_from_checkpoints.py
2020-11-23 13:41:11 +01:00

377 lines
14 KiB
Python
Executable File

from dolfin import *
import dolfin
import argparse
from common import inout
import numpy as np
import sys
# import ruamel.yaml as yaml
from pathlib import Path
import csv
import shutil
# parameters['allow_extrapolation'] = True
# careful with this option... can give nonsensical results outside
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
def solution_function_space(options):
mesh, _, _ = inout.read_mesh(options['mesh'])
if options['fem']['velocity_space'].lower() == 'p2':
Evel = VectorElement('P', mesh.ufl_cell(), 2)
elif options['fem']['velocity_space'].lower() == 'p1':
Evel = VectorElement('P', mesh.ufl_cell(), 1)
else:
raise Exception('Velocity space {} not yet implemented!'.format(
options['fem']['velocity_space'].lower()))
V = FunctionSpace(mesh, Evel)
return V
def measurement_function_spaces(options):
measurement_lst = options['estimation']['measurements']
if not isinstance(measurement_lst, list):
measurement_lst = [measurement_lst]
meshes = [meas['mesh'] for meas in measurement_lst]
if 'fe_degree' in options['estimation']['measurements'][0]:
degree = options['estimation']['measurements'][0]['fe_degree']
else:
degree = 1
for measurement in measurement_lst[1:]:
if not degree == measurement['fe_degree']:
raise Exception('fe_degree must the the same for all '
'measurements!')
if degree == 1:
element_family = 'P'
elif degree == 0:
element_family = 'DG'
else:
raise Exception('Unsupported measurement FE degree: {}'
.format(degree))
scalar = [False]*len(measurement_lst)
for i, measurement in enumerate(measurement_lst):
if 'velocity_direction' in measurement:
direction = measurement['velocity_direction']
if direction and None not in direction and sum(direction) > 0:
scalar[i] = True
V = []
V_aux = []
for scal, file in zip(scalar, meshes):
mesh, _, _ = inout.read_mesh(file)
if scal:
V.append(FunctionSpace(mesh, element_family, degree))
# 1. interpolate velocity vector onto measurement grid (V_aux)
# 2. perform component projection in the measurement space
# ---> need to store both scalar and vector spaces
V_aux.append(VectorFunctionSpace(mesh, element_family, degree))
else:
V.append(VectorFunctionSpace(mesh, element_family, degree))
return V, V_aux
def find_checkpoints(options):
chkpt_root = options['io']['write_path'] + '/checkpoint/{i}/u.h5'
indices = options['estimation']['measurements'][0]['indices']
if not indices:
# its seems not work now
#path_all = list(Path().glob(chkpt_root.format(i='*')))
#indices = sorted(int(str(s).split('/')[-2]) for s in path_all)
from glob import glob
chkpt_root2 = options['io']['write_path'] + '/checkpoint/*'
paths = glob(chkpt_root2)
pathsclean = paths
pathff = []
for l in range(len(paths)):
pathsclean[l] = paths[l].replace(options['io']['write_path'] + '/checkpoint/','')
pathff.append(int(pathsclean[l]))
indices = [int(x) for x in pathff]
indices.sort()
# dt_meas = options['timemarching']['checkpoint_dt']
# times = np.concatenate(([options['timemarching']['dt']],
# dt_meas*np.array(indices[1:])))
dt = options['timemarching']['dt']
times = dt*np.array(indices)
if rank == 0:
print('indices: \n')
print('\t', indices)
print('times: \n')
print('\t', times)
files = [chkpt_root.format(i=i) for i in indices]
# check if all files are found
# try:
# for f in files:
# # Path(f).resolve(strict=True)
# Path(f).is_file()
# except FileNotFoundError:
# raise
for f in files:
if not Path(f).is_file():
raise FileNotFoundError(f)
return indices, times, files
def generate(options, seed_lst, print_norms=False):
V = solution_function_space(options)
ndim = V.mesh().topology().dim()
u = Function(V)
V_meas_lst, V_aux_lst = measurement_function_spaces(options)
u_meas_lst = [Function(V_meas) for V_meas in V_meas_lst]
u_meas_cpy = [Function(V_meas) for V_meas in V_meas_lst]
#LI = LagrangeInterpolator()
if V_aux_lst:
u_aux_lst = [Function(V_aux) for V_aux in V_aux_lst]
comp_assigner = [FunctionAssigner([V]*ndim, V_aux) for V, V_aux in
zip(V_meas_lst, V_aux_lst)]
measurement_lst = options['estimation']['measurements']
noise_sd = [meas['noise_stddev'] for meas in measurement_lst]
# if 'project' in options['estimation']['measurements']:
# project_switch = bool(options['estimation']['measurements']
# ['project'])
# else:
# project_switch = False
project_switch = False
indices, times, files = find_checkpoints(options)
outfile_root_lst = [meas['file_root'] for meas in measurement_lst]
xdmf_paths = [meas['xdmf_file'] for meas in measurement_lst]
if seed_lst[0] > 0:
for i, (out, xdmf) in enumerate(zip(outfile_root_lst, xdmf_paths)):
if isinstance(out, str) and 'seed{s}' not in out:
out_ = out.split('/')
outfile_root_lst[i] = '/'.join(out_[:-1] + ['seed{s}',
out_[-1]])
if isinstance(xdmf, str) and 'seed{s}' not in xdmf:
xdmf_ = xdmf.split('/')
xdmf_paths[i] = '/'.join(xdmf_[:-1] + ['seed{s}', xdmf_[-1]])
if any(xdmf_paths):
xdmf_lst = []
for pth in xdmf_paths:
seed_dict = {}
for seed in seed_lst:
file = XDMFFile(pth.format(s=seed))
file.parameters['rewrite_function_mesh'] = False
seed_dict[seed] = file
xdmf_lst.append(seed_dict)
else:
xdmf_lst = [None]*len(u_meas_lst)
for i, t, infile in zip(indices, times, files):
print('Processing {} at t = {}'.format(infile, t))
t_ = inout.read_HDF5_data(u.function_space().mesh().mpi_comm(), infile,
u, '/u')
assert np.allclose(t, t_), ('Timestamps do not match! {} vs {} (HDF5 '
'file)'.format(t, t_))
# interpolate u to measurement meshes
for k, (u_meas, outfile_root, xdmf, sd) in enumerate(zip(
u_meas_lst, outfile_root_lst, xdmf_lst, noise_sd)):
if project_switch:
u_meas.assign(project(u, u_meas.function_space()))
else:
if V_aux_lst:
print('scalar')
direction = (options['estimation']['measurements'][k]
['velocity_direction'])
if direction.count(0) == 2 and direction.count(1) == 1:
LagrangeInterpolator.interpolate(u_meas, u.sub(direction.index(1)))
else:
assert u_meas.value_size() == 1
# normalize projection direction
direction = np.array(direction, dtype=np.float64)
direction /= np.sqrt(np.dot(direction, direction))
print('d = {}'.format(direction))
LagrangeInterpolator().interpolate(u_aux_lst[k], u)
# This is faster than simply Xobs_aux.split(True) !
u_i = [u_meas] + [u_meas.copy(True) for j in
range(ndim - 1)]
comp_assigner[k].assign(u_i, u_aux_lst[k])
u_meas.vector()[:] *= direction[0]
for ui, d in zip(u_i[1:], direction[1:]):
if d:
u_meas.vector().axpy(d, ui.vector())
else:
LagrangeInterpolator.interpolate(u_meas, u)
if sd:
u_meas_cpy[k].assign(u_meas)
# add noise
for seed in seed_lst:
if sd:
# this makes no sense, sorry...
# if seed > 0:
# np.random.seed(seed + i)
noise = np.random.normal(0., sd,
u_meas.vector().local_size())
u_meas.assign(u_meas_cpy[k])
u_meas.vector()[:] += noise
if rank == 0:
if print_norms:
print('Writing file at t = {}\t |u_m| = {}'.format(
t, norm(u_meas)))
else:
print('Writing file at t = {}\t'.format(t))
outfile = outfile_root.format(i=i, s=seed)
inout.write_HDF5_data(
u_meas.function_space().mesh().mpi_comm(), outfile, u_meas,
'/u', t)
if xdmf:
xdmf[seed].write(u_meas, t)
# write indices and timesteps
write_timestamps(options, indices, times, seed_lst)
def write_timestamps(options, indices, times, seed_lst):
file_root_lst = [meas['file_root'] for meas in options['estimation']
['measurements']]
# if seed_lst[0]:
# for i, out in enumerate(file_root_lst):
# if isinstance(out, str) and 'seed{s}' not in out:
# out_ = out.split('/')
# file_root_lst[i] = '/'.join(out_[:-1] + ['seed{s}', out_[-1]])
for file_root in file_root_lst:
for seed in seed_lst:
path = Path(file_root.format(s=seed, i=-1)).parent
if seed > 0 and 'seed{s}'.format(s=seed) not in str(path):
path = path.joinpath('seed{s}'.format(s=seed))
path = path.joinpath('timestamps.csv')
if rank == 0:
print('Writing timestamps to file: {}'.format(path))
with path.open('w') as file:
writer = csv.writer(file, delimiter=' ')
for i, t in zip(indices, times):
writer.writerow((i, t))
def copy_inputfile(options, inputfile, seed_lst):
if rank == 0:
file_root_lst = [meas['file_root'] for meas in options['estimation']
['measurements']]
for file_root in file_root_lst:
for seed in seed_lst:
path = Path(file_root.format(s=seed, i=-1)).parent
if seed > 0 and 'seed{s}'.format(s=seed) not in str(path):
path = path.joinpath('seed{s}'.format(s=seed))
path = path.joinpath('input.yaml')
shutil.copy2(str(inputfile), str(path))
print('Copied input file to {}'.format(path))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='''\
Generate measurements from HDF5 checkpoints, generated by the Fractional-Step
or the monolithic Navier-Stokes solvers.
Reads options from the given input file.
''', formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('inputfile', type=str, help='path to yaml input file')
# parser.add_argument('-s', '--seed', type=int, default=-1,
# help='seed for random generator')
parser.add_argument('-s', '--seed', nargs='+', help='seed or list of seeds'
' for repitions of RNG. not really a seed, noise is '
' different for all timesteps', default=[-1])
parser.add_argument('-n', '--print_norms', action='store_true',
help='print norms')
parser.add_argument('-d', '--dump', action='store_true',
help='dump minimal default parameters to inputfile')
args = parser.parse_args()
if args.dump:
default = '''\
# default inputfile for gen_measurements_from_checkpoints.py
mesh: ./meshes/mesh.h5
io:
write_path: ./results/test/ # path to checkpoints
timemarching:
T: 0.4
dt: 0.001
fem:
velocity_space: p1 # p1, p2
pressure_space: p1
estimation:
measurements:
- mesh: ./measurements/mesh_meas.h5 # measurement mesh
fe_degree: 0 # 0 or 1
file_root: ./measurements/u{{i}}.h5 # velocity measurements \
to be written by the program, {{i}} will be replaced by the corresponding \
index of the checkpoint
xdmf_file: ./measurements/meas.xdmf # path for optional XDMF output
noise_stddev: 10. # absolute standard deviation of Gaussian \
noise
indices: 0 # indices of checkpoints to be processed. 0 == all
- # second measurement ...
- ...
'''
with open(args.inputfile, 'w') as f:
f.write(default)
print('dumped default options to file: {}'.format(args.inputfile))
sys.exit(0)
seed = args.seed
if seed[0] <= 0:
assert len(seed) == 1, 'if multiple seeds are given, all should be > 0'
try:
options = inout.read_parameters(args.inputfile)
except IOError:
raise IOError('File could not be read: {}'.format(args.inputfile))
generate(options, seed, args.print_norms)
copy_inputfile(options, args.inputfile, seed)