How to Build a Data Pipeline Using Python
Weld supports hundreds of data sources out of the box. But what if your tool isn’t yet supported? That’s where custom data pipelines come in.
In this tutorial, we’ll walk through how to build a custom data pipeline for Weld using the Typeform API as an example. You’ll learn how to:
- Set up your local development environment
- Implement schema and data flow endpoints
- Handle pagination and state
- Secure the endpoint with authentication
- Deploy your pipeline to Google Cloud
- Connect it to Weld
Whether you're syncing from a niche platform or building an internal API integration, this guide shows you how to do it, step by step.
🎥 Watch the video tutorial
Prefer to follow along in a video? Here’s a full walkthrough from start to finish:
Lets dive in!
1. Set up your development environment
We’re writing the pipeline in Python and hosting it with Google Cloud Run, using the functions-framework package. Start by creating and activating a virtual environment:
1python3 - m venv.venv source.venv / bin / activate
Then install dependencies:
1pip install requests functions-framework
Create your main.py file and begin with a basic "Hello world" endpoint.
1import functions_framework 2import flask 3 4@functions_framework.http 5def handler(request: flask.Request): 6 return "Hello World" 7
Start the local server using:
1functions-framework --target=handler --debug --port 8080
Test it with a curl or Postman request to localhost:8080. You should see “Hello World” as the response.
2. Review the Weld schema and root endpoint requirements
In the Weld documentation, you’ll find requirements for:
- /schema (GET) – Defines available tables, primary keys, and optionally fields
- / (POST) – Handles data requests with support for state and pagination
You’ll also want to explore the Typeform API docs to understand endpoints, authorization, and data structure.
3. Implement the schema endpoint
Create a get_schema() function that defines one or more tables.
This function should return a JSON object with the available table names, primary keys and optional fields. Weld auto-detects available properties as string fields, but by defining them here you can enhance the type of properties.
For example:
1def get_schema(): 2 return { 3 "tables": { 4 "form": { 5 "primary_key": ["id"], 6 "fields": { 7 "id": "string" 8 } 9 } 10 } 11 } 12
Add a GET handler for /schema to return this structure.
1@functions_framework.http
2def handler(request: flask.Request):
3 if(request.path == '/schema'):
4 if(request.method == 'GET'):
5 return get_schema()
6 else:
7 return 'Method not allowed', 405
8
Test that your endpoint returns the correct JSON format.
4. Implement the data flow endpoint
Next, implement the / POST handler. Weld will send the table name and current state in the request body. You’ll fetch data from the Typeform API accordingly.
You’ll need to:
- Add your Typeform API key as an environment variable
- Make authorized API calls to api.typeform.com
1def call_api(endpoint, params, retries=0):
2 base_url = "https://api.typeform.com"
3 url = f"{base_url}/{endpoint}"
4
5 params = {**params}
6 headers = {"Authorization": f"Bearer {os.environ.get('TYPEFORM_API_KEY')}"}
7
8 response = requests.get(url, params=params, headers=headers)
9
10 if(response.status_code != 200):
11 response.raise_for_status()
12 else:
13 return response.json()
14
- Handle pagination via page_count and page parameters:
- Weld expects the data in chunks, so if the API is paginated, then you only need to send one page at a time. Then you can use the state to track where the sync needs to continue. In your response, include a state object and update it based on pagination logic.
- The next time Weld syncs, it will send this state object in the request.
- When there is no more data, you can send an empty object to make sure the next sync is started with a fresh state.
- Return results in the format Weld expects, including insert, hasMore, and state fields.
1def get_endpoint_config(table_name, state):
2 endpoint = None
3 new_state = {}
4 additional_data = {}
5 params = {}
6
7 match(table_name):
8 case "form":
9 endpoint = "forms"
10 current_page = state.get('page', 1)
11 params = {"page": current_page}
12 new_state["page"] = current_page+1
13 case _:
14 raise Exception('Invalid or missing table name')
15
16 return endpoint,new_state,additional_data,params
17
18
19def get_data(request_data):
20 state = request_data.get('state', {})
21 table_name = request_data.get('name', None)
22
23 endpoint, new_state, additional_data, params = get_endpoint_config(table_name, state)
24
25 data = call_api(endpoint, params)
26 has_more = True
27
28 match(table_name):
29 case 'form':
30 has_more = data.get('page_count') > state.get('page', 1)
31 case _:
32 raise Exception('Invalid or missing table name')
33
34 return {
35 "insert": data.get('items', []),
36 "hasMore": has_more,
37 "state": new_state if has_more else {}
38 }
39
You can also clean the data by stripping out unnecessary fields:
1def clean_data(items):
2 cleaned_items = []
3 for item in items:
4 item.pop('theme',None)
5 item.pop('self', None)
6 item.pop('_links', None)
7
8 return cleaned_items
9
Add the handler POST endpoint for the root path:
1@functions_framework.http
2def handler(request: flask.Request):
3 ...
4 if(request.path == '/'):
5 if(request.method == 'POST'):
6 request_data = json.loads(request.data)
7 try:
8 return get_data(request_data)
9 except requests.HTTPError as e:
10 return e.response.json(), e.response.status_code
11 except Exception as e:
12 return str(e), 500
13 else:
14 return 'Method not allowed', 405
15
5. Add additional tables and logic
In our Typeform example, we add both a form and form_response table.
Form responses are tied to form IDs, so we handle them differently, collecting the form IDs first, then looping through them with state tracking.
The logic ensures Weld continues syncing where it left off, even across multiple endpoints.
1def get_form_ids() -> list[str]:
2 endpoint = 'forms'
3 page_num = 0
4 form_ids = []
5 has_more = True
6
7 while(has_more):
8 page_num += 1
9 data = call_api(endpoint, {"page": page_num, 'page_size': 200})
10 form_ids.extend(list(map(lambda item: item["id"], data["items"])))
11 has_more = data["page_count"] > page_num
12
13 return form_ids
14
15def get_endpoint_config(table_name, state):
16 ...
17 match(table_name):
18 ...
19 case "form_response":
20 form_ids = state.get('form_ids', get_form_ids())
21 form_id = form_ids[0]
22 endpoint = f'forms/{form_id}/responses'
23 # In the account used for this demo there are no more than 1000 responses
24 # in this case setting the page size to the maximum ensures we get all responses in 1 request
25 params = {"page_size": 1000}
26 new_state["form_ids"] = form_ids[1:]
27 additional_data["form_id"] = form_id
28 ...
29
30 def get_data(request_data):
31 ...
32 match(table_name):
33 ...
34 case 'form_response':
35 remaining_form_ids = new_state.get('form_ids')
36 has_more = len(remaining_form_ids) > 0
37 ...
38 ...
39
6. Add authentication
To secure your pipeline, validate that requests include an Authorization header matching your Weld token:
1auth = request.headers.get("Authorization")
2if auth != f"Bearer {WELD_AUTH_TOKEN}":
3 return "Unauthorized", 401
4
This prevents unauthorized access to your endpoint.
7. Deploy the pipeline to Google Cloud
Go to Google Cloud Console → Cloud Functions and:
- Choose Python 3.13
- Disable Cloud IAM Auth
- Add environment variables for your API keys and Weld token
- Upload your main.py and requirements.txt
- Set the entry point to your handler function
Once deployed, test the function using Cloud Shell to ensure it returns the expected results.
8. Set it up in Weld
Now go to Weld:
- Choose Custom Connector as your data source
- Paste in your Cloud Function endpoint
- Add your bearer token for authentication
- Weld will automatically discover available tables and fields
- Choose what to sync and configure deduplication if needed
Start the sync, and once complete, your Typeform data will be available in your data warehouse.
Wrap-up
Custom connectors give you the flexibility to integrate any API with Weld. This setup with Typeform shows the core concepts: schema and data handling, state tracking, and secure hosting.