Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Audio Transcript heuristic for dynamic thread allocation on client #2023

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
26a6d13
add reenqueue item property and fallbacktask
gfd2020 Dec 7, 2023
dd6570f
add heuristic config variables to be set on RemoteWav2VectTranscript
gfd2020 Dec 7, 2023
0adf8c8
add config variables treatment
gfd2020 Dec 7, 2023
b74ab22
ass requeue item property and fallbacktask
gfd2020 Dec 7, 2023
ab38841
compatibility fix for new properties
gfd2020 Dec 7, 2023
5b2748f
setup config parameters
gfd2020 Dec 7, 2023
57e49ad
setup fallback task and call it when necessary
gfd2020 Dec 7, 2023
ad1586f
add new reenqueue method with spaced positioning
gfd2020 Dec 7, 2023
de1fcf0
add new method to add itens on middle of the queue
gfd2020 Dec 7, 2023
e5687f6
add static variables and logic to control the requeue heuristic
gfd2020 Dec 7, 2023
6e54dda
resolve conflict
gfd2020 Jun 17, 2024
36a48d9
fix conflict
gfd2020 Jun 17, 2024
95be1ab
fix fork conflicts
gfd2020 Jun 17, 2024
5bf47e4
fix conflits
gfd2020 Jun 17, 2024
728c713
fix conflicts
gfd2020 Jun 17, 2024
e2b1f05
fix conflicts
gfd2020 Jun 17, 2024
2abc1ca
Merge branch 'sepinf-inc:master' into audio-transcript-heuristic
gfd2020 Jun 17, 2024
4c434b8
fix conflicts
gfd2020 Jun 17, 2024
0d870f3
fix conflicts
gfd2020 Jun 17, 2024
063ebef
Merge branch 'sepinf-inc:master' into audio-transcript-heuristic
gfd2020 Jun 20, 2024
7eaa8c8
fix conflict
gfd2020 Dec 30, 2024
5002684
fix conflict
gfd2020 Dec 30, 2024
2c26c82
fix conflict
gfd2020 Dec 30, 2024
378fd55
Merge branch 'sepinf-inc:master' into audio-transcript-heuristic
gfd2020 Dec 30, 2024
7878e60
fix conflict
gfd2020 Dec 30, 2024
fa70309
Merge branch 'sepinf-inc:master' into audio-transcript-heuristic
gfd2020 Jan 2, 2025
1571ccc
Merge branch 'sepinf-inc:master' into audio-transcript-heuristic
gfd2020 Feb 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions iped-api/src/main/java/iped/data/IItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,9 @@ public interface IItem extends IItemReader {
@Override
String toString();

void setReEnqueueItem(boolean val);
boolean isReEnqueueItem();
void setFallBackTask(boolean val);
boolean isFallBackTask();

}
15 changes: 15 additions & 0 deletions iped-app/resources/config/conf/AudioTranscriptConfig.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ batchSize = 1
# IP:PORT of the service/central node used by the RemoteTranscriptionTask implementation.
# remoteServiceAddress = 127.0.0.1:11111

#Performs a heuristic for dynamic thread allocation and spaced requeue. Helps improve performance of slow transcription servers.
#clientDynamicThreadRequeueHeuristics = true

#If active, the client will also help the server with the transcription task, only if the client has no other tasks to do. The heuristic must be turned on
#clientTranscriptHelp = true

#Defines the implementation class for client help, must be a local implementation ( not remote transcript task )
#clientTranscriptHelpImplementationClass = iped.engine.task.transcript.Wav2Vec2TranscriptTask

#Advanced Parameter. Defines which part of the queue the items will be sent to. 4 = 1/4 size. Values ​​greater than or equal to 1
#clientSplitQueueRatio = 4

#Advanced Parameter. Sets the delta time in milliseconds when consecutive items are requested to be requeued, provides better spacing.
#clientRequeueDeltaTime = 5000

#########################################
# MicrosoftTranscriptTask options
#########################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig {
private static final String SKIP_KNOWN_FILES = "skipKnownFiles";
private static final String PRECISION = "precision";
private static final String BATCH_SIZE = "batchSize";
private static final String REQUEUE_HEURISTICS = "clientDynamicThreadRequeueHeuristics";
private static final String CLIENTE_HELP = "clientTranscriptHelp";
private static final String IMPL_CLASS_KEY_CLIENT = "clientTranscriptHelpImplementationClass";
private static final String REQUEUE_RATIO = "clientSplitQueueRatio";
private static final String REQUEUE_DELTA_TIME = "clientRequeueDeltaTime";

