Reading:
Manage Data Lineage as Code with the Data Lineage API
Share:

Manage Data Lineage as Code with the Data Lineage API

Avatar
by Grant
October 10, 2020
How To Tree Schema 101

Best practices for integrating the data lineage API in everything from CICD pipelines to Faust stream processing jobs!

Overview

There are a LOT of ways you move your data - open source tools (e.g. Kafka Connect, Debezium, etc.) enable configuration-driven stream integrations, SQL triggers can be efficient mechanisms for moving data within a database and serverless functions continue to take over the world with practical applications in everything from batch file movement to well, just about everything. The Tree Schema Data Lineage API aims to provide you the easiest possible way to create and maintain your data lineage as code.

In this article I’ll walk you through how to use the Tree Schema Python client (a lightweight wrapper to the REST API) to manage your data lineage. I assume that you have already populated your data catalog with your data stores, schemas and fields, if you haven’t then make sure to check out Tree Schema 101 (after reading this) which walks through how to connect Tree Schema to your data stores to automatically populate your catalog.

Before we begin, here are the important links to the documentation for the tools that will be covered:

Manage your Data Lineage as Code

The Tree Schema Python client is intended to provide you with the easiest possible way to define your data lineage in a way that can be versioned, maintained and managed in a collaborative manner. Creating data lineage is as simple as defining the source and target schemas and the linking fields together. A simple, but full working example looks like this:

            
  from treeschema import TreeSchema
  ts = TreeSchema('YOUR_EMAIL', 'YOUR_SECRET_KEY')
  
  # Define the schemas that have at least 1 field in the transformation
  dvc_sess_schema = ts.data_store('Kafka').schema('dvc.session:v1:avro')
  user_dvc_schema = ts.data_store('Kafka').schema('user.clickstream:v1:avro')
  usr_mngo_schema = ts.data_store('Mongo DB').schema('user.session')
  ses_rsft_schema = ts.data_store('Redshift').schema('user.session')
  usr_rsft_schema = ts.data_store('Redshift').schema('user.page_view')

  # Define links between the fields (Source, Target)
  transform_links = [
    (dvc_sess_schema.field('session_id'), ses_rsft_schema.field('session_id')),
    (dvc_sess_schema.field('event_ts'),   ses_rsft_schema.field('event_ts')),

    (dvc_sess_schema.field('user_id'),  user_dvc_schema.field('user_id')),
    (dvc_sess_schema.field('event_ts'), user_dvc_schema.field('event_ts')),

    (user_dvc_schema.field('user_id'),  usr_mngo_schema.field('user_id')),
    (user_dvc_schema.field('event_ts'), usr_mngo_schema.field('event_ts')),

    (user_dvc_schema.field('user_id'),  usr_rsft_schema.field('user_id')),
    (user_dvc_schema.field('page_id'),  usr_rsft_schema.field('page_id')),
    (user_dvc_schema.field('event_ts'), usr_rsft_schema.field('event_ts'))
  ]
  
  # Retrieve the transformation
  t = ts.transformation('User-Session Analytics Pipelines')

  # Create your links
  t.set_links_state(transform_links)
            
          

This would create the following data lineage in Tree Schema:

Data Lineage Created by Tree Schema Python Client
Breaking this down, there are three steps to creating data lineage:
  1. Define each of the schemas that have at least one source or target field
                    
      # Define the schemas that have at least 1 field in the transformation
      dvc_sess_schema = ts.data_store('Kafka').schema('dvc.session:v1:avro')
      user_dvc_schema = ts.data_store('Kafka').schema('user.clickstream:v1:avro')
      usr_mngo_schema = ts.data_store('Mongo DB').schema('user.session')
      ses_rsft_schema = ts.data_store('Redshift').schema('user.session')
      usr_rsft_schema = ts.data_store('Redshift').schema('user.page_view')
                   
                  

    A schema resides within a data store so in order to select the schema we access it through the data store that it sits in. In this example there are a total of five schemas impacted and, for readability, each has been created as it's own object

    Data lineage is all about expressing data movement from one field within a schema to another field within a different schema and Tree Schema tries to capture those object relationships.

  2. Create links between your source and target fields
    
      # Define links between the fields (Source, Target)
      transform_links = [
        (dvc_sess_schema.field('session_id'), ses_rsft_schema.field('session_id')),
        (dvc_sess_schema.field('event_ts'),   ses_rsft_schema.field('event_ts')),
    
        (dvc_sess_schema.field('user_id'),  user_dvc_schema.field('user_id')),
        (dvc_sess_schema.field('event_ts'), user_dvc_schema.field('event_ts')),
    
        (user_dvc_schema.field('user_id'),  usr_mngo_schema.field('user_id')),
        (user_dvc_schema.field('event_ts'), usr_mngo_schema.field('event_ts')),
    
        (user_dvc_schema.field('user_id'),  usr_rsft_schema.field('user_id')),
        (user_dvc_schema.field('page_id'),  usr_rsft_schema.field('page_id')),
        (user_dvc_schema.field('event_ts'), usr_rsft_schema.field('event_ts'))
      ]
                  

    Links are just tuples of (source, target), you can add a list of as many as you’d like. Since each of the schema objects has been previously defined we can simply access fields within the schema by referring to their name.

  3. Select the transformation and add the links
    
      # Retrieve the transformation
      t = ts.transformation('User-Session Analytics Pipelines')
      
      # Create your links
      t.set_links_state(transform_links)
                  

    The transformation is just a shell to hold transformation links, it’s up to you to decide how to put links into transformations. Divvy it up by use-case, source schema, or whatever works for you!

