diff --git a/importer.py b/importer.py index c9513be..c742352 100644 --- a/importer.py +++ b/importer.py @@ -33,7 +33,7 @@ class dHealthImport(): 'CONTACT_DTTM','CONTACT_DT','PROCESSED_DTTM','KENNISDATUM','TSRECORD','DATUM_TOESTAND_VANAF','DATUM_TOESTAND_TM','CONTACT_DATUM_TIJD'])) POSTAL_CODE_FIELDS = list(set(['postalCode','ZipCode','ZCL_POSTCODE'])) - DROP_FIELDS = list(set(['EmailAddress','rug_id'])) + DROP_FIELDS = list(set(['EmailAddress','rug_id','email_address'])) # Index fields will only set on those fields that are available when optimizing. So the rug_id field will not be added the first optimize run INDEX_FIELDS = list(set(['AccountId','LegacyAccountId','DacadooId','ORIGINEEL_RELATIE_ID','userId','rug_id','ZCL_REL_NR'])) @@ -56,10 +56,10 @@ class dHealthImport(): self.__drop_fields = copy.copy(dHealthImport.DROP_FIELDS) self.__source_files = [] self.__source_folders = { - 'dacadoo' : None, - 'menzis' : None, - 'netivity' : None, - 'vektis' : None, + 'dacadoo' : [], + 'menzis' : [], + 'netivity' : [], + 'vektis' : [], } self.__error_log = Path('errors.log') @@ -160,18 +160,20 @@ class dHealthImport(): def __load_files(self): self.__source_files = [] for _, source in self.__source_folders.items(): - if source is not None: - self.__source_files += sorted([child for child in source.iterdir()]) + for location in source: + self.__source_files += sorted([child for child in location.iterdir()]) def __file_belongs_to_source(self,file): for name, source in self.__source_folders.items(): - if file.parent == source: + if file.parent in source: return name def __process_csv_file(self,file,table): with self.db.connect() as conn, conn.begin(): self.__init_mysql_connection(conn) - conn.execute(f'drop table if exists {table}') + sql = f'drop table if exists {table}' + self.__logmessage(sql) + conn.execute(sql) panda_data = pd.read_csv(file, sep=dHealthImport.CSV_SEPERATOR, error_bad_lines=False, warn_bad_lines=True) # Some fields are not detected properly. So the dict dtype holds field names which are not properly detected and specify the needed types @@ -240,7 +242,7 @@ class dHealthImport(): json_data = [] records_done = 0 filesize_done = 0 - with file.open() as f: + with file.open(encoding='utf-8') as f: for line in f: filesize_done += len(line) json_line = json.loads(line) @@ -351,9 +353,9 @@ class dHealthImport(): self.__optimize_mysql_tables() def process_file(self,file = None): - self.__logmessage(f'Processing file: {file} with filesize: {self.__byte_size(file.stat().st_size)}') - start = time() table = self.__file_belongs_to_source(file) + '_' + self.__logmessage(f'Processing file: {file} with filesize: {self.__byte_size(file.stat().st_size)} with table prefix {table}') + start = time() if '.json' == file.suffix: # Stupid Windows does not understand casing.... so force lowercase... :( @@ -361,7 +363,8 @@ class dHealthImport(): self.__process_json_file(file,table) elif '.csv' == file.suffix: # Stupid Windows does not understand casing.... so force lowercase... :( - table = '{}{}'.format(table,re.sub(r'(_\d+_\d+)?\.csv', '', file.name)).lower() + # Mensiz is no capable of producing correct export names.... + table = '{}{}'.format(table,re.sub(r'([_-]+\d+[_-]+\d+(?:.*)?)?.csv', '', file.name)).lower() self.__process_csv_file(file,table) elif '.gz' == file.suffix: self.__process_gz_file(file) @@ -371,8 +374,6 @@ class dHealthImport(): def filter_on_consent(self): def fix_vektis_insurance_number(self): - print('fix_vektis_insurance_number') - print(self) # Apperently it is difficult to combine to CSV files at the source. So we have to fix it here again.... # And they are not able to upload easy to understand file names... bunch of rubbish.. @@ -388,7 +389,7 @@ class dHealthImport(): if not source_table_name.lower().startswith('vektis_'): continue - # Do some cleanup... Menzis rommel + # Do some cleanup... Menzis rommel. They stored numbers in tekst, and apperently forced to scientific format. This we cannot process sql = f'DELETE FROM {source_table_name} WHERE VERZEKERDENUMMER LIKE "%%e+%%"' result = conn.execute(sql) self.__logmessage(f'Deleted {result.rowcount} rows from table {source_table_name}') @@ -430,7 +431,7 @@ class dHealthImport(): print('Fix vektis exception') print(ex) - if source_table_name is None: + if source_table_name is not None: try: # Drop the table, as we do not need it anymore sql = f'DROP TABLE {source_table_name}' @@ -507,13 +508,13 @@ class dHealthImport(): if table_name in ['netivity_account','netivity_legacyaccountoud']: continue - source_table = 'netivity_legacyaccountoud' if table_name.startswith('legacy') else 'netivity_account' + source_table = 'netivity_legacyaccountoud' if 'legacy' in table_name else 'netivity_account' sql = f'DELETE FROM {table_name} WHERE {id_field_name} NOT IN (SELECT {id_field_name} FROM {source_table})' self.__logmessage(sql) conn.execute(sql) - # And now how about the Menzis data??? Unclear... + # And now how about the Menzis data??? Unclear... Looks unrelated marketing data def create_new_rug_ids(self): self.__drop_fields.append('LegacyAccountId') @@ -526,10 +527,12 @@ class dHealthImport(): `menzis_id` BIGINT(20) NULL DEFAULT '0', `netivity_legacy_id` BIGINT(20) NULL DEFAULT '0', `dacadoo_id` VARCHAR(100) NULL DEFAULT '' COLLATE 'utf8_general_ci', - `rug_id` VARCHAR(50) DEFAULT (uuid()), + `email_address` VARCHAR(250) NULL DEFAULT '' COLLATE 'utf8_general_ci', + `rug_id` VARCHAR(50) NOT NULL DEFAULT (uuid()), INDEX (`menzis_id`), INDEX (`netivity_legacy_id`), INDEX (`dacadoo_id`), + INDEX (`email_address`), UNIQUE INDEX `rug_id` (`rug_id`) USING BTREE)''' with self.db.connect() as conn, conn.begin(): @@ -539,39 +542,43 @@ class dHealthImport(): # Collect the account IDs from Netivity data which holds 3 fields: AccountId, LegacyAccountId and DacadooId. This allow us to join all the data sets self.__logmessage('Inserting Netivity account data to new rug ids ...') - sql = f'INSERT INTO {new_user_table}(menzis_id,netivity_legacy_id,dacadoo_id) SELECT AccountId, LegacyAccountId, DacadooId FROM netivity_account' + sql = f'INSERT INTO {new_user_table}(menzis_id,netivity_legacy_id,dacadoo_id,email_address) SELECT AccountId, LegacyAccountId, DacadooId, EmailAddress FROM netivity_account' self.__logmessage(sql) - conn.execute(sql) - self.__logmessage('Inserting Netivity account data to new rug ids is done!') + count = conn.execute(sql) + self.__logmessage(f'Inserted {count.rowcount} Netivity accounts.') self.__logmessage('Get the legacy account ids based on Dacadoo Ids ...') sql = f'UPDATE {new_user_table} SET netivity_legacy_id = (SELECT AccountId FROM netivity_legacyaccountoud WHERE netivity_legacyaccountoud.DacadooId = {new_user_table}.dacadoo_id LIMIT 1) WHERE netivity_legacy_id = \'\'' self.__logmessage(sql) - conn.execute(sql) + count = conn.execute(sql) + self.__logmessage(f'Updated {count.rowcount} Netivity legacy accounts based on the Dacadoo IDs.') + # Collect old IDS from legacy tables which holds 2 fields: AccountId, DacadooId. But now we only want NEW records in table rug_userid_conversion. So only request records which are not al ready seen. self.__logmessage('Inserting Netivity LEGACY account data to new rug ids ...') sql = f'INSERT INTO {new_user_table}(netivity_legacy_id,dacadoo_id) SELECT AccountId, DacadooId FROM netivity_legacyaccountoud WHERE AccountId NOT IN (SELECT netivity_legacy_id FROM {new_user_table}) AND DacadooId NOT IN (SELECT dacadoo_id FROM {new_user_table})' self.__logmessage(sql) - conn.execute(sql) - self.__logmessage('Inserting Netivity LEGACY account data to new rug ids is done!') + count = conn.execute(sql) + self.__logmessage(f'Added {count.rowcount} legacy Netivity accounts.') # Get all Dacadoo IDs which are not seen yet.... self.__logmessage('Loading the remaining Dacadoo Ids which do not have a link with the Menzis or Netivity data') sql = f'INSERT INTO {new_user_table}(dacadoo_id) SELECT id FROM dacadoo_user WHERE id NOT IN (SELECT dacadoo_id FROM {new_user_table})' self.__logmessage(sql) - conn.execute(sql) - self.__logmessage('Loaded the remaining Dacadoo Ids which do not have a link with the Menzis or Netivity data') + count = conn.execute(sql) + self.__logmessage(f'Loaded the remaining Dacadoo Ids ({count.rowcount}) which do not have a link with the Menzis or Netivity data') - # Load all the accounts from temp table to memory, so we only have to query once the source table - accounts = [] - sql = f'SELECT menzis_id, netivity_legacy_id, dacadoo_id, rug_id FORM {new_user_table} ORDER BY Index' - result = conn.execute(sql) - for row in result: - accounts.append((row[0],row[1],row[2],row[3])) + with self.db.connect() as conn, conn.begin(): + self.__init_mysql_connection(conn) + # Load all the accounts from temp table to memory, so we only have to query once the source table + accounts = [] + sql = f'SELECT menzis_id, netivity_legacy_id, dacadoo_id, email_address, rug_id FROM {new_user_table} ORDER BY `Index`' + result = conn.execute(sql) + for row in result: + accounts.append((row[0],row[1],row[2],row[3],row[4])) - total_accounts = len(accounts) - self.__logmessage(f'Loaded in total {total_accounts} accounts to re-number') + total_accounts = len(accounts) + self.__logmessage(f'Loaded in total {total_accounts} accounts to re-number') with self.db.connect() as conn, conn.begin(): self.__init_mysql_connection(conn) @@ -586,6 +593,7 @@ class dHealthImport(): for row in result: table_counter += 1 table_name = row[0] + try: sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL' self.__logmessage(sql) @@ -598,11 +606,15 @@ class dHealthImport(): counter = 0 for account in accounts: counter += 1 - source_id = 'netivity_legacy_id' if 'legacy' in table_name else 'menzis_id' source_value = account[1] if 'legacy' in table_name else account[0] + + if source_value is None: + # This happens due to the fact, that Netivity users are sharing their Dacadoo account. We cannot handle that. + print(account) + continue - sql = f'UPDATE {table_name} SET rug_id = \'{account[3]}\' WHERE {source_id} = {source_value}' - self.__logmessage(sql) + sql = f'UPDATE {table_name} SET rug_id = \'{account[4]}\' WHERE accountid = {source_value}' + #self.__logmessage(sql) updates = conn.execute(sql) self.__logmessage(f'[{counter} off {total_accounts}({(counter/total_accounts)*100:.2f}%)]: Updated {updates.rowcount} records for account id {source_value} in table {table_name}') @@ -640,99 +652,99 @@ class dHealthImport(): # self.__logmessage(sql) # conn.execute(sql) - tables_to_process = [] - with self.db.connect() as conn, conn.begin(): - self.__init_mysql_connection(conn) - # Get all the Dacadoo tables which holds ids in field name userId - table_name = 'dacadoo_user' - try: - sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL' - self.__logmessage(sql) - conn.execute(sql) - except Exception: - pass - sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.id LIMIT 1)' - self.__logmessage(sql) - conn.execute(sql) - - id_field_name = 'userId' - self.__drop_fields.append(id_field_name) - sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' - result = conn.execute(sql) - for row in result: - tables_to_process.append(row[0]) - - # Here we loop over the tables that needs to be changed. We open a new DB connection for every table update. - # This will hopefully reduce the undo log and commit earlier the changes - for table_name in tables_to_process: - with self.db.connect() as conn, conn.begin(): - self.__init_mysql_connection(conn) - try: - sql = f'ALTER TABLE {table_name} DROP COLUMN rug_id' - self.__logmessage(sql) - conn.execute(sql) - except Exception: - pass - - try: - sql = f'ALTER TABLE {table_name} DROP INDEX rug_id' - #self.__logmessage(sql) - conn.execute(sql) - except Exception: - pass - - try: - sql = f'ALTER TABLE {table_name} ADD COLUMN rug_id VARCHAR(50) NULL DEFAULT NULL' - self.__logmessage(sql) - conn.execute(sql) - except Exception: - pass - - sql = f'FLUSH TABLES' - #self.__logmessage(sql) - conn.execute(sql) - - batch_size = 100000 - amount_of_records = round(self.__record_count(table_name)/batch_size)+1 - - for i in range(amount_of_records): - with self.db.connect() as conn, conn.begin(): - self.__init_mysql_connection(conn) - - sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL LIMIT {batch_size}' - self.__logmessage(f'({i+1}/{amount_of_records}) {sql}') - try: - result = conn.execute(sql) - except Exception as ex: - result = conn.execute(sql) - - with self.db.connect() as conn, conn.begin(): - self.__init_mysql_connection(conn) - sql = f'DROP TABLE {new_user_table}' - self.__logmessage(sql) - conn.execute(sql) - - with self.db.connect() as conn, conn.begin(): - self.__init_mysql_connection(conn) - id_field_name = 'rug_id' - sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' - result = conn.execute(sql) - for row in result: - table_name = row[0] - sql = f'DELETE FROM {table_name} WHERE {id_field_name} IS NULL' - self.__logmessage(sql) - conn.execute(sql) - - # Special case. These are the original Dacadoo ids. Only in the user table they should be deleted. - with self.db.connect() as conn, conn.begin(): - self.__init_mysql_connection(conn) - drop_field = 'id' - table_name = 'dacadoo_user' - self.__logmessage(f'Dropping field {drop_field} from table {table_name}') - sql = f'ALTER TABLE {table_name} DROP {drop_field}' - result = conn.execute(sql) - self.__logmessage(f'Dropped field {drop_field} from table {table_name}') - +## tables_to_process = [] +## with self.db.connect() as conn, conn.begin(): +## self.__init_mysql_connection(conn) +## # Get all the Dacadoo tables which holds ids in field name userId +## table_name = 'dacadoo_user' +## try: +## sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL' +## self.__logmessage(sql) +## conn.execute(sql) +## except Exception: +## pass +## sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.id LIMIT 1)' +## self.__logmessage(sql) +## conn.execute(sql) +## +## id_field_name = 'userId' +## self.__drop_fields.append(id_field_name) +## sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' +## result = conn.execute(sql) +## for row in result: +## tables_to_process.append(row[0]) +## +## # Here we loop over the tables that needs to be changed. We open a new DB connection for every table update. +## # This will hopefully reduce the undo log and commit earlier the changes +## for table_name in tables_to_process: +## with self.db.connect() as conn, conn.begin(): +## self.__init_mysql_connection(conn) +## try: +## sql = f'ALTER TABLE {table_name} DROP COLUMN rug_id' +## self.__logmessage(sql) +## conn.execute(sql) +## except Exception: +## pass +## +## try: +## sql = f'ALTER TABLE {table_name} DROP INDEX rug_id' +## #self.__logmessage(sql) +## conn.execute(sql) +## except Exception: +## pass +## +## try: +## sql = f'ALTER TABLE {table_name} ADD COLUMN rug_id VARCHAR(50) NULL DEFAULT NULL' +## self.__logmessage(sql) +## conn.execute(sql) +## except Exception: +## pass +## +## sql = f'FLUSH TABLES' +## #self.__logmessage(sql) +## conn.execute(sql) +## +## batch_size = 100000 +## amount_of_records = round(self.__record_count(table_name)/batch_size)+1 +## +## for i in range(amount_of_records): +## with self.db.connect() as conn, conn.begin(): +## self.__init_mysql_connection(conn) +## +## sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL LIMIT {batch_size}' +## self.__logmessage(f'({i+1}/{amount_of_records}) {sql}') +## try: +## result = conn.execute(sql) +## except Exception as ex: +## result = conn.execute(sql) +## +## with self.db.connect() as conn, conn.begin(): +## self.__init_mysql_connection(conn) +## sql = f'DROP TABLE {new_user_table}' +## self.__logmessage(sql) +## conn.execute(sql) +## +## with self.db.connect() as conn, conn.begin(): +## self.__init_mysql_connection(conn) +## id_field_name = 'rug_id' +## sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name' +## result = conn.execute(sql) +## for row in result: +## table_name = row[0] +## sql = f'DELETE FROM {table_name} WHERE {id_field_name} IS NULL' +## self.__logmessage(sql) +## conn.execute(sql) +## +## # Special case. These are the original Dacadoo ids. Only in the user table they should be deleted. +## with self.db.connect() as conn, conn.begin(): +## self.__init_mysql_connection(conn) +## drop_field = 'id' +## table_name = 'dacadoo_user' +## self.__logmessage(f'Dropping field {drop_field} from table {table_name}') +## sql = f'ALTER TABLE {table_name} DROP {drop_field}' +## result = conn.execute(sql) +## self.__logmessage(f'Dropped field {drop_field} from table {table_name}') +## def drop_fields(self, drop = True): @@ -776,7 +788,7 @@ class dHealthImport(): for row in result: table_name = row[0] self.__logmessage(f'Updateing postal code field {postal_code_field} for table {table_name}') - sql = f'UPDATE {table_name} SET {postal_code_field} = SUBSTRING({postal_code_field},0,4)' + sql = f'UPDATE {table_name} SET {postal_code_field} = SUBSTRING({postal_code_field},1,4)' result = conn.execute(sql) self.__logmessage(f'Updated postal code field {postal_code_field} for table {table_name}') @@ -816,36 +828,46 @@ class dHealthImport(): print('') def addDacadooData(self,location): - location = Path(location) - if location.exists() and location.is_dir(): - self.__source_folders['dacadoo'] = location - self.__load_files() - else: - raise RuntimeError(f'Location {location} is not a valid Dacadoo source location') + for location in location.split(','): + location = Path(location) + if location.exists() and location.is_dir(): + self.__source_folders['dacadoo'].append(location) + else: + print(f'Location {location} is not a valid Dacadoo source location') + + self.__load_files() def addMenzisData(self,location): - location = Path(location) - if location.exists() and location.is_dir(): - self.__source_folders['menzis'] = location - self.__load_files() - else: - raise RuntimeError(f'Location {location} is not a valid Menzis source location') + for location in location.split(','): + location = Path(location) + if location.exists() and location.is_dir(): + self.__source_folders['menzis'].append(location) + else: + print(f'Location {location} is not a valid Menzis source location') + + self.__load_files() def addNetivityData(self,location): - location = Path(location) - if location.exists() and location.is_dir(): - self.__source_folders['netivity'] = location - self.__load_files() - else: - raise RuntimeError(f'Location {location} is not a valid Netivity source location') + for location in location.split(','): + location = Path(location) + if location.exists() and location.is_dir(): + self.__source_folders['netivity'].append(location) + else: + print(f'Location {location} is not a valid Netivity source location') + + self.__load_files() + def addVektisData(self,location): - location = Path(location) - if location.exists() and location.is_dir(): - self.__source_folders['vektis'] = location - self.__load_files() - else: - raise RuntimeError(f'Location {location} is not a valid Vektis source location') + for location in location.split(','): + location = Path(location) + if location.exists() and location.is_dir(): + self.__source_folders['vektis'].append(location) + else: + print(f'Location {location} is not a valid Vektis source location') + + self.__load_files() + if __name__ == "__main__": @@ -889,11 +911,11 @@ if __name__ == "__main__": except KeyError as ex: print('Not exporting CSV data') - #importer.run(True) - #importer.filter_on_consent() - # importer.create_new_rug_ids() - #importer.clean_birth_days() - #importer.clean_postal_codes() - #importer.drop_fields() + importer.run(reinit_db=True) + importer.filter_on_consent() + #importer.create_new_rug_ids() + importer.clean_birth_days() + importer.clean_postal_codes() + importer.drop_fields() importer.create_csv_exports() importer.create_csv_exports(False,False)