Data Masking with Azure Databricks

Alice: Would you tell me, please, which way I ought to go from here?
The Cheshire Cat: That depends a good deal on where you want to get to.”
— Lewis Carroll, Alice’s Adventures in Wonderland

(2025-Jan-13) Working on the data masking project has been a long journey; writing about it will take some time. I’m fine if you’re not a big fan of long reads and prefer scrolling, skipping, or engaging in shorter, bite-sized learning experiences. I will keep this writing for myself as a chronicle of my journey along a path that may seem overwhelming. However, the experience throughout this journey and the joy of reaching the final destination will be hard to forget.

When I think of data masking, or the term "masking" in particular — the intention to “hide” something from someone comes to mind. I picture a man in a mask, presumably to conceal his identity, or an image blurred beyond recognition. In the world of computer data, masking represents the process of obfuscating, redacting, altering, hiding, encrypting, anonymizing, or substituting sensitive information that requires protection.

Image by Atner Yegorov from Pixabay

One way to protect sensitive information from end users in a database is through dynamic masking. In this process, the actual data is not altered; however, when the data is exposed or queried, the results are returned with modified values, or the actual values are replaced with special characters or notes indicating that the requested data is hidden for protection purposes.

In this blog, we will discuss a different approach to protecting data, where personally identifiable information (PII – a term you will frequently encounter when reading about data protection and data governance) is actually changed or updated in the database / persistent storage. This ensures that even if someone gains access to the data, nothing will be compromised. This is usually needed for refreshing the production database or dataset containing PII data elements to a lower environment. Your QA team will appreciate having a realistic data volume that resembles production environment but with masked data.

Data Masking Workflow

Ok, let’s put on a developer’s hat. You’ve been assigned to create a solution to mask tables in a relational database. You know which tables are involved, and someone has informed you that not all columns in those tables need to be masked. You’ve also heard about different methods to mask or alter the data.

Your first instinct as a developer might be to run a few UPDATE SQL statements to permanently change and protect the PII data. It all sounds good, but it lacks a controlled masking workflow, and relying on manual efforts as a routine practice is not ideal. This approach can lead to inconsistencies and fails to ensure the reliable protection of sensitive data across all environments.

Not a problem. Let’s move forward with the understanding that we can automate those UPDATE statements and incorporate them into several scripts or a more sophisticated data transformation solution. You’ve been well-trained and can choose the tools that suit you best.  

The process remains the same: references to tables, their columns, and different masking methods are embedded in your crafted solution. The number of scripts or data flow pipelines corresponds to the number of tables that need protection. I’m hysterically filled with joy at the sight of explicit table column names hardcoded in my solution.

I’m being sarcastic because I’ve seen this approach many times in other people’s work, and I’m guilty of it myself doing similar things in the early days of my IT career.  

Why this approach is bad: any future schema changes will require additional effort to update the data flow pipelines. If I need to replicate the same generic data update mechanism for multiple tables, I’ll have to apply it to every single table. Now imagine being deep into development and discovering a mistake that affects all your database table routines. At that point, your maintenance efforts can quickly spiral into a nightmare, and screaming for help might not offer much relief.

What do we do? We need to build a solution that allows for configuring tables, table columns, masking functions or methods, and assigning those masking functions to specific table columns. This data masking solution should also be scalable to support many tables – whether it's dozens, or even hundreds. Your database and business requirements for data protection and governance will define the limits.

An additional but very important requirement is to make this data masking solution as generic as possible: avoid hardcoded references to any database objects within the solution. The external data masking scope should define all efforts for data protection. If the underlying database schema changes, no alterations are made to the masking solution; only the external configuration is updated accordingly.

To keep it short – this data masking solution was built using the Azure Databricks platform to mask 290 Oracle database tables, with more than 1,200 specific columns assigned to 10 different masking methods. These methods ranged from simple data substitution and lookup dictionary redactions to regulated randomization.

Why was Databricks chosen? Simple, it’s a scalable cloud data platform that can handle data flow solutions for a single table or scale up to many more as needed. We only pay for the actual workload, with no additional licensing fees for third-party solutions. Plus, Databricks supports multiple programming languages: Python, for example, and when it comes to data masking, Python naturally comes to mind.

Following this order in the data masking process ensures data integrity and consistency in a relational database. Disabling database constraints (Step 1) prevents errors during masking when constraints like foreign keys or unique indexes might block updates. The masking process (Step 2) then modifies sensitive data as required. Finally, re-enabling constraints (Step 3) restores the database's structural safeguards, ensuring that all data adheres to the original relational rules after masking is complete.

Masking Scope Configuration

Let's have a look at the actual data masking scope configuration.

