Skip to content

Commit

Permalink
clean up batches
Browse files Browse the repository at this point in the history
  • Loading branch information
alnutile committed Jun 23, 2024
1 parent 0ee6bf9 commit a904f75
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 109 deletions.
119 changes: 57 additions & 62 deletions app/Jobs/ProcessFileJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Log;
use LlmLaraHub\LlmDriver\LlmDriverFacade;
use LlmLaraHub\TagFunction\Jobs\TagDocumentJob;

class ProcessFileJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public array $jobs = [];

public array $finally = [];

/**
* Create a new job instance.
*/
Expand All @@ -40,69 +43,61 @@ public function handle(): void
*/
$document = $this->document;

if ($document->type === TypesEnum::Pptx) {
Log::info('Processing PPTX Document');
$batch = Bus::batch([
new ParsePowerPointJob($this->document),
])
->name('Process PPTX Document - '.$document->id)
->finally(function (Batch $batch) use ($document) {
Bus::batch([
[
new SummarizeDocumentJob($document),
new TagDocumentJob($document),
new DocumentProcessingCompleteJob($document),
],
])
->name("Summarizing and Tagging Document - {$document->id}")
->allowFailures()
->onQueue(LlmDriverFacade::driver($document->getDriver())->onQueue())
->dispatch();
})
->allowFailures()
->onQueue(LlmDriverFacade::driver($document->getDriver())->onQueue())
->dispatch();

} elseif ($document->type === TypesEnum::Txt) {

Log::info('Processing Text Document');
Bus::batch([
new ProcessTextFilesJob($this->document),
])
->name('Processing Text Document - '.$document->id)
->finally(function (Batch $batch) use ($document) {
DocumentProcessingCompleteJob::dispatch($document);
})
->allowFailures()
->onQueue(LlmDriverFacade::driver($document->getDriver())->onQueue())
->dispatch();
} elseif ($document->type === TypesEnum::HTML) {
$options = [
TypesEnum::Pptx->value => [
'jobs' => [
ParsePowerPointJob::class,
],
'finally' => [
SummarizeDocumentJob::class,
TagDocumentJob::class,
DocumentProcessingCompleteJob::class,
],
],
TypesEnum::Txt->value => [
'jobs' => [
ProcessTextFilesJob::class,
],
'finally' => [
DocumentProcessingCompleteJob::class,
],
],
TypesEnum::HTML->value => [
'jobs' => [
WebPageDocumentJob::class,
],
'finally' => [
DocumentProcessingCompleteJob::class,
],
],
TypesEnum::PDF->value => [
'jobs' => [
ParsePdfFileJob::class,
],
'finally' => [
DocumentProcessingCompleteJob::class,
],
],
];

Log::info('Processing Html Document');
$option = $options[$document->type->value];

Bus::batch([
new WebPageSourceJob($this->document->source, $this->document->file_path),
])
->name('Processing Html Document - '.$document->id)
->finally(function (Batch $batch) use ($document) {
DocumentProcessingCompleteJob::dispatch($document);
})
->allowFailures()
->onQueue(LlmDriverFacade::driver($document->getDriver())->onQueue())
->dispatch();
} elseif ($document->type === TypesEnum::PDF) {
Log::info('Processing PDF Document');
Bus::batch([
new ParsePdfFileJob($this->document),
])
->name('Process PDF Document - '.$document->id)
->finally(function (Batch $batch) use ($document) {
DocumentProcessingCompleteJob::dispatch($document);
})
->allowFailures()
->onQueue(LlmDriverFacade::driver($document->getDriver())->onQueue())
->dispatch();
}
Bus::batch(collect($option['jobs'])->map(function ($job) use ($document) {
return new $job($document);
})->toArray())
->name(sprintf('Process %s Document - %d', $document->type->value, $document->id))
->finally(function (Batch $batch) use ($document, $option) {
Bus::batch(collect($option['finally'])->map(function ($job) use ($document) {
return new $job($document);
})->toArray())
->name(sprintf('Part 2 of Process for %s Document - %d', $document->type->value, $document->id))
->allowFailures()
->onQueue(LlmDriverFacade::driver($document->getDriver())->onQueue())
->dispatch();
})
->allowFailures()
->onQueue(LlmDriverFacade::driver($document->getDriver())->onQueue())
->dispatch();

}
}
65 changes: 65 additions & 0 deletions app/Jobs/WebHelperTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

namespace App\Jobs;

