Featured image of post Trigger and Monitor Data Factory Jobs from Databricks Workflows

Trigger and Monitor Data Factory Jobs from Databricks Workflows

In data engineering in the Azure Cloud, a common setup is to use Azure Data Factory to orchestrate data pipelines. If you wanted to orchestrate Databricks pipelines, you had a powerful tool at hand with Data Factory to schedule and orchestrate the Databricks pipelines. In this article, I will show you the other way around: how to start and monitor Data Factory pipelines from Databricks workflows.

Intro

With Databricks Workflows, Databricks offers a powerful ETL orchestration suite that can be used to orchestrate and trigger all data pipelines in Databricks. The functionalities of workflows have been continuously expanded in recent months.

If I now implement a data platform in Azure with Azure Databricks, I will have two different options as to where I want to implement my orchestration. Two years ago, that would have been a decision you didn’t even have to think about; that would have been the Data Factory because it simply offered all the functionalities. Nowadays, my decision is different. If my data platform is realized with Databricks and 95% of the pipelines are created with Databricks, then I also want to have the orchestration in Databricks. Of course, it would be nice if I could create 100% of the pipelines with Databricks, but there are still certain situations in which the Data Factory is more suitable, for example, if I need to copy large amounts of data or integrate data sources from on-prem networks. Databricks Lakeflow Connect is on the way, but not yet GA. I’ll have to be patient a little longer. As soon as Lakeflow Connect is ready for use, you have to go back over the books.

You could now go and create a few Data Factory pipelines and orchestrate them in Data Factory. All Databricks pipelines would then be orchestrated in workflows. However, this would be a sub-optimal setup. There may be dependencies between the pipelines. For example, I have to copy data from an on-prem system to the data lake with an ADF pipeline (ADF = Azure Data Factory) and then start a Databricks pipeline. How do I know that the data factory is ready? I can make a schedule that starts the ADF pipeline at 04:00 in the morning. I know from experience that the ADF job takes around 30 minutes, so I plan a little reserve and start the Databricks pipeline at 05:00. It’s bad because I lost 30 minutes. It’s also bad because if there are problems, the pipeline fails. For example, the ADF pipeline may take longer for some reason, i.e., it may not be ready at 05:00. This then leads to conflicts. You could then say that instead of scheduling the Databricks pipeline as well, you use a different trigger, e.g., a file trigger. The destination of the ADF pipeline, where the files are written to the Datalake, can be monitored with Databricks workflows; as soon as a file is written, the Databricks pipeline is started first. This allows me to start processing the files in Databricks straight away. However, it must be remembered that the files are only being written. So, I can’t start Databricks when the first file is being written. You have to use a trick to write a kind of control file after the copy job in ADF when everything runs successfully. This is then the trigger for Databricks to start its job. But that doesn’t sound good either. Another possibility would be to execute a task after the copy job in ADF, which then triggers Databricks. Somehow, however, the possibilities are limited. I can execute Databricks Notebooks, Databricks Jar, or Databricks Python code in ADF.

This means that I would have to write code in one of these three tasks to trigger the actual Databricks job. Another possibility that makes more sense would be to trigger the job in Databricks workflows from ADF via REST API. But what I don’t like about this setup is that I have two different orchestrators. What if something goes wrong? I have to look in Databricks Workflows to see what happened, as well as in ADF. The logic also has to be created, operated, and managed in both places.

So, my goal is to do the central orchestration exclusively with Databricks workflows. All scheduling and triggering take place in Databricks. The Data Factory is only used to execute simple pipelines and to use the Self Hosted Integration Runtime. The pipelines are written generically and started from Databricks with parameters. The aim is that the pipelines have to be created once and do not have to be adapted and deployed for each new data source. All logging should then also be fed back to Databricks.

Data Factory Pipeline

In my example, I have created a simple ADF pipeline.

The pipeline has a single task that waits a defined number of seconds. I do not hard-code the number of seconds but pass it as a parameter. This parameter must, therefore be passed when the pipeline is run:

Creating an Entra Service Principal

If I now want to run this ADF pipeline in an automatic ETL process, I do not run it under my user but use a service principal. There are detailed instructions for Azure on how to create a service principal: https://learn.microsoft.com/en-us/power-apps/developer/data-platform/walkthrough-register-app-azure-active-directory