private List<String> languages = new ArrayList<>();
private List<String> mimesToProcess = new ArrayList<>();
Expand All @@ -53,6 +58,11 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig {
private boolean skipKnownFiles = true;
private String precision = "int8";
private int batchSize = 1;
private boolean requeueHeuristic = false;
private boolean clientTranscriptHelp = false;
private String classNameFallBack = "";
private int requeueRatio = 4;
private long requeueDeltaTime = 5000;

public String getPrecision() {
return precision;
Expand Down Expand Up @@ -136,6 +146,26 @@ public String getGoogleModel() {
return googleModel;
}

public boolean getRequeueHeuristic() {
return requeueHeuristic;
}

public boolean getClientTranscriptHelp() {
return clientTranscriptHelp;
}

public String getClassNameFallBack() {
return classNameFallBack;
}

public int getRequeueRatio() {
return requeueRatio;
}

public long getRequeueDeltaTime() {
return requeueDeltaTime;
}

@Override
public void processProperties(UTF8Properties properties) {

Expand Down Expand Up @@ -200,6 +230,22 @@ public void processProperties(UTF8Properties properties) {
if (value != null) {
batchSize = Integer.parseInt(value.trim());
}
value = properties.getProperty(CLIENTE_HELP);
if (value != null) {
clientTranscriptHelp = Boolean.valueOf(value.trim());
}
value = properties.getProperty(IMPL_CLASS_KEY_CLIENT);
if (value != null) {
classNameFallBack = value.trim();
}
value = properties.getProperty(REQUEUE_RATIO);
if (value != null) {
requeueRatio = Integer.valueOf(value.trim());
}
value = properties.getProperty(REQUEUE_DELTA_TIME);
if (value != null) {
requeueDeltaTime = Long.valueOf(value.trim());
}
}

/**
Expand Down
44 changes: 44 additions & 0 deletions iped-engine/src/main/java/iped/engine/core/ProcessingQueues.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import iped.data.IItem;
import iped.engine.data.CaseData;
import iped.engine.util.Util;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ProcessingQueues {

Expand Down Expand Up @@ -61,6 +63,48 @@ public void addItemToQueue(IItem item, int queuePriority) throws InterruptedExce
addItemToQueue(item, queuePriority, false, false);
}

public void addItemToQueueSpaced(IItem item, int queuePriority, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) throws InterruptedException {
addItemToQueueSpaced(item, queuePriority, true, numWorkers, lastQueueIndex, lastQueueTime, queueSplit,queueDeltaTime);
}

private void addItemToQueueSpaced(IItem item, int queuePriority, boolean blockIfFull, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime)
throws InterruptedException {

Util.calctrackIDAndUpdateID(caseData, item);

LinkedList<IItem> queue = queues.get(queuePriority);
boolean sleep = false;
while (true) {
if (sleep) {
sleep = false;
Thread.sleep(1000);
}
synchronized (this) {
if (blockIfFull && queuePriority == 0 && queue.size() >= maxQueueSize) {
sleep = true;
continue;
} else {
queueSplit = (queueSplit <= 0)?4:queueSplit;
queueDeltaTime = (queueDeltaTime <= 0)?5000:queueDeltaTime;
int queueSplitInteger = (int)(queue.size()/queueSplit);
if (lastQueueIndex.get() == -1){
lastQueueIndex.set(queueSplitInteger);
lastQueueTime.set(System.currentTimeMillis());
}else{
if (lastQueueIndex.get() + numWorkers < queue.size() && ((System.currentTimeMillis() - lastQueueTime.get()) < queueDeltaTime)){
lastQueueIndex.addAndGet(numWorkers);
}else{
lastQueueIndex.set(queueSplitInteger);
}
lastQueueTime.set(System.currentTimeMillis());
}
queue.add(lastQueueIndex.get(),item);
break;
}
}
}
}

private void addItemToQueue(IItem item, int queuePriority, boolean addFirst, boolean blockIfFull)
throws InterruptedException {

Expand Down
21 changes: 21 additions & 0 deletions iped-engine/src/main/java/iped/engine/data/Item.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ public static void setStartID(int start) {

private static final int maxImageLength = 128 << 20;

private boolean reEnqueueItem = false;

private boolean fallBackTask = false;

/**
* Adiciona o item a uma categoria.
*
Expand Down Expand Up @@ -1316,4 +1320,21 @@ public Object getTempAttribute(String key) {
public void setTempAttribute(String key, Object value) {
tempAttributes.put(key, value);
}

public void setReEnqueueItem(boolean val){
this.reEnqueueItem = val;
}

public boolean isReEnqueueItem(){
return this.reEnqueueItem;
}

public void setFallBackTask(boolean val){
this.fallBackTask = val;
}

public boolean isFallBackTask(){
return this.fallBackTask;
}

}
16 changes: 16 additions & 0 deletions iped-engine/src/main/java/iped/engine/task/AbstractTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import iped.engine.data.CaseData;
import iped.engine.io.TimeoutException;
import iped.parsers.util.CorruptedCarvedException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Classe que representa uma tarefa de procesamento (assinatura, hash, carving,
Expand Down Expand Up @@ -248,6 +250,20 @@ protected void sendToNextTask(IItem evidence) throws Exception {
}
}

protected void reEnqueueItemSpaced(IItem item, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) throws InterruptedException {
reEnqueueItemSpaced(item, worker.manager.getProcessingQueues().getCurrentQueuePriority(), numWorkers, lastQueueIndex, lastQueueTime, queueSplit, queueDeltaTime);
throw new ItemReEnqueuedException();
}

private void reEnqueueItemSpaced(IItem item, int queue, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) throws InterruptedException {
item.dispose();
SkipCommitedTask.checkAgainLaterProcessedParents(item);
worker.manager.getProcessingQueues().addItemToQueueSpaced(item, queue, numWorkers, lastQueueIndex, lastQueueTime, queueSplit, queueDeltaTime);
if (!item.isQueueEnd()) {
worker.decItemsBeingProcessed();
}
}

protected void reEnqueueItem(IItem item) throws InterruptedException {
reEnqueueItem(item, worker.manager.getProcessingQueues().getCurrentQueuePriority());
throw new ItemReEnqueuedException();
Expand Down
Loading