Skip to content

Commit

Permalink
Merge pull request #98 from Rfam/slurmize-dequeuer
Browse files Browse the repository at this point in the history
Slurmize dequeuer
  • Loading branch information
blakesweeney authored Jun 26, 2024
2 parents e0caf09 + e0ff9a7 commit d696710
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 39 deletions.
98 changes: 61 additions & 37 deletions Rfam/Lib/Bio/Rfam/View/Dequeuer.pm
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Bio::Rfam::View::Dequeuer - poll for and submit Rfam View process jobs
my $job_dequeuer = Bio::Rfam::View::Dequeuer->new( 'family' );
$job_dequeuer->daemonise;
$job_dequeuer->start_polling;
$job_dequeuer->start_polling($rfam_config);
=head1 DESCRIPTION
Expand Down Expand Up @@ -157,12 +157,12 @@ as in:
<view_process_job_dequeuer>
polling_interval 2
</view_process_job_dequeuer>
/view_process_job_dequeuer>
=cut

sub start_polling {
my ( $self ) = shift;
my ( $self, $rfam_config ) = shift;

my $delay = $self->_config->{view_process_job_dequeuer}->{polling_interval} || 2;
$self->_log->info( "starting submission loop for '" . $self->job_type .
Expand All @@ -188,26 +188,29 @@ sub start_polling {
# get the row for the next pending job from the tracking table
next unless my $job = $jobs->next;

# build a hash with the parameters describing the LSF job
# build a hash with the parameters describing the LSF or slurm job
my $job_spec = $self->_build_job_spec( $job );

# and actually submit that job to LSF
my $lsf_job = $self->_submit_lsf_job( $job_spec );
my $lsf_job_id = $lsf_job->id;

# and actually submit that job to LSF or slurm
my $job_id = $self->_submit_job( $rfam_config, $job_spec ); # job_id will be undefined if something went wrong
# make sure it submitted successfully
unless ( $lsf_job_id and $lsf_job_id =~ m/^\d+$/ ) {
unless ( $job_id and $job_id =~ m/^\d+$/ ) {
$self->_log->error( 'there was a problem submitting the view process for '
. $job->job_type . ' '
. $job->entity_acc );
$job->fail;
next;
}

$self->_log->debug( "job submitted with LSF ID $lsf_job_id" );

# update the job row with the LSF ID for the farm job
$job->lsf_id( $lsf_job_id );
if((defined $rfam_config->scheduler) && ($rfam_config->scheduler eq "slurm")) {
$self->_log->debug( "job submitted with slurm ID $job_id" );
}
else {
$self->_log->debug( "job submitted with LSF ID $job_id" );
}

# update the job row with the job ID for the farm job
$job->lsf_id( $job_id );

# and flag the job as running. Also sets the start time
$job->run;
Expand Down Expand Up @@ -310,23 +313,28 @@ sub _build_job_spec {

#-------------------------------------------------------------------------------

=head2 _submit_lsf_job
=head2 _submit_job
Given the job spec generated by L<_build_job_spec>, this method builds the
shell command needed to run the view process and submits it to LSF. Return
value is an LSF job object.
shell command needed to run the view process and submits it to LSF or slurm
depending on value of $rfam_config->scheduler. Returns the job id.
=cut

sub _submit_lsf_job {
my ( $self, $job_spec ) = @_;
sub _submit_job {
my ( $self, $rfam_config, $job_spec ) = @_;

my $scheduler = "LSF";
if((defined $rfam_config->scheduler) && ($rfam_config->scheduler eq "slurm")) {
$scheduler = "slurm";
}

$self->_log->debug( "building LSF job command" );
$self->_log->debug( "building $scheduler job command" );

my $working_dir = $job_spec->{tmp_dir} . '/'
. $job_spec->{lsf_user} . '/'
. $job_spec->{job_id};
$self->_log->debug( "LSF working directory: $working_dir" );
$self->_log->debug( "$scheduler working directory: $working_dir" );

my $view_script = $self->_config->{view_process_job_dequeuer}->{view_script}->{ $self->job_type };
$self->_log->debug( "view script: |$view_script|" );
Expand All @@ -337,32 +345,48 @@ sub _submit_lsf_job {
. ' ' . $self->view_set;
$self->_log->debug( "view command: |$view_command|" );

my $lsf_command = "mkdir -p $working_dir"
. " && cd $working_dir"
. " && $view_command"
. " && rm -rf $working_dir";
$self->_log->debug( "LSF command: |$lsf_command|" );
my $command = "mkdir -p $working_dir"
. " && cd $working_dir"
. " && $view_command"
. " && rm -rf $working_dir";
$self->_log->debug( "$scheduler command: |$command|" );

my $log_file = $job_spec->{tmp_dir} . '/'
. $job_spec->{lsf_user} . '/'
. $job_spec->{job_id} . '.log';
$self->_log->debug( "writing log to: |$log_file|" );

my $memory_resource = 'rusage[mem=' . $job_spec->{memory} . ']';
my $memory_resource = ($scheduler eq "LSF") ? 'rusage[mem=' . $job_spec->{memory} . ']' : '--mem-per-cpu=' . $job_spec->{memory};
$self->_log->debug( "memory resource string: |$memory_resource|" );

$self->_log->debug( "submitting LSF job" );
$self->_log->debug( "submitting $scheduler job" );

if($scheduler eq "LSF") {
my $lsf_job = LSF::Job->submit(
-o => $log_file,
-q => $job_spec->{lsf_queue},
-R => $memory_resource,
-R => $job_spec->{tmp_space},
-M => $job_spec->{memory},
$lsf_command
);
$job_id = $lsf_job->id;
}
else { # slurm
my $submit_cmd .= "sbatch -o $log_file -n 1 $memory_resource --time=48:00:00 --wrap \"$slurm_command\" > /dev/null";
$self->_log->debug( "$scheduler submit command: |$submit_cmd|" );
# we need to determine the job id, so we capture the stdout of sbatch
my $slurm_output = `$submit_cmd`;
# Submitted batch job 102045
if($slurm_output =~ /^Submitted batch job (\d+)/) {
$job_id = $1;
}
else {
$job_id = undef;
}
}

my $lsf_job = LSF::Job->submit(
-o => $log_file,
-q => $job_spec->{lsf_queue},
-R => $memory_resource,
-R => $job_spec->{tmp_space},
-M => $job_spec->{memory},
$lsf_command
);

return $lsf_job;
return $job_id;
}

#-------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions Rfam/Scripts/view/job_dequeuer.pl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
$job_dequeuer->daemonise;
}

$job_dequeuer->start_polling;
$job_dequeuer->start_polling($config);

exit;

Expand All @@ -80,7 +80,7 @@ =head1 DESCRIPTION
This script polls the "RfamJobs" database for new view process jobs to run. If
it finds a pending job, it builds a view process command and submits it to the
farm using LSF.
farm using LSF or slurm.
Once a job is submitted, the dequeuer goes back to polling the database; it
does not attempt to keep track of the progress or status of jobs once they have
Expand Down

0 comments on commit d696710

Please sign in to comment.