In the Azure Portal, you can search for app registrations and then create a new one. I choose a descriptive name so that you can roughly recognize the purpose for which this service principal was created.

You can leave the Redirect URI empty.

After creating it, you will see the important information in the Overview. We need the Application (client) ID and the Directory (tenant) ID.

Now we need a secret, which we can create under “Certificates & secrets.”

The created secret is then displayed in the value field. This process is only possible once, so the secret must be copied out immediately and saved in a safe place.

I store this information in my KeyVault.

Authorizing the service principal on the data factory

In order for the service principal to have the correct authorizations to start an ADF pipeline, it must be authorized on the ADF resource. To do this, go to the corresponding resource in the Azure portal.

Note: Of course, I would not set such an authorization manually in a real data project, but via IaC such as Terraform.

When creating the role assignment, I give the service principal “Data Factory Contributor” rights. You can search for the name of the service principal under “Select members.”

Databricks Secret Scope with KeyVault integration

Later on in the example, I don’t want to save the secrets for the app registration in plain text in the code. For this reason, I will create a secret scope in Databricks that points to the KeyVault in which I have entered the corresponding secrets. There are also detailed instructions on how to create the secret scope and connect it to the KeyVault: https://learn.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes

In my example, I call the secret scope “kv”.

In addition, databricks must be authorized on the KeyVault, otherwise you will receive an error message like this:

This authorization is set on the KeyVault resource, similar to how it was set on the Data Factory resource. At the moment, I don’t want to change any secrets via Databricks, only read them, so I assign the “Key Vault Secrets User” role and assign it to the “AzureDatabricks” application.

Databricks Notebook for ADF REST API Call

In this example, I would like to make a simple REST call. First I have to import the corresponding Python libraries and then I set various variables so that I can control my correct data factory. I can pack the pipeline parameters for the data factory into a dictionary and thus extend them as required.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import requests
import json

subscription_id = 'f43d1b9d-e6a3-49d5-abcd-6f952ee6e67f'
resource_group  = 'rg-stefan'
factory_name    = 'adf-stefan-nona'
pipeline_name   = 'DummyWaitPipeline'
parameters      = {
                    'seconds_to_wait': 1
                  }

To interact with the Azure management API, you must first generate an access token using the service principal’s credentials. I have written a function for this purpose.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def create_access_token():
    """
    This function creates an access token using client credentials.
    
    Returns:
        str: Access token if successful, None otherwise.
    """
    
    client_id = dbutils.secrets.get(scope="kv", key="app-reg-adf-job-client-id")
    client_secret = dbutils.secrets.get(scope="kv", key="app-reg-adf-job-client-secret")
    tenant_id = dbutils.secrets.get(scope="kv", key="app-reg-adf-job-tenant-id")

    token_url = f'https://login.microsoftonline.com/{tenant_id}/oauth2/token'
    resource = 'https://management.core.windows.net/'

    headers = {
        'Content-Type': 'application/x-www-form-urlencoded'
    }

    data = {
        'grant_type': 'client_credentials',
        'client_id': client_id,
        'client_secret': client_secret,
        'resource': resource
    }
    try:
        response = requests.post(token_url, headers=headers, data=data)
        if response.status_code == 200:
            access_token = response.json()['access_token']
            return access_token
        else:
            error_message = response.json().get('error', {}).get('message', 'No additional error message provided.')
            print(f'Failed to obtain access token with status code {response.status_code}. Error: {error_message}')

    except requests.exceptions.RequestException as e:
        print(f"An error occurred while obtaining the access token: {e}")

