Skip to content

Changes table

Changes queue table

  • Name format: base_{baseIdNoDashes}.changes_table_{internal_table_id}
  • Scope: per base and per internal table

What it does

  • Acts as the durable queue for detected changes (new/updated/deleted records) discovered by readers (e.g., event-forward → connectors /read_stream).
  • Stores minimal, encrypted payload needed for writers to perform downstream writes to destination apps.
  • Drives writer scheduling and metrics (count is tracked in base_stats under changes).

How it’s written

  • Source: connectors send_detected_changes_to_queue(...) (connectors/utils/changes.py).
  • For each batch with detected changes, we build a DataFrame with columns:
  • source_app_id: the originating app id
  • change_id: UUID per change row
  • internal_rec_id: internal record id
  • change_type: n (new), u (update), d (delete)
  • data: encrypted JSON payload of either {raw_internal_column_id, changes} for n/u, or {external_rec_id} for d
  • process_id: null on insert (writers claim rows by setting this)
  • created_time: timestamp (can be forced far past for prioritization in specific scenarios)
  • Insert target table is computed via:
  • table_name(table_type="changes", base_id, app_id?, internal_table_id)
  • For changes it resolves to: base_{baseIdNoDashes}.changes_table_{internal_table_id}

How it’s used

  • Claiming rows: get_changes_from_queue(...) marks a batch by setting process_id and process_start_time (SELECT ... FOR UPDATE SKIP LOCKED), applying size and batching rules.
  • Decryption and split: writers decrypt data, split into new_records_df, updated_records_df, deleted_records_df.
  • Writing: writers publish to destination app, then delete processed rows via delete_changes_from_db(...).
  • Retry/control helpers:
  • unmark_changes_from_queue(...): clears process_id/process_start_time to release rows.
  • postpone_changes_processing(...): pushes process_start_time to the future.
  • base_{baseIdNoDashes}.base_stats: stat_name='changes' tracks live row count; updated on insert/delete.

Key code references

  • Write into queue: connectors/utils/changes.pysend_detected_changes_to_queue
  • Read/claim from queue: connectors/utils/changes.pyget_changes_from_queue
  • Table name helper: python-utils-3/stacksync_utils/table_name.pytable_name(..., table_type="changes", ...)