Open Source Identity Resolution on Databricks for Customer 360

Our modern enterprises operate through a variety of specialized applications for sales, marketing, and customer support. Thus, customer data gets siloed in different systems which makes customer analytics extremely challenging. To understand customer behavior, we need to group the individual records spread in these different systems into distinct customer identities. Thus, Identity Resolution, building a single source of truth about customers, becomes an important step in the data analysis pipeline to unify customer data from different sources.

In this post, we will learn to resolve identities and build a single source of truth of customers on the Databricks environment and open-source Zingg. Zingg is an open source ML based identity and entity resolution framework. It takes away the complexity of scaling and matching definition from us so that we can focus on the business problem. To resolve our customer entities and build a 360 view, we will configure Zingg to run within Databricks, run its findTrainingData and label phases to build training data and finally resolve the entities.

Let us take a look at what our customer data looks like. We have taken an example customer dataset with demographic information about a customer and loaded it in Databricks as a csv file. This example set is borrowed from the Zingg repository and can be found here. Reading the first few lines of the customer data shows that many records have variations but belong to the same real-world customer. There are missing attributes and typos across different source systems, and unifying these records by specifying conditional logic is tough.

Besides first name, last name and address attributes, this dataset also contains SSN and date of birth. But these fields are unreliable, and records of the same customer from different sources have variations in them.

Step 1: Set Up a Databricks Account

If you are new to Databricks, the first step is creating an account. Do not worry; it is fairly straightforward to set up!

  1. Head over to Databricks and sign up for a free trial.
  2. Once inside, take a quick tour of the interface. The Workspace tab is where you’ll organize your work, and the Clusters tab is for creating the compute resources you’ll need.

Tip: If you’re unsure about cluster settings, choose a smaller size to start — Databricks will automatically scale it up if needed!

This guide is tested with Databricks Runtime 15.4, but newer versions with Spark 3.5 should work.

Step 2: Create a Spark Cluster and Install Zingg

A cluster is essentially a group of computers working together to process your data. Creating one on Databricks is as simple as clicking a button:

  1. Go to the Clusters tab, hit Create Cluster, and give it a name like “Zingg-Demo.”
  2. Set the runtime version to a current LTS (Long-Term Support) version for compatibility.
  3. Next, you’ll need to install Zingg. Start by downloading the latest JAR file from Zingg’s GitHub.
  4. Upload the file: Open the cluster details, navigate to the Libraries section, and click Install New > Upload JAR. Done!
Uploading Zingg to Databricks.

Step 3: Install Zingg Python Package

To run Zingg entity resolution on Databricks, you also need the Zingg Python package. Open a Databricks notebook and type this:

!pip install zingg

Installing Zingg on Databricks.

This command fetches and installs the Zingg Python library. Let us also double-check that Zingg dependencies are installed.

Use the following command to install the required dependencies, especially tabulate and restart the Python kernel:

!pip install tabulate

Installing tabulate on Databricks.

Step 4: Organize Your Workspace

It is a good idea to keep things neat, especially when you’re working with multiple datasets and training models. Zingg requires specific folders for its labeled and unlabeled training data. Let us set up directory paths in our notebook for the model.

##you can change these to the locations of your choice
##these are the only two settings that need to change
zinggDir = "/models"
modelId = "zinggTrial26Nov_1"

Setting up directory paths in the Notebook.

Let us also add some standard code to get Zingg to work within Databricks. We do not need to modify anything here.

##please leave the following unchanged
MARKED_DIR = zinggDir + "/" + modelId + "/trainingData/marked/"
UNMARKED_DIR = zinggDir + "/" + modelId + "/trainingData/unmarked/"
MARKED_DIR_DBFS = "/dbfs" + MARKED_DIR
UNMARKED_DIR_DBFS = "/dbfs" + UNMARKED_DIR

import pandas as pd
import numpy as np
 
import time
import uuid
 
from tabulate import tabulate
from ipywidgets import widgets, interact, GridspecLayout
import base64
import pyspark.sql.functions as fn

##this code sets up the Zingg Python interface
from zingg.client import *
from zingg.pipes import *

def cleanModel():
    dbutils.fs.rm(MARKED_DIR, recurse=True)
    # drop unmarked data
    dbutils.fs.rm(UNMARKED_DIR, recurse=True)
    return

