Source code for w4h_db_utils

import datetime
from loguru import logger
import pandas as pd
from sqlalchemy import create_engine, text, MetaData, Table, Column, String, ForeignKey, DateTime, REAL
from sqlalchemy.orm import sessionmaker
from sqlalchemy_utils import database_exists, create_database


from geoalchemy2 import Geometry

from utils import load_config, get_db_engine


[docs]def create_tables(db_name: str, config_file='config.yaml'): """Create the W4H tables in the database with the given name based on the config file Args: db_name (str): Name of the database to create the tables in config_file (str, optional): Path to the config file. Defaults to 'config.yaml'. """ metadata = MetaData() config = load_config(config_file=config_file) db_engine = get_db_engine(config_file, db_name=db_name) columns_config = config["mapping"]["columns"] # Create the user table user_table_config = config["mapping"]["tables"]["user_table"] user_columns = [eval(f'Column("{col_name}", {col_dtype}, primary_key={col_name == columns_config["user_id"]})') for col_name, col_dtype in user_table_config["columns"].items()] # Convert string to actual SQLAlchemy type user_table = Table(user_table_config["name"], metadata, *user_columns) # Create time series tables for table_name in config["mapping"]["tables"]["time_series"]: table = Table(table_name, metadata, Column(columns_config["user_id"], ForeignKey(user_table_config["name"] + '.' + columns_config["user_id"]), primary_key=True), Column(columns_config["timestamp"], DateTime, primary_key=True), Column(columns_config["value"], REAL), ) # Create geo tables for table_name in config["mapping"]["tables"]["geo"]: table = Table(table_name, metadata, Column(columns_config["user_id"], ForeignKey(user_table_config["name"] + '.' + columns_config["user_id"]), primary_key=True), Column(columns_config["timestamp"], DateTime, primary_key=True), Column(columns_config["value"], Geometry('POINT')) ) metadata.create_all(db_engine) db_engine.dispose()
[docs]def create_w4h_instance(db_name: str, config_file='config.yaml'): """Create a new W4H database instance with the given name and initialize the tables based on the config file Args: db_name (str): Name of the database to create config_file (str, optional): Path to the config file. Defaults to 'config.yaml'. """ db_engine_tmp = get_db_engine(config_file) logger.info('Database engine created!') # Execute the SQL command to create the database if it doesn't exist if not database_exists(f'{db_engine_tmp.url}{db_name}'): create_database(f'{db_engine_tmp.url}{db_name}') logger.success(f"Database {db_name} created!") db_engine_tmp.dispose() else: logger.error(f"Database {db_name} already exists!") db_engine_tmp.dispose() return db_engine = get_db_engine(config_file, db_name=db_name) # Enable PostGIS extension with db_engine.connect() as connection: connection.execute(text(f"CREATE EXTENSION postgis;")) logger.success(f"PostGIS extension enabled for {db_name}!") connection.commit() db_engine.dispose() # Create the W4H tables create_tables(config_file=config_file, db_name=db_name) logger.success(f"W4H tables initialized!")
[docs]def get_existing_databases(config_file='config.yaml') -> list: """Get a list of all existing databases Args: config_file (str, optional): Path to the config file. Defaults to 'config.yaml'. Returns: list: List of all existing databases (strings) """ config = load_config(config_file=config_file) db_engine = get_db_engine(config_file) with db_engine.connect() as connection: result = connection.execute(text("SELECT datname FROM pg_database WHERE datistemplate = false;")) databases = [row[0] for row in result] db_engine.dispose() return databases
[docs]def populate_tables(df: pd.DataFrame, db_name: str, mappings: dict, config_path='config.yaml'): """Populate the W4H tables in the given database with the data from the given dataframe based on the mappings between the CSV columns and the database tables. Args: df (pd.DataFrame): Dataframe containing the data to be inserted into the database db_name (str): Name of the database to insert the data into mappings (dict): Dictionary containing the mappings between the CSV columns and the database tables config_path (str, optional): Path to the config file. Defaults to 'config.yaml'. """ # Load the config config = load_config(config_path) # Extract default column names from the config default_user_id = config['mapping']['columns']['user_id'] default_timestamp = config['mapping']['columns']['timestamp'] default_value = config['mapping']['columns']['value'] user_table_name = config['mapping']['tables']['user_table']['name'] # Create a session engine = get_db_engine(config_path, db_name=db_name) Session = sessionmaker(bind=engine) session = Session() # Ensure all unique users from the dataframe exist in the user table unique_users = df[mappings[default_user_id]].unique().astype(str) existing_users = session.query(Table(user_table_name, MetaData(bind=engine), autoload=True).c[default_user_id]).all() existing_users = [x[0] for x in existing_users] # Identify users that are not yet in the database new_users = set(unique_users) - set(existing_users) if new_users: # Convert the set of new users into a DataFrame all_new_users = pd.DataFrame({default_user_id: list(new_users)}) # Use to_sql to insert all new users into the user table all_new_users.to_sql(user_table_name, engine, if_exists='append', index=False) # Get the subset of mappings that doesn't include default_user_id and default_timestamp table_mappings = {k: v for k, v in mappings.items() if k not in [default_user_id, default_timestamp]} # Loop through each table in table_mappings for table_name, csv_column in table_mappings.items(): # Check if the mapping is not NULL and exists in the df if csv_column and csv_column in df.columns: # Ensure that the dataframe columns match the user_id, timestamp, and value from your CSV columns_to_insert = [mappings[default_user_id], mappings[default_timestamp], csv_column] subset_df = df[columns_to_insert].copy() # Rename columns to match the table's column names using the defaults from config subset_df.columns = [default_user_id, default_timestamp, default_value] # dropping duplicate user_id and timestamp subset_df.drop_duplicates(subset=[default_user_id, default_timestamp], inplace=True) # subset_df = subset_df.groupby([default_user_id, default_timestamp]).mean().reset_index() # handling geometry data if table_name in config["mapping"]["tables"]["geo"]: subset_df[default_value] = subset_df[default_value].apply(lambda x: f'POINT{x}'.replace(',', '')) # Insert data into the table subset_df.to_sql(table_name, engine, if_exists='append', index=False) # Commit the remaining changes and close the session session.commit() session.close() engine.dispose()
[docs]def populate_subject_table(df: pd.DataFrame, db_name: str, config_path='config.yaml', user_tbl_name=None): """Populate the W4H subject table in the given database with the data from the given dataframe based on the given subject table name in the config file. Args: df (pd.DataFrame): Dataframe containing the subject data to be inserted into the database db_name (str): Name of the subject database to insert the data into config_path (str, optional): Path to the config file. Defaults to 'config.yaml'. """ # Load the config config = load_config(config_path) # Create a session engine = get_db_engine(config_path, db_name=db_name) # populate the user table (directly push df to table), if already exists, append new users df.to_sql(user_tbl_name, engine, if_exists='replace', index=False) # Commit the remaining changes and close the session engine.dispose()