CREATE TABLE [cfg].[MaskingConfigDatabase] (
    [DatabaseID] INT IDENTITY(1,1) NOT NULL PRIMARY KEY CLUSTERED,
    [WorkspaceName] VARCHAR(50) NULL,
    [HostName] VARCHAR(50) NULL,
    [Port] INT NULL,
    [DatabaseName] VARCHAR(50) NULL
) ON [PRIMARY];
GO

A single entry in the configuration table represents the Oracle database configuration for masking. The table includes critical information such as WorkspaceName, HostName, Port, and DatabaseName. 

CREATE TABLE cfg.MaskingConfigTable (
    ID INT IDENTITY(1,1) NOT NULL PRIMARY KEY CLUSTERED,
    DatabaseName VARCHAR(50) NULL,
    SchemaName VARCHAR(50) NULL,
    TableName VARCHAR(50) NULL,
    PrimaryKeyColumn VARCHAR(50) NULL,
    ColumnName VARCHAR(50) NULL,
    MaskingRule VARCHAR(100) NULL,
    MaskingType VARCHAR(50) NULL
);
GO

The masking process for a target database is configured through the table [cfg].[MaskingConfigTable], which defines the specific tables and columns targeted for masking. Each entry outlines key attributes such as SchemaName, TableName, and PrimaryKeyColumn, ensuring that data masking operations are precisely scoped at the table and column level. The ColumnName field identifies sensitive data elements that require masking, while the MaskingRule specifies the approach used, such as data_redaction_simple for straightforward text redaction or data_lookup_substitution for city and postal code substitution. The MaskingType further categorizes the nature of masking, including text, email, city, and postal code, aligning with the sensitivity of the data.

Here is the list of currently scoped and custom-developed masking methods using Python notebooks in Databricks. Please note that this list may differ from what is needed for your particular case. You are free to develop your own masking functions as needed.


Cloning Data for Masking

Working with real data involves the feeling, “Oh, what if I make a mistake and do something wrong with the actual data? What should I do to correct it, and how can I prevent it from happening again?”

This natural concern leads us to clone and make copies of the tables required for masking in a separate schema. We don’t clone the entire database, only the objects scoped for masking.

Additionally, to protect against data corruption during schema cloning within the database, a decision was made to create a copy of all the scoped tables in a separate, secure Azure Blob Storage as well.

Python Function for Table Cloning: Initial Stage

def dm_flow_table_backup(database_name, table_name, operations_timestamp, backup_types, df_target_table_data, df_target_table_metadata):
    try:
        run_logging_to_blob_history('Info', f'dm_flow_table_backup({database_name})', None, None, None, None, None, 'Started')

        target_table_name = table_name
        source_table_name = f"{target_table_name.split('.')[0]}_src.{target_table_name.split('.')[1]}"

        if 'database' in backup_types:
            update_masking_table_status(table_name, str(operations_timestamp), "CloneDBStatus", "Started")
            dm_flow_table_backup_database(target_table_name, source_table_name)           
            update_masking_table_status(table_name, str(operations_timestamp), "CloneDBStatus", "Completed")

        if 'file_storage' in backup_types:
            update_masking_table_status(table_name, str(operations_timestamp), "CloneStorageStatus", "Started")
            dm_flow_table_backup_blob(database_name, target_table_name, operations_timestamp, df_target_table_data)            
            update_masking_table_status(table_name, str(operations_timestamp), "CloneStorageStatus", "Completed")

        run_logging_to_blob_history('Info', f'dm_flow_table_backup({database_name}, {backup_types})', None, None, None, None, None, 'Completed')

    except Exception as e:
        error_message = str(e)
        run_logging_to_blob_history('Error', f"Error occurred while cloning to either Azure Blob or Database: {error_message}", 'Dataset Cloning', None, None, None, None, 'Failed')
        run_logging_to_sql_history()
        raise

The dm_flow_table_backup function automates database table backups by cloning them within the database or exporting their data to Azure Blob Storage based on the specified backup types. It identifies the target table and derives the source table by modifying the schema name. If 'database' is in the backup types, the function clones the table from the source schema to the target schema. If 'file_storage' is specified, the table’s data is exported to Azure Blob Storage. The function processes each backup type independently, ensuring tables are duplicated or saved as required.

Python Function for Table Cloning: Database

