Skip to content

HubSpot Associations Incremental Sync

Overview

Associations in HubSpot link two objects together (e.g., contacts-to-deals, companies-to-tickets). Reading associations is expensive because HubSpot doesn't provide a direct "get changed associations" API.

We implemented two methods: 1. Full Scan (read_full_ping) - Iterates through ALL parent object IDs 2. Incremental (read_incremental_ping) - Only reads associations for parent objects that changed

Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                              SETUP PHASE                                     │
│  setup.py                                                                    │
│  └── Detects rollup fields → Sets table_options.ping_type                   │
│      - If rollup fields found → ping_type = "read_incremental_ping"         │
│      - If no rollup fields   → ping_type = "read_full_ping"                 │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│                           SWITCH ON PHASE                                    │
│  switch_app.py                                                               │
│  └── Creates read_ping task with options:                                   │
│      - cursor: timestamp or "0"                                              │
│      - ping_type: from table_options (defaults to read_full_ping)           │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│                            READ PING PHASE                                   │
│  read_ping.py (lines 855-890)                                               │
│  └── Routes based on ping_type:                                             │
│      ┌─────────────────────────────────────────────────────────────────┐    │
│      │ if ping_type == 'read_incremental_ping' AND cursor is not None │    │
│      │   → read_incremental_ping_associations()                        │    │
│      │ else                                                            │    │
│      │   → read_ping_associations() (full scan)                        │    │
│      └─────────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────────────┘

File Structure

apps/hubspot/
├── setup.py                          # Detects rollup fields, sets ping_type
├── switch_app.py                     # Creates read_ping options
├── read_ping.py                      # Routes to correct method
├── read_ping_objects/
│   ├── associations.py               # Full scan method
│   └── associations_incremental.py   # Incremental method
├── functions/
│   ├── associations.py               # get_changed_parent_record_ids()
│   ├── properties.py                 # get_all_sync_association_properties()
│   └── rollup_fields.py              # Rollup field verification
└── https/
    ├── search_api.py                 # HubSpot Search API wrapper
    └── properties_api.py             # HubSpot Properties API wrapper

How It Works

1. Setup Phase (setup.py lines 110-163)

When a base is set up, we check if rollup fields exist for each association:

# Get all rollup properties matching pattern:
# sync_associations_{parent}_{child}_label_*
rollup_field_labels_found = get_all_sync_association_properties(
    parent_external_table_id=parent_external_table_id,
    child_external_table_id=child_external_table_id,
    ...
)

if rollup_field_labels_found:
    table["table_options"]["ping_type"] = "read_incremental_ping"
    table["table_options"]["rollup_field_labels"] = rollup_field_labels_found
else:
    table["table_options"]["ping_type"] = "read_full_ping"

2. Switch On Phase (switch_app.py lines 1040-1066)

When the base is switched on, we create read_ping options:

# Get ping_type from table_schema (set during setup)
ping_type = table_schema.get('table_options', {}).get('ping_type', 'read_full_ping')

if ping_type == "read_incremental_ping" and cursor is not None:
    # Use incremental method
    return {
        "cursor": cursor,  # lastmodifieddate timestamp in milliseconds
        "ping_type": "read_incremental_ping",
    }
else:
    # Use full scan method
    return {
        "cursor": cursor if cursor is not None else "0",
        "ping_type": "read_full_ping",
    }

3. Read Ping Routing (read_ping.py lines 806-890)

# For associations, get ping_type from table_schema (not from options)
# This fixes old tasks that had incorrect ping_type
if hubspot_object_type == 'association':
    ping_type = table_schema.get('table_options', {}).get('ping_type', 'read_full_ping')

# Route to appropriate method
if ping_type == 'read_incremental_ping' and cursor is not None:
    changes_df, updated_options = read_incremental_ping_associations(...)
else:
    changes_df, updated_options = read_ping_associations(...)  # full scan

Full Scan Method (read_ping_associations)

File: read_ping_objects/associations.py

Flow: 1. Query datastore to get batch of parent object IDs (cursor-based pagination) 2. For each parent ID, fetch associations from HubSpot 3. Compare with existing associations in datastore to detect deletes 4. When all parent IDs processed, transition to incremental (if supported)

Cursor: Object ID from datastore (for pagination through parent objects)

Datastore: Contacts Table          HubSpot Associations API
┌─────────────────────┐           ┌─────────────────────────┐
│ contact_id: 123     │ ──────────│ GET /associations       │
│ contact_id: 456     │           │ for contact_id=123      │
│ contact_id: 789     │           └─────────────────────────┘
│ ...                 │
└─────────────────────┘
     │ cursor = last contact_id read

Incremental Method (read_incremental_ping_associations)

File: read_ping_objects/associations_incremental.py

Flow: 1. Query HubSpot Search API for parent objects where lastmodifieddate > cursor 2. For each changed parent, fetch its associations 3. Compare with datastore to detect deletes 4. Update cursor to latest lastmodifieddate

Cursor: Unix timestamp in milliseconds (lastmodifieddate of parent object)

