from pathlib import Path import pandas as pd import shlex import subprocess from zipfile import ZipFile import json def available_states(): #TODO: Make a setting for this path location = Path('/opt/development/synthea_webservice/synthea/src/main/resources/geography/') df = pd.read_csv(location / 'timezones.csv', index_col=False) # The state information is expected in the first column states = df[df.columns[0]].to_list() states.sort() return states def available_modules(): #TODO: Make a setting for this path location = Path('/opt/development/synthea_webservice/synthea/src/main/resources/modules/') # Assumption here: A folder is a single module. And all .json in the main modules folder is a module. modules = [] for module in location.iterdir(): if module.is_file() and module.suffix == '.json': data = json.loads(module.read_text()) modules.append({'module' : module.name.replace('.json',''), 'name' : data['name']}) modules = sorted(modules, key=lambda k: k['name'].lower()) return modules def run_synthea(state = None, population = None, gender = None, age = None, module = None): # TODO: Make synthea setting(s) location = '/opt/development/synthea_webservice/synthea/' synthea_cmd = ['/opt/development/synthea_webservice/synthea/run_synthea'] zip_file = 'Synthea_' zip_export = location if population: synthea_cmd.append('-p') synthea_cmd.append(str(population)) zip_file += f'population_{population}_' if gender: synthea_cmd.append('-g') synthea_cmd.append(gender.upper()) zip_file += f'gender_{gender}_' if age: synthea_cmd.append('-a') synthea_cmd.append(age) zip_file += f'age_{age}_' if module: synthea_cmd.append('-m') synthea_cmd.append(module) zip_file += f'module_{module}_' if state: synthea_cmd.append(state) zip_file += f'state_{state}' process_ok = False log = '' with subprocess.Popen(synthea_cmd,cwd=location, stdout=subprocess.PIPE,stderr=subprocess.PIPE) as process: for line in process.stdout: line = line.decode('utf8') log += line if not process_ok: process_ok = line.find('BUILD SUCCESSFUL') >= 0 if process_ok: with ZipFile(f'{zip_export}/{zip_file}.zip', 'w') as export: for file in Path(location + 'output/fhir_stu3').iterdir(): export.write(file,file.name) return Path(f'{zip_export}/{zip_file}.zip') else: raise Exception(log)