def dm_flow_table_backup_database(table_name_to_clone, table_name_to_create):
    try:
        # Log the start of the operation
        run_logging_to_blob_history('Info', 
                                    f'dm_flow_table_backup_database({database_name}:{table_name_to_clone}): Database schema creation', 
                                    'Dataset Cloning', database_name, None, None, None, 'Started')

        # Split schema and table name
        schema_name, table_name = table_name_to_create.split('.')
        
        # Check if the table already exists
        table_exists_query = f"""
            SELECT COUNT(*) AS TABLE_COUNT
            FROM all_tables 
            WHERE owner = '{schema_name.upper()}' AND table_name = '{table_name.upper()}'
        """
        mss_user, mss_password, jdbcUrl, jdbc_driver_class = dm_oracle_connection()
        df = spark.read \
            .format("jdbc") \
            .option("url", jdbcUrl) \
            .option("dbtable", f"({table_exists_query})") \
            .option("user", mss_user) \
            .option("password", mss_password) \
            .option("driver", jdbc_driver_class) \
            .load()

        df_count = int(df.collect()[0]['TABLE_COUNT'])        
        
        if df_count > 0: # Table exists
            run_logging_to_blob_history('Info', 
                                        f"dm_flow_table_backup_database({database_name}:{table_name_to_clone}): Table {table_name_to_create} already exists, skipping creation", 
                                        'Dataset Cloning', database_name, None, None, None, 'Skipped')
            print(f"{datetime.now()} Table {table_name_to_create} already exists, skipping creation.")
            return  # Exit the function
        
        # Table does not exist; proceed with creation
        dm_execute_oracle_command(f"CREATE TABLE {table_name_to_create} AS SELECT * FROM {table_name_to_clone}")
        
        # Log successful completion
        run_logging_to_blob_history('Info', 
                                    f'dm_flow_table_backup_database({database_name}:{table_name_to_clone}): Database table creation', 
                                    'Dataset Cloning', database_name, None, None, None, 'Completed')
        print(f"{datetime.now()} Table '{table_name_to_create}' has been cloned in the database successfully.")
    
    except Exception as e:
        # Handle exceptions and log the error
        error_message = str(e)
        run_logging_to_blob_history('Error', 
                                    f"dm_flow_table_backup_database({database_name}:{table_name_to_clone}): Database table creation: {error_message}", 
                                    'Dataset Cloning', database_name, schema_name, None, None, 'Failed')
        run_logging_to_sql_history()
        raise

The dm_flow_table_backup_database function clones a database table by creating a new table with the same structure and data as the original. It first checks if the target table already exists by querying the database. If the table exists, the function skips the creation process. If the table does not exist, it executes a SQL command to create the table by copying data from the source table. The function handles schema and table name separation, ensuring the correct target location for the cloned table.

Python Function for Table Cloning: Azure Blob Storage

def dm_flow_table_backup_blob(database_name, table_name, operations_timestamp, df_target_table_data):
    try:
        # Constants
        SMALL_DATA_THRESHOLD = 1000
        LARGE_DATA_THRESHOLD = 10000        
        records_per_partition = 1000000
        
        df = df_target_table_data    
        output_blob_path = MOUNTPOINT + "/mss/backup/" + operations_timestamp + "/" + table_name        

        # Count the number of records in the DataFrame
        record_count = df.count()        
        print(f"{datetime.now()} Number of records in the DataFrame: {record_count}")
        
        # Define the number of records per partition        

        # Calculate the number of partitions, ensuring at least one partition
        num_partitions = max(math.ceil(record_count / records_per_partition), 1)
        print(f"{datetime.now()} Number of partitions in the DataFrame: {num_partitions}")
   
        # Adjust partitions
        if record_count < SMALL_DATA_THRESHOLD:
            df = df.coalesce(1)  # Small DataFrame
        else:
            df = df.repartition(num_partitions)

        # Adjust Spark configurations
        if record_count < LARGE_DATA_THRESHOLD:
            spark.conf.set("spark.sql.shuffle.partitions", str(max(1, num_partitions)))
        else:
            spark.conf.set("spark.sql.shuffle.partitions", "200")

        spark.conf.set("spark.sql.files.maxRecordsPerFile", "10000000")
        spark.conf.set("spark.sql.parquet.mergeSchema", "false")
        spark.conf.set("spark.sql.parquet.filterPushdown", "true")

        # Write data to output path
        df.write.format("parquet") \
            .option("batchsize", "10000") \
            .option("socketTimeout", "600000") \
            .option("connectTimeout", "60000") \
            .mode("overwrite").save(output_blob_path)
   
        
        print(f"{datetime.now()} Table '{table_name}' has been cloned in Azure Blob storage successfully:{output_blob_path}")
         
    except Exception as e:
        error_message = str(e)        
        raise

The dm_flow_table_backup_blob function exports a database table's data to Azure Blob Storage by writing it in Parquet format. It first calculates the number of records in the provided DataFrame and adjusts the number of partitions based on the data size. For small tables, the data is combined into a single partition, while larger tables are split across multiple partitions to optimize performance. The function dynamically configures Spark settings to control shuffle partitions, record limits per file, and schema merging. Finally, the data is written to a specified Blob Storage path, ensuring efficient and organized backup of table data.

Masking Process

As previously mentioned, the process includes different stages (such as (0) Database Constraints Metadata Collection, (1) Disable Database Constraints, (2) Data Masking Process, and (3) Enable Database Constraints). Let’s now review the individual steps, workflows, and jobs involved in the masking process.

Azure Databricks Job for Database constraints