Now that the access token has been generated, the next step is to implement the start of the pipeline. So that I can keep the whole thing a little generic, I have also created a simple function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def trigger_adf_pipeline_run(subscription_id, resource_group, factory_name, pipeline_name, parameters):
    """
    Triggers an Azure Data Factory pipeline run.

    Args:
        subscription_id (str): The subscription ID for the Azure account.
        resource_group (str): The resource group name where the Data Factory is located.
        factory_name (str): The name of the Data Factory.
        pipeline_name (str): The name of the pipeline to trigger.
        parameters (dict): A dictionary of parameters to pass to the pipeline run.

    Returns:
        str: The run ID of the triggered pipeline if successful, None otherwise.
    """
    
    access_token = create_access_token()

    api_url = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.DataFactory/factories/{factory_name}/pipelines/{pipeline_name}/createRun?api-version=2018-06-01"

    # Define headers with Authorization
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {access_token}'
    }

    try:
        # Make the POST request
        response = requests.post(api_url, headers=headers, json=parameters)

        # Check response
        if response.status_code == 200:
            run_id = response.json().get('runId')
            print(f'Pipeline run triggered successfully. Run ID: {run_id}')
            return run_id
        else:
            # Output specific error message for better debugging
            error_message = response.json().get('error', {}).get('message', 'No additional error message provided.')
            print(f'Pipeline run failed with status code {response.status_code}. Error: {error_message}')
            
    except requests.exceptions.RequestException as e:
        print(f"An error occurred while triggering the pipeline: {e}")

To start the ADF pipeline, I call the function:

1
adf_run_id = trigger_adf_pipeline_run(subscription_id, resource_group, factory_name, pipeline_name, parameters)

I then receive a success message in Databricks that the pipeline has been started:

A look at the Data Factory also shows the successful run. Since I played around a bit with the Databricks notebook, you can see several runs:

When I sort by run-id, I find the corresponding run:

The Databricks notebook then looks like this:

Databricks Workflows Job

Now I’m putting the whole thing into a Databricks job. But before I configure the job, I will customize the notebook a bit. I will not write the variables hard-coded in the notebook, but add them as widgets. This way I can also reuse the notebook for other pipelines.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
dbutils.widgets.text("subscription_id", "f43d1b9d-e6a3-49d5-abcd-6f952ee6e67f")
dbutils.widgets.text("resource_group", "rg-stefan")
dbutils.widgets.text("factory_name", "adf-stefan-nona")
dbutils.widgets.text("pipeline_name", "DummyWaitPipeline")
dbutils.widgets.text("parameters", '{"seconds_to_wait": 1}')

subscription_id = dbutils.widgets.get("subscription_id")
resource_group = dbutils.widgets.get("resource_group")
factory_name = dbutils.widgets.get("factory_name")
pipeline_name = dbutils.widgets.get("pipeline_name")
parameters = json.loads(dbutils.widgets.get("parameters"))

It looks like this:

In the Workflows tab, I create a new job and give it a descriptive name. I select Notebook as the task and enter the path to the previously created notebook.

I also enter the required information in the parameters.

I can start the job by clicking on “Run now”. After a few seconds, the job runs successfully.

If you click on the run, you can also see the details of the run.

What about the runtime? Is Databricks waiting for ADF?

The question I’m asking myself now is what happens if the Data Factory pipeline has a long runtime? Does the Databricks workflow then wait until the ADF pipeline is finished? That would cause unnecessary costs. To answer this question, I will set the waiting time of the dummy pipeline to 5 minutes. All I have to do is change the corresponding value in the parameters.

Well, as it turns out, the runtime has not changed as a result. The Databricks pipeline runs for exactly 32 seconds and is then terminated. The first run took 33 seconds. This means that the Databricks job does not wait for the execution of the ADF pipeline.

And here is the screenshot of the ADF execution:

This means that compute resources are not wasted unnecessarily. On the other hand, I don’t know whether the job in the ADF has been successfully completed.

Check ADF-Pipeline Run via REST API

In another example, I would like to extend the notebook and see what the current status of the pipeline is. For this purpose, I rewrite the Databricks notebook a little and add a function to it. The Microsoft documentation describes in detail how to get the status of an ADF pipeline run via REST: https://learn.microsoft.com/en-us/rest/api/datafactory/pipeline-runs/get?view=rest-datafactory-2018-06-01&tabs=HTTP

First I add 2 widgets, adf_run_id and action.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
dbutils.widgets.text("subscription_id", "f43d1b9d-e6a3-49d5-abcd-6f952ee6e67f")
dbutils.widgets.text("resource_group", "rg-stefan")
dbutils.widgets.text("factory_name", "adf-stefan-nona")
dbutils.widgets.text("pipeline_name", "DummyWaitPipeline")
dbutils.widgets.text("parameters", '{"seconds_to_wait": 1}')
dbutils.widgets.text("adf_run_id", "")
dbutils.widgets.dropdown("action", "start_adf_pipeline", ["start_adf_pipeline", "check_adf_pipeline"])

