from dolfin import * import dolfin import argparse from common import inout import numpy as np import sys # import ruamel.yaml as yaml from pathlib import Path import csv import shutil # parameters['allow_extrapolation'] = True # careful with this option... can give nonsensical results outside if '2017' in dolfin.__version__: class MPI(MPI): comm_world = MPI.comm_world def solution_function_space(options): ''' Create function space to read in solution checkpoints, as specified in options. Args: options (dict): Options dictionary (from input file) Returns: FunctionSpace: Solution checkpoint function space ''' mesh, _, _ = inout.read_mesh(options['mesh']) if 'fluid' in options: if options['fem']['velocity_space'].lower() == 'p2': Evel = VectorElement('P', mesh.ufl_cell(), 2) elif options['fem']['velocity_space'].lower() == 'p1': Evel = VectorElement('P', mesh.ufl_cell(), 1) else: raise Exception('Velocity space {} not yet implemented!'.format( options['fem']['velocity_space'].lower())) elif 'material' in options: if options['incompressibility']['mixed_formulation']: raise Exception('Only compressible hyperelasticity data is ' 'supported here, for simplicity') deg = options['solver']['fe_degree'] Evel = VectorElement('P', mesh.ufl_cell(), deg) V = FunctionSpace(mesh, Evel) return V def measurement_function_spaces(options): ''' Create function space for measurements, as specified in options. Args: options (dict): Options dictionary (from input file) Returns: list((Vector)FunctionSpace): list of function spaces on measurement mesh, scalar if measurements are scalar, else VectorFunctionSpace list(VectorFunctionSpace): if scalar measurements, a list of corresponding VectorFunctionSpace for interpolation, else empty list ''' measurement_lst = options['estimation']['measurements'] if not isinstance(measurement_lst, list): measurement_lst = [measurement_lst] meshes = [meas['mesh'] for meas in measurement_lst] if 'fe_degree' in options['estimation']['measurements'][0]: degree = options['estimation']['measurements'][0]['fe_degree'] else: degree = 1 for measurement in measurement_lst[1:]: if not degree == measurement['fe_degree']: raise Exception('fe_degree must the the same for all ' 'measurements!') if degree in (1, 2): element_family = 'P' elif degree == 0: element_family = 'DG' else: raise Exception('Unsupported measurement FE degree: {}' .format(degree)) scalar = [False]*len(measurement_lst) for i, measurement in enumerate(measurement_lst): if 'velocity_direction' in measurement: direction = measurement['velocity_direction'] if direction and None not in direction and sum(direction) > 0: scalar[i] = True V = [] V_aux = [] for scal, file in zip(scalar, meshes): mesh, _, _ = inout.read_mesh(file) if scal: V.append(FunctionSpace(mesh, element_family, degree)) # 1. interpolate velocity vector onto measurement grid (V_aux) # 2. perform component projection in the measurement space # ---> need to store both scalar and vector spaces V_aux.append(VectorFunctionSpace(mesh, element_family, degree)) else: V.append(VectorFunctionSpace(mesh, element_family, degree)) return V, V_aux def find_checkpoints(options): ''' Find all checkpoints of a simulation from configuration in options dictionary. Args: options (dict): Options dictionary (from YAML input file) Returns: list: List of found checkpoint indices list: List of found checkpoint times list: List of found checkpoint files ''' if MPI.rank(MPI.comm_world) > 0: return chkpt_root = options['io']['write_path'] + '/checkpoint/{i}/u.h5' indices = options['estimation']['measurements'][0]['indices'] if not indices: path_all = list(Path().glob(chkpt_root.format(i='*'))) indices = sorted(int(str(s).split('/')[-2]) for s in path_all) # dt_meas = options['timemarching']['checkpoint_dt'] # times = np.concatenate(([options['timemarching']['dt']], # dt_meas*np.array(indices[1:]))) dt = options['timemarching']['dt'] times = dt*np.array(indices) print('indices: \n') print('\t', indices) print('times: \n') print('\t', times) files = [chkpt_root.format(i=i) for i in indices] # check if all files are found for f in files: if not Path(f).is_file(): raise FileNotFoundError(f) return indices, times, files def generate(options, seed_lst, print_norms=False): ''' Generate measurements. Args: options (dict): Options dictionary (from input file) seed_lst (list): list of random seeds print_norms (bool): switch for printing norms of measurements ''' V = solution_function_space(options) ndim = V.mesh().topology().dim() u = Function(V) V_meas_lst, V_aux_lst = measurement_function_spaces(options) u_meas_lst = [Function(V_meas, name='measurement') for V_meas in V_meas_lst] u_meas_cpy = [Function(V_meas, name='measurement') for V_meas in V_meas_lst] LI = LagrangeInterpolator if V_aux_lst: u_aux_lst = [Function(V_aux) for V_aux in V_aux_lst] comp_assigner = [FunctionAssigner([V]*ndim, V_aux) for V, V_aux in zip(V_meas_lst, V_aux_lst)] measurement_lst = options['estimation']['measurements'] noise_sd = [meas['noise_stddev'] for meas in measurement_lst] # if 'project' in options['estimation']['measurements']: # project_switch = bool(options['estimation']['measurements'] # ['project']) # else: # project_switch = False project_switch = False indices, times, files = find_checkpoints(options) outfile_root_lst = [meas['file_root'] for meas in measurement_lst] xdmf_paths = [meas['xdmf_file'] for meas in measurement_lst] # don't automatically append seed{s} do path anymore! # suppose seed{s} is included # if seed_lst[0] > 0: # for i, (out, xdmf) in enumerate(zip(outfile_root_lst, xdmf_paths)): # # if isinstance(out, str) and 'seed{s}' not in out: # # out_ = out.split('/') # # outfile_root_lst[i] = '/'.join(out_[:-1] + ['seed{s}', # # out_[-1]]) # if isinstance(xdmf, str) and 'seed{s}' not in xdmf: # xdmf_ = xdmf.split('/') # xdmf_paths[i] = '/'.join(xdmf_[:-1] + ['seed{s}', xdmf_[-1]]) # check if a list of seeds is given that {s} is included in path if len(seed_lst) > 1: for out in outfile_root_lst: assert '{s}' in out, ('For a list of seeds, the string \'{s}\' ' 'must be included in the file_root and will ' 'be replaced by the seed number') if any(xdmf_paths): xdmf_lst = [] for pth in xdmf_paths: if not isinstance(pth, str): raise TypeError('xdmf_file setting must be None or a string ' 'indicating the target XDMF file. Got type {}'. format(type(pth))) seed_dict = {} for seed in seed_lst: file = XDMFFile(pth.format(s=seed)) file.parameters['rewrite_function_mesh'] = False seed_dict[seed] = file xdmf_lst.append(seed_dict) else: xdmf_lst = [None]*len(u_meas_lst) for count, (index, time, infile) in enumerate(zip(indices, times, files)): if MPI.rank(MPI.comm_world) == 0: print('Processing {} at t = {}'.format(infile, time)) t_ = inout.read_HDF5_data(u.function_space().mesh().mpi_comm(), infile, u, '/u') assert np.allclose(time, t_), ('Timestamps do not match! {} vs {} ' '(HDF5 file)'.format(time, t_)) # interpolate u to measurement meshes for k, (u_meas, outfile_root, xdmf, sd) in enumerate(zip( u_meas_lst, outfile_root_lst, xdmf_lst, noise_sd)): if project_switch: u_meas.assign(project(u, u_meas.function_space())) else: if V_aux_lst: if MPI.rank(MPI.comm_world) == 0: print('- Scalar velocity component') direction = (options['estimation']['measurements'][k] ['velocity_direction']) if direction.count(0) == 2 and direction.count(1) == 1: LI.interpolate(u_meas, u.sub(direction.index(1))) else: assert u_meas.value_shape() == [] # normalize projection direction direction = np.array(direction, dtype=np.float64) direction /= np.sqrt(np.dot(direction, direction)) if MPI.rank(MPI.comm_world) == 0: print('- direction: {}'.format(direction)) LagrangeInterpolator.interpolate(u_aux_lst[k], u) # This is faster than simply Xobs_aux.split(True) ! u_i = [u_meas] + [u_meas.copy(True) for j in range(ndim - 1)] comp_assigner[k].assign(u_i, u_aux_lst[k]) u_meas.vector()[:] *= direction[0] for ui, d in zip(u_i[1:], direction[1:]): if d: u_meas.vector().axpy(d, ui.vector()) else: if MPI.rank(MPI.comm_world) == 0: print('- Full velocity vector') LI.interpolate(u_meas, u) if sd: u_meas_cpy[k].assign(u_meas) # add noise for seed in seed_lst: if sd: if seed > 0: np.random.seed(seed + count) if MPI.rank(MPI.comm_world) == 0: print('- Add noise with stddev = {}, seed = {}' .format(sd, seed + count)) noise = np.random.normal(0., sd, u_meas.vector().local_size()) u_meas.assign(u_meas_cpy[k]) u_meas.vector()[:] += noise if MPI.rank(MPI.comm_world) == 0: if print_norms: print('Writing file at t = {}\t |u_m| = {}'.format( time, norm(u_meas))) else: print('Writing file at t = {}\t'.format(time)) outfile = outfile_root.format(i=index, s=seed) inout.write_HDF5_data( u_meas.function_space().mesh().mpi_comm(), outfile, u_meas, '/u', time) if xdmf: xdmf[seed].write(u_meas, time) # THIS IS OBSOLETE NOW # write indices and timesteps # write_timestamps(options, indices, times, seed_lst) def write_timestamps(options, indices, times, seed_lst): ''' Write time stamps of measurements to csv file. Args: options (dict): Options dictionary (from YAML input file) indices (list): List of checkpoint indices times (list): List of checpoint times seed_lst (list): List of random seeds ''' if MPI.rank(MPI.comm_world) > 0: return warning('timestamps.csv is OBSOLETE!') file_root_lst = [meas['file_root'] for meas in options['estimation'] ['measurements']] # if seed_lst[0]: # for i, out in enumerate(file_root_lst): # if isinstance(out, str) and 'seed{s}' not in out: # out_ = out.split('/') # file_root_lst[i] = '/'.join(out_[:-1] # + ['seed{s}', out_[-1]]) for file_root in file_root_lst: for seed in seed_lst: path = Path(file_root.format(s=seed, i=-1)).parent if seed > 0 and 'seed{s}'.format(s=seed) not in str(path): path = path.joinpath('seed{s}'.format(s=seed)) path = path.joinpath('timestamps.csv') print('Writing timestamps to file: {}'.format(path)) with path.open('w') as file: writer = csv.writer(file, delimiter=' ') for i, t in zip(indices, times): writer.writerow((i, t)) def copy_inputfile(options, inputfile, seed_lst): ''' Copy input file of reference solution to measurements directory. Args: options (dict): Options dictionary (from YAML input file) inputfile (str): Path of input file to be copied seed_lst (list): List of random seeds ''' if MPI.rank(MPI.comm_world) > 0: return file_root_lst = [meas['file_root'] for meas in options['estimation'] ['measurements']] for file_root in file_root_lst: for seed in seed_lst: path = Path(file_root.format(s=seed, i=-1)).parent # if seed > 0 and 'seed{s}'.format(s=seed) not in str(path): # path = path.joinpath('seed{s}'.format(s=seed)) path = path.joinpath('input.yaml') # path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(str(inputfile), str(path)) print('Copied input file to {}'.format(path)) def dump_example_options(path): ''' Dump example options to given path and exit. Args: path (str): Path to input file ''' example = '''\ # example fractional step inputfile for gen_measurements_from_checkpoints.py mesh: './meshes/mesh.h5' io: # Path containing checkpoints in a checkpoints/ folder write_path: './results/test/' timemarching: # final time of simulation to be processed T: 0.4 # simulation time step size (not dt_checkpoint) dt: 0.001 fem: # Velocity and presse (for monolithic) function spaces of checkpoints velocity_space: 'p1' pressure_space: 'p1' estimation: measurements: # List of measurements - # Mesh of 1. measurement set mesh: './measurements/mesh_meas.h5' # degree of finite element function space # (1 for linear, 0 for discontinuous piece-wise constant) fe_degree: 0 # velocity measurement files to be written by this tool # {i} will be replaced by the corresponding index of the checkpoint # if given, {s} may be replaced by the seed number, for instance # path/seed{s}/u{i}.h5 file_root: './measurements/u{i}.h5' # XDMF file where measurements are optionally stored for # visualization (one file for all time steps) xdmf_file: './measurements/meas.xdmf' # Select a velocity component: # * None (~) means "use complete vector" # * else, use a scalar velocity by projecting onto the given # direction vector, [x, y, (z)] velocity_direction: ~ # (absolute) standard deviation of Gaussian noise noise_stddev: 10. # indices of checkpoints to be processed: # * 0 == all # * a list of integes indices: 0 - # second measurement ... - # further measurements ... ''' with open(path, 'w') as f: f.write(example) print('Example options:\n\n') print(example) print('\ndumped example options to file: {}'.format(path)) sys.exit(0) def get_parser(): parser = argparse.ArgumentParser(description='''\ Generate measurements from HDF5 checkpoints, generated by the Fractional-Step or the monolithic Navier-Stokes solvers. Reads options from the given input file. See the example options for further explanations:: (`gen_measurements_from_checkpoints.py -d example.yaml`). ''', formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('inputfile', type=str, help='path to yaml input file') # parser.add_argument('-s', '--seed', type=int, default=-1, # help='seed for random generator') parser.add_argument('-s', '--seed', nargs='+', help='seed or list of seeds' ' for noise random number generator. ' 'Will take the given seed for the first time step ' 'and increment by +1 for all subsequent time steps, ' 'so that noise is different for all times ' 'but reproducible', default=[-1], type=int) parser.add_argument('-n', '--print_norms', action='store_true', help='print norms') parser.add_argument('-d', '--dump', action='store_true', help='dump minimal example parameters to inputfile') return parser if __name__ == '__main__': args = get_parser().parse_args() if args.dump: dump_example_options(args.inputfile) seed = args.seed if seed[0] <= 0: assert len(seed) == 1, 'if multiple seeds are given, all should be > 0' try: options = inout.read_parameters(args.inputfile) except IOError: raise IOError('File could not be read: {}'.format(args.inputfile)) generate(options, seed, args.print_norms) copy_inputfile(options, args.inputfile, seed)