# assign label to candidate pair
def assign_label(candidate_pairs_pd, z_cluster, label):
  '''
  The purpose of this function is to assign a label to a candidate pair
  identified by its z_cluster value.  Valid labels include:
     0 - not matched
     1 - matched
     2 - uncertain
  '''
  
  # assign label
  candidate_pairs_pd.loc[ candidate_pairs_pd['z_cluster']==z_cluster, 'z_isMatch'] = label
  
  return
 
def count_labeled_pairs(marked_pd):
  '''
  The purpose of this function is to count the labeled pairs in the marked folder.
  '''
n_total = len(np.unique(marked_pd['z_cluster']))
  n_positive = len(np.unique(marked_pd[marked_pd['z_isMatch']==1]['z_cluster']))
  n_negative = len(np.unique(marked_pd[marked_pd['z_isMatch']==0]['z_cluster']))
  
  return n_positive, n_negative, n_total
# setup widget 
available_labels = {
    'No Match':0,
    'Match':1,
    'Uncertain':2
    }
#dbutils.widgets.dropdown('label', 'Uncertain', available_labels.keys(), 'Is this pair a match?')

If you are unsure what this means, think of the model folders as “buckets” where Zingg will store its work. The rest of the cde helps Zing ginteract with the Databricks file system and to set up utilities to train Zingg.

Step 5: Passing matching and data information to Zingg.

Let us build the Zingg arguments to configure locations of input and output, field attributes and match types.

#build the arguments for zingg
args = Arguments()
# Set the modelid and the zingg dir. You can use this as is
args.setModelId(modelId)
args.setZinggDir(zinggDir)

Step 6: Load Your Data

Zingg supports multiple file formats like CSV, Parquet, or JSON. For this example, let’s use a CSV file. Upload your file to Databricks by dragging it into the Data tab. Define its schema (i.e., column names and types):

schema = "rec_id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, dob string, ssn string"
inputPipe = CsvPipe("testFebrl", "/FileStore/tables/data.csv", schema)
args.setData(inputPipe)

Replace /FileStore/tables/data.csv with your actual file path.

So our data look something like this:

Loading data on Zingg.

Now let us configure the output. Just like loading the data, output can be a CSV , Parquet ,Delta Tables, etc.

#setting outputpipe in 'args'
outputPipe = CsvPipe("resultOutput", "/tmp/output26Nov_1")
args.setOutput(outputPipe)

 Configuring the Output.

                                                                                                             

Step 7: Define Matching Rules

Here’s where Zingg starts to shine. It uses your rules to decide how to compare records. Let’s say you want to match people based on first name, last name, and address, along with ssn and dob when present. Define these fields in your notebook:

# Set field definitions
rec_id = FieldDefinition("rec_id", "string", MatchType.DONT_USE)
fname = FieldDefinition("fname", "string", MatchType.FUZZY)  # First Name
lname = FieldDefinition("lname", "string", MatchType.FUZZY)  # Last Name
stNo = FieldDefinition("stNo", "string", MatchType.FUZZY)    # Street Number
add1 = FieldDefinition("add1", "string", MatchType.FUZZY)    # Address Line 1
add2 = FieldDefinition("add2", "string", MatchType.FUZZY)    # Address Line 2
city = FieldDefinition("city", "string", MatchType.FUZZY)    # City
state = FieldDefinition("state", "string", MatchType.FUZZY)  # State
dob = FieldDefinition("dob", "string", MatchType.EXACT)      # Date of Birth (prefer exact match)
ssn = FieldDefinition("ssn", "string", MatchType.EXACT)      # SSN (should use exact match)

# Create the field definitions list
fieldDefs = [rec_id, fname, lname, stNo, add1, add2, city, state, dob, ssn]
# Set field definitions in args
args.setFieldDefinition(fieldDefs)

Defining the rules for matching particular fields.

Field Definitions define which fields should appear in the output and whether and how they need to be used in matching. Some match types are:

  • EXACT means attributes must match perfectly.
  • FUZZY allows for differences in the field. For example “Jon” and “John.”

You can get creative here depending on your data!

Step 8: Let us tune Zingg Performance

The numPartitions define how data is split across the cluster. Please change this as per your data and cluster size by referring to the performance section of the Zingg docs. The labelDataSampleSize is used for sampling in findTrainingData. It lets Zingg select pairs for labeling in a reasonable amount of time. If the findTrainingData phase is taking too much time, please reduce this by atleast 1/10th of its previous value and try again.

# The numPartitions define how data is split across the cluster. 
# Please change the fllowing as per your data and cluster size by referring to the docs.

args.setNumPartitions(4)
args.setLabelDataSampleSize(0.5)

Step 9: Label Data for Training

