Skip to content

Commit

Permalink
Merge pull request #3 from botify-labs/collector/schedule-type
Browse files Browse the repository at this point in the history
collector: add `schedule_type` dimension for job and connection sync metrics
  • Loading branch information
virtualtam authored Jan 16, 2024
2 parents a2fc577 + ba87722 commit cb6e611
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 24 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

## Metrics exposed

| Metric | Type | Labels |
| ---------------------------------------------------- | --------- | ----------------------------------------------------- |
| `airbyte_jobs_completed_total` | Counter | destination_connector, source_connector, type, status |
| `airbyte_connections` | Gauge | destination_connector, source_connector, status |
| `airbyte_sources` | Gauge | source_connector, tombstone |
| `airbyte_destinations` | Gauge | destination_connector, tombstone |
| `airbyte_jobs_pending` | Gauge | destination_connector, source_connector, type |
| `airbyte_jobs_running` | Gauge | destination_connector, source_connector, type |
| `airbyte_connections_last_successful_sync_age_hours` | Histogram | destination_connector, source_connector |
| Metric | Type | Labels |
| ---------------------------------------------------- | --------- | -------------------------------------------------------------------- |
| `airbyte_jobs_completed_total` | Counter | destination_connector, source_connector, schedule_type, type, status |
| `airbyte_connections` | Gauge | destination_connector, source_connector, status |
| `airbyte_sources` | Gauge | source_connector, tombstone |
| `airbyte_destinations` | Gauge | destination_connector, tombstone |
| `airbyte_jobs_pending` | Gauge | destination_connector, source_connector, schedule_type, type |
| `airbyte_jobs_running` | Gauge | destination_connector, source_connector, schedule_type, type |
| `airbyte_connections_last_successful_sync_age_hours` | Histogram | destination_connector, source_connector, schedule_type |


## Configuration
Expand Down
12 changes: 8 additions & 4 deletions cmd/airbyte_exporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ func NewCollector(airbyteService *airbyte.Service) *collector {
jobsCompleted: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "jobs_completed_total"),
"Completed jobs (total)",
[]string{"destination_connector", "source_connector", "type", "status"},
[]string{"destination_connector", "source_connector", "schedule_type", "type", "status"},
nil,
),
jobsPending: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "jobs_pending"),
"Pending jobs",
[]string{"destination_connector", "source_connector", "type"},
[]string{"destination_connector", "source_connector", "schedule_type", "type"},
nil,
),
jobsRunning: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "jobs_running"),
"Running jobs",
[]string{"destination_connector", "source_connector", "type"},
[]string{"destination_connector", "source_connector", "schedule_type", "type"},
nil,
),
}
Expand Down Expand Up @@ -106,6 +106,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
float64(jobsCompleted.Count),
jobsCompleted.DestinationConnector,
jobsCompleted.SourceConnector,
jobsCompleted.ScheduleType,
jobsCompleted.Type,
jobsCompleted.Status,
)
Expand Down Expand Up @@ -150,6 +151,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
float64(jobsPending.Count),
jobsPending.DestinationConnector,
jobsPending.SourceConnector,
jobsPending.ScheduleType,
jobsPending.Type,
)
}
Expand All @@ -161,6 +163,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
float64(jobsRunning.Count),
jobsRunning.DestinationConnector,
jobsRunning.SourceConnector,
jobsRunning.ScheduleType,
jobsRunning.Type,
)
}
Expand All @@ -173,7 +176,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
Help: "Age of the last successful sync job (hours)",
Buckets: []float64{6, 12, 18, 24, 48, 72, 168},
},
[]string{"destination_connector", "source_connector"},
[]string{"destination_connector", "source_connector", "schedule_type"},
)

for _, connectionLastSuccessfulSyncAge := range metrics.ConnectionsLastSuccessfulSyncAges {
Expand All @@ -191,6 +194,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
WithLabelValues(
connectionLastSuccessfulSyncAge.DestinationConnector,
connectionLastSuccessfulSyncAge.SourceConnector,
connectionLastSuccessfulSyncAge.ScheduleType,
).
Observe(age.Hours())
}
Expand Down
4 changes: 3 additions & 1 deletion internal/airbyte/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type ConnectionSyncAge struct {
ID string `db:"id"`
DestinationConnector string `db:"destination"`
SourceConnector string `db:"source"`
Hours float64 `db:"hours"` // no Scanner for time.Duration, storing as a raw string
ScheduleType string `db:"connection_schedule_type"`
Hours float64 `db:"hours"` // no Scanner for time.Duration, storing as a raw value
}