Database constraint script collection

Dynamic collection of database constraint scripts facilitates preparation before the masking process. Appropriate dynamic SQL statements are provided to generate the following: (1) Disable Triggers, (2) Disable Table Foreign Keys, (3) Enable Table Primary Keys, and (4) Disable/Drop Table Indexes.

Dynamic collection of database constraint scripts also facilitates the re-enabling and creation of constraints after the masking process. Appropriate dynamic SQL statements are provided to generate the following: (1) Enable/Create Table Indexes, (2) Enable Table Primary Keys, (3) Enable Table Foreign Keys, and (4) Enable Triggers.

Depending on your database platform (Oracle, SQL Server, PostgreSQL, etc.), the individual DDL scripts required to generate the necessary statements will vary. The main idea is to dynamically create all the "ALTER" statements for the available database objects and then save them into a script file using the following code:

def collect_database_script_single(script_query, script_file_name, operations_timestamp):
  # connection details
  user = dbutils.secrets.get(scope="dm", key="mss-oracle-user")
  password = dbutils.secrets.get(scope="dm", key="mss-oracle-password")
  jdbcHostname = dbutils.secrets.get(scope="dm", key="mss-oracle-hostname")
  jdbcDatabase = dbutils.secrets.get(scope="dm", key="mss-oracle-database")
  jdbcPort = 1521

  # Step 1: Define Oracle JDBC connection properties
  jdbc_url = f"jdbc:oracle:thin:@{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"

  connection_properties = {
      "user": user,
      "password": password,
      "driver": "oracle.jdbc.OracleDriver"
  }

  # Step 2: Execute the query using JDBC and collect the result
  df = spark.read.jdbc(url=jdbc_url, table=f"({script_query})", properties=connection_properties)

  # Step 3: Collect the result into a list of strings
  sql_statements = df.collect() 

  # Step 4: Prepare the final SQL script as a string
  sql_script = "\n".join([row[0] for row in sql_statements])

  # Step 5: Define the file path in mounted storage
  output_file_path = f"{MOUNTPOINT}/mss/operations/{operations_timestamp}/scripts/{script_file_name}"

  # Step 6: Write the SQL script to the file in Azure Blob Storage
  dbutils.fs.put(output_file_path, sql_script, overwrite=True)

  print(f"SQL script saved to {output_file_path}")

This function, collect_database_script_single, connects to an Oracle database using JDBC to execute a SQL query (script_query) and retrieves the resulting SQL statements. It dynamically retrieves database connection details (user, password, hostname, and database name) from Databricks secrets. After executing the query, the results are collected into a list, formatted into a single SQL script string, and saved to an Azure Blob Storage location specified by script_file_name and operations_timestamp. 

We reviewed two types of database collection scripts. The initial approach was highly dynamic, allowing us to dive deep into each table's metadata and collect all immediate and direct elements (such as indexes, primary keys, and foreign keys, which were relatively straightforward). However, dealing with potentially related triggers presented challenges, and there was always a risk of missing something. Considering the large number of tables requiring masking, we ultimately decided to adopt a strategy of massively disabling all constraints initially and then re-enabling everything after the entire masking process is complete.

Disable database table constraints

To disable all database constraints, a specific workflow/job was created in Databricks

  • DisableTrigger: This step is responsible for disabling all triggers in the database.
  • DisableConstraint_FK: This job focuses on disabling all foreign key constraints.
  • DisableConstraint_UK: This job handles the disabling of unique constraints. However, not all unique constraints were disabled—only those used in primary key constraints.
  • DisableConstraint_PK: This job disables primary key constraints.
  • DropIndex: The final job in the sequence drops or disables indexes. Different database platforms offer varying levels of optimization for handling triggers. For example, disabling triggers in Oracle databases can be challenging due to the platform's archaic structure. Adding new SQL DDL capabilities in Oracle often faces resistance from its foundational design and the philosophy of its original creators.
Each job step processes the previously saved SQL DDL scripts, which are then passed to this function for execution in parallel chunks. This approach helps to reasonably speed up the process, prevent overloading the backend database, and avoid sequential execution of individual DDL statements, especially given the large number of statements involved.