Reviewing Data Lineage with GIT

Looking at this in GitHub we can start to see how the Tree Schema Python client really makes collaboration and data lineage management simple. When new fields are added or removed from a transformation your team can easily review these changes. The downstream impact to your data scientists, business analysts and other data users will help to build a tremendous amount of trust in your data.

Manage Data Lineage in GitHub

Tree Schema in Your CICD Process

We recommended that you integrate your Tree Schema data lineage into your CICD pipeline or build process since this is the single point of control that exists before your actual data lineage changes. As seen above, reviewing the changes to data lineage can rather easy and the next step would be to simply run the script right before your code is moved into production. Each of the APIs and functions in the Python client that create data assets are idempotent so you don't have to worry about creating duplicate data lineage if the script is executed multiple times!

Each of the examples above has used the transformation method set_links_state(). With this method you can call the API over and over with a set of links and the transformation will update to contain exactly the set of links that you provided. There are other functions to create and delete links, either individually or in batches, but the recommended method is to set the entire state of the transformation. Just set it and forget it!

All of the other methods in the Python client to create new data assets are also idempotent. For example if you are creating a transformation you can pass in the same set of inputs from your CICD pipeline each time that you deploy. Only the first execution will create a new transformation object within Tree Schema, all subsequent requests will return the existing transformation.

Make sure to look into the examples referenced in the documetation for more details on how to do this.

Advanced Usage

For a lot of use-cases, manually defining the source and target links will work quite well, especially if you are creating new fields (e.g. count of user_id), have multiple target fields per source or if you have multiple source fields per target. However, there will be times that you do not want to manually define the full data lineage. In fact, if you've already used Tree Schema to automatically populate your data catalog then you can use exiting metadata to automate your data lineage!

Here are some use-cases where we use the Tree Schema Python client to automate data lineage creation:

  • Lift and shift: taking a complete extract of a table and move it to an analytical environment / data lake
  • Data cube creation: many aggregations are applied at an entity level across a specified time period, for example: count of page views per day per user, count of sessions per day per user, count of mobile logins per day per user, etc. The naming conventions are consistent enough that scripting can build a large number of links for us automatically
  • Inside of application code: many aggregations are applied at an entity level across a specified time period, for example: count of page views per day per user, count of sessions per day per user, count of mobile logins per day per user, etc. The naming conventions are consistent enough that scripting can build a large number of links for us automatically

Since the basics have been covered above we’ll explain each with examples mostly with code but also adding in a little bit of commentary where necessary.

Lift and shift

This process simply creates data lineage by matching field names in one schema with field names in another schema and then creating links where the field names match.

            
  from treeschema import TreeSchema
  ts = TreeSchema('YOUR_EMAIL', 'YOUR_SECRET_KEY')
  
  # Get the source and target schemas
  src_schema = ts.data_store('My Source DS').schema('src.schema')
  tgt_schema = ts.data_store('Target DS').schema('tgt.schema')

  # Pre fetch all of the fields to reduce API overhead
  src_schema.get_fields()
  tgt_schema.get_fields()
  
  transform_links = []
  
  # For each field in the source, check if the field name exists in the target
  for src_field in src_schema.fields.values():
      tgt_field = tgt_schema.field(src_field.name)
      if tgt_field:
        link_tuple = (src_field, tgt_field)
        transform_links.append(link_tuple)
  
  # Set the state to the current value, this will create new links and
  # deprecate old links
  
  t = ts.transformation('My Second Transform')
  t.set_links_state(transform_links)
            
          

