Skip to content

Record-Level Monitoring — ClickHouse Query Reference

Every record that flows through the connectors pipeline emits events into the record_level_monitoring ClickHouse table. This page is a copy-paste query cookbook for investigating record journeys, sync durations, failures, and more.

Always filter on log_timestamp

The table is partitioned by toYYYYMMDD(log_timestamp). Every query must include a log_timestamp range or ClickHouse will scan all partitions (30 days of data). All queries below default to the last 7 days — adjust the INTERVAL 7 DAY as needed.


Table Schema

Column Type Notes
log_timestamp DateTime64(3, 'UTC') Partition key — always filter on this
base_id String Part of primary key
internal_table_id String Part of primary key
internal_rec_id String Part of primary key
app_id String
process_id String
external_rec_id String
change_type Nullable(String) 'n' (new), 'u' (updated), 'd' (deleted)
request_type Nullable(String) 'write', 'read_ping', 'read_rescrape', 'full_table_scan', 'read_stream'
pipeline_stage Nullable(String) 'read', 'change_queue_insert', 'change_queue_remove', 'write', 'lock', 'unlock'
write_status Nullable(String) 'success', 'failed', 'retry'
change_queue_insertion_time Nullable(String) When the change entered the queue
change_queue_removal_time Nullable(String) When the change was picked up for writing
lock_time Nullable(String)
unlock_time Nullable(String)
ongoing_write_conflict_retries Nullable(Int32) Number of conflict retries
ongoing_write_conflict_escalated_to_rescrape Nullable(Bool) True if conflict was escalated
error_message Nullable(String)
write_issue_id Nullable(String)
connector_type Nullable(String) App type (e.g. hubspot, salesforce)
source_app_id Nullable(String)
changed_columns_internal_column_ids Nullable(String) JSON array of changed column IDs
is_rescrape Nullable(Bool)
write_reclassified_to Nullable(String) Original change_type if reclassified
write_postponed Nullable(Bool)
write_stopped_early Nullable(Bool)
write_stop_reason Nullable(String)

Partition: toYYYYMMDD(log_timestamp) · Primary key: (base_id, internal_table_id, internal_rec_id, log_timestamp) · TTL: 30 days


Record Journey

"What happened to record X across the full pipeline?"

SELECT
    log_timestamp,
    pipeline_stage,
    request_type,
    change_type,
    write_status,
    process_id,
    error_message
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
  AND internal_table_id = '<INTERNAL_TABLE_ID>'
  AND internal_rec_id = '<INTERNAL_REC_ID>'
  AND log_timestamp >= now() - INTERVAL 7 DAY
ORDER BY log_timestamp ASC;

Look for: gaps between pipeline stages, repeated 'retry' statuses, unexpected change_type reclassifications.


Sync Stats

"How many reads, changes detected, and writes happened for a base/table over time?"

SELECT
    toDate(log_timestamp) AS day,
    pipeline_stage,
    request_type,
    count() AS event_count,
    countIf(write_status = 'success') AS successful_writes,
    countIf(write_status = 'failed') AS failed_writes
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
  AND internal_table_id = '<INTERNAL_TABLE_ID>'
  AND log_timestamp >= now() - INTERVAL 7 DAY
GROUP BY day, pipeline_stage, request_type
ORDER BY day ASC, pipeline_stage;

Look for: sudden drops in read counts (source issue), rising failed writes (destination issue), change_queue_insert without corresponding change_queue_remove (queue backup).


Change Details

"Which columns changed in a specific sync cycle?"

SELECT
    internal_rec_id,
    change_type,
    changed_columns_internal_column_ids,
    write_reclassified_to,
    process_id
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
  AND internal_table_id = '<INTERNAL_TABLE_ID>'
  AND process_id = '<PROCESS_ID>'
  AND log_timestamp >= now() - INTERVAL 7 DAY
  AND changed_columns_internal_column_ids IS NOT NULL
ORDER BY log_timestamp ASC;

Look for: records with many changed columns (bulk update or schema change), write_reclassified_to values (change type was overridden during write).


Sync Duration

"How long does it take from read to write completion for each record?"

SELECT
    internal_rec_id,
    min(log_timestamp) AS first_event,
    max(log_timestamp) AS last_event,
    dateDiff('second', min(log_timestamp), max(log_timestamp)) AS duration_seconds,
    groupArray(pipeline_stage) AS stages
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
  AND internal_table_id = '<INTERNAL_TABLE_ID>'
  AND process_id = '<PROCESS_ID>'
  AND log_timestamp >= now() - INTERVAL 7 DAY
GROUP BY internal_rec_id
ORDER BY duration_seconds DESC
LIMIT 50;

Look for: outlier durations (lock contention or queue delay), records missing later pipeline stages (stuck mid-pipeline).


Failed Writes

"Which records failed to write, and why?"

SELECT
    internal_rec_id,
    external_rec_id,
    change_type,
    error_message,
    write_issue_id,
    connector_type,
    process_id,
    log_timestamp
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
  AND log_timestamp >= now() - INTERVAL 7 DAY
  AND write_status = 'failed'
ORDER BY log_timestamp DESC
LIMIT 100;

Group by error to find the most common failure reason:

SELECT
    error_message,
    count() AS failure_count,
    uniqExact(internal_rec_id) AS unique_records
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
  AND log_timestamp >= now() - INTERVAL 7 DAY
  AND write_status = 'failed'
GROUP BY error_message
ORDER BY failure_count DESC
LIMIT 20;

Stuck in Queue

"Which records entered the change queue but were never picked up for writing?"

SELECT
    internal_rec_id,
    change_id,
    change_type,
    change_queue_insertion_time,
    process_id,
    log_timestamp
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
  AND internal_table_id = '<INTERNAL_TABLE_ID>'
  AND log_timestamp >= now() - INTERVAL 7 DAY
  AND change_queue_insertion_time IS NOT NULL
  AND change_queue_removal_time IS NULL
ORDER BY change_queue_insertion_time ASC;

Look for: records stuck for hours (write process may be paused or rate-limited), large clusters at the same insertion time (batch that was never consumed).


Write Conflicts

"Which records had ongoing-write conflicts during read?"

SELECT
    internal_rec_id,
    ongoing_write_conflict_retries,
    ongoing_write_conflict_escalated_to_rescrape,
    ongoing_write_conflict_oldest_write_time,
    skipped_already_locked_before_change_detection,
    skipped_already_locked_after_change_detection,
    process_id,
    log_timestamp
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
  AND internal_table_id = '<INTERNAL_TABLE_ID>'
  AND log_timestamp >= now() - INTERVAL 7 DAY
  AND (
      ongoing_write_conflict_retries > 0
      OR ongoing_write_conflict_escalated_to_rescrape = true
  )
ORDER BY ongoing_write_conflict_retries DESC;

Look for: high retry counts (hot records being written to frequently), escalation to rescrape (read gave up waiting for the write lock).


Problematic Records

"Which records fail most often?"

SELECT
    internal_rec_id,
    external_rec_id,
    count() AS failure_count,
    groupArray(distinct error_message) AS error_messages,
    min(log_timestamp) AS first_failure,
    max(log_timestamp) AS last_failure
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
  AND internal_table_id = '<INTERNAL_TABLE_ID>'
  AND log_timestamp >= now() - INTERVAL 7 DAY
  AND write_status = 'failed'
GROUP BY internal_rec_id, external_rec_id
ORDER BY failure_count DESC
LIMIT 20;

Look for: records with diverse error messages (unstable data), records failing consistently with the same error (systematic issue — check the external platform).