# NOTICE: Multiprocessing does not work on Windows :(. So it is not used
self.__number_of_cpu=multiprocessing.cpu_count()
# TODO: Make the import and cleaning run in a newly created temporary database. Afterward, drop the old existing, and rename the temporary to the final datbase name.
sql=f'SELECT COUNT(*) as AmountRecords FROM {table}'
#self.__logmessage(sql)
result=conn.execute(sql)
forrowinresult:
returnrow[0]
returnNone
def__get_all_tables(self):
data=[]
withself.db.connect()asconn,conn.begin():
sql='SHOW TABLES'
#self.__logmessage(sql)
result=conn.execute(sql)
forrowinresult:
data.append(row[0])
returndata
def__optimize_mysql_tables(self):
withself.db.connect()asconn,conn.begin():
self.__init_mysql_connection(conn)
# Adding indexes to the tables for faster processing
forindex_fieldindHealthImport.INDEX_FIELDS:
sql=f'SELECT a.TABLE_NAME, (SELECT COUNT(*) FROM information_schema.statistics AS b WHERE b.TABLE_NAME = a.TABLE_NAME AND b.index_name = a.COLUMN_NAME AND b.TABLE_SCHEMA = a.TABLE_SCHEMA) AS Amount FROM information_schema.COLUMNS AS a WHERE a.COLUMN_NAME = \'{index_field}\' and a.TABLE_SCHEMA = \'{self.__temp_db_name}\''
result=conn.execute(sql)
forrowinresult:
# Index is alreay added. So skip
ifrow[1]>0:
continue
table_name=row[0]
try:
sql=f'ALTER TABLE {table_name} ADD INDEX ({index_field})'
self.__logmessage(sql)
conn.execute(sql)
exceptExceptionasex:
pass
# Optimize tables so all data is indexed and ready
self.__logmessage('Processing at {:.3f}%{}/{}, {} records from JSON data file \'{}\'.'.format((filesize_done/file.stat().st_size)*100,self.__byte_size(filesize_done),self.__byte_size(file.stat().st_size),records_done,file))
# Remaining records
records_done+=len(json_data)
panda_data=pd.DataFrame(json_data)
del(json_data)
withself.db.connect()asconn,conn.begin():
self.__init_mysql_connection(conn)
# Some fields are not detected properly. So the dict dtype holds field names which are not properly detected and specify the needed types
self.__logmessage('Processing at {:.3f}%{}/{}, {} records from JSON data file \'{}\'.'.format((filesize_done/file.stat().st_size)*100,self.__byte_size(filesize_done),self.__byte_size(file.stat().st_size),records_done,file))
self.__logmessage(f'Processing file {file} done in {timedelta(seconds=(time()-start))}')
deffilter_on_consent(self):
deffix_vektis_insurance_number(self):
# Apperently it is difficult to combine to CSV files at the source. So we have to fix it here again....
# And they are not able to upload easy to understand file names... bunch of rubbish..
# Find the 'source' table with the insurance and accountid numbers
withself.db.connect()asconn,conn.begin():
self.__init_mysql_connection(conn)
sql=f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'VERZEKERDENUMMER\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
# Find all the tables that holds the field ZCL_REL_NR
convert_field='ZCL_REL_NR'
sql=f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{convert_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
result=conn.execute(sql)
forrowinresult:
table_name=row[0]
try:
sql=f'SELECT AccountID, VERZEKERDENUMMER FROM {source_table_name} ORDER BY AccountID'
account_ids=conn.execute(sql)
total_records=account_ids.rowcount
self.__logmessage(f'Updating {total_records} accounts in the table {table_name}')
counter=0
foraccountinaccount_ids:
counter+=1
sql=f'UPDATE {table_name} SET {convert_field} = {account[0]} WHERE {convert_field} = {account[1]}'
updates=conn.execute(sql)
self.__logmessage(f'[{counter} off {total_records}({(counter/total_records)*100:.2f}%)]: Updated {updates.rowcount} records for account id {account[0]} in table {table_name}')
self.__logmessage(f'Renaming field {convert_field} to AccountId')
# Here we clean up the records that should not be here. They have not given a consent
# First we start with the Dacadoo data.
withself.db.connect()asconn,conn.begin():
self.__init_mysql_connection(conn)
try:
# Delete all dacadoo accounts that should not be here.
sql='DELETE FROM dacadoo_user WHERE id NOT IN (SELECT id FROM dacadoo_acceptedUser)'
self.__logmessage(sql)
conn.execute(sql)
# Drop the table, as we do not need it anymore
sql='DROP TABLE dacadoo_acceptedUser'
self.__logmessage(sql)
conn.execute(sql)
exceptExceptionasex:
print(ex)
# Now we clean all the other tables that contain the Dacadoo userId field (reference field)
# And delete all records where the userId is not in the account table.
id_field_name='userId'
self.__drop_fields.append(id_field_name)
sql=f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result=conn.execute(sql)
forrowinresult:
table_name=row[0]
sql=f'DELETE FROM {table_name} WHERE {id_field_name} NOT IN (SELECT id FROM dacadoo_user)'
self.__logmessage(sql)
conn.execute(sql)
# Convert Vektis insurance number to Netvitiy AccoundID, so we can clean if later on
fix_vektis_insurance_number(self)
# Now we clean the Netivity data
withself.db.connect()asconn,conn.begin():
self.__init_mysql_connection(conn)
# Delete all Netivity accounts that should not be here.
# But this is based on either legacy or normal account id
try:
sql='DELETE FROM netivity_account WHERE AccountId NOT IN (SELECT ToestemmingRUG FROM netivity_rugtoestemmingids) AND LegacyAccountId NOT IN (SELECT ToestemmingRUG FROM netivity_rugtoestemmingids)'
self.__logmessage(sql)
conn.execute(sql)
sql='DELETE FROM netivity_legacyaccountoud WHERE AccountId NOT IN (SELECT ToestemmingRUG FROM netivity_rugtoestemmingids)'
self.__logmessage(sql)
conn.execute(sql)
# Drop the table, as we do not need it anymore
sql='DROP TABLE netivity_rugtoestemmingids'
self.__logmessage(sql)
conn.execute(sql)
exceptExceptionasex:
print(ex)
# Now we clean all the other tables that contain the Netivity userId field (reference field)
# And delete all records where the userId is not in the account table.
id_field_name='AccountId'
sql=f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
sql=f'INSERT INTO {new_user_table}(menzis_id,netivity_legacy_id,dacadoo_id,email_address) SELECT AccountId, LegacyAccountId, DacadooId, EmailAddress FROM netivity_account'
self.__logmessage('Get the legacy account ids based on Dacadoo Ids ...')
sql=f'UPDATE {new_user_table} SET netivity_legacy_id = (SELECT AccountId FROM netivity_legacyaccountoud WHERE netivity_legacyaccountoud.DacadooId = {new_user_table}.dacadoo_id LIMIT 1) WHERE netivity_legacy_id = \'\''
# Collect old IDS from legacy tables which holds 2 fields: AccountId, DacadooId. But now we only want NEW records in table rug_userid_conversion. So only request records which are not al ready seen.
self.__logmessage('Inserting Netivity LEGACY account data to new rug ids ...')
sql=f'INSERT INTO {new_user_table}(netivity_legacy_id,dacadoo_id) SELECT AccountId, DacadooId FROM netivity_legacyaccountoud WHERE AccountId NOT IN (SELECT netivity_legacy_id FROM {new_user_table}) AND DacadooId NOT IN (SELECT dacadoo_id FROM {new_user_table})'
# Now we are looking for all tables that holds the old IDS. Based on table name (legacy or not) we choose the field name to match
id_field_name='AccountId'
self.__drop_fields.append(id_field_name)
sql=f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
result=conn.execute(sql)
table_counter=0
table_total=result.rowcount
self.__logmessage(f'We found {table_total} tables that needs to be re-numbered for the field {id_field_name}')
self.__logmessage(f'[{counter} off {total_accounts}({(counter/total_accounts)*100:.2f}%)]: Updated {updates.rowcount} records for account id {source_value} in table {table_name}')
# self.__logmessage(f'Updated ')
# sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.{source_id} = {table_name}.{id_field_name} LIMIT 1)'
# self.__logmessage(sql)
# conn.execute(sql)
# with self.db.connect() as conn, conn.begin():
# self.__init_mysql_connection(conn)
# # Get all the Menzis tables which holds ids in field name ORIGINEEL_RELATIE_ID
# id_field_name = 'ORIGINEEL_RELATIE_ID'
# self.__drop_fields.append(id_field_name)
# sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
# result = conn.execute(sql)
# for row in result:
# table_name = row[0]
# # sql = f'INSERT INTO {new_user_table}(netivity_legacy_id) SELECT DISTINCT {id_field_name} FROM {table_name} WHERE {id_field_name} NOT IN (SELECT menzis_id FROM {new_user_table}) AND {id_field_name} NOT IN (SELECT netivity_legacy_id FROM {new_user_table})'
# sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.menzis_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL'
# self.__logmessage(sql)
# conn.execute(sql)
# sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.netivity_legacy_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL'
## sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.id LIMIT 1)'
## self.__logmessage(sql)
## conn.execute(sql)
##
## id_field_name = 'userId'
## self.__drop_fields.append(id_field_name)
## sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
## result = conn.execute(sql)
## for row in result:
## tables_to_process.append(row[0])
##
## # Here we loop over the tables that needs to be changed. We open a new DB connection for every table update.
## # This will hopefully reduce the undo log and commit earlier the changes
## for table_name in tables_to_process:
## with self.db.connect() as conn, conn.begin():
## self.__init_mysql_connection(conn)
## try:
## sql = f'ALTER TABLE {table_name} DROP COLUMN rug_id'
## self.__logmessage(sql)
## conn.execute(sql)
## except Exception:
## pass
##
## try:
## sql = f'ALTER TABLE {table_name} DROP INDEX rug_id'
## sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL LIMIT {batch_size}'
## sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
## result = conn.execute(sql)
## for row in result:
## table_name = row[0]
## sql = f'DELETE FROM {table_name} WHERE {id_field_name} IS NULL'
## self.__logmessage(sql)
## conn.execute(sql)
##
## # Special case. These are the original Dacadoo ids. Only in the user table they should be deleted.
## with self.db.connect() as conn, conn.begin():
## self.__init_mysql_connection(conn)
## drop_field = 'id'
## table_name = 'dacadoo_user'
## self.__logmessage(f'Dropping field {drop_field} from table {table_name}')
## sql = f'ALTER TABLE {table_name} DROP {drop_field}'
## result = conn.execute(sql)
## self.__logmessage(f'Dropped field {drop_field} from table {table_name}')
# Drop all the fields that contain data that is sensitive
# For now, we keep all the different userid fields. As we do not re-create new rug-ids
# for drop_field in self.__drop_fields:
fordrop_fieldindHealthImport.DROP_FIELDS:
sql=f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{drop_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result=conn.execute(sql)
forrowinresult:
table_name=row[0]
self.__logmessage(f'Dropping field {drop_field} from table {table_name}')
sql=f'ALTER TABLE {table_name} DROP {drop_field}'
ifdrop:
result=conn.execute(sql)
self.__logmessage(f'Dropped field {drop_field} from table {table_name}')
defclean_birth_days(self):
withself.db.connect()asconn,conn.begin():
self.__init_mysql_connection(conn)
forbirthday_fieldindHealthImport.BIRTHDAY_FIELDS:
sql=f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{birthday_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result=conn.execute(sql)
forrowinresult:
table_name=row[0]
self.__logmessage(f'Updateing birthday field {birthday_field} for table {table_name}')
sql=f'UPDATE {table_name} SET {birthday_field} = DATE_FORMAT({birthday_field},\'%%Y-01-01\')'
result=conn.execute(sql)
self.__logmessage(f'Updated birthday field {birthday_field} for table {table_name}')
sql=f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{postal_code_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result=conn.execute(sql)
forrowinresult:
table_name=row[0]
self.__logmessage(f'Updateing postal code field {postal_code_field} for table {table_name}')