def execute_oracle_sql_script_jaydebeapi(operations_timestamp, script_file_name, jdbc_url, username, password):
    # Step 1: Define file paths
    sql_file_path = f"{MOUNTPOINT}/mss/operations/{operations_timestamp}/scripts/{script_file_name}"
    log_file_name_part = os.path.splitext(script_file_name)[0]
    log_file_path = f"{MOUNTPOINT}/mss/operations/{operations_timestamp}/logs/sql_execution_log_{log_file_name_part}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
    
    # Step 2: Read the SQL script from file
    df = spark.read.text(sql_file_path)
    lines = df.collect()  # Collect the DataFrame rows (lines of the SQL script)

    # Step 3: Parse SQL statements by splitting them by semicolons
    sql_statements = []
    current_statement = []

    for row in lines:
        line = row['value'].strip()
        current_statement.append(line)
        if line.endswith(";"):  # End of an SQL statement
            full_statement = " ".join(current_statement)  # Combine parts into a full statement
            sql_statements.append(full_statement)  # Add it to the list
            current_statement = []  # Reset for next statement

    # Step 4: Split the SQL statements into chunks using numpy (for parallel execution)
    num_chunks = 8  # Number of parallel chunks
    sql_chunks = np.array_split(sql_statements, num_chunks)

    # Step 5: Prepare (chunk, chunk_id) tuples for parallel execution
    chunk_with_ids = [(chunk, idx) for idx, chunk in enumerate(sql_chunks)]

    # Define the function to execute a list of SQL statements and return logs from each worker
    def execute_sql_list_and_collect_logs(sql_list_with_id):
        sql_list, chunk_id = sql_list_with_id  # Unpack the tuple (list of SQLs, chunk ID)
        worker_logs = []  # Collect logs in a local list for each worker
        
        # Add an iteration ID to track the execution of each statement
        for idx, sql in enumerate(sql_list, start=1):  # enumerate adds the iteration ID starting from 1
            try:
                # Remove any semicolons from the SQL line
                sql_statement = sql.replace(';', '') 
                
                # Execute the SQL statement using the provided function
                sql_log_message = execute_oracle_ddl_statement_jaydebeapi(sql_statement, jdbc_url, username, password)
                
                # Add iteration ID and log message
                log_message = f"{datetime.now()} - Chunk {chunk_id}, Iteration {idx} - Executed: {sql_log_message}"
                worker_logs.append(log_message)  # Collect the log message
                
            except Exception as e:
                # Collect error message, including iteration ID
                log_message = f"{datetime.now()} - Chunk {chunk_id}, # {idx} - Error executing: {sql_statement[:50]} - {str(e)}"
                worker_logs.append(log_message)  # Collect the log message
        
        return worker_logs  # Return the collected logs from each worker


    # Step 6: Run the chunks in parallel using sc.parallelize()
    rdd = sc.parallelize(chunk_with_ids, numSlices=num_chunks)

    # Collect logs from all workers
    collected_logs = rdd.flatMap(execute_sql_list_and_collect_logs).collect()

    # Step 7: Write all collected logs to the log file
    if collected_logs:
        log_content = "\n".join(collected_logs)
        dbutils.fs.put(log_file_path, log_content, overwrite=True)
        print(f"Logs for {log_file_name_part} are written to {log_file_path}")
    else:
        print("No logs were collected.")

# Example call to the function (replace with actual values)
# execute_oracle_sql_script_jaydebeapi("202410062107", "DisableIndex_TEST.sql", jdbc_url, username, password)

The execute_oracle_sql_script_jaydebeapi function executes SQL statements from a file against an Oracle database using `jaydebeapi` Python Library. It reads the SQL script from Azure File Storage, parses it into individual statements, and splits them into chunks for parallel execution using Spark. Each chunk is processed with detailed logging of successes and errors, including timestamps and iteration details. The logs are aggregated and saved to Azure File Storage for auditing. 

The actual execute_oracle_ddl_statement_jaydebeapi function that executes individual SQL DDL statements is presented here:

def execute_oracle_ddl_statement_jaydebeapi(sql_statement, jdbc_url, username, password):
    try:
        # Connect without JPype using pure JDBC mode
        connection = jaydebeapi.connect(
            "oracle.jdbc.OracleDriver",    # JDBC driver class
            jdbc_url,                      # JDBC URL for Oracle
            [username, password],          # Credentials
            "/Workspace/Shared/MDM/Library/ojdbc8.jar"  # Path to the JDBC JAR
        )

        connection.jconn.setAutoCommit(False)  # Disable auto-commit
        cursor = connection.cursor()           # Create a cursor object
        cursor.execute(sql_statement)          # Execute the DDL statement
        connection.commit()                    # Commit the transaction
        return f"Success: DDL statement: {sql_statement[:100]}"

    except Exception as e:
												   
        if 'connection' in locals():
            connection.rollback()
        return f"Failed: DDL statement: {sql_statement[:100]}, Error: {str(e)}"

    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'connection' in locals():
            connection.close()

The execute_oracle_ddl_statement_jaydebeapi function executes a single Oracle DDL statement using the jaydebeapi library. It establishes a JDBC connection to the Oracle database with credentials and a specified JDBC driver, disables auto-commit, and executes the provided SQL statement. If successful, the transaction is committed, and a success message is returned. In case of an error, the transaction is rolled back, and an error message is returned, including the failed SQL snippet.

Enable database table constraints

