Source code for import_hub_main

import streamlit as st
import pandas as pd
import difflib
import re
from fuzzywuzzy import process

from utils import load_config

from w4h_db_utils import create_w4h_instance, get_existing_databases, populate_tables, populate_subject_table


CONFIG_FILE = 'config.yaml'

[docs]def preprocess_string(s: str) -> str: """Preprocess the string by converting to lowercase, replacing underscores with spaces, tokenizing, and then reconstructing without special characters. Args: s (str): String to preprocess Returns: str: Preprocessed string """ # Convert to lowercase and replace underscores with spaces s = s.lower().replace('_', ' ') # Tokenize the string and remove special characters tokens = re.findall(r'\b\w+\b', s) # Reconstruct the string with spaces return ' '.join(tokens)
[docs]def find_closest_name(col_names: list, targets: str) -> str: """Find the closest column name based on substrings. Args: col_names (list): List of column names to search through targets (str): String containing the target column names Returns: str: Closest column name """ score_threshold = 60 def_choice = col_names[0] # default # Preprocess column names preprocessed_to_original = {preprocess_string(col): col for col in col_names} preprocessed_col_names = list(preprocessed_to_original.keys()) # Extract best match using fuzzywuzzy match, score = process.extractOne(preprocess_string(targets), preprocessed_col_names) return preprocessed_to_original[match] if score > score_threshold else def_choice
[docs]def populate_db(df: pd.DataFrame, db_name: str, mappings: dict, config_path: str): """Populate the database with the given dataframe. Args: df (pd.DataFrame): Dataframe containing the data to be inserted into the database db_name (str): Name of the database to insert the data into mappings (dict): Dictionary containing the mappings between the CSV columns and the database tables config_path (str, optional): Path to the config file. Defaults to 'config.yaml'. """ st.write("Populating database...") st.write(mappings) populate_tables(df, db_name, mappings, config_path)
[docs]def main(): """Main function for the streamlit app""" # Load the config config = load_config(config_file=CONFIG_FILE) config_path = CONFIG_FILE st.title("W4H Import Hub") selected_db = None # Choose between existing or new database db_selection_options = ["Choose existing W4H database instance", "Create new W4H database instance"] database_option = st.radio( "Select an option", db_selection_options ) # Handling the chosen option if database_option == db_selection_options[0]: # `get_existing_databases()` retrieves the list of existing databases. existing_databases = get_existing_databases(config_path) # This function needs to be implemented. selected_db = st.selectbox("**Select an existing database**", existing_databases) elif database_option == db_selection_options[1]: new_db_name = st.text_input("Enter new w4h database instance name") if st.button("Create"): # Here, implement logic to create the new database with the name new_db_name. create_w4h_instance(new_db_name, config_path) # This function needs to be implemented. st.success(f"Database '{new_db_name}' created!") selected_db = new_db_name uploaded_file = st.file_uploader("Choose a CSV file", type="csv") # option to populate subject table or feature time series tables is_subjects_populated = st.checkbox("Populate subject table?") if uploaded_file and is_subjects_populated: # set subject table name subject_tbl_name = st.text_input("Enter subject table name", value="subjects") st.success("File uploaded!") df = pd.read_csv(uploaded_file) st.write("Columns in your CSV:") st.write(df.columns) if st.button("Populate Database"): populate_subject_table(df, selected_db, config_path, user_tbl_name=subject_tbl_name) st.success("Database populated!") if uploaded_file and not is_subjects_populated: st.success("File uploaded!") df = pd.read_csv(uploaded_file) st.write("Columns in your CSV:") st.write(df.columns) # Map columns st.subheader("Mapping") # Default selections based on column name similarity default_timestamp = find_closest_name(df.columns, 'time timestamp date start_time end_time') default_user_id = find_closest_name(df.columns, 'user id email') timestamp_col = st.selectbox("**Select Timestamp Column**", df.columns, index=df.columns.get_loc(default_timestamp)) user_id_col = st.selectbox("**Select User ID Column**", df.columns, index=df.columns.get_loc(default_user_id)) # Foldable block for optional mappings mappings = { config['mapping']['columns']['timestamp']: timestamp_col, config["mapping"]['columns']['user_id']: user_id_col, } table_mappings = {} with st.expander("**Map Features to W4H Tables**", expanded=True): st.write("Map your CSV columns to corresponding W4H tables.") choices = ["None"] + list(df.columns) for target_table_name in config['mapping']['tables']['time_series'] + config['mapping']['tables']['geo']: target_table_label = ' '.join([label.capitalize() for label in target_table_name.replace('_', ' ').split()]) st.subheader(target_table_label) def_choice = find_closest_name(choices, target_table_label) mapped_col = st.selectbox("Select Corresponding Column", choices, key=target_table_name, index=choices.index(def_choice)) table_mappings[target_table_name] = mapped_col if mapped_col != "None" else None # Once mappings are set, allow the user to populate the database if st.button("Populate Database"): mappings = {**mappings, **table_mappings} populate_db(df, selected_db, mappings, config_path) st.success("Database populated!")
if __name__ == "__main__": main()