Skip to content

Batch upload external subjects from a CSV manifest with error handling

Campbell Allen edited this page Dec 12, 2016 · 1 revision
import csv, os, signal
from panoptes_client import SubjectSet, Subject, Project, Panoptes
from panoptes_client.panoptes import PanoptesAPIException
# debugger with breakpoints set by pdb.set_trace()
#import pdb

# SETUP variables for the script
# ------------------------------
saved_subjects = []
uploaded_subjects_count = 0
csv_input_file = 'data/input_file_manifest.csv'
project_id = 1
# project_slug = 'owner-name/project-name'
set_name = 'subject_set_name'

# number of csv rows to process in a batch
# NOTE: if any api failure occurs, this will be the max number of
# subjects rolled backed before stopping / reporting after the error.
CSV_BATCH_SIZE = 100

# Define functions for re-use
# ---------------------------
# create a new subject and set the metadata
# and the remote URL for the externally hosted images
# e.g. not via zooniverse s3
def create_external_subject(project, row):
   subject = Subject()
   subject.links.project = project
   subject.locations.append({'image/jpeg': row['url']})
   # NOTE: modify this to set whatever metadata you want
   subject.metadata['origin'] = row['origin']
   subject.metadata['subject_id'] = row['subject_id']
   subject.metadata['image_name'] = row['image_name']
   subject.metadata['licence'] = row['licence']

   subject.save()
   return subject

def handle_batch_failure(saved_subjects):
    print('\nRolling back, attempting to clean up the the current batch of uploaded subjects.')
    for subject in saved_subjects :
        print('Removing the subject with id: {}'.format(subject.id))
        # this method may change in the future
        # https://github.com/zooniverse/panoptes-python-client/issues/39
        Subject.delete(subject.id, headers={'If-Match': subject.etag})

def add_batch_to_subject_set(subject_set, subjects):
    print('Linking {} subjects to the set with id: {}'.format(len(subjects), subject_set.id))
    subject_set.add(subjects)

# handle (Ctrl+C) keyboard interrupt
def signal_handler(*args):
    print('You pressed Ctrl+C! - attempting to clean up gracefully')
    handle_batch_failure(saved_subjects)
    raise SystemExit
#register the handler for interrupt signal
signal.signal(signal.SIGINT, signal_handler)

# get an API connection with our user creds
Panoptes.connect(username=os.environ['USERNAME'], password=os.environ['PASSWORD'])

# get a ref to the project we're uploading to
# change this for the correct project
# project = Project.find(project_slug)
project = Project.find(project_id)

# find / create the subject set to upload to
try:
    subject_set = SubjectSet.where(project_id=project.id, display_name=set_name).next()
    print("Using the existing subject set with id: {}.".format(subject_set.id))
except StopIteration:
    # create a new subject set for the new data and link it to the project above
    subject_set = SubjectSet()
    subject_set.links.project = project
    subject_set.display_name = set_name
    subject_set.save()
    print("Created a new subject set with id: {}.".format(subject_set.id))

# read the manifest and create externally linked subjects
with open(csv_input_file, 'rb') as csvfile:
    subjects_to_upload = csv.DictReader(csvfile)
    print("\nRead the csv maninfest, now building subjects for project id: {}".format(project.id))

    for count, row in enumerate(subjects_to_upload):
        # expected header format: image_name, origin, licence, link
        # print(row['image_name'], row['origin'], row['licence'], row['url'])

        # try and handle api failures, intermittent network, etc.
        try:
            subject = create_external_subject(project, row)
        except PanoptesAPIException as e:
            print('\nError occurred on row: {} of the csv file'.format(count+1))
            print('Details of error: {}'.format(e))
            handle_batch_failure(saved_subjects)
            raise SystemExit

        # save the list of subjects to add to the subject set above
        saved_subjects.append(subject)

        # for each batch of new subjects
        if (count + 1) % CSV_BATCH_SIZE == 0:
            add_batch_to_subject_set(subject_set, saved_subjects)
            # reset the saved_subjects for the next batch
            saved_subjects = []
            uploaded_subjects_count += CSV_BATCH_SIZE

# catch any left over batches in the file
if len(saved_subjects) > 0:
    add_batch_to_subject_set(subject_set, saved_subjects)
    uploaded_subjects_count += len(saved_subjects)

print("Finished uploading {} subjects".format(uploaded_subjects_count))