One of the main challenges in generating useful insights from data is siloed and disjointed datasets having poor data quality. Data inconsistencies and duplicate records are often introduced due to multiple sources of information in enterprise data pipelines. This is a huge problem for data scientists working on fraud detection, personalization and recommendations. For data analysts, it is hard to report accurate customer lifetime value, most valuable customers, new customers added and other metrics as the underlying data is inconsistent.
For example, take a look at the customer data below
We can see that there are multiple rows for the same customer, such as the following
We have an ssn column, however, there is no guarantee that it would always be unique. In the example above, the ssn of the second record is not consistent with the other records. A quick human verification however confirms that these 3 records belong to the same customer.
Here’s another example of different records belonging to the same customer.
Defining rules across attributes and combining them all to assess matching records is time consuming and prone to errors. Building such a system can easily take months and delay the actual analysis. Trying to perform identity resolution at scale utilizing data columns is not easy either.
Overall, building an identity resolution system has the following challenges
As our dataset size N increases, the number of comparisons shoot up
2. Understanding which attributes to use for identifying duplicates and deciding the matching criteria for large datasets with many attributes and variations is tough.
While building an identity resolution system, the following steps are generally involved
Building this ourselves is certainly doable but is it worth our time? Let us explore an open source option — Zingg. We will use Zingg’s Python api and build an identity resolution pipeline for our customer data.
As an ML based tool, Zingg takes care of the above steps so that we can perform identity resolution at scale.
In order to perform entity resolution, Zingg defines five phases — findTrainingData, label, train, match and link.
findTrainingData and label phase are usually run a few times and samples are marked by the user. Non matching, obvious matches and edge case samples are chosen carefully by Zingg through findTrainingData to build a robust training set with sufficient variations to train the models.
Once we have about 30–40 matching samples we can train and save the models by running the train phase. Once the model is trained, the match or the link phase can be performed as new datasets with the same attributes come in.
Let us use Zingg on the customer data and resolve identities.
The fastest way to get started with Zingg is to use the Zingg docker image. Let us get the docker image and start working with it.
docker pull zingg/zingg:0.3.4docker run -u <uid> -it zingg/zingg:0.3.4 bash
For additional information regarding using Zingg docker, check out the official documentation linked here.
We will use the febrl example loaded into a table named customers in Postgres.
Zingg python package is already installed in the docker container, so we will use it directly. One important point to remember is that Zingg Python programs are PySpark programs and have to be run with the zingg.sh script provided with the Zingg release.
Let us define the input for our customers table. We create a Python script named FebrlExamplePostgresql.py and import the necessary packages
from zingg.client import *from zingg.pipes import *
For each column that we want to use as part of our output, we create a FieldDefinition object. The FieldDefinition takes the name of the attribute, its data type, and the match type. For example, we can define the first name column like this
fname = FieldDefinition("fname", "string", MatchType.FUZZY)
Match types configure Zingg on how we want the matching to be performed for each field. Zingg provides different matching criteria, some of which are
There are many more match types that we can use as per our column data documentation here.
The following Python program builds the arguments for Zingg and adds all the field definitions to it.
#build the arguments for zinggargs = Arguments()#set field definitionsfname = FieldDefinition("fname", "string", MatchType.FUZZY)lname = FieldDefinition("lname", "string", MatchType.FUZZY)streetnumber = FieldDefinition("streetnumber", "string", MatchType.FUZZY)street = FieldDefinition("street","string", MatchType.FUZZY)address = FieldDefinition("address", "string", MatchType.FUZZY)locality = FieldDefinition("locality", "string", MatchType.FUZZY)areacode = FieldDefinition("areacode", "string", MatchType.FUZZY)state = FieldDefinition("state", "string", MatchType.FUZZY)dateofbirth = FieldDefinition("dateofbirth", "string", MatchType.FUZZY)ssn = FieldDefinition("ssn", "string", MatchType.FUZZY)fieldDefs = [fname, lname, streetnumber, street, address, locality, areacodestate, dateofbirth, ssn]args.setFieldDefinition(fieldDefs)
Zingg lets us connect to various data sources and sinks using relevant configurations for each type of data platform as described in the documentation. Zingg’s Python API provides a generic Pipe through which we can define the data sources and sinks for our pipeline.
To connect Zingg with our customers table in Postgres, we create a Pipe object and name it customerDataStaging. We set the format as jdbc.
customerDataStaging = Pipe("customerDataStaging", "jdbc")
We specify the properties for the Pipe to connect with our Postgres database.
customerDataStaging.addProperty("url","jdbc:postgresql://localhost:5432/postgres")customerDataStaging.addProperty("dbtable", "customers")customerDataStaging.addProperty("driver", "org.postgresql.Driver")customerDataStaging.addProperty("user","suchandra")customerDataStaging.addProperty("password","1234")
To write our output to the customers_unified table, we create another Pipe object with relevant properties.
customerIdentitiesResolved = Pipe("customerIdentitiesResolved", "jdbc")customerIdentitiesResolved.addProperty("url","jdbc:postgresql://localhost:5432/postgres")customerIdentitiesResolved.addProperty("dbtable", "customers_unified")customerIdentitiesResolved.addProperty("driver", "org.postgresql.Driver")customerIdentitiesResolved.addProperty("user","suchandra")customerIdentitiesResolved.addProperty("password","1234")
We also need the JDBC driver for Zingg to connect with Postgres. Let us download the Postgres JDBC driver and add the path of the driver to spark.jars property of zingg.conf as described here.
We have defined our input schema as well as input and output data source and sink. Let us now specify the directory to store the models and the model identifier. We also specify the fraction of the dataset to be used to select data for labeling so that our training data can be created fast.
args.setZinggDir("models")args.setModelId("customer360")args.setNumPartitions(4)args.setLabelDataSampleSize(0.5)
Next, we invoke Zingg client and specify the appropriate <PHASE_NAME> which could be findTrainingData, label, train or match. We will take the phase name as an input on the command line.
options = ClientOptions([ClientOptions.PHASE,<PHASE_NAME>])#Zingg execution for the given phasezingg = Zingg(args, options)zingg.initAndExecute()
Here is the complete code
from zingg.client import *from zingg.pipes import *import sys#build the arguments for zinggargs = Arguments()#phase name to be passed as a command line argumentphase_name = sys.argv[1]#set field definitionsfname = FieldDefinition("fname", "string", MatchType.FUZZY)lname = FieldDefinition("lname", "string", MatchType.FUZZY)streetnumber = FieldDefinition("streetnumber", "string", MatchType.FUZZY)street = FieldDefinition("street","string", MatchType.FUZZY)address = FieldDefinition("address", "string", MatchType.FUZZY)locality = FieldDefinition("locality", "string", MatchType.FUZZY)areacode = FieldDefinition("areacode", "string", MatchType.FUZZY)state = FieldDefinition("state", "string", MatchType.FUZZY)dateofbirth = FieldDefinition("dateofbirth", "string", MatchType.FUZZY)ssn = FieldDefinition("ssn", "string", MatchType.FUZZY)fieldDefs = [fname, lname, streetnumber, street, address, locality, areacodestate, dateofbirth, ssn]#add field definitions to Zingg Client argumentsargs.setFieldDefinition(fieldDefs)#defining input pipecustomerDataStaging = Pipe("customerDataStaging", "jdbc")customerDataStaging.addProperty("url","jdbc:postgresql://localhost:5432/postgres")customerDataStaging.addProperty("dbtable", "customers")customerDataStaging.addProperty("driver", "org.postgresql.Driver")customerDataStaging.addProperty("user","suchandra")customerDataStaging.addProperty("password","1234")#add input pipe to arguments for Zingg clientargs.setData(customerDataStaging)#defining output pipecustomerIdentitiesResolved = Pipe("customerIdentitiesResolved", "jdbc")customerIdentitiesResolved.addProperty("url","jdbc:postgresql://localhost:5432/postgres")customerIdentitiesResolved.addProperty("dbtable", "customers_unified")customerIdentitiesResolved.addProperty("driver", "org.postgresql.Driver")customerIdentitiesResolved.addProperty("user","suchandra")customerIdentitiesResolved.addProperty("password","1234")#add output pipe to arguments for Zingg clientargs.setOutput(customerIdentitiesResolved)#save latest model in directory models/599args.setModelId("customer360")#store all models in directory models/args.setZinggDir("models")#sample size for selecting data for labellingargs.setNumPartitions(4)#fraction of total dataset to select data for labellingargs.setLabelDataSampleSize(0.5)options = ClientOptions([ClientOptions.PHASE,phase_name])#Zingg execution for the given phasezingg = Zingg(args, options)zingg.initAndExecute()
We will run our customer identity resolution Python program using the Zingg command line. As mentioned earlier, we will use the Zingg script to take care of all the dependencies.
zingg.sh --properties-file /zingg-0.3.4-SNAPSHOT/config/zingg.conf--run /zingg-0.3.4-SNAPSHOT/examples/febrl/CustomerIdentityResolution.py findTrainingData
Here are some of the output logs during the findTrainingData phase.
We can now run the Zingg label phase and mark the records selected by the program earlier.
zingg.sh --properties-file /zingg-0.3.4-SNAPSHOT/config/zingg.conf--run /zingg-0.3.4-SNAPSHOT/examples/febrl/CustomerIdentityResolution.py label
We are shown pairs of records
We need to specify whether they match, do not match or we are not sure whether they match or not. The above records do not look like they match so we mark them as 0
These records on the other hand look very similar hence we label them as 1
What should we do with this pair?
It is a little unlikely that there are two different people living in the same locality, same area with same date of birth and ssn number, these two records could be for the same person with incorrect entry for last name, however it could be for different persons as well. For such cases where we are unsure, we mark them as 2.
As our dataset was small, one round of findTrainingData and label was enough. So now, we can build our models by running
zingg.sh --properties-file /zingg-0.3.4-SNAPSHOT/config/zingg.conf--run /zingg-0.3.4-SNAPSHOT/examples/febrl/CustomerIdentityResolution.py train
Training and cross validation parameters are printed on the screen.
Zingg models are ready, so let’s match
zingg.sh --properties-file /zingg-0.3.4-SNAPSHOT/config/zingg.conf--run /zingg-0.3.4-SNAPSHOT/examples/febrl/CustomerIdentityResolution.py match
On completion, Zingg writes the output to the table we specified in our Python program.
Zingg adds 3 new columns
Using these columns and applying suitable thresholds, we can resolve identities. We can also get human review from annotators.
Let’s see how Zingg interpreted the duplicate records for eglinton and jaiden rollins we had see earlier.
Consider cluster 15
It has 3 records which are clear matches and should be resolved to the same customer. Each record shows a max and min score greater than 0.95, which means each record has a maximum match greater than 0.95 as well as the least match within the same cluster is also greater than 0.95. We can set a threshold of 0.95 and automatically mark records above this threshold as being sure matches requiring no human intervention.
Similarly, cluster 37 shows obvious matching records
Let’s look at cluster 20
This shows scores lesser than our selected threshold so we will pass it on to humans for review.
Identity Resolution is a critical step while building our data platforms and products. It enables us to understand who our core business entities are. As a custom tool for identity resolution, Zingg abstracts away the complexity and effort in building a fuzzy record matching system. Zingg takes care of data preprocessing, attribute and record level similarity definition and scalability. Thus we can use our customer, supplier, supplies, product, and parts data for critical business analytics and operations without worrying about missing or incomplete information.