subscription_id = dbutils.widgets.get("subscription_id")
resource_group = dbutils.widgets.get("resource_group")
factory_name = dbutils.widgets.get("factory_name")
pipeline_name = dbutils.widgets.get("pipeline_name")
parameters = json.loads(dbutils.widgets.get("parameters"))
adf_run_id = dbutils.widgets.get("adf_run_id")
action = dbutils.widgets.get("action")

I copied the previous function trigger_adf_pipeline_run and created a new function that queries the status.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def check_adf_pipeline_run(subscription_id, resource_group, factory_name, pipeline_name, adf_run_id):
    """
    Check status of an Azure Data Factory pipeline run.

    Args:
        subscription_id (str): The subscription ID for the Azure account.
        resource_group (str): The resource group name where the Data Factory is located.
        factory_name (str): The name of the Data Factory.
        pipeline_name (str): The name of the pipeline to trigger.
        adf_run_id (str): The id of the adf pipeline run to check.

    Returns:
        str: The status of a pipeline run.
    """
    
    access_token = create_access_token()

    api_url = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.DataFactory/factories/{factory_name}/pipelineruns/{adf_run_id}?api-version=2018-06-01"

    # Define headers with Authorization
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {access_token}'
    }

    try:
        # Make the GET request
        response = requests.get(api_url, headers=headers)

        # Check response
        if response.status_code == 200:
            adf_pipe_status = response.json()['status']
            adf_pipe_full_status = response.json()
            print(f'Got pipeline run status:')
            return adf_pipe_status, adf_pipe_full_status
        else:
            # Output specific error message for better debugging
            error_message = response.json().get('error', {}).get('message', 'No additional error message provided.')
            print(f'Something went wrong with status code {response.status_code}. Error: {error_message}')
            
    except requests.exceptions.RequestException as e:
        print(f"An error occurred while get the status of the pipeline: {e}")

I have also rewritten the function call accordingly.

1
2
3
4
5
6
7
if action == 'start_adf_pipeline':
    adf_run_id = trigger_adf_pipeline_run(subscription_id, resource_group, factory_name, pipeline_name, parameters)

elif action == 'check_adf_pipeline':
    adf_pipe_status, adf_pipe_full_status = check_adf_pipeline_run(subscription_id, resource_group, factory_name, pipeline_name, adf_run_id)
    print(adf_pipe_status)
    print(adf_pipe_full_status)

If I now run the notebook with the action ‘check_adf_pipeline’, I get the following response:

Now I can query the status of an ADF pipeline programmatically. But I have to bear in mind that this is only half the battle. What I don’t want to do is trigger an ADF pipeline and then check every 5 minutes with a Databricks job to see if the pipeline is complete.

What would make more sense here is that at the end of the ADF pipeline, when the relevant tasks have been processed, the ADF pipeline triggers a Databricks job. This can be realized by creating an additional task at the end. In this way, you would be informed immediately and can then start another job in Databricks. Calling a Databricks job would also be implemented via a REST call.

Preparation for Trigger Databricks Job from Azure Data Factory

Before I dedicate myself to the actual rest of the call, I make a few small preparations. Firstly, I create a simple notebook that is to be executed in a job.

The notebook does nothing other than accept and print 2 parameters. These are the run ID of the ADF and the run ID of Databricks. To execute the notebook automatically, I create the corresponding Databricks job.

Service principal to start Databricks workflow job

The job in Databricks must be executed via a service principal. We also need this service principal to make the REST call. This means that there is a machine-to-machine athentication.

For the sake of simplicity, I use the same service principal with which I start the ADF pipeline. In practice, however, it makes sense to create another service principal.

I will add the service principal in the Databricks account. To open it, I go to the account:

https://accounts.azuredatabricks.net/

As the service principal has already been created, I select “Microsoft Entra ID managed”.

Then I go to the workspace and add the service principal to the workspace.

Now I have to authorize the Service Principal to the job so that he is able to start the job at all.

Customizing the ADF pipeline

In the Data Factory, I now need to customize the pipeline. I add an additional task, a web activity.

In the pipeline parameters I add 2 additional parameters:

  • dbx_workspace_id: The ID of the Databricks workspace.
  • dbx_job_id: The ID of the Databricks job that I want to trigger from ADF.

I can read the Databricks Workspace ID from the URL.

