From 0c33f6f62f60300eed3dc11d5d1861ba08e82de5 Mon Sep 17 00:00:00 2001 From: "J.G. Rubingh" Date: Mon, 11 Jan 2021 16:12:46 +0100 Subject: [PATCH] Update repository with new code --- importer.py | 899 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 3 + settings.cfg | 14 + 3 files changed, 916 insertions(+) create mode 100644 importer.py create mode 100644 requirements.txt create mode 100644 settings.cfg diff --git a/importer.py b/importer.py new file mode 100644 index 0000000..c9513be --- /dev/null +++ b/importer.py @@ -0,0 +1,899 @@ +import multiprocessing +from pathlib import Path +from datetime import datetime, timedelta +from time import time +import json +import pandas as pd +import re +import gzip +import configparser +import copy +import math + +from sqlalchemy import create_engine +from sqlalchemy.types import String, Text, DateTime, Date, BigInteger +from sqlalchemy.exc import IntegrityError, ProgrammingError + +class dHealthImport(): + + CSV_SEPERATOR = ';' + CSV_CHUNK_SIZE = 10 ** 5 + + JSON_BATCH_SIZE = 10000 + EXPORT_BATCH_SIZE = 1000000 + + SQL_INSERT_BATCH = 10 + + # Put the fields in a set, so we will not end up with duplicate field names + BIRTHDAY_FIELDS = list(set(['Geboortedatum','DateOfBirth','dateOfBirth','ZCL_DATUM_GEBOORTE'])) + + DATE_FIELDS = list(set(BIRTHDAY_FIELDS + ['DatumAangemeld','DatumAfgemeld','DateAnonymized','DateRegistered','DateUnregistered','_anonymized_timestamp', + 'LastAppUseDate','Timestamp','DateTime','LoginTime','Date','StartDate','LoginDate','StartDatum','DatumAangemaakt','DatumTot','DatumVan', + 'Datum','DatumAangemaakt','DatumGeplaatst','ActiefTot','ActiefVan','DatumToegevoegd','MutationDate','Date','EndTime','StartTime','LastLoginDate','EindDatum', + 'CONTACT_DTTM','CONTACT_DT','PROCESSED_DTTM','KENNISDATUM','TSRECORD','DATUM_TOESTAND_VANAF','DATUM_TOESTAND_TM','CONTACT_DATUM_TIJD'])) + + POSTAL_CODE_FIELDS = list(set(['postalCode','ZipCode','ZCL_POSTCODE'])) + DROP_FIELDS = list(set(['EmailAddress','rug_id'])) + # Index fields will only set on those fields that are available when optimizing. So the rug_id field will not be added the first optimize run + INDEX_FIELDS = list(set(['AccountId','LegacyAccountId','DacadooId','ORIGINEEL_RELATIE_ID','userId','rug_id','ZCL_REL_NR'])) + + # Panda sql export needs to know what the name of the date fields are + PANDA_TYPE_FIELDS = {} + for date_field in DATE_FIELDS: + PANDA_TYPE_FIELDS[date_field] = DateTime() + + PANDA_TYPE_FIELDS['AccountId'] = BigInteger() + # Omdat Menzis geen getallen kunnen exporten, zijn de 'getallen' in het veld VERZEKERDENUMMER TEKST. Hoe dom! + # Daardoor kunnen we ze niet als INT importeren, want dan krijg je een error bij het getal '6,019e+09' + # Hierdoor moeten we de data CASTEN tijdens query, wat weer tijd kost. + # PANDA_TYPE_FIELDS['VERZEKERDENUMMER'] = BigInteger() + PANDA_TYPE_FIELDS['DacadooId'] = String(50) + + def __init__(self, mysql_user, mysql_pass, mysql_host, mysql_db, reinit_db = True): + self.db_name = mysql_db + self.export_location = None + + self.__drop_fields = copy.copy(dHealthImport.DROP_FIELDS) + self.__source_files = [] + self.__source_folders = { + 'dacadoo' : None, + 'menzis' : None, + 'netivity' : None, + 'vektis' : None, + } + + self.__error_log = Path('errors.log') + if self.__error_log.exists(): + self.__error_log.unlink() + + # NOTICE: Multiprocessing does not work on Windows :(. So it is not used + self.__number_of_cpu = multiprocessing.cpu_count() + # TODO: Make the import and cleaning run in a newly created temporary database. Afterward, drop the old existing, and rename the temporary to the final datbase name. + self.__temp_db_name = f'{self.db_name}_temp' + + self.db = create_engine(f'mysql://{mysql_user}:{mysql_pass}@{mysql_host}/{self.__temp_db_name}?charset=utf8mb4') + + def __logmessage(self,message): + print(f'[{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}] {message}',flush=True) + + def __byte_size(self,size): + sizes = ['b','Kb','Mb','Gb','Tb'] + counter = 0 + while size > 1024: + size /= 1024 + counter += 1 + + return f'{size:.3f} {sizes[counter]}' + + def __init_mysql_connection(self,conn): + mysql_import_settings = [ + 'set global max_allowed_packet=1048576000', + 'set global connect_timeout=36000', + 'set global wait_timeout=36000', + 'set global interactive_timeout=36000', + 'set global mysqlx_connect_timeout=36000', + 'set global mysqlx_read_timeout=36000', + 'set global net_read_timeout=36000', + 'set global innodb_buffer_pool_size=568435456', + 'set global join_buffer_size=64*1024*1024', + 'flush tables' + ] + for mysql_setting in mysql_import_settings: + try: + conn.execute(mysql_setting) + except Exception as ex: + pass + #print(ex) + + def __record_count(self,table): + with self.db.connect() as conn, conn.begin(): + sql = f'SELECT COUNT(*) as AmountRecords FROM {table}' + #self.__logmessage(sql) + result = conn.execute(sql) + for row in result: + return row[0] + + return None + + def __get_all_tables(self): + data = [] + with self.db.connect() as conn, conn.begin(): + sql = 'SHOW TABLES' + #self.__logmessage(sql) + result = conn.execute(sql) + for row in result: + data.append(row[0]) + + return data + + def __optimize_mysql_tables(self): + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + + # Adding indexes to the tables for faster processing + for index_field in dHealthImport.INDEX_FIELDS: + sql = f'SELECT a.TABLE_NAME, (SELECT COUNT(*) FROM information_schema.statistics AS b WHERE b.TABLE_NAME = a.TABLE_NAME AND b.index_name = a.COLUMN_NAME AND b.TABLE_SCHEMA = a.TABLE_SCHEMA) AS Amount FROM information_schema.COLUMNS AS a WHERE a.COLUMN_NAME = \'{index_field}\' and a.TABLE_SCHEMA = \'{self.__temp_db_name}\'' + + result = conn.execute(sql) + for row in result: + # Index is alreay added. So skip + if row[1] > 0: + continue + + table_name = row[0] + try: + sql = f'ALTER TABLE {table_name} ADD INDEX ({index_field})' + self.__logmessage(sql) + conn.execute(sql) + except Exception as ex: + pass + + # Optimize tables so all data is indexed and ready + result = conn.execute('show tables') + for row in result: + table_name = row[0] + sql = f'OPTIMIZE TABLE {table_name}' + self.__logmessage(sql) + conn.execute(sql) + + + def __load_files(self): + self.__source_files = [] + for _, source in self.__source_folders.items(): + if source is not None: + self.__source_files += sorted([child for child in source.iterdir()]) + + def __file_belongs_to_source(self,file): + for name, source in self.__source_folders.items(): + if file.parent == source: + return name + + def __process_csv_file(self,file,table): + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + conn.execute(f'drop table if exists {table}') + + panda_data = pd.read_csv(file, sep=dHealthImport.CSV_SEPERATOR, error_bad_lines=False, warn_bad_lines=True) + # Some fields are not detected properly. So the dict dtype holds field names which are not properly detected and specify the needed types + try: + panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=dHealthImport.SQL_INSERT_BATCH, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS) + except Exception as ex: + # Something went wrong. The data is not committed to db, so we do it now one by one, to filter out the error records. + try: + panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=1, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS) + except Exception as ex: + with self.__error_log.open('a') as logfile: + logfile.write(f'{table}, {file}: {ex}\n') + + def __process_gz_file(self,file,table = None): + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + if table is not None: + conn.execute('drop table if exists {}'.format(table)) + + with gzip.open(file, 'rt', encoding='utf-8') as f: + sql_statement = '' + for line in f: + # Were we skip the drop table lines + sql_line = line.strip() + + if sql_line.startswith('--') or sql_line.startswith('CREATE DATABASE') or sql_line.startswith('USE') or sql_line.startswith('DROP TABLE'): + # Cleanup step 1 + if 'table' in sql_line: + print('') + self.__logmessage(f'[{file.name}] {sql_line}') + continue + + if sql_line.startswith('CREATE TABLE'): + # Cleanup step 2 + sql_line = sql_line.replace('CREATE TABLE','CREATE TABLE /*!32312 IF NOT EXISTS*/') + + line = sql_line + if '' != sql_line: + sql_statement = '{} {}'.format(sql_statement,sql_line) + if sql_line.endswith(';'): + try: + conn.execute(sql_statement) + conn.execute('commit') + except IntegrityError as ex: + # Duplicate key errors + with self.__error_log.open('a') as logfile: + logfile.write('{}, {}: {}\n'.format(file, ex, sql_statement)) + except Exception as ex: + print('\nError: {}'.format(ex)) + print(file) + print(sql_statement) + + #with self.__error_log.open('a') as logfile: + # logfile.write('{}, {}: {}\n'.format(file, ex, sql_statement)) + + sql_statement = '' + print('.', end='',flush=True) + + print('') + self.__logmessage('Processing file {} is done!'.format(file)) + + def __process_json_file(self,file,table): + with self.db.connect() as conn, conn.begin(): + conn.execute('drop table if exists {}'.format(table)) + + json_data = [] + records_done = 0 + filesize_done = 0 + with file.open() as f: + for line in f: + filesize_done += len(line) + json_line = json.loads(line) + # Stupid date fixes..... + for date_field in dHealthImport.DATE_FIELDS: + try: + json_line[date_field] = json_line[date_field].strip().strip('Z').strip() + if '' == json_line[date_field]: + json_line[date_field] = None + elif ' ' in json_line[date_field]: + temp = json_line[date_field].split(' ') + temp[0] = temp[0].split('-') + + # Swap year and day field.... mixing up dates here.... + if int(temp[0][0]) > 1000: + json_line[date_field] = '{}-{}-{}T{}'.format(temp[0][0],temp[0][1],temp[0][2],temp[1]) + else: + json_line[date_field] = '{}-{}-{}T{}'.format(temp[0][2],temp[0][1],temp[0][0],temp[1]) + + except Exception as ex: + #print(ex) + pass + + json_data.append(json_line) + + if len(json_data) == dHealthImport.JSON_BATCH_SIZE: + records_done += dHealthImport.JSON_BATCH_SIZE + panda_data = pd.DataFrame(json_data) + del(json_data) + json_data = [] + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + # Some fields are not detected properly. So the dict dtype holds field names which are not properly detected and specify the needed types + try: + panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=dHealthImport.SQL_INSERT_BATCH, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS) + except Exception as ex: + # Something went wrong. The data is not committed to db, so we do it now one by one, to filter out the error records. + try: + panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=1, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS) + except Exception as ex: + with self.__error_log.open('a') as logfile: + logfile.write('{}, {}: {}\n'.format(table,file,ex)) + + self.__logmessage('Processing at {:.3f}% {}/{}, {} records from JSON data file \'{}\'.'.format((filesize_done / file.stat().st_size) * 100,self.__byte_size(filesize_done),self.__byte_size(file.stat().st_size),records_done,file)) + + # Remaining records + records_done += len(json_data) + panda_data = pd.DataFrame(json_data) + del(json_data) + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + # Some fields are not detected properly. So the dict dtype holds field names which are not properly detected and specify the needed types + try: + panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=dHealthImport.SQL_INSERT_BATCH, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS) + except Exception as ex: + # Something went wrong. The data is not commited to db, so we do it now one by one, to filter out the error records. + + try: + panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=1, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS) + except Exception as ex: + with self.__error_log.open('a') as logfile: + logfile.write('{}, {}: {}\n'.format(table,file,ex)) + + self.__logmessage('Processing at {:.3f}% {}/{}, {} records from JSON data file \'{}\'.'.format((filesize_done / file.stat().st_size) * 100,self.__byte_size(filesize_done),self.__byte_size(file.stat().st_size),records_done,file)) + + + def set_export_location(self, path): + path = Path(path) + if not path.exists(): + self.__logmessage(f'Creating CSV export folder {path}') + path.mkdir() + + self.export_location = path + + def run(self, reinit_db = True): + if reinit_db: + temp_con = create_engine(str(self.db.url).replace(self.__temp_db_name,'')) + + try: + self.__logmessage(f'Dropping existing temporary database: {self.__temp_db_name}') + temp_con.execute(f'DROP DATABASE IF EXISTS {self.__temp_db_name}') + self.__logmessage(f'Create new temporary database: {self.__temp_db_name}') + temp_con.execute(f'CREATE DATABASE {self.__temp_db_name}') + except Exception as ex: + print(ex) + + # TODO: Make it multiprocessing.... that is way faster then one file at the time + for file in self.__source_files: + self.process_file(file) + + # Start renaming the dacadoo tables. As the original names are coming from the SQL import + # Were we prefix all the tables that do not have a known prefix + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + result = conn.execute('show tables') + for row in result: + table_name = row[0] + rename_table = f'dacadoo_{table_name}'.lower() + if table_name.startswith('menzis_') or table_name.startswith('netivity_') or table_name.startswith('vektis_') or table_name.startswith('dacadoo_'): + continue + + self.__logmessage(f'rename table {table_name} TO {rename_table}') + try: + conn.execute(f'rename table {table_name} TO {rename_table}') + except Exception as ex: + print(f'Error executing: {ex}') + + self.__optimize_mysql_tables() + + def process_file(self,file = None): + self.__logmessage(f'Processing file: {file} with filesize: {self.__byte_size(file.stat().st_size)}') + start = time() + table = self.__file_belongs_to_source(file) + '_' + + if '.json' == file.suffix: + # Stupid Windows does not understand casing.... so force lowercase... :( + table = '{}{}'.format(table,re.sub(r'_\d+-\d+-\d+T.*\.json', '', file.name)).lower() + self.__process_json_file(file,table) + elif '.csv' == file.suffix: + # Stupid Windows does not understand casing.... so force lowercase... :( + table = '{}{}'.format(table,re.sub(r'(_\d+_\d+)?\.csv', '', file.name)).lower() + self.__process_csv_file(file,table) + elif '.gz' == file.suffix: + self.__process_gz_file(file) + + self.__logmessage(f'Processing file {file} done in {timedelta(seconds=(time()-start))}') + + def filter_on_consent(self): + + def fix_vektis_insurance_number(self): + print('fix_vektis_insurance_number') + print(self) + # Apperently it is difficult to combine to CSV files at the source. So we have to fix it here again.... + # And they are not able to upload easy to understand file names... bunch of rubbish.. + + # Find the 'source' table with the insurance and accountid numbers + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'VERZEKERDENUMMER\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name' + self.__logmessage(sql) + source_table_name = None + result = conn.execute(sql) + for row in result: + source_table_name = row[0] + if not source_table_name.lower().startswith('vektis_'): + continue + + # Do some cleanup... Menzis rommel + sql = f'DELETE FROM {source_table_name} WHERE VERZEKERDENUMMER LIKE "%%e+%%"' + result = conn.execute(sql) + self.__logmessage(f'Deleted {result.rowcount} rows from table {source_table_name}') + + self.__logmessage(f'Found source insurrance table at: {source_table_name}') + + if source_table_name is None: + return + + # Find all the tables that holds the field ZCL_REL_NR + convert_field = 'ZCL_REL_NR' + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{convert_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name' + result = conn.execute(sql) + for row in result: + table_name = row[0] + try: + sql = f'SELECT AccountID, VERZEKERDENUMMER FROM {source_table_name} ORDER BY AccountID' + account_ids = conn.execute(sql) + total_records = account_ids.rowcount + self.__logmessage(f'Updating {total_records} accounts in the table {table_name}') + + counter = 0 + for account in account_ids: + counter += 1 + sql = f'UPDATE {table_name} SET {convert_field} = {account[0]} WHERE {convert_field} = {account[1]}' + updates = conn.execute(sql) + self.__logmessage(f'[{counter} off {total_records}({(counter/total_records)*100:.2f}%)]: Updated {updates.rowcount} records for account id {account[0]} in table {table_name}') + + self.__logmessage(f'Renaming field {convert_field} to AccountId') + sql = f'ALTER TABLE {table_name} CHANGE COLUMN {convert_field} AccountId Bigint(20)' + self.__logmessage(sql) + conn.execute(sql) + + sql = f'ALTER TABLE {table_name} ADD INDEX (AccountId)' + self.__logmessage(sql) + conn.execute(sql) + + except Exception as ex: + print('Fix vektis exception') + print(ex) + + if source_table_name is None: + try: + # Drop the table, as we do not need it anymore + sql = f'DROP TABLE {source_table_name}' + self.__logmessage(sql) + conn.execute(sql) + except Exception as ex: + print(ex) + + + # Here we clean up the records that should not be here. They have not given a consent + # First we start with the Dacadoo data. + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + + try: + # Delete all dacadoo accounts that should not be here. + sql = 'DELETE FROM dacadoo_user WHERE id NOT IN (SELECT id FROM dacadoo_acceptedUser)' + self.__logmessage(sql) + conn.execute(sql) + + # Drop the table, as we do not need it anymore + sql = 'DROP TABLE dacadoo_acceptedUser' + self.__logmessage(sql) + conn.execute(sql) + + except Exception as ex: + print(ex) + + # Now we clean all the other tables that contain the Dacadoo userId field (reference field) + # And delete all records where the userId is not in the account table. + id_field_name = 'userId' + self.__drop_fields.append(id_field_name) + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' + result = conn.execute(sql) + for row in result: + table_name = row[0] + sql = f'DELETE FROM {table_name} WHERE {id_field_name} NOT IN (SELECT id FROM dacadoo_user)' + self.__logmessage(sql) + conn.execute(sql) + + # Convert Vektis insurance number to Netvitiy AccoundID, so we can clean if later on + fix_vektis_insurance_number(self) + + # Now we clean the Netivity data + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + # Delete all Netivity accounts that should not be here. + # But this is based on either legacy or normal account id + + try: + sql = 'DELETE FROM netivity_account WHERE AccountId NOT IN (SELECT ToestemmingRUG FROM netivity_rugtoestemmingids) AND LegacyAccountId NOT IN (SELECT ToestemmingRUG FROM netivity_rugtoestemmingids)' + self.__logmessage(sql) + conn.execute(sql) + + sql = 'DELETE FROM netivity_legacyaccountoud WHERE AccountId NOT IN (SELECT ToestemmingRUG FROM netivity_rugtoestemmingids)' + self.__logmessage(sql) + conn.execute(sql) + + # Drop the table, as we do not need it anymore + sql = 'DROP TABLE netivity_rugtoestemmingids' + self.__logmessage(sql) + conn.execute(sql) + + except Exception as ex: + print(ex) + + # Now we clean all the other tables that contain the Netivity userId field (reference field) + # And delete all records where the userId is not in the account table. + id_field_name = 'AccountId' + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name' + result = conn.execute(sql) + for row in result: + table_name = row[0] + if table_name in ['netivity_account','netivity_legacyaccountoud']: + continue + + source_table = 'netivity_legacyaccountoud' if table_name.startswith('legacy') else 'netivity_account' + sql = f'DELETE FROM {table_name} WHERE {id_field_name} NOT IN (SELECT {id_field_name} FROM {source_table})' + self.__logmessage(sql) + conn.execute(sql) + + + # And now how about the Menzis data??? Unclear... + + def create_new_rug_ids(self): + self.__drop_fields.append('LegacyAccountId') + self.__drop_fields.append('DacadooId') + + new_user_table = 'rug_userid_conversion' + + table_sql = '''CREATE TABLE `''' + new_user_table + '''` ( + `Index` BIGINT(20) NOT NULL AUTO_INCREMENT PRIMARY KEY, + `menzis_id` BIGINT(20) NULL DEFAULT '0', + `netivity_legacy_id` BIGINT(20) NULL DEFAULT '0', + `dacadoo_id` VARCHAR(100) NULL DEFAULT '' COLLATE 'utf8_general_ci', + `rug_id` VARCHAR(50) DEFAULT (uuid()), + INDEX (`menzis_id`), + INDEX (`netivity_legacy_id`), + INDEX (`dacadoo_id`), + UNIQUE INDEX `rug_id` (`rug_id`) USING BTREE)''' + + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + conn.execute(f'DROP TABLE IF EXISTS {new_user_table}') + conn.execute(table_sql) + + # Collect the account IDs from Netivity data which holds 3 fields: AccountId, LegacyAccountId and DacadooId. This allow us to join all the data sets + self.__logmessage('Inserting Netivity account data to new rug ids ...') + sql = f'INSERT INTO {new_user_table}(menzis_id,netivity_legacy_id,dacadoo_id) SELECT AccountId, LegacyAccountId, DacadooId FROM netivity_account' + self.__logmessage(sql) + conn.execute(sql) + self.__logmessage('Inserting Netivity account data to new rug ids is done!') + + self.__logmessage('Get the legacy account ids based on Dacadoo Ids ...') + sql = f'UPDATE {new_user_table} SET netivity_legacy_id = (SELECT AccountId FROM netivity_legacyaccountoud WHERE netivity_legacyaccountoud.DacadooId = {new_user_table}.dacadoo_id LIMIT 1) WHERE netivity_legacy_id = \'\'' + self.__logmessage(sql) + conn.execute(sql) + + # Collect old IDS from legacy tables which holds 2 fields: AccountId, DacadooId. But now we only want NEW records in table rug_userid_conversion. So only request records which are not al ready seen. + self.__logmessage('Inserting Netivity LEGACY account data to new rug ids ...') + sql = f'INSERT INTO {new_user_table}(netivity_legacy_id,dacadoo_id) SELECT AccountId, DacadooId FROM netivity_legacyaccountoud WHERE AccountId NOT IN (SELECT netivity_legacy_id FROM {new_user_table}) AND DacadooId NOT IN (SELECT dacadoo_id FROM {new_user_table})' + self.__logmessage(sql) + conn.execute(sql) + self.__logmessage('Inserting Netivity LEGACY account data to new rug ids is done!') + + # Get all Dacadoo IDs which are not seen yet.... + self.__logmessage('Loading the remaining Dacadoo Ids which do not have a link with the Menzis or Netivity data') + sql = f'INSERT INTO {new_user_table}(dacadoo_id) SELECT id FROM dacadoo_user WHERE id NOT IN (SELECT dacadoo_id FROM {new_user_table})' + self.__logmessage(sql) + conn.execute(sql) + self.__logmessage('Loaded the remaining Dacadoo Ids which do not have a link with the Menzis or Netivity data') + + # Load all the accounts from temp table to memory, so we only have to query once the source table + accounts = [] + sql = f'SELECT menzis_id, netivity_legacy_id, dacadoo_id, rug_id FORM {new_user_table} ORDER BY Index' + result = conn.execute(sql) + for row in result: + accounts.append((row[0],row[1],row[2],row[3])) + + total_accounts = len(accounts) + self.__logmessage(f'Loaded in total {total_accounts} accounts to re-number') + + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + # Now we are looking for all tables that holds the old IDS. Based on table name (legacy or not) we choose the field name to match + id_field_name = 'AccountId' + self.__drop_fields.append(id_field_name) + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name' + result = conn.execute(sql) + table_counter = 0 + table_total = result.rowcount + self.__logmessage(f'We found {table_total} tables that needs to be re-numbered for the field {id_field_name}') + for row in result: + table_counter += 1 + table_name = row[0] + try: + sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL' + self.__logmessage(sql) + conn.execute(sql) + except Exception: + pass + + # Loop over all accounts to update + self.__logmessage(f'Re-numbering table {table_name} ({table_counter} off {table_total} - {(table_counter/table_total)*100:.2f}%)') + counter = 0 + for account in accounts: + counter += 1 + source_id = 'netivity_legacy_id' if 'legacy' in table_name else 'menzis_id' + source_value = account[1] if 'legacy' in table_name else account[0] + + sql = f'UPDATE {table_name} SET rug_id = \'{account[3]}\' WHERE {source_id} = {source_value}' + self.__logmessage(sql) + updates = conn.execute(sql) + self.__logmessage(f'[{counter} off {total_accounts}({(counter/total_accounts)*100:.2f}%)]: Updated {updates.rowcount} records for account id {source_value} in table {table_name}') + +# self.__logmessage(f'Updated ') + + + # sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.{source_id} = {table_name}.{id_field_name} LIMIT 1)' + # self.__logmessage(sql) + # conn.execute(sql) + + # with self.db.connect() as conn, conn.begin(): + # self.__init_mysql_connection(conn) + # # Get all the Menzis tables which holds ids in field name ORIGINEEL_RELATIE_ID + # id_field_name = 'ORIGINEEL_RELATIE_ID' + # self.__drop_fields.append(id_field_name) + # sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name' + # result = conn.execute(sql) + # for row in result: + # table_name = row[0] + # # sql = f'INSERT INTO {new_user_table}(netivity_legacy_id) SELECT DISTINCT {id_field_name} FROM {table_name} WHERE {id_field_name} NOT IN (SELECT menzis_id FROM {new_user_table}) AND {id_field_name} NOT IN (SELECT netivity_legacy_id FROM {new_user_table})' + # # self.__logmessage(sql) + # # conn.execute(sql) + # try: + # sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL' + # self.__logmessage(sql) + # conn.execute(sql) + # except Exception: + # pass + + # sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.menzis_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL' + # self.__logmessage(sql) + # conn.execute(sql) + + # sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.netivity_legacy_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL' + # self.__logmessage(sql) + # conn.execute(sql) + + tables_to_process = [] + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + # Get all the Dacadoo tables which holds ids in field name userId + table_name = 'dacadoo_user' + try: + sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL' + self.__logmessage(sql) + conn.execute(sql) + except Exception: + pass + sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.id LIMIT 1)' + self.__logmessage(sql) + conn.execute(sql) + + id_field_name = 'userId' + self.__drop_fields.append(id_field_name) + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' + result = conn.execute(sql) + for row in result: + tables_to_process.append(row[0]) + + # Here we loop over the tables that needs to be changed. We open a new DB connection for every table update. + # This will hopefully reduce the undo log and commit earlier the changes + for table_name in tables_to_process: + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + try: + sql = f'ALTER TABLE {table_name} DROP COLUMN rug_id' + self.__logmessage(sql) + conn.execute(sql) + except Exception: + pass + + try: + sql = f'ALTER TABLE {table_name} DROP INDEX rug_id' + #self.__logmessage(sql) + conn.execute(sql) + except Exception: + pass + + try: + sql = f'ALTER TABLE {table_name} ADD COLUMN rug_id VARCHAR(50) NULL DEFAULT NULL' + self.__logmessage(sql) + conn.execute(sql) + except Exception: + pass + + sql = f'FLUSH TABLES' + #self.__logmessage(sql) + conn.execute(sql) + + batch_size = 100000 + amount_of_records = round(self.__record_count(table_name)/batch_size)+1 + + for i in range(amount_of_records): + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + + sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL LIMIT {batch_size}' + self.__logmessage(f'({i+1}/{amount_of_records}) {sql}') + try: + result = conn.execute(sql) + except Exception as ex: + result = conn.execute(sql) + + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + sql = f'DROP TABLE {new_user_table}' + self.__logmessage(sql) + conn.execute(sql) + + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + id_field_name = 'rug_id' + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' + result = conn.execute(sql) + for row in result: + table_name = row[0] + sql = f'DELETE FROM {table_name} WHERE {id_field_name} IS NULL' + self.__logmessage(sql) + conn.execute(sql) + + # Special case. These are the original Dacadoo ids. Only in the user table they should be deleted. + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + drop_field = 'id' + table_name = 'dacadoo_user' + self.__logmessage(f'Dropping field {drop_field} from table {table_name}') + sql = f'ALTER TABLE {table_name} DROP {drop_field}' + result = conn.execute(sql) + self.__logmessage(f'Dropped field {drop_field} from table {table_name}') + + + def drop_fields(self, drop = True): + + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + + # Drop all the fields that contain data that is sensitive + # For now, we keep all the different userid fields. As we do not re-create new rug-ids + # for drop_field in self.__drop_fields: + for drop_field in dHealthImport.DROP_FIELDS: + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{drop_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' + result = conn.execute(sql) + for row in result: + table_name = row[0] + self.__logmessage(f'Dropping field {drop_field} from table {table_name}') + sql = f'ALTER TABLE {table_name} DROP {drop_field}' + if drop: + result = conn.execute(sql) + self.__logmessage(f'Dropped field {drop_field} from table {table_name}') + + + def clean_birth_days(self): + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + for birthday_field in dHealthImport.BIRTHDAY_FIELDS: + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{birthday_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' + result = conn.execute(sql) + for row in result: + table_name = row[0] + self.__logmessage(f'Updateing birthday field {birthday_field} for table {table_name}') + sql = f'UPDATE {table_name} SET {birthday_field} = DATE_FORMAT({birthday_field},\'%%Y-01-01\')' + result = conn.execute(sql) + self.__logmessage(f'Updated birthday field {birthday_field} for table {table_name}') + + def clean_postal_codes(self): + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + for postal_code_field in dHealthImport.POSTAL_CODE_FIELDS: + sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{postal_code_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' + result = conn.execute(sql) + for row in result: + table_name = row[0] + self.__logmessage(f'Updateing postal code field {postal_code_field} for table {table_name}') + sql = f'UPDATE {table_name} SET {postal_code_field} = SUBSTRING({postal_code_field},0,4)' + result = conn.execute(sql) + self.__logmessage(f'Updated postal code field {postal_code_field} for table {table_name}') + + + def create_csv_exports(self, summary = True, clean = True): + # Create the export folder for all the CSV files + if self.export_location is None: + return + + if clean: + self.__logmessage(f'Clean up export location: {self.export_location}') + for child in self.export_location.iterdir(): + child.unlink() + self.__logmessage(f'Delete file {child.name}') + + summary = '' if not summary else '_summary' + batch_size = dHealthImport.EXPORT_BATCH_SIZE + for table_name in self.__get_all_tables(): + export_file = f'{self.export_location}/{table_name}{summary}.csv' + self.__logmessage(f'Exporting to {export_file}') + if summary: + batches = 1 + batch_size = 1000 + else: + batches = math.ceil(self.__record_count(table_name) / batch_size) + + for x in range(batches): + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + + sql = f'SELECT * FROM {table_name} LIMIT {x*batch_size}, {batch_size}' + sql_data = pd.read_sql(sql, conn) + # Add headers when x == 0. This is the first batch, which should create and add headers + sql_data.to_csv(export_file, index=False, header=(x==0), mode='a', encoding='utf-8', sep=dHealthImport.CSV_SEPERATOR) + print('.', end='',flush=True) + + print('') + + def addDacadooData(self,location): + location = Path(location) + if location.exists() and location.is_dir(): + self.__source_folders['dacadoo'] = location + self.__load_files() + else: + raise RuntimeError(f'Location {location} is not a valid Dacadoo source location') + + def addMenzisData(self,location): + location = Path(location) + if location.exists() and location.is_dir(): + self.__source_folders['menzis'] = location + self.__load_files() + else: + raise RuntimeError(f'Location {location} is not a valid Menzis source location') + + def addNetivityData(self,location): + location = Path(location) + if location.exists() and location.is_dir(): + self.__source_folders['netivity'] = location + self.__load_files() + else: + raise RuntimeError(f'Location {location} is not a valid Netivity source location') + + def addVektisData(self,location): + location = Path(location) + if location.exists() and location.is_dir(): + self.__source_folders['vektis'] = location + self.__load_files() + else: + raise RuntimeError(f'Location {location} is not a valid Vektis source location') + + +if __name__ == "__main__": + + config = configparser.ConfigParser() + try: + config.read_file(open('settings.cfg')) + except FileNotFoundError as ex: + print('Please create a settings.cfg file based on the settings.sample.cfg file.') + exit() + + importer = dHealthImport(config['database']['user'],config['database']['pass'],config['database']['host'],config['database']['db']) + + try: + importer.addDacadooData(config['datasources']['dacadoo']) + print('Loaded Dacadoo data') + except KeyError as ex: + print('Not loading Dacadoo data') + + try: + importer.addMenzisData(config['datasources']['menzis']) + print('Loaded Menzis data') + except KeyError as ex: + print('Not loading Menzis data') + + try: + importer.addNetivityData(config['datasources']['netivity']) + print('Loaded Netivity data') + except KeyError as ex: + print('Not loading Netivity data') + + try: + importer.addVektisData(config['datasources']['vektis']) + print('Loaded Vektis data') + except KeyError as ex: + print('Not loading Vektis data') + + try: + importer.set_export_location(config['export']['location']) + print(f'Export location is set to {importer.export_location}') + except KeyError as ex: + print('Not exporting CSV data') + + #importer.run(True) + #importer.filter_on_consent() + # importer.create_new_rug_ids() + #importer.clean_birth_days() + #importer.clean_postal_codes() + #importer.drop_fields() + importer.create_csv_exports() + importer.create_csv_exports(False,False) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..433f041 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +sqlalchemy +MySQLdb +pandas diff --git a/settings.cfg b/settings.cfg new file mode 100644 index 0000000..fd48978 --- /dev/null +++ b/settings.cfg @@ -0,0 +1,14 @@ +[database] +host=129.125.108.73 +user=datamanager +pass=datamanager +db=healthpro + +[datasources] +dacadoo=G:\HealthPro\RAW\Received 2020-06-30\Dacadoo +menzis=G:\HealthPro\RAW\Received 2019-11-06\Menzis +netivity=G:\HealthPro\RAW\Received 2020-09-17\Netivity +vektis=G:\HealthPro\RAW\Received 2020-11-18\Vektis + +[export] +location=G:\HealthPro\CLEAN