To disable all database constraints, a specific workflow/job was created in Databricks:
The process of enabling constraints follows the reverse order of disabling them, ensuring the database is restored to its original state:

  • CreateIndex: The first step in the sequence recreates or enables all indexes. Indexes are reintroduced to optimize query performance and ensure efficient database operations.
  • EnableConstraint_PK: This job restores primary key constraints, re-establishing the unique identification of rows within each table.
  • EnableConstraint_UK: This step enables unique constraints that were previously disabled, ensuring the integrity of columns requiring unique values. Only those associated with primary key constraints are re-enabled.
  • EnableConstraint_FK: This job restores all foreign key constraints, re-establishing the relationships between tables and ensuring referential integrity.
  • EnableTrigger: The final step in the sequence re-enables all triggers, restoring any automated database behaviours that respond to specific events.

Similar Python functions (execute_oracle_sql_script_jaydebeapi & execute_oracle_ddl_statement_jaydebeapi) are used in each Enable step to execute their corresponding SQL DDL scripts.

Azure Databricks Job for Masking

The job responsible for running the masking process on all required databases is called by its parent job but can also be executed independently. While including all the necessary initialization and logging steps, the purpose of this job is straightforward: it takes a table name as an input parameter and passes it further to execute all the required steps for the table (data copy, pre-processing validation, masking, and post-processing validation). These steps are performed in parallel.

Job parameters


The job_operations_list parameter offers flexibility, allowing you to run all operations: (1) data copy, (2) pre-processing validation, (3) masking, and (4) post-processing validation together or specify their individual execution as needed.

Another important parameter is job_table_list, which is a JSON array defining the scope of tables for masking.

For Each Masking Table iteration

The job_table_list parameter is used to pass table names to a for-each-loop container, which processes up to 20 tables in parallel for masking. This limit of 20 parallel iterations was determined through testing to balance performance: it avoids overloading the target database environment while ensuring efficient utilization of the Spark Databricks compute resources.

Then, each individual element of the {{input}} array is passed to the dm_flow_masking_table Python notebook to sequentially execute the following steps: (1) data copy, (2) pre-processing validation, (3) masking, and (4) post-processing.



Python dm_flow_masking_table Notebook

One of the cells in the dm_flow_masking_table notebook parses the job_operations_list to execute the steps mentioned above:

# Split `table_name` into `schema_name` and `table_name_only`
schema_name, table_name_only = table_name.split(".")

dm_databricks_workspacename = dbutils.secrets.get(scope="dm", key="dm-databricks-workspacename")
df = reading_dm_config_table(dm_databricks_workspacename)

# Filter the DataFrame and select the required columns, keeping it as a DataFrame
df_target_table_metadata = df.filter(
    (df["schema_name"] == schema_name) & (df["table_name"] == table_name_only)
)

# Extract the values for `database_name` and `host_port`
row = df_target_table_metadata.first()
database_name = row["database_name"] if row is not None else None
host_port = str(row["port"])
table_name_source = f"{schema_name}_src.{table_name_only}"

# Uninitialized DataFrames
df_target_table_data = None
df_sourcing_table_data = None

if 'backup' in operations_list:
    print(f"{datetime.now()} Table '{table_name}' backup started")

    # Read target table into DataFrame
    df_target_table_data = dm_read_table_into_dataframe(database_name, table_name, host_port)

    dm_flow_table_backup(
        database_name, 
        table_name, 
        operations_timestamp, 
        "['file_storage', 'database']", 
        df_target_table_data, 
        df_target_table_metadata
    )

if 'validation_begin' in operations_list:
    print(f"{datetime.now()} Table '{table_name}' validation started")
    update_masking_table_status(table_name, str(operations_timestamp), "PreValidation", "Started")

    # Read target table into DataFrame
    if df_target_table_data is None or not df_target_table_data.schema.fields:
        df_target_table_data = dm_read_table_into_dataframe(database_name, table_name, host_port)

    # Read sourcing table into DataFrame
    df_sourcing_table_data = dm_read_table_into_dataframe(database_name, table_name_source, host_port)

    dm_flow_table_validation(
        database_name, 
        table_name, 
        'begin', 
        df_target_table_data, 
        df_target_table_metadata, 
        df_sourcing_table_data
    )

    update_masking_table_status(table_name, str(operations_timestamp), "PreValidation", "Completed")

if 'masking' in operations_list:
    print(f"{datetime.now()} Table '{table_name}' masking started")
    update_masking_table_status(table_name, str(operations_timestamp), "MaskingStatus", "Started")
 
    # Read sourcing table into DataFrame
    if df_sourcing_table_data is None or not df_sourcing_table_data.schema.fields:
        df_sourcing_table_data = dm_read_table_into_dataframe(database_name, table_name_source, host_port)

    df_masked_table_data  = dm_flow_table_masking(
        database_name, 
        table_name, 
        df_sourcing_table_data, 
        df_target_table_metadata
    )

    print(f"{datetime.now()} df_masked_table_data '{df_masked_table_data.count()}' records.")

    update_masking_table_status(table_name, str(operations_timestamp), "MaskingStatus", "Completed")

