Custom Connector Example
The information below is being updated at the moment. To find examples of already-built connectors, please visit our Community Connectors Github.
Prerequisites
π₯ Watch the video tutorial
Prefer to follow along in a video? Here's a full walkthrough from start to finish:
Lets dive in!
1. Set up your development environment
We're writing the pipeline in Python and hosting it with Google Cloud Run, using the functions-framework package. Start by creating and activating a virtual environment:
python3 -m venv .venv
source .venv/bin/activate
Then install dependencies:
pip install requests functions-framework
Create your main.py file and begin with a basic "Hello world" endpoint.
import functions_framework
import flask
@functions_framework.http
def handler(request: flask.Request):
return "Hello World"
Start the local server using:
functions-framework --target=handler --debug --port 8080
Test it with a curl or Postman request to localhost:8080. You should see βHello Worldβ as the response.
2. Review the Weld schema and root endpoint requirements
In the Weld documentation, you'll find requirements for:
- /schema (GET) - Defines available tables, primary keys, and optionally fields
- / (POST) - Handles data requests with support for state and pagination
You'll also want to explore the Typeform API docs to understand endpoints, authorization, and data structure.
3. Implement the schema endpoint
Create a get_schema() function that defines one or more tables.
This function should return a JSON object with the available table names, primary keys and optional fields. Weld auto-detects available properties as string fields, but by defining them here you can enhance the type of properties.
For example:
def get_schema():
return {
"tables": {
"form": {
"primary_key": ["id"],
"fields": {
"id": "string"
}
}
}
}
Add a GET handler for /schema to return this structure.
@functions_framework.http
def handler(request: flask.Request):
if(request.path == '/schema'):
if(request.method == 'GET'):
return get_schema()
else:
return 'Method not allowed', 405
Test that your endpoint returns the correct JSON format.
4. Implement the data flow endpoint
Next, implement the / POST handler. Weld will send the table name and current state in the request body. You'll fetch data from the Typeform API accordingly.
You'll need to:
- Add your Typeform API key as an environment variable
- Make authorized API calls to api.typeform.com
def call_api(endpoint, params, retries=0):
base_url = "https://api.typeform.com"
url = f"{base_url}/{endpoint}"
params = {**params}
headers = {"Authorization": f"Bearer {os.environ.get('TYPEFORM_API_KEY')}"}
response = requests.get(url, params=params, headers=headers)
if(response.status_code != 200):
response.raise_for_status()
else:
return response.json()
- Handle pagination via page_count and page parameters:
- Weld expects the data in chunks, so if the API is paginated, then you only need to send one page at a time. Then you can use the state to track where the sync needs to continue. In your response, include a state object and update it based on pagination logic.
- The next time Weld syncs, it will send this state object in the request.
- When there is no more data, you can send an empty object to make sure the next sync is started with a fresh state.
- Return results in the format Weld expects, including insert, hasMore, and state fields.
def get_endpoint_config(table_name, state):
endpoint = None
new_state = {}
additional_data = {}
params = {}
match(table_name):
case "form":
endpoint = "forms"
current_page = state.get('page', 1)
params = {"page": current_page}
new_state["page"] = current_page+1
case _:
raise Exception('Invalid or missing table name')
return endpoint,new_state,additional_data,params
def get_data(request_data):
state = request_data.get('state', {})
table_name = request_data.get('name', None)
endpoint, new_state, additional_data, params = get_endpoint_config(table_name, state)
data = call_api(endpoint, params)
has_more = True
match(table_name):
case 'form':
has_more = data.get('page_count') > state.get('page', 1)
case _:
raise Exception('Invalid or missing table name')
return {
"insert": data.get('items', []),
"hasMore": has_more,
"state": new_state if has_more else {}
}
You can also clean the data by stripping out unnecessary fields:
def clean_data(items):
cleaned_items = []
for item in items:
item.pop('theme',None)
item.pop('self', None)
item.pop('_links', None)
return cleaned_items
Add the handler POST endpoint for the root path:
@functions_framework.http
def handler(request: flask.Request):
...
if(request.path == '/'):
if(request.method == 'POST'):
request_data = json.loads(request.data)
try:
return get_data(request_data)
except requests.HTTPError as e:
return e.response.json(), e.response.status_code
except Exception as e:
return str(e), 500
else:
return 'Method not allowed', 405
5. Add additional tables and logic
In our Typeform example, we add both a form and form_response table.
Form responses are tied to form IDs, so we handle them differently, collecting the form IDs first, then looping through them with state tracking.
The logic ensures Weld continues syncing where it left off, even across multiple endpoints.
def get_form_ids() -> list[str]:
endpoint = 'forms'
page_num = 0
form_ids = []
has_more = True
while(has_more):
page_num += 1
data = call_api(endpoint, {"page": page_num, 'page_size': 200})
form_ids.extend(list(map(lambda item: item["id"], data["items"])))
has_more = data["page_count"] > page_num
return form_ids
def get_endpoint_config(table_name, state):
...
match(table_name):
...
case "form_response":
form_ids = state.get('form_ids', get_form_ids())
form_id = form_ids[0]
endpoint = f'forms/{form_id}/responses'
# In the account used for this demo there are no more than 1000 responses
# in this case setting the page size to the maximum ensures we get all responses in 1 request
params = {"page_size": 1000}
new_state["form_ids"] = form_ids[1:]
additional_data["form_id"] = form_id
...
def get_data(request_data):
...
match(table_name):
...
case 'form_response':
remaining_form_ids = new_state.get('form_ids')
has_more = len(remaining_form_ids) > 0
...
...
6. Add authentication
To secure your pipeline, validate that requests include an Authorization header matching your Weld token:
auth = request.headers.get("Authorization")
if auth != f"Bearer {WELD_AUTH_TOKEN}":
return "Unauthorized", 401
This prevents unauthorized access to your endpoint.
7. Deploy the pipeline to Google Cloud
Go to Google Cloud Console β Cloud Functions and:
- Choose Python 3.13
- Disable Cloud IAM Auth
- Add environment variables for your API keys and Weld token
- Upload your main.py and requirements.txt
- Set the entry point to your handler function
Once deployed, test the function using Cloud Shell to ensure it returns the expected results.
8. Set it up in Weld
Now go to Weld:
- Choose Custom Connector as your data source
- Paste in your Cloud Function endpoint
- Add your bearer token for authentication
- Weld will automatically discover available tables and fields
- Choose what to sync and configure deduplication if needed
Start the sync, and once complete, your Typeform data will be available in your data warehouse.
Find the final version of this data pipeline here.