Zingg can’t magically know how to match your data — it needs your guidance! Let us ask it to start learning our matching criteria. When we run Zingg’s findTrainingData phase, it generates candidate pairs for you to review:

options = ClientOptions([ClientOptions.PHASE,"findTrainingData"])
#Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

Step 10: Fetching record pairs for user labelling

Let us see which record pairs Zingg found for labeling. We will review these pairs in Databricks and label them as matches or non-matches or uncertain. Think of it as teaching Zingg what’s an accurate match and what is not.

Let us start by running Zingg’s label phase which will read the records found by findTrainingData phase.

options = ClientOptions([ClientOptions.PHASE,"label"])
#Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.init()

Prepare for identity resolution — edge cases user Labeling.

The following code will read the pairs Zingg found and tell us how many got found.

# get candidate pairs
candidate_pairs_pd = getPandasDfFromDs(zingg.getUnmarkedRecords())
 
# if no candidate pairs, run job and wait
if candidate_pairs_pd.shape[0] == 0:
  print('No unlabeled candidate pairs found.  Run findTraining job ...')
else:
    # get list of pairs (as identified by z_cluster) to label 
    z_clusters = list(np.unique(candidate_pairs_pd['z_cluster'])) 
    # identify last reviewed cluster
    last_z_cluster = '' # none yet
    # print candidate pair stats
    print('{0} candidate pairs found for labeling'.format(len(z_clusters)))

Fetching records for Labeling.

Step 11: Start labeling to make Zingg learn how we want to match our data

# Label Training Set
# define variable to avoid duplicate saves
ready_for_save = False
print(candidate_pairs_pd)
# user-friendly labels and corresponding zingg numerical value
# (the order in the dictionary affects how displayed below)
LABELS = {
  'Uncertain':2,
  'Match':1,
  'No Match':0  
  }
# GET CANDIDATE PAIRS
# ========================================================
#candidate_pairs_pd = get_candidate_pairs()
n_pairs = int(candidate_pairs_pd.shape[0]/2)
# ========================================================
# DEFINE IPYWIDGET DISPLAY
# ========================================================
display_pd = candidate_pairs_pd.drop(
  labels=[
    'z_zid', 'z_prediction', 'z_score', 'z_isMatch', 'z_zsource'
    ], 
  axis=1)
# define header to be used with each displayed pair
html_prefix = "<p><span style='font-family:Courier New,Courier,monospace'>"
html_suffix = "</p></span>"
header = widgets.HTML(value=f"{html_prefix}<b>" + "<br />".join([str(i)+"&nbsp;&nbsp;" for i in display_pd.columns.to_list()]) + f"</b>{html_suffix}")
# initialize display
vContainers = []
vContainers.append(widgets.HTML(value=f'<h2>Indicate if each of the {n_pairs} record pairs is a match or not</h2></p>'))
# for each set of pairs
for n in range(n_pairs):
  # get candidate records
  candidate_left = display_pd.loc[2*n].to_list()
  print(candidate_left)
  candidate_right = display_pd.loc[(2*n)+1].to_list()
  print(candidate_right)
  # define grid to hold values
  html = ''
  for i in range(display_pd.shape[1]):
    # get column name
    column_name = display_pd.columns[i]
    # if field is image
    if column_name == 'image_path':
      # define row header
      html += '<tr>'
      html += '<td><b>image</b></td>'
      # read left image to encoded string
      l_endcode = ''
      if candidate_left[i] != '':
        with open(candidate_left[i], "rb") as l_file:
          l_encode = base64.b64encode( l_file.read() ).decode()
      # read right image to encoded string
      r_encode = ''
      if candidate_right[i] != '':
        with open(candidate_right[i], "rb") as r_file:
          r_encode = base64.b64encode( r_file.read() ).decode()      
      # present images
      html += f'<td><img src="data:image/png;base64,{l_encode}"></td>'
      html += f'<td><img src="data:image/png;base64,{r_encode}"></td>'
      html += '</tr>'
    elif column_name != 'image_path':  # display text values
      if column_name == 'z_cluster': z_cluster = candidate_left[i]
      html += '<tr>'
      html += f'<td style="width:10%"><b>{column_name}</b></td>'
      html += f'<td style="width:45%">{str(candidate_left[i])}</td>'
      html += f'<td style="width:45%">{str(candidate_right[i])}</td>'
      html += '</tr>'
  # insert data table
  table = widgets.HTML(value=f'<table data-title="{z_cluster}" style="width:100%;border-collapse:collapse" border="1">'+html+'</table>')
  z_cluster = None
  # assign label options to pair
  label = widgets.ToggleButtons(
    options=LABELS.keys(), 
    button_style='info'
    )
  # define blank line between displayed pair and next
  blankLine=widgets.HTML(value='<br>')
  # append pair, label and blank line to widget structure
  vContainers.append(widgets.VBox(children=[table, label, blankLine]))