HubSpot Search API                 HubSpot Associations API
┌─────────────────────────────┐   ┌─────────────────────────┐
│ POST /contacts/search       │   │ GET /associations       │
│ filter: lastmodifieddate >  │   │ for changed contacts    │
│         cursor (timestamp)  │   │ only!                   │
└─────────────────────────────┘   └─────────────────────────┘
         Changed parent IDs
         (e.g., 123, 789)

Cursor Format

Method Cursor Format Example
Full Scan Object ID (string) "123456"
Incremental Unix timestamp ms (string) "1702857600000"

Transition: When full scan completes, cursor changes from object ID to timestamp:

# When full scan completes (no more parent IDs)
cursor = str(int(datetime.utcnow().timestamp() * 1000))  # e.g., "1702857600000"

Rollup Fields

Rollup fields are HubSpot properties that aggregate data from associated objects. They are created in HubSpot UI (cannot be created via API).

Naming Convention: sync_associations_{parent}_{child}_label_*

Example: - Association: contacts → tickets - Rollup field label: sync_associations_contacts_tickets_label_count

When a rollup field changes on a parent object (e.g., contact), the parent's lastmodifieddate is updated. This allows us to detect association changes by monitoring parent object changes.

Key Functions

get_changed_parent_record_ids()

File: functions/associations.py

Queries HubSpot Search API to find parent objects that changed since cursor.

search_body = {
    "filterGroups": [{
        "filters": [{
            "propertyName": "lastmodifieddate",  # or "hs_lastmodifieddate"
            "operator": "GT",
            "value": cursor  # Unix timestamp in milliseconds
        }]
    }],
    "sorts": [{"propertyName": "lastmodifieddate", "direction": "ASCENDING"}],
    "properties": ["hs_object_id", "lastmodifieddate"],
    "limit": batch_size
}

search_objects()

File: https/search_api.py

Wrapper for HubSpot Search API with: - Automatic pagination - Rate limiting - Retry logic - Token refresh

Debugging CDC Boost Setup

Step 1: Check what rollup fields exist in HubSpot

Query the HubSpot Properties API for each parent object:

TOKEN="<hubspot_access_token>"

# Check each standard object
for obj in companies contacts deals; do
  echo "=== $obj ==="
  curl -s "https://api.hubapi.com/crm/v3/properties/$obj" \
    -H "Authorization: Bearer $TOKEN" \
    | jq "[.results[] | select(.label | startswith(\"sync_associations_$obj\"))] | .[].label"
done

Step 2: Check what Stacksync detected

Query the base schema and look for supports_roll_up and rollup_field_labels in each association table's table_options.

Step 3: Compare

For each association table, verify:

  • supports_roll_up is true if rollup fields exist in HubSpot
  • rollup_field_labels contains all the labels from HubSpot
  • rollup_fields_object_id points to the alphabetically first object

Step 4: If mismatched, re-save the sync

Stop → Edit → Next through all steps → Save → Start. This re-runs setup.py which detects rollup fields.

Common Issues

1. Invalid Cursor Format (400 Error)

Symptom: HTTPError: 400 Client Error: Bad Request for url: .../search

Cause: Cursor is in wrong format for incremental flow (e.g., object ID instead of timestamp)

Fix: The cursor must be a Unix timestamp in milliseconds for the incremental flow.

2. ping_type Not Set

Symptom: Bases without rollup fields using incremental flow

Cause: Old bases created before ping_type was added to table_options

Fix: In read_ping.py, we now read ping_type from table_schema (not from stored options), defaulting to read_full_ping:

if hubspot_object_type == 'association':
    ping_type = table_schema.get('table_options', {}).get('ping_type', 'read_full_ping')

3. Rollup Fields Not Detected

Symptom: Base has rollup fields in HubSpot but supports_roll_up is false

Possible causes:

  • Rollup fields were created after the last sync save → re-save the sync
  • Rollup field label doesn't match pattern sync_associations_{parent}_{child}_label_*
  • (Fixed in PR #1507) Parent/child ordering didn't match docs convention → now uses alphabetical sort

4. Objects That Cannot Use CDC Boost

Engagement objects (calls, emails, meetings, notes, tasks, communications, postal_mail) do not support rollup properties in HubSpot. supports_roll_up will always be false for these.

Transition Flow

┌─────────────────┐     Full scan      ┌─────────────────┐
│  First Sync     │ ─────────────────► │  read_full_ping │
│  (no cursor)    │                    │  cursor = "0"   │
└─────────────────┘                    └────────┬────────┘
                    Iterate through all parent IDs
                                    ┌─────────────────────┐
                                    │ Full scan complete  │
                                    │ cursor = timestamp  │
                                    └──────────┬──────────┘
                        If table has ping_type = "read_incremental_ping"
                                    ┌─────────────────────┐
                                    │read_incremental_ping│
                                    │ cursor = timestamp  │
                                    └─────────────────────┘

Performance Comparison

Scenario Full Scan Incremental
100K contacts, 10 changed ~1000 API calls ~1 API call
Read all associations Always Only for changed parents
First sync Required N/A
Ongoing sync Slow Fast
  • 47bef0c5 (Dec 15, 2025): "Associations read incremental" - Initial implementation
  • ad8e843a: "changed code to use the correct rollup_field_labels" - Bug fix

Environment Requirements

No additional environment variables needed. Uses existing HubSpot credentials and rate limit configuration from app_schema.