if 'validation_end' in operations_list:
    print(f"{datetime.now()} Table '{table_name}' validation started")
    update_masking_table_status(table_name, str(operations_timestamp), "PostValidation", "Started")

    # Read target table into DataFrame
    if df_masked_table_data is None or not df_masked_table_data.schema.fields:
        df_masked_table_data = dm_read_table_into_dataframe(database_name, table_name, host_port)

    # Read sourcing table into DataFrame
    if df_sourcing_table_data is None or not df_sourcing_table_data.schema.fields:
        df_sourcing_table_data = dm_read_table_into_dataframe(database_name, table_name_source, host_port)

    print(f"{datetime.now()} df_masked_table_data '{df_masked_table_data.count()}' records.")
    print(f"{datetime.now()} df_sourcing_table_data '{df_sourcing_table_data.count()}' records.")

    dm_flow_table_validation(
        database_name, 
        table_name, 
        'end', 
        df_masked_table_data, 
        df_target_table_metadata, 
        df_sourcing_table_data
    )

    update_masking_table_status(table_name, str(operations_timestamp), "PostValidation", "Completed")

This code performs a series of operations for processing and masking data in a Databricks environment. It begins by splitting a fully qualified table name into its schema and table components, retrieving configuration metadata using a secret-scoped workspace name, and filtering this metadata for the specified table. Depending on the specified operations_list, it performs several tasks: backing up the target table, validating the data before and after masking, and applying masking transformations. Each task involves reading the target and sourcing tables into DataFrames, processing them using dedicated functions (e.g., dm_flow_table_backup, dm_flow_table_validation, dm_flow_table_masking), and updating the masking table's status to track progress. Conditional checks ensure that data is reloaded only when necessary, optimizing resource usage. Throughout, timestamps and status updates are logged for operational transparency.

Python dm_flow_table_masking Function

Each scoped table masking operation is performed using the dm_flow_table_masking Python function, which is called from the notebook cell described above.

def dm_flow_table_masking(database_name, target_table_name, df_target_table_data, df_target_table_metadata):
    try:
        run_logging_to_blob_history('Info', f'dm_flow_table_masking({database_name}:{target_table_name})', None, None, None, None, None, 'Started')

        source_table_name = f"{target_table_name.split('.')[0]}_src.{target_table_name.split('.')[1]}"
        print(f"Tables to mask: {target_table_name} from {source_table_name}.")

        mss_user, mss_password, jdbcUrl, jdbc_driver_class = dm_oracle_connection()
        current_table_data_df = df_target_table_data
        print(f"current_table_data_df Table: {current_table_data_df.count()}...")

        current_table_metadata_df = df_target_table_metadata.select("column_name", "masking_rule", "masking_type")

        source_schema, source_table = source_table_name.split('.')

        for row in current_table_metadata_df.collect():
            column_name, masking_rule, masking_type = row["column_name"], row["masking_rule"], row["masking_type"]
            width = get_column_width(jdbcUrl, mss_user, mss_password, source_table_name, column_name)

            if masking_rule == "data_redaction_simple":
                current_table_data_df = current_table_data_df.withColumn("masking_type_temp", lit(masking_type))
                current_table_data_df = current_table_data_df.withColumn(
                    column_name,
                    dm_masking_rule_data_redaction_simple_udf(col(column_name), col("masking_type_temp"), lit(width))
                ).drop("masking_type_temp")
                current_table_data_df = current_table_data_df.drop("masking_type_temp")

            elif masking_rule == "data_randomization":
                current_table_data_df = current_table_data_df.withColumn(
                    column_name, 
                    dm_masking_rule_data_randomization_udf(col(column_name).cast("string"), lit(masking_type))
                )

            elif masking_rule == "data_nullificaion":
                current_table_data_df = current_table_data_df.withColumn(
                    column_name, 
                    lit(None).cast(current_table_data_df.schema[column_name].dataType)
                )

            elif masking_rule == "data_lookup_substitution_name_simple":
                current_table_data_df = current_table_data_df.withColumn(
                    column_name, 
                    dm_masking_rule_data_substitution_name_simple_udf(col(column_name), lit(width), lit(masking_type))
                )

            elif masking_rule == "data_lookup_substitution_address_street":
                current_table_data_df = current_table_data_df.withColumn(
                    column_name, 
                    dm_masking_rule_data_substitution_address_street_udf(col(column_name), lit(width))
                )

            elif masking_rule == "data_lookup_substitution_address_other":
                current_table_data_df = current_table_data_df.withColumn(
                    column_name, 
                    dm_masking_rule_data_substitution_address_other_udf(col(column_name))
                )

            elif masking_rule == "data_lookup_substitution_employer":
                current_table_data_df = current_table_data_df.withColumn(
                    column_name, 
                    dm_masking_rule_data_substitution_employer_udf(col(column_name), lit(width))
                )

            elif masking_rule == "data_lookup_substitution_fullname":
                current_table_data_df = current_table_data_df.withColumn(
                    column_name, 
                dm_masking_rule_data_substitution_fullname_udf(col(column_name), lit(width))
                )
            elif masking_rule == "data_lookup_substitution_city_postal_code":
                column_name = [item.strip() for item in column_name[1:-1].split(',')] 
                
                if masking_type == 'postalcode':
                    columnname = column_name[1]
                else: #CITY name case
                    columnname = column_name[0]
                   
                if len(column_name) ==3: #case when we have a complete set of attributes (City, Postal Code, Province/State)
                    current_table_data_df = current_table_data_df.withColumn(
                    columnname, 
                    dm_masking_rule_data_substitution_city_postalcode_province_udf(col(column_name[0]), 
                                                                                    col(column_name[1]), lit(masking_type),
                                                                                    col(column_name[2]))
                )
                else:
                    # otherwise City, Masking Type, Province/State
                    current_table_data_df = current_table_data_df.withColumn(
                    columnname, 
                    dm_masking_rule_data_substitution_city_postalcode_province_udf(col(column_name[0]), lit(masking_type),col(column_name[1]))
                )                

            run_logging_to_blob_history(
                'Info', f'DataMasking Process Started on {source_table_name}', 
                'Data Masking Operation', database_name, source_schema, source_table, None, 'Started'
            )

        # Writing masked data to the original target table 
        backup_table_name = f"{target_table_name.split('.')[0]}_src.{target_table_name.split('.')[1]}"
             
        dm_append_dataframe_to_table(current_table_data_df, target_table_name, backup_table_name, jdbcUrl, mss_user, mss_password)

        run_logging_to_blob_history(
            'Info', f'DataMasking Process Completed on {source_table_name} from {target_table_name}', 
            'Data Masking Operation', database_name, source_schema, source_table, None, 'Completed'
        )

        return current_table_data_df

    except Exception as e:
        error_message = str(e)
        run_logging_to_blob_history(
            'Error', f"dm_flow_table_masking({database_name}) : {error_message}",
            'Data Masking Operation', database_name, None, source_table_name, None, 'Failed'
        )
        run_logging_to_sql_history()
        raise

