Record-Level Monitoring — ClickHouse Query Reference¶
Every record that flows through the connectors pipeline emits events into the record_level_monitoring ClickHouse table. This page is a copy-paste query cookbook for investigating record journeys, sync durations, failures, and more.
Always filter on log_timestamp
The table is partitioned by toYYYYMMDD(log_timestamp). Every query must include a log_timestamp range or ClickHouse will scan all partitions (30 days of data). All queries below default to the last 7 days — adjust the INTERVAL 7 DAY as needed.
Table Schema¶
| Column | Type | Notes |
|---|---|---|
log_timestamp |
DateTime64(3, 'UTC') |
Partition key — always filter on this |
base_id |
String |
Part of primary key |
internal_table_id |
String |
Part of primary key |
internal_rec_id |
String |
Part of primary key |
app_id |
String |
|
process_id |
String |
|
external_rec_id |
String |
|
change_type |
Nullable(String) |
'n' (new), 'u' (updated), 'd' (deleted) |
request_type |
Nullable(String) |
'write', 'read_ping', 'read_rescrape', 'full_table_scan', 'read_stream' |
pipeline_stage |
Nullable(String) |
'read', 'change_queue_insert', 'change_queue_remove', 'write', 'lock', 'unlock' |
write_status |
Nullable(String) |
'success', 'failed', 'retry' |
change_queue_insertion_time |
Nullable(String) |
When the change entered the queue |
change_queue_removal_time |
Nullable(String) |
When the change was picked up for writing |
lock_time |
Nullable(String) |
|
unlock_time |
Nullable(String) |
|
ongoing_write_conflict_retries |
Nullable(Int32) |
Number of conflict retries |
ongoing_write_conflict_escalated_to_rescrape |
Nullable(Bool) |
True if conflict was escalated |
error_message |
Nullable(String) |
|
write_issue_id |
Nullable(String) |
|
connector_type |
Nullable(String) |
App type (e.g. hubspot, salesforce) |
source_app_id |
Nullable(String) |
|
changed_columns_internal_column_ids |
Nullable(String) |
JSON array of changed column IDs |
is_rescrape |
Nullable(Bool) |
|
write_reclassified_to |
Nullable(String) |
Original change_type if reclassified |
write_postponed |
Nullable(Bool) |
|
write_stopped_early |
Nullable(Bool) |
|
write_stop_reason |
Nullable(String) |
Partition: toYYYYMMDD(log_timestamp) · Primary key: (base_id, internal_table_id, internal_rec_id, log_timestamp) · TTL: 30 days
Record Journey¶
"What happened to record X across the full pipeline?"
SELECT
log_timestamp,
pipeline_stage,
request_type,
change_type,
write_status,
process_id,
error_message
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
AND internal_table_id = '<INTERNAL_TABLE_ID>'
AND internal_rec_id = '<INTERNAL_REC_ID>'
AND log_timestamp >= now() - INTERVAL 7 DAY
ORDER BY log_timestamp ASC;
Look for: gaps between pipeline stages, repeated 'retry' statuses, unexpected change_type reclassifications.
Sync Stats¶
"How many reads, changes detected, and writes happened for a base/table over time?"
SELECT
toDate(log_timestamp) AS day,
pipeline_stage,
request_type,
count() AS event_count,
countIf(write_status = 'success') AS successful_writes,
countIf(write_status = 'failed') AS failed_writes
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
AND internal_table_id = '<INTERNAL_TABLE_ID>'
AND log_timestamp >= now() - INTERVAL 7 DAY
GROUP BY day, pipeline_stage, request_type
ORDER BY day ASC, pipeline_stage;
Look for: sudden drops in read counts (source issue), rising failed writes (destination issue), change_queue_insert without corresponding change_queue_remove (queue backup).
Change Details¶
"Which columns changed in a specific sync cycle?"
SELECT
internal_rec_id,
change_type,
changed_columns_internal_column_ids,
write_reclassified_to,
process_id
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
AND internal_table_id = '<INTERNAL_TABLE_ID>'
AND process_id = '<PROCESS_ID>'
AND log_timestamp >= now() - INTERVAL 7 DAY
AND changed_columns_internal_column_ids IS NOT NULL
ORDER BY log_timestamp ASC;
Look for: records with many changed columns (bulk update or schema change), write_reclassified_to values (change type was overridden during write).
Sync Duration¶
"How long does it take from read to write completion for each record?"
SELECT
internal_rec_id,
min(log_timestamp) AS first_event,
max(log_timestamp) AS last_event,
dateDiff('second', min(log_timestamp), max(log_timestamp)) AS duration_seconds,
groupArray(pipeline_stage) AS stages
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
AND internal_table_id = '<INTERNAL_TABLE_ID>'
AND process_id = '<PROCESS_ID>'
AND log_timestamp >= now() - INTERVAL 7 DAY
GROUP BY internal_rec_id
ORDER BY duration_seconds DESC
LIMIT 50;
Look for: outlier durations (lock contention or queue delay), records missing later pipeline stages (stuck mid-pipeline).
Failed Writes¶
"Which records failed to write, and why?"
SELECT
internal_rec_id,
external_rec_id,
change_type,
error_message,
write_issue_id,
connector_type,
process_id,
log_timestamp
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
AND log_timestamp >= now() - INTERVAL 7 DAY
AND write_status = 'failed'
ORDER BY log_timestamp DESC
LIMIT 100;
Group by error to find the most common failure reason:
SELECT
error_message,
count() AS failure_count,
uniqExact(internal_rec_id) AS unique_records
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
AND log_timestamp >= now() - INTERVAL 7 DAY
AND write_status = 'failed'
GROUP BY error_message
ORDER BY failure_count DESC
LIMIT 20;
Stuck in Queue¶
"Which records entered the change queue but were never picked up for writing?"
SELECT
internal_rec_id,
change_id,
change_type,
change_queue_insertion_time,
process_id,
log_timestamp
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
AND internal_table_id = '<INTERNAL_TABLE_ID>'
AND log_timestamp >= now() - INTERVAL 7 DAY
AND change_queue_insertion_time IS NOT NULL
AND change_queue_removal_time IS NULL
ORDER BY change_queue_insertion_time ASC;
Look for: records stuck for hours (write process may be paused or rate-limited), large clusters at the same insertion time (batch that was never consumed).
Write Conflicts¶
"Which records had ongoing-write conflicts during read?"
SELECT
internal_rec_id,
ongoing_write_conflict_retries,
ongoing_write_conflict_escalated_to_rescrape,
ongoing_write_conflict_oldest_write_time,
skipped_already_locked_before_change_detection,
skipped_already_locked_after_change_detection,
process_id,
log_timestamp
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
AND internal_table_id = '<INTERNAL_TABLE_ID>'
AND log_timestamp >= now() - INTERVAL 7 DAY
AND (
ongoing_write_conflict_retries > 0
OR ongoing_write_conflict_escalated_to_rescrape = true
)
ORDER BY ongoing_write_conflict_retries DESC;
Look for: high retry counts (hot records being written to frequently), escalation to rescrape (read gave up waiting for the write lock).
Problematic Records¶
"Which records fail most often?"
SELECT
internal_rec_id,
external_rec_id,
count() AS failure_count,
groupArray(distinct error_message) AS error_messages,
min(log_timestamp) AS first_failure,
max(log_timestamp) AS last_failure
FROM record_level_monitoring
WHERE base_id = '<BASE_ID>'
AND internal_table_id = '<INTERNAL_TABLE_ID>'
AND log_timestamp >= now() - INTERVAL 7 DAY
AND write_status = 'failed'
GROUP BY internal_rec_id, external_rec_id
ORDER BY failure_count DESC
LIMIT 20;
Look for: records with diverse error messages (unstable data), records failing consistently with the same error (systematic issue — check the external platform).