This can create many transformation links at once:

Many links created by Data Lineage API
Data cube creation

We run a lot of analysis and many of the pre-computed features that we create follow a very similar paradigm for how they are created. If some of these processes were to be written in SQL it would they may look something like this:

            
  select
    user_id,
    sum(case when dvc_type = 'ios' then 1 else 0 end) as user_dvc_ios_login,
    sum(case when dvc_type = 'android' then 1 else 0 end) as user_dvc_android_login,
    ...
  from some.table
  group by user_id
            
          

This can give us metrics such as total iOS logins per user, total Android logins per user, total session duration per user, etc. As you can see, there are two inputs into each of these metrics: the user and the field that is being counted. We can recreate this with a simple script. It should be noted that, at least for this example, you will need to have consistent naming conventions throughout your data pipeline to support matching fields that share the same name pattern.


  from treeschema import TreeSchema
  ts = TreeSchema('YOUR_EMAIL', 'YOUR_SECRET_KEY')
  
  # Get the source and target schemas
  src_schema = ts.data_store('Postgres').schema('usr.session_events')
  tgt_schema = ts.data_store('Redshift').schema('usr.page_view_analytics')
  
  # Pre fetch all of the fields to reduce API overhead
  src_schema.get_fields()
  tgt_schema.get_fields()

  # In this example there are multiple source fields for each target field,
  # we will infer the two source fields from the target field name 
  field_descriptors = {'user': 'user_id', 'dvc': 'dvc_type'}
  def get_src_field_names(tgt_field):
      tgt_field_pieces = tgt_field.split('_')
      src_field_1 = field_descriptors.get(tgt_field_pieces[0])
      src_field_2 = field_descriptors.get(tgt_field_pieces[1])
      
      # Field names are all stored internally in lowercase to remove 
      # case sensitivity, convert them here to conform to the standard
      if src_field_1:
        src_field_1 = src_field_1.lower()
      if src_field_2: 
        src_field_2 = src_field_2.lower()

      return src_field_1, src_field_2 
      
  # For each field in the source, check if the field name exists in the target
  transform_links = []
  for tgt_field in tgt_schema.fields.values():
      src_1_name, src_2_name = get_src_field_names(tgt_field.name)
      src_1_field = src_schema.field(src_1_name)
      src_2_field = src_schema.field(src_2_name)
      if src_1_field and src_2_field:
          link_tuple_1 = (src_1_field, tgt_field)
          link_tuple_2 = (src_2_field, tgt_field)
          transform_links.append(link_tuple_1)
          transform_links.append(link_tuple_2)
  
  # Set the state to the current value, this will create new links and
  # deprecate old links
  
  t = ts.transformation('Data Cube Transform')
  t.set_links_state(transform_links)
          

And again, the corresponding transformation:

Create Links from Aggregations with the Tree Schema API

This example is also somewhat simplistic but it demonstrates the potential complexity that you can apply with the API.

Inside of application code

What is more fun than managing metadata than to actually having it integrated into the code that creates the data?

We use Faust for our stream processing. In this example we use the same Faust models that define our serialization to register our data lineage. Now, it should be noted that you could potentially run into a rate limit exception by doing this. Our Faust apps run in docker containers and we invoke the Tree Schema client when a new container is spun up; this only happens after deployments and a handful times in the middle of the day when autoscaling kicks in.

