Skip to content

Commit

Permalink
chore(CE): added error handling to log duplicated primary key (#484)
Browse files Browse the repository at this point in the history
Co-authored-by: TivonB-AI2
  • Loading branch information
github-actions[bot] authored Nov 22, 2024
1 parent 83be837 commit 5481b08
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 18 deletions.
47 changes: 29 additions & 18 deletions server/lib/reverse_etl/extractors/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ def log_sync_run_error(sync_run)
}.to_s)
end

def log_skip_sync_run(sync_record, record, sync_run, error = nil)
primary_key = record.data.with_indifferent_access[sync_run.sync.model.primary_key]
message = error ? error.message : "Skipping sync record"

Rails.logger.info({
message:,
primary_key:,
sync_id: sync_run.sync_id,
sync_run_id: sync_run.id,
sync_record_id: sync_record&.id
}.to_s)
end

def process_record(record, sync_run, model)
primary_key = record.data.with_indifferent_access[model.primary_key]
raise StandardError, "Primary key cannot be blank" if primary_key.blank?
Expand Down Expand Up @@ -97,26 +110,24 @@ def action(sync_record)
end

def update_or_create_sync_record(sync_record, record, sync_run, fingerprint)
unless sync_record && new_record?(sync_record, fingerprint)
primary_key = record.data.with_indifferent_access[sync_run.sync.model.primary_key]
Rails.logger.info({
message: "Skipping sync record",
primary_key:,
sync_id: sync_run.sync_id,
sync_run_id: sync_run.id,
sync_record_id: sync_record&.id
}.to_s)
begin
unless sync_record && new_record?(sync_record, fingerprint)
log_skip_sync_run(sync_record, record, sync_run)

return false
return false
end
sync_record.assign_attributes(
sync_run_id: sync_run.id,
action: action(sync_record),
fingerprint:,
record: record.data,
status: "pending"
)
sync_record.save!
end
sync_record.assign_attributes(
sync_run_id: sync_run.id,
action: action(sync_record),
fingerprint:,
record: record.data,
status: "pending"
)
sync_record.save!
rescue StandardError => e
log_skip_sync_run(sync_record, record, sync_run, e)
false
end
end
end
Expand Down
31 changes: 31 additions & 0 deletions server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,37 @@
expect(updated_sync_record.status).to eq("success")
end
end

context "when error is raised skip the record" do
it "skip the record" do
sync_record = subject.send(:process_record, record.record, sync_run1, sync_run1.model)
fingerprint = ReverseEtl::Extractors::Base.new.send(:generate_fingerprint, record.record.data)
sync_record.update!(status: "success", fingerprint:, action: "destination_insert",
record: record.record.data)
fingerprint = ReverseEtl::Extractors::Base.new.send(:generate_fingerprint, record_dup.record.data)
sync_record_dup = subject.send(:process_record, record_dup.record, sync_run2, sync_run2.model)

expect(Rails.logger).to receive(:info).with({
message: "Test error",
primary_key: record_dup.record.data.with_indifferent_access[sync_run1.sync.model.primary_key],
sync_id: sync_run1.sync_id,
sync_run_id: sync_run1.id,
sync_record_id: sync_record_dup.id
}.to_s)

allow(sync_record_dup).to receive(:new_record?).and_raise(StandardError, "Test error")
result = subject.send(:update_or_create_sync_record, sync_record_dup, record_dup.record,
sync_run1, fingerprint)

expect(result).to eq(false)
updated_sync_record = SyncRecord.find(sync_record_dup.id)
expect(updated_sync_record).not_to be_nil
expect(updated_sync_record.sync_run_id).to eq(sync_run1.id)
expect(updated_sync_record.action).to eq("destination_insert")
expect(updated_sync_record.fingerprint).to eq(fingerprint)
expect(updated_sync_record.status).to eq("success")
end
end
end
end
end

0 comments on commit 5481b08

Please sign in to comment.