The dm_flow_table_masking function performs data masking on a specified target table by applying various masking rules to its columns. It starts by logging the initiation of the process and retrieves metadata and connection details for the database and table. The function iterates through the metadata to identify the masking rules for each column, such as redaction, randomization, nullification, and lookup-based substitution (e.g., for names, addresses, employers, and city/postal codes). Depending on the rule, it applies a corresponding masking transformation using predefined UDFs (user-defined functions). The masked data is then written back to the target table while maintaining a backup.

Masking logic explained

The simplicity, beauty, and elegance of this meta-driven solution for table data masking, data transformation, and data obfuscation lie in its design. The entire set of tables for masking is processed dynamically. Each table has a unique set of columns requiring masking, and each column is associated with a specific masking rule.


The current_table_data_df represents a Spark DataFrame corresponding to a particular table. There is no need to hardcode table names, column names, or masking rules. Instead, this metadata is provided dynamically through the df_target_table_metadata DataFrame, which contains information about the columns and their respective masking rules. The masking process iterates through this metadata, applying the appropriate transformations to each column.

For each column, the process involves (1) replacing the column with a new version of the same name, where (2) the values are transformed using a corresponding masking function. These masking functions are implemented as custom Spark UDFs (User-Defined Functions) in Python. Each UDF is designed to handle complex transformations specific to the masking rule, taking the column value and additional attributes as input.

Thanks to Spark's lazy evaluation, the transformations are not executed immediately. Instead, they are logically defined using a chain of .withColumn operations, which modify the DataFrame schema and logic incrementally. The actual execution of these transformations occurs only at the point where the resulting DataFrame is written back to the target database table. This approach ensures that Spark optimizes the execution plan for efficiency, leveraging Databricks' distributed compute capabilities to handle large-scale data transformations effectively.

And this approach works seamlessly for every table, every column, and every masking rule, all dynamically driven by the external metadata of the masking configuration. It is the "holy grail" of metadata-driven masking, efficiently realized through Databricks. Without this capability, recreating a similar solution would be significantly more challenging and time-consuming.

Pre- and post-validation were not that complicated, primarily checking table schema consistency and data changes at the end. These steps were less important since the most critical part was ensuring that the masked data changes were successfully written to persistent storage. The complete Databricks solution included 13 Python notebooks with over a hundred Python functions, ranging from simple to complex. At the end of the day, it proved that Azure Databricks can be used for a metadata-driven masking process, and it can do this job really well!


Comments