I can see the job ID when I open the Databricks job.

Now I configure the web activity.

I make the URL dynamic so that I can address any Databricks workspaces. The content looks like this:

1
2
3
4
5
@concat(
      'https://'
    , pipeline().parameters.dbx_workspace_id
    , '.azuredatabricks.net/api/2.1/jobs/run-now'
    )

I also make the body of the REST call dynamic. There I enter the Databricks job ID, dbx_run_id and adf_run_id as parameters. The job ID is intended for Databricks to know which job is to be executed, the other two parameters are notebook parameters. The pipeline expression builder in the Data Factory takes a bit of getting used to. That is, it is not exactly user-friendly to build an expression. I have gotten into the habit of writing all subsctrings that are to be combined in a new line for concat methods. The content of the body then looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@concat(
        '{"job_id": '
        , pipeline().parameters.dbx_job_id
        , ', '
        , '"notebook_params": {'
        , '"dbx_run_id": "'
        , pipeline().parameters.dbx_run_id
        , '", '
        , '"adf_run_id": "'
        , pipeline().RunId
        , '"'
        , '}}'
)

During authentication, I select the service principal and enter the relevant information:

You could now integrate the KeyVault as a linked service in ADF and then link the secret from there. This certainly makes sense in productive environments. For the sake of simplicity, I will omit this in my example.

I enter the following hard-coded ID for the resource:

1
2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

The Azure resource ID 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d refers to the Databricks API resource in Entra. This ID is the unique application ID for Databricks and is used when a service principal or user identity in Azure accesses Databricks via the API.

I will now run a first test by starting the pipeline in debug mode. And it runs successfully.

In Databrick’s workflows, I can also see that the job has run successfully.

And then I take a look at the notebook that was executed by this dummy job. There I can also see the run ID of the ADF.

This matches the corresponding information in ADF.

Complete example pipeline

Now I would like to combine the individual steps into an overall picture. I would like to implement the following logic:

  1. start a Databricks job
  2. the Databricks job triggers an ADF job
    • When called, the Run ID is retrieved from the Data Factory job
    • The job writes its own Run ID and that of the ADF to a delta table
  3. the ADF pipeline is started
    • The ADF pipeline executes its intended tasks
    • At the end of the pipeline, a web activity is started, which starts a Databricks job
    • When the Databricks follow-up job is triggered, the current run ID of the ADF pipeline is transferred
  4. the Databricks follow-up job is started.
    • The run ID of the Databricks follow-up job is written to the delta table
    • The status of the ADF job is retrieved using a function and written to the delta table

With this mechanism, the ADF jobs and the Databricks jobs can be linked and monitored.

For this purpose, I have created a rudimentary delta table and assigned the corresponding authorizations to the service principal.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE CATALOG IF NOT EXISTS demo;
CREATE SCHEMA IF NOT EXISTS demo.demo;
CREATE TABLE IF NOT EXISTS demo.demo.job_run (
    id BIGINT GENERATED ALWAYS AS IDENTITY,
    dbx_start_run_id STRING,
    dbx_follow_run_id STRING,
    adf_run_id STRING,
    adf_run_status STRING, 
    PRIMARY KEY (id)
);

I also need to extend the notebook for triggering the ADF job. I create two additional widgets, one that receives the Databricks Run ID from the start job and a second that receives the Delta Table.

1
2
3
4
dbutils.widgets.text("dbx_start_run_id", "test")
dbutils.widgets.text("job_run_table", "demo.demo.job_run")
dbx_start_run_id = dbutils.widgets.get("dbx_start_run_id")
job_run_table = dbutils.widgets.get("job_run_table")

I have written a simple function so that the data can be written to the delta table.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def write_run_id_to_table(dbx_start_run_id, adf_run_id, job_run_table):
    """
    Inserts the Databricks start run ID and Azure Data Factory run ID into a specified table.

    Args:
        dbx_start_run_id (str): The Databricks start run ID.
        adf_run_id (str): The Azure Data Factory run ID.
        job_run_table (str): The name of the table where the IDs should be inserted.

    """
    sql = f"""INSERT INTO {job_run_table} (dbx_start_run_id, adf_run_id)
                VALUES
                ('{dbx_start_run_id}', '{adf_run_id}')          
            
            """
    print(f"Insert Run-ID's into table {job_run_table}")
    spark.sql(sql).display()
    spark.sql(f"SELECT * FROM {job_run_table} ORDER BY id DESC LIMIT 1").display()

