Custom Connector Example

Setup Guide

Download the example code from Github and open up your folder in Visual Studio Code.

You have to implement 2 endpoints in your custom connector: / and /schema

  • / will be called for every table name to get the data;
  • /schema is called by Weld to get the schema for the connector (table names, field names, and types).

1 - What should be returned in the schema /schema?

Weld will infer the schema of your table but you must declare the table name, primary key and fields in the schema:

Define schema

def getSchema():
   return {
       "schema": {
           "table_name": {
               "primary_key": "id",
               "fields": [ ]
           }
      }
}

\

2 - What should be returned in the root /?

When you run your function you should expect to return a dictionary with three keys:

  • insert - rows to insert into destination following the schema
  • state - current state of the function
  • hasMore - boolean value that indicates whether there are more pages to be fetched or not.

Dictionary keys

return {
       "insert": rows,
       "state": {
         "next_page":next_page if has_more else None,
         "ratelimit": new_ratelimit
       } ,
       "hasMore": has_more
   }

2.1 - Authentication

Depending on the API there are a variety of ways that API authentication can occur:

1 - API keys: This is a simple and widely used authentication method where a unique key is generated for each API client. The client includes this key in the API request to authenticate themselves.

API keys

api =  "&api_token=" + os.environ.get('API_BEARER_TOKEN')

2 - OAuth: This is an industry-standard protocol that allows secure authorization and authentication between two parties. OAuth provides access tokens that are used by the API client to access protected resources. 3 - JSON Web Tokens (JWT): JWT is a popular method of authentication for APIs. It involves the exchange of JSON objects containing information about the client and the authorization scopes. The client includes the JWT in the request to authenticate themselves. 4 - Basic authentication: This is a simple authentication method where the API client includes a username and password in the request header. The server verifies the credentials before allowing access to protected resources.


The authentication process is implemented through the inclusion of relevant credentials within the header of the API.

\

2.2 - Rate Limiting

Rate limiting is used to control the amount of traffic that can be sent or received by an application or API over a period of time. Each API will have different rate limits and ways of handling this.

For our example there is a limit of 3600 requests per hour. We can use the header responses to know how many responses we have left that hour. If we hit our max rate limit then the function will wait for 1 hour before trying again.

x-ratelimit-remaining: 3587

x-ratelimit-limit: 3600

Rate limit

    # Check rate limit
    if(state.get("ratelimit")):
      ratelimit_remaining = int(state.get("ratelimit").get("remaining"))
      if(ratelimit_remaining<5):
        current_time = time.time()
        n = 1
    # Add 60 minutes to datetime object containing current time
        wait_time = current_time + timedelta(minutes=n)
        time.sleep(3600)
      else:
        time.sleep(0.5)

2.3 - Pagination


Pagination is used in data management to break down large amounts of data into smaller, more manageable chunks or pages. It is a way to limit the number of results returned by a query, API, or website to a certain number per page, with the ability to navigate to other pages to access the remaining data.

API pagination can be done in several ways, including

  • Offset-based: skips a fixed number of items to retrieve the next page
  • Cursor-based: uses a pointer to the last item retrieved
  • Time-based: retrieves data based on timestamps
  • Keyset pagination: retrieves data based on a unique sorting criteria

Pagination part 1

    current_page = 1
    if(state.get("next_page")):
      current_page = state.get("next_page")

Pagination part 2

    # Handle pagination
    next_page = current_page + 1
    has_more = len(rows) >= 79

In our example we are using offset-based pagination where the default number of results per page is limited to 25, and the client can access other pages by specifying the page number using the page parameter.

3 - Running your function

Using the Functions Framework, you can write lightweight functions that run in many different environments, including:

  • Cloud functions
  • Your local development machine
  • Cloud Run
  • Knative-based environments

Install the Functions Framework in your terminal via pip: pip install functions-framework


In your code define a Python function named handler, and use the @functions_framework.http decorator.

Functions Framework

@functions_framework.http
def handler(req):

When you are ready to test your function on Postman run the command in the terminal:

functions-framework --target=handler --debug
\

4 - Testing your function on Postman:


In Postman you can create a get request and enter in your local host url.

In the body of the request you replicate what will be requested by Weld when the custom connector is called.

If the connector runs successfully you should see your data returned inside the insert key:

postman

4.1 - Setting up your function on Google Cloud Functions

Create your new cloud function.

Allow unauthenticated invocations, as this is handled directly in the cloud function code.

google_function_setup

Add your Runtime Environment Variables:

API_BEARER_TOKEN: If your API requires an API key then you can store this here.

WELD_API_KEY: You can create your own API key here which will be used when creating the connector in the Weld app.

google_function_varibles

Copy and paste your code into the cloud function. Make sure the Entry point has the same name as in your code:

google_function_code

Click Deploy.

4.2 - Setting up your function on AWS Lambda

Create a new function from scratch:

lambda_set_up

Add a layer for the SDK requirements.

We have added the AWSSDKPandas-Python39 layer as it included the packages needed to run this function.

lambda_layer

Copy and paste your code into the code source.

If you have been using the functions framework to test then you will need to update the function to use the lambda_handler:

def lambda_handler(event, context):
   path = event["rawPath"][1:]
   if path == "schema":
     return {
         'statusCode': 200,
         'body': json.dumps(getSchema())
       }
   body = json.loads(event["body"])


Add your Runtime Environment Variables through the configuration tab

API_BEARER_TOKEN: If your API requires an API key then you can store this here.

WELD_API_KEY: You can create your own API key here which will be used when creating the connector in the Weld app.

Add your function URL through the configuration tab. This is the URL that will be used when setting it up in Weld.

For Auth type click NONE.

5 - Creating your custom connector in Weld

weld_setup

Select a new custom connector in Weld. You will need your cloud function trigger url.

Then the API key that you defined in your WELD_API_KEY environment variables.

Once the connection has been made you can then select the tables you want to sync and how often you would like the sync to run.

weld_sync

Was this page helpful?