# present widget
display(widgets.VBox(children=vContainers))
# ========================================================
# mark flag to allow save 
ready_for_save = True

Then, we just have to save the labels provided from the other code.

if not ready_for_save:
  print('No labels have been assigned. Run the previous cell to create candidate pairs and assign labels to them before re-running this cell.')
else:
  # ASSIGN LABEL VALUE TO CANDIDATE PAIRS IN DATAFRAME
  # ========================================================
  # for each pair in displayed widget
  for pair in vContainers[1:]:
    # get pair and assigned label
    html_content = pair.children[1].get_interact_value() # the displayed pair as html
    user_assigned_label = pair.children[1].get_interact_value() # the assigned label
    # extract candidate pair id from html pair content
    start = pair.children[0].value.find('data-title="')
    if start > 0: 
      start += len('data-title="') 
      end = pair.children[0].value.find('"', start+2)
    pair_id = pair.children[0].value[start:end]

    # assign label to candidate pair entry in dataframe
    candidate_pairs_pd.loc[candidate_pairs_pd['z_cluster']==pair_id, 'z_isMatch'] = LABELS.get(user_assigned_label)
  # ========================================================
  # SAVE LABELED DATA TO ZINGG FOLDER
  # ========================================================
  # make target directory if needed
  dbutils.fs.mkdirs(MARKED_DIR)
  
  # save label assignments
  # save labels
  zingg.writeLabelledOutputFromPandas(candidate_pairs_pd,args)
  # count labels accumulated
  marked_pd_df = getPandasDfFromDs(zingg.getMarkedRecords())
  n_pos, n_neg, n_tot = count_labeled_pairs(marked_pd_df)
  print(f'You have accumulated {n_pos} pairs labeled as positive matches.')
  print(f'You have accumulated {n_neg} pairs labeled as not matches.')
  print("If you need more pairs to label, re-run the cell for 'findTrainingData'")
  # ========================================================  
  # save completed
  ready_for_save = False

Step 12: Train the Entity Resolution Models

After labeling, it is time to let Zingg do the heavy lifting. Training builds the Zingg models by leveraging the labels provided earlier. The models are trained to your specific dataset. We can start training with this command:

options = ClientOptions([ClientOptions.PHASE,"trainMatch"])
# Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()\#

Training the Entity Resolution Zingg models.

Let us sit back and relax — Zingg will process the data and persist the models. These models can be used again and again on newer datasets.

Step 13: Predict Matching Records

Once the model is ready, you can run predictions to see which records are matches. The output will be stored in the folder you specified earlier:

outputDF = spark.read.csv("/tmp/output26Nov_1")
colNames = ["z_minScore", "z_maxScore", "z_cluster", "rec_id", "fname", "lname", "stNo", "add1", "add2", "city", "state", "dob", "ssn"]
outputDF.toDF(*colNames).show(100)

The above code will display the matched pairs, along with their match confidence scores. Review these to ensure accuracy.The following columns are added to the input data

z_minScore: It represents the minimum similarity score of that record with another record in the cluster.

z_maxScore: It represents the maximum similarity score of that record with another record of the same cluster.

z_cluster: Matching records share the same z_cluster.

As matching records have identical z_cluster, we can resolve them and use this value as the customer identifier which represents a unique person.

The high probability matches can be consumed in subsequent pipelines and transactional data can be overlaid in conjunction with the Z_CLUSTER to build the customer 360 view. The low probability matches can be manually reviewed and utilized within the pipeline again.

Step 14: Documenting the model

You can revisit the training data used to build the models to help you understand how Zingg makes decisions. Let us generate and view the docs with:

options = ClientOptions([ClientOptions.PHASE,"generateDocs"])
#Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()
DOCS_DIR = zinggDir + "/" + modelId + "/docs/"
dbutils.fs.ls('file:'+DOCS_DIR)

# see the labels
displayHTML(open(DOCS_DIR+"model.html", 'r').read())

The below code will display your metadata such as, Field Name as name of the field, Field Type and Nullable.

displayHTML(open(DOCS_DIR+"data.html", 'r').read())

Congratulations, we just implemented our first entity resolution pipeline on Databricks with Zingg!