And then added this function when triggering the ADF.

1
2
3
4
5
6
7
8
if action == 'start_adf_pipeline':
    adf_run_id = trigger_adf_pipeline_run(subscription_id, resource_group, factory_name, pipeline_name, parameters)
    write_run_id_to_table(dbx_start_run_id, adf_run_id, job_run_table)

elif action == 'check_adf_pipeline':
    adf_pipe_status, adf_pipe_full_status = check_adf_pipeline_run(subscription_id, resource_group, factory_name, pipeline_name, adf_run_id)
    print(adf_pipe_status)
    print(adf_pipe_full_status)

Now I’ll adjust the ADF job accordingly. I’ve noticed that I don’t need to pass the Databricks Run ID if I write it to the Delta Table, so I’ll remove it.

I also adapt the body of the web activity accordingly and remove these parameters so that it looks like the following:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@concat(
        '{"job_id": '
        , pipeline().parameters.dbx_job_id
        , ', '
        , '"notebook_params": {'
        , '"adf_run_id": "'
        , pipeline().RunId
        , '"'
        , '}}'
)

I now adapt the follow-up job in databricks so that the current run ID of the job is transferred to the notebook in the parameter dbx_run_id. I can do this by entering the following string in the value: {{job.run_id}}

It will look like this

If I now start the job to trigger the ADF, the ADF pipeline is executed, this starts the dummy job at the end and this then outputs the Run IDs.

Now I’m going to write a function there that writes this information to the Delta Table. The name Dummy for the job and the notebook is no longer quite appropriate, so I’ll rename it Follow Job. The function is no longer an INSERT, but an update. As the Run ID from the Data Factory is already in the table, I will insert the status from the ADF job and the Run ID from the Databricks Follow Job accordingly. The function for the update then looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def update_run_id_table(dbx_run_id, adf_run_id, job_run_table, adf_pipe_full_status):
    """
    Inserts the Databricks start run ID and Azure Data Factory run ID into a specified table.

    Args:
        dbx_start_run_id (str): The Databricks start run ID.
        adf_run_id (str): The Azure Data Factory run ID.
        job_run_table (str): The name of the table where the IDs should be inserted.

    """

    sql = f"""UPDATE {job_run_table}
              SET 
              dbx_follow_run_id = "{dbx_run_id}", 
              adf_run_status = "{adf_pipe_full_status}"
              WHERE adf_run_id = "{adf_run_id}"
            """

    # print(sql)        
    print(f"Update table {job_run_table}")
    spark.sql(sql).display()

Now I start a final test. I go to my initial job in Databricks Workflows and start the pipeline. The dbx_start_run_id and the adf_run_id are written to the delta table.

The corresponding pipeline was started in the Data Factory.

This in turn started the follow job in Databricks, which then wrote the missing information to the delta table.

Now the runs from the Data Factory and Databricks can be brought into relation by querying the Delta Table.

In addition, this delta table could be joined with the system table of databricks, in which the runs of databricks are located.

Conclusion

Using REST queries, it is easy to start Data Factory jobs from Databricks workflows and vice versa. With these methods, you have a lot of freedom to set up central orchestration and monitoring in Databricks.

In each of my examples, I have used an Azure Service Principal (aka App Registration). Of course, you could also use a Managed Identity to authenticate yourself in Databricks or Data Factory.

The setup works best with serverless clusters. This is because the Databricks job that starts the ADF job only takes a few seconds. So it makes no sense to use a classic cluster.

The code examples used are examples. This means that they were created for illustrative purposes and should of course be improved accordingly if you want to use them in production. The processes should also be adapted to your own environment. The artifacts created should only serve as input.

I provide the created artifacts on the following Github repo: https://github.com/stefanko-ch/Databricks_Dojo/tree/main/Trigger_and_Monitor_ADF_from_Databricks

I take no responsibility for the content and accuracy of the code. It is what it is and you are welcome to customize it. In addition to the notebooks created, I also provide the definitions of the Databricks jobs and the ADF pipeline.

I am happy to receive questions, tips or comments. 😊

comments powered by Disqus
All content on this website reflects my personal opinion only.
Built with Hugo