use App\Helpers\TextChunker;
use App\Models\Document;
use App\Models\DocumentChunk;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Log;
use LlmLaraHub\LlmDriver\LlmDriverFacade;
use LlmLaraHub\TagFunction\Jobs\TagDocumentJob;

trait WebHelperTrait
{
protected function processDocument(Document $document): void
{
$jobs = [];

$page_number = 1;

$html = $document->original_content;

$chunked_chunks = TextChunker::handle($html);

foreach ($chunked_chunks as $chunkSection => $chunkContent) {

$guid = md5($chunkContent);

$DocumentChunk = DocumentChunk::updateOrCreate(
[
'document_id' => $document->id,
'sort_order' => $page_number,
'section_number' => $chunkSection,
],
[
'guid' => $guid,
'content' => to_utf8($chunkContent), //still having issues.
]
);

Log::info('[LaraChain] adding to new batch');

$jobs[] = [
new VectorlizeDataJob($DocumentChunk),
new TagDocumentJob($document),
new SummarizeDocumentJob($document),
];

$page_number++;
}

if (! empty($jobs)) {
Bus::batch($jobs)
->name('Web Pages to Documents - '.$document->subject)
->finally(function (Batch $batch) use ($document) {
DocumentProcessingCompleteJob::dispatch($document);
})
->allowFailures()
->onQueue(LlmDriverFacade::driver($document->collection->getDriver())->onQueue())
->dispatch();
}

}
}
42 changes: 42 additions & 0 deletions app/Jobs/WebPageDocumentJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

namespace App\Jobs;

use App\Models\Document;
use Illuminate\Bus\Batch;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class WebPageDocumentJob implements ShouldQueue
{
use Batchable;
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
use WebHelperTrait;

/**
* Create a new job instance.
*/
public function __construct(public Document $document)
{
//
}

/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...

return;
}

$this->processDocument($this->document);

}
}
51 changes: 4 additions & 47 deletions app/Jobs/WebPageSourceJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

use App\Domains\Documents\StatusEnum;
use App\Domains\Documents\TypesEnum;
use App\Helpers\TextChunker;
use App\Models\Document;
use App\Models\DocumentChunk;
use App\Models\Source;
use Facades\App\Domains\Sources\WebSearch\GetPage;
use Illuminate\Bus\Batch;
Expand All @@ -16,15 +14,12 @@
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Log;
use LlmLaraHub\LlmDriver\LlmDriverFacade;
use LlmLaraHub\TagFunction\Jobs\TagDocumentJob;

class WebPageSourceJob implements ShouldQueue
{
use Batchable;
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
use WebHelperTrait;

/**
* Create a new job instance.
Expand Down Expand Up @@ -66,13 +61,13 @@ public function handle(): void
$document = Document::updateOrCreate(
[
'source_id' => $this->source->id,
'type' => TypesEnum::HTML,
'subject' => to_utf8($title),
'link' => $this->url,
'collection_id' => $this->source->collection_id,
],
[
'status' => StatusEnum::Pending,
'type' => TypesEnum::HTML,
'subject' => to_utf8($title),
'file_path' => $this->url,
'summary' => str($html)->limit(254)->toString(),
'status_summary' => StatusEnum::Pending,
Expand All @@ -81,45 +76,7 @@ public function handle(): void
]
);

$page_number = 1;

$chunked_chunks = TextChunker::handle($html);

foreach ($chunked_chunks as $chunkSection => $chunkContent) {

$guid = md5($chunkContent);

$DocumentChunk = DocumentChunk::updateOrCreate(
[
'document_id' => $document->id,
'sort_order' => $page_number,
'section_number' => $chunkSection,
],
[
'guid' => $guid,
'content' => to_utf8($chunkContent), //still having issues.
]
);

Log::info('[LaraChain] adding to new batch');

$jobs[] = [
new VectorlizeDataJob($DocumentChunk),
new TagDocumentJob($document),
new SummarizeDocumentJob($document),
];

$page_number++;
}

Bus::batch($jobs)
->name('Web Pages to Documents - '.$this->source->subject)
->finally(function (Batch $batch) use ($document) {
DocumentProcessingCompleteJob::dispatch($document);
})
->allowFailures()
->onQueue(LlmDriverFacade::driver($this->source->getDriver())->onQueue())
->dispatch();
$this->processDocument($document);

}
}
Loading

0 comments on commit a904f75

Please sign in to comment.