// Age returns the duration since the last job attempt.
Expand All @@ -58,6 +59,7 @@ type ActorCount struct {
type JobCount struct {
DestinationConnector string `db:"destination"`
SourceConnector string `db:"source"`
ScheduleType string `db:"connection_schedule_type"`
Type string `db:"config_type"`
Status string `db:"status"`
Count uint `db:"count"`
Expand Down
20 changes: 10 additions & 10 deletions internal/airbyte/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (r *Repository) ConnectionsLastSuccessfulSyncAge() ([]ConnectionSyncAge, er
AND status = 'succeeded'
GROUP BY scope
)
SELECT c.id, ad1.name as destination, ad2.name as source, EXTRACT(EPOCH FROM AGE(NOW(), j.updated_at))/3600 as hours
SELECT c.id, COALESCE(c.schedule_type, 'manual') AS connection_schedule_type, ad1.name as destination, ad2.name as source, EXTRACT(EPOCH FROM AGE(NOW(), j.updated_at))/3600 as hours
FROM connection c
JOIN j ON j.scope = CAST(c.id AS VARCHAR(255))
JOIN actor a1 ON c.destination_id = a1.id
Expand Down Expand Up @@ -157,16 +157,16 @@ func (r *Repository) DestinationsCount() ([]ActorCount, error) {
// JobsCompletedCount returns the count of completed Airbyte jobs, grouped by destination, source, type and status.
func (r *Repository) JobsCompletedCount() ([]JobCount, error) {
query := `
SELECT ad1.name as destination, ad2.name as source, j.config_type, j.status, COUNT(j.status)
SELECT ad1.name as destination, ad2.name as source, COALESCE(c.schedule_type, 'manual') AS connection_schedule_type, j.config_type, j.status, COUNT(j.status)
FROM jobs j
JOIN connection c ON j.scope = CAST(c.id AS VARCHAR(255))
JOIN actor a1 ON c.destination_id = a1.id
JOIN actor_definition ad1 ON a1.actor_definition_id = ad1.id
JOIN actor a2 ON c.source_id = a2.id
JOIN actor_definition ad2 ON a2.actor_definition_id = ad2.id
WHERE j.status IN ('cancelled', 'failed', 'succeeded')
GROUP BY ad1.name, ad2.name, j.config_type, j.status
ORDER BY ad1.name, ad2.name, j.config_type, j.status
GROUP BY ad1.name, ad2.name, connection_schedule_type, j.config_type, j.status
ORDER BY ad1.name, ad2.name, connection_schedule_type, j.config_type, j.status
`

return r.jobCountQuery(query)
Expand All @@ -175,16 +175,16 @@ func (r *Repository) JobsCompletedCount() ([]JobCount, error) {
// JobsPendingCount returns the count of pending Airbyte jobs, grouped by destination, source and type.
func (r *Repository) JobsPendingCount() ([]JobCount, error) {
query := `
SELECT ad1.name as destination, ad2.name as source, j.config_type, j.status, COUNT(j.status)
SELECT ad1.name as destination, ad2.name as source, COALESCE(c.schedule_type, 'manual') AS connection_schedule_type, j.config_type, j.status, COUNT(j.status)
FROM jobs j
JOIN connection c ON CAST(c.id AS VARCHAR(255)) = j.scope
JOIN actor a1 ON c.destination_id = a1.id
JOIN actor_definition ad1 ON a1.actor_definition_id = ad1.id
JOIN actor a2 ON c.source_id = a2.id
JOIN actor_definition ad2 ON a2.actor_definition_id = ad2.id
WHERE j.status = 'pending'
GROUP BY ad1.name, ad2.name, j.config_type, j.status
ORDER BY ad1.name, ad2.name, j.config_type, j.status
GROUP BY ad1.name, ad2.name, connection_schedule_type, j.config_type, j.status
ORDER BY ad1.name, ad2.name, connection_schedule_type, j.config_type, j.status
`

return r.jobCountQuery(query)
Expand All @@ -193,7 +193,7 @@ func (r *Repository) JobsPendingCount() ([]JobCount, error) {
// JobsRunningCount returns the count of running Airbyte jobs, grouped by destination, source and type.
func (r *Repository) JobsRunningCount() ([]JobCount, error) {
query := `
SELECT ad1.name as destination, ad2.name as source, j.config_type, j.status, COUNT(j.status)
SELECT ad1.name as destination, ad2.name as source, COALESCE(c.schedule_type, 'manual') AS connection_schedule_type, j.config_type, j.status, COUNT(j.status)
FROM jobs j
JOIN attempts att ON att.job_id = j.id
JOIN connection c ON j.scope = CAST(c.id AS VARCHAR(255))
Expand All @@ -203,8 +203,8 @@ func (r *Repository) JobsRunningCount() ([]JobCount, error) {
JOIN actor_definition ad2 ON a2.actor_definition_id = ad2.id
WHERE j.status = 'running'
AND att.status = 'running'
GROUP BY ad1.name, ad2.name, j.config_type, j.status
ORDER BY ad1.name, ad2.name, j.config_type, j.status
GROUP BY ad1.name, ad2.name, connection_schedule_type, j.config_type, j.status
ORDER BY ad1.name, ad2.name, connection_schedule_type, j.config_type, j.status
`

return r.jobCountQuery(query)
Expand Down

0 comments on commit cb6e611

Please sign in to comment.