Custom Connector Example

πŸŽ₯ Watch the video tutorial

Prefer to follow along in a video? Here's a full walkthrough from start to finish:


Lets dive in!

1. Set up your development environment

We're writing the pipeline in Python and hosting it with Google Cloud Run, using the functions-framework package. Start by creating and activating a virtual environment:

python3 -m venv .venv
source .venv/bin/activate


Then install dependencies:

pip install requests functions-framework


Create your main.py file and begin with a basic "Hello world" endpoint.

import functions_framework
import flask

@functions_framework.http
def handler(request: flask.Request):
	return "Hello World"


Start the local server using:

functions-framework --target=handler --debug --port 8080


Test it with a curl or Postman request to localhost:8080. You should see β€œHello World” as the response.


2. Review the Weld schema and root endpoint requirements

In the Weld documentation, you'll find requirements for:

  • /schema (GET) - Defines available tables, primary keys, and optionally fields
  • / (POST) - Handles data requests with support for state and pagination

You'll also want to explore the Typeform API docs to understand endpoints, authorization, and data structure.


3. Implement the schema endpoint

Create a get_schema() function that defines one or more tables.

This function should return a JSON object with the available table names, primary keys and optional fields. Weld auto-detects available properties as string fields, but by defining them here you can enhance the type of properties.

For example:

def get_schema():
    return {
        "tables": {
            "form": {
                "primary_key": ["id"],
                "fields": {
                    "id": "string"
                }
            }
        }
    }


Add a GET handler for /schema to return this structure.

@functions_framework.http
def handler(request: flask.Request):
	if(request.path == '/schema'):
		if(request.method == 'GET'):
			return get_schema()
		else:
			return 'Method not allowed', 405


Test that your endpoint returns the correct JSON format.


4. Implement the data flow endpoint

Next, implement the / POST handler. Weld will send the table name and current state in the request body. You'll fetch data from the Typeform API accordingly.

You'll need to:

  • Add your Typeform API key as an environment variable
  • Make authorized API calls to api.typeform.com

def call_api(endpoint, params, retries=0):
    base_url = "https://api.typeform.com"
    url = f"{base_url}/{endpoint}"

    params = {**params}
    headers = {"Authorization": f"Bearer {os.environ.get('TYPEFORM_API_KEY')}"}

    response = requests.get(url, params=params, headers=headers)

    if(response.status_code != 200):
        response.raise_for_status()
    else:
        return response.json()

  • Handle pagination via page_count and page parameters:
    • Weld expects the data in chunks, so if the API is paginated, then you only need to send one page at a time. Then you can use the state to track where the sync needs to continue. In your response, include a state object and update it based on pagination logic.
    • The next time Weld syncs, it will send this state object in the request.
    • When there is no more data, you can send an empty object to make sure the next sync is started with a fresh state.
  • Return results in the format Weld expects, including insert, hasMore, and state fields.

def get_endpoint_config(table_name, state):
    endpoint = None
    new_state = {}
    additional_data = {}
    params = {}

    match(table_name):
        case "form":
            endpoint = "forms"
            current_page = state.get('page', 1)
            params = {"page": current_page}
            new_state["page"] = current_page+1
        case _:
            raise Exception('Invalid or missing table name')

    return endpoint,new_state,additional_data,params


def get_data(request_data):
    state = request_data.get('state', {})
    table_name = request_data.get('name', None)

    endpoint, new_state, additional_data, params = get_endpoint_config(table_name, state)

    data = call_api(endpoint, params)
    has_more = True

    match(table_name):
        case 'form':
            has_more = data.get('page_count') > state.get('page', 1)
        case _:
            raise Exception('Invalid or missing table name')

    return {
        "insert": data.get('items', []),
        "hasMore": has_more,
        "state": new_state if has_more else {}
    }


You can also clean the data by stripping out unnecessary fields:

def clean_data(items):
    cleaned_items = []
    for item in items:
        item.pop('theme',None)
        item.pop('self', None)
        item.pop('_links', None)

    return cleaned_items


Add the handler POST endpoint for the root path:

@functions_framework.http
def handler(request: flask.Request):
    ...
    if(request.path == '/'):
        if(request.method == 'POST'):
            request_data = json.loads(request.data)
            try:
                return get_data(request_data)
            except requests.HTTPError as e:
                return e.response.json(), e.response.status_code
            except Exception as e:
                return str(e), 500
        else:
            return 'Method not allowed', 405


5. Add additional tables and logic

In our Typeform example, we add both a form and form_response table.

Form responses are tied to form IDs, so we handle them differently, collecting the form IDs first, then looping through them with state tracking.

The logic ensures Weld continues syncing where it left off, even across multiple endpoints.

def get_form_ids() -> list[str]:
    endpoint = 'forms'
    page_num = 0
    form_ids = []
    has_more = True

    while(has_more):
        page_num += 1
        data = call_api(endpoint, {"page": page_num, 'page_size': 200})
        form_ids.extend(list(map(lambda item: item["id"], data["items"])))
        has_more =  data["page_count"] > page_num

    return form_ids

def get_endpoint_config(table_name, state):
    ...
    match(table_name):
        ...
        case "form_response":
            form_ids = state.get('form_ids', get_form_ids())
            form_id = form_ids[0]
            endpoint = f'forms/{form_id}/responses'
            # In the account used for this demo there are no more than 1000 responses
            # in this case setting the page size to the maximum ensures we get all responses in 1 request
            params = {"page_size": 1000}
            new_state["form_ids"] = form_ids[1:]
            additional_data["form_id"] = form_id
        ...

 def get_data(request_data):
    ...
    match(table_name):
        ...
        case 'form_response':
            remaining_form_ids = new_state.get('form_ids')
            has_more = len(remaining_form_ids) > 0
        ...
    ...


6. Add authentication

To secure your pipeline, validate that requests include an Authorization header matching your Weld token:

auth = request.headers.get("Authorization")
if auth != f"Bearer {WELD_AUTH_TOKEN}":
    return "Unauthorized", 401


This prevents unauthorized access to your endpoint.


7. Deploy the pipeline to Google Cloud

Go to Google Cloud Console β†’ Cloud Functions and:

  • Choose Python 3.13
  • Disable Cloud IAM Auth
  • Add environment variables for your API keys and Weld token
  • Upload your main.py and requirements.txt
  • Set the entry point to your handler function

Once deployed, test the function using Cloud Shell to ensure it returns the expected results.


8. Set it up in Weld

Now go to Weld:

  • Choose Custom Connector as your data source
  • Paste in your Cloud Function endpoint
  • Add your bearer token for authentication
  • Weld will automatically discover available tables and fields
  • Choose what to sync and configure deduplication if needed

Start the sync, and once complete, your Typeform data will be available in your data warehouse.

Find the final version of this data pipeline here.

Was this page helpful?