The main benefit here is that it’s a no-forget solution. That is, if we update our app but we don’t change the data lineage then the app will break. The code itself should not allow us to forget to make this change. The downside is that it’s another dependency for a production app, which also means this may not be a good idea if your app is mission critical. This is a fun and somewhat quirky way to integrate the Tree Schema data lineage API intended to show off it’s flexibility but probably should not be your primary integration pattern.


  import faust
  from kafka import KafkaProducer
  
  # Faust Models - these will also be used to create out schemas in Tree Schema
  class User(faust.Record, serializer='json'):
      user_id: str
      rand_val: float
  
  class UserOutput(faust.Record, serializer='json'):
      user_id: str
      user_cnt: int
      user_max: int
      user_min: int
  
  # Kafka configs
  INPUT_TOPIC_NAME = 'user-input-data.v1'
  OUTPUT_TOPIC_NAME = 'user-output-aggregations.v1'
  BROKER = 'localhost:9092'
  
  # Faust app
  app = faust.App(
      'UserProcess',
      broker=f'kafka://{BROKER}', 
      version=1,
      consumer_auto_offset_reset='latest'
  )
  
  # Producer that will write the output
  KAFKA_PRODUCER = KafkaProducer(bootstrap_servers=[BROKER])

  # Function to send messages back to another kafka topic
  def publish_message(user_id, user_cnt, user_max, user_min):
      user_output = UserOutput(
          user_id=user_id, 
          user_cnt=user_cnt,
          user_max=user_max,
          user_min=user_min
      )
      key = bytes(user_id, encoding='utf-8')
      KAFKA_PRODUCER.send(
          OUTPUT_TOPIC_NAME, 
          key=key, 
          value=user_output.dumps()
      )
      KAFKA_PRODUCER.flush()
  
  
  # Incoming Faust topic definition
  INPUT_TOPIC = app.topic(INPUT_TOPIC_NAME, value_type=User)
  
  # Table to keep track of per-user events to calculate metrics
  TABLE = app.Table(
      'UserAggregationTable', 
      default=list,
      partitions=1
  )
  
  # The actual stream process
  @app.agent(INPUT_TOPIC)
  async def print_totals(stream):
      async for user in stream:
          user_list = TABLE[user.user_id]
          user_list.append(user.rand_val)
          TABLE[user.user_id] = user_list
          publish_message(
              user_id=user.user_id, 
              user_cnt=len(TABLE[user.user_id]),
              user_max=max(TABLE[user.user_id]),
              user_min=min(TABLE[user.user_id])
          )
  
  
  # Tree Schema Configs
  from treeschema import TreeSchema
  ts = TreeSchema('YOUR_EMAIL', 'YOUR_SECRET_KEY')
  
  # In this example, we get or create the schemas.
  # If the schemas do not exist in Tree Schema then they will be created,
  # otherwise the existing schema will be returned
  src_schema_inputs = {'name': INPUT_TOPIC_NAME, 'type': 'json'}
  tgt_schema_inputs = {'name': OUTPUT_TOPIC_NAME, 'type': 'json'}
  src_schema = ts.data_store('Kafka').schema(src_schema_inputs)
  tgt_schema = ts.data_store('Kafka').schema(tgt_schema_inputs)
  
  # We need to get or create the fields for each schema, this function ensures
  # that each field is created before creating the transformation link. 
  # You can pass in the native Python data type and the Tree Schema library 
  # will infer the correct values which works well since Faust gives us the 
  # field names and data types!
  def get_create_fields(schema, faust_model):
      for field, dtype in faust_model._options.fields.items():
          # This will create the field if it does not exist or retrieve it if it does
          schema.field({'name': field, 'type': dtype})
  
  # Make sure the fields are created for each schema.
  # This will only create the fields the first time it is executed
  get_create_fields(src_schema, User)
  get_create_fields(tgt_schema, UserOutput)
  
  # Create the data lineage - notice the Faust Models are used to get the fields
  transform_links = [
      (src_schema.field(User.user_id.field), tgt_schema.field(UserOutput.user_id.field)),
      
      (src_schema.field(User.user_id.field), tgt_schema.field(UserOutput.user_cnt.field)),
      
      (src_schema.field(User.user_id.field), tgt_schema.field(UserOutput.user_max.field)),
      (src_schema.field(User.rand_val.field), tgt_schema.field(UserOutput.user_max.field)),
      
      (src_schema.field(User.user_id.field), tgt_schema.field(UserOutput.user_min.field)),
      (src_schema.field(User.rand_val.field), tgt_schema.field(UserOutput.user_min.field))
  ]
  
  # Again, this will create or retrieve the transformation transformation
  transform_inputs = {'name': 'Faust Transform', 'type': 'pub_sub_event'}
  t = ts.transformation(transform_inputs)

  # Finally, set the state of the transformation!
  t.set_links_state(transform_links)
            
          

If you are interested in the full working example, including sending and processing messages, integrating with Tree Schema and then outputting the events back to Kafka check out the Faust examples in the GitHub repo.

And, of course, the code above would create the following data lineage within Tree Schema:

Data Lineage with Faust and Tree Schema

Closing Thoughts

We’d love to know how you’re using the Tree Schema client or what features you’d like to see! Drop us a line and we’ll happily trade feedback for discounts :)


Share this article:

Like this article? Get great articles direct to your inbox