From 26a6d13e12f1712d58a30dae3c1a1f392ccf6216 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:31:31 -0300 Subject: [PATCH 01/22] add reenqueue item property and fallbacktask --- iped-api/src/main/java/iped/data/IItem.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/iped-api/src/main/java/iped/data/IItem.java b/iped-api/src/main/java/iped/data/IItem.java index 6af1bef799..1561d6be15 100644 --- a/iped-api/src/main/java/iped/data/IItem.java +++ b/iped-api/src/main/java/iped/data/IItem.java @@ -468,4 +468,9 @@ public interface IItem extends IItemReader { @Override String toString(); + void setReEnqueueItem(boolean val); + boolean isReEnqueueItem(); + void setFallBackTask(boolean val); + boolean isFallBackTask(); + } From dd6570f395832252d3b82233bb0205eac3b8037f Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:32:07 -0300 Subject: [PATCH 02/22] add heuristic config variables to be set on RemoteWav2VectTranscript --- .../config/conf/AudioTranscriptConfig.txt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/iped-app/resources/config/conf/AudioTranscriptConfig.txt b/iped-app/resources/config/conf/AudioTranscriptConfig.txt index 67ac350618..7de56ca38f 100644 --- a/iped-app/resources/config/conf/AudioTranscriptConfig.txt +++ b/iped-app/resources/config/conf/AudioTranscriptConfig.txt @@ -97,6 +97,21 @@ minWordScore = 0.5 # IP:PORT of the service/central node used by the RemoteWav2Vec2TranscriptTask implementation. # wav2vec2Service = 127.0.0.1:11111 +#Performs a heuristic for dynamic thread allocation and spaced requeue. Helps improve performance of slow transcription servers. +#clientDynamicThreadRequeueHeuristics = true + +#If active, the client will also help the server with the transcription task, only if the client has no other tasks to do. The heuristic must be turned on +#clientTranscriptHelp = true + +#Defines the implementation class for client help, must be a local implementation ( not remote transcript task ) +#clientTranscriptHelpImplementationClass = iped.engine.task.transcript.Wav2Vec2TranscriptTask + +#Advanced Parameter. Defines which part of the queue the items will be sent to. 4 = 1/4 size. Values ​​greater than or equal to 1 +#clientSplitQueueRatio = 4 + +#Advanced Parameter. Sets the delta time in milliseconds when consecutive items are requested to be requeued, provides better spacing. +#clientRequeueDeltaTime = 5000 + ######################################### # MicrosoftTranscriptTask options ######################################### From 0adf8c8e9164ad66647277065c34e067c494ae3d Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:32:54 -0300 Subject: [PATCH 03/22] add config variables treatment --- .../engine/config/AudioTranscriptConfig.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java index 198d73e62b..4a47580272 100644 --- a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java +++ b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java @@ -32,6 +32,12 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private static final String LANG_AUTO_VAL = "auto"; private static final String SKIP_KNOWN_FILES = "skipKnownFiles"; + private static final String REQUEUE_HEURISTICS = "clientDynamicThreadRequeueHeuristics"; + private static final String CLIENTE_HELP = "clientTranscriptHelp"; + private static final String IMPL_CLASS_KEY_CLIENT = "clientTranscriptHelpImplementationClass"; + private static final String REQUEUE_RATIO = "clientSplitQueueRatio"; + private static final String REQUEUE_DELTA_TIME = "clientRequeueDeltaTime"; + private List languages = new ArrayList<>(); private List mimesToProcess = new ArrayList<>(); private String className; @@ -47,6 +53,12 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private String googleModel; private boolean skipKnownFiles = true; + private boolean requeueHeuristic = false; + private boolean clientTranscriptHelp = false; + private String classNameFallBack = ""; + private int requeueRatio = 4; + private long requeueDeltaTime = 5000; + public boolean getSkipKnownFiles() { return this.skipKnownFiles; } @@ -117,6 +129,27 @@ public String getGoogleModel() { return googleModel; } + public boolean getRequeueHeuristic() { + return requeueHeuristic; + } + + public boolean getClientTranscriptHelp() { + return clientTranscriptHelp; + } + + public String getClassNameFallBack() { + return classNameFallBack; + } + + public int getRequeueRatio() { + return requeueRatio; + } + + public long getRequeueDeltaTime() { + return requeueDeltaTime; + } + + @Override public void processProperties(UTF8Properties properties) { @@ -165,6 +198,31 @@ public void processProperties(UTF8Properties properties) { if (value != null) { timeoutPerSec = Integer.valueOf(value.trim()); } + + value = properties.getProperty(REQUEUE_HEURISTICS); + if (value != null) { + requeueHeuristic = Boolean.valueOf(value.trim()); + } + + value = properties.getProperty(CLIENTE_HELP); + if (value != null) { + clientTranscriptHelp = Boolean.valueOf(value.trim()); + } + + value = properties.getProperty(IMPL_CLASS_KEY_CLIENT); + if (value != null) { + classNameFallBack = value.trim(); + } + + value = properties.getProperty(REQUEUE_RATIO); + if (value != null) { + requeueRatio = Integer.valueOf(value.trim()); + } + + value = properties.getProperty(REQUEUE_DELTA_TIME); + if (value != null) { + requeueDeltaTime = Long.valueOf(value.trim()); + } } /** From b74ab222c758846e54d6dc9f57e41b1798bf23d5 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:33:33 -0300 Subject: [PATCH 04/22] ass requeue item property and fallbacktask --- .../src/main/java/iped/engine/data/Item.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/iped-engine/src/main/java/iped/engine/data/Item.java b/iped-engine/src/main/java/iped/engine/data/Item.java index 7213c518c4..42e38e9551 100644 --- a/iped-engine/src/main/java/iped/engine/data/Item.java +++ b/iped-engine/src/main/java/iped/engine/data/Item.java @@ -201,6 +201,10 @@ public static void setStartID(int start) { static final int BUF_LEN = 8 * 1024 * 1024; + private boolean reEnqueueItem = false; + + private boolean fallBackTask = false; + /** * Adiciona o item a uma categoria. * @@ -1265,4 +1269,21 @@ public Object getTempAttribute(String key) { public void setTempAttribute(String key, Object value) { tempAttributes.put(key, value); } + + public void setReEnqueueItem(boolean val){ + this.reEnqueueItem = val; + } + + public boolean isReEnqueueItem(){ + return this.reEnqueueItem; + } + + public void setFallBackTask(boolean val){ + this.fallBackTask = val; + } + + public boolean isFallBackTask(){ + return this.fallBackTask; + } + } From ab38841224472b6373c2ae1120ce9d45b9811fac Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:33:56 -0300 Subject: [PATCH 05/22] compatibility fix for new properties --- .../parsers/util/BaseItemSearchContext.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/iped-parsers/iped-parsers-impl/src/test/java/iped/parsers/util/BaseItemSearchContext.java b/iped-parsers/iped-parsers-impl/src/test/java/iped/parsers/util/BaseItemSearchContext.java index 3b202beb16..1218d22d47 100644 --- a/iped-parsers/iped-parsers-impl/src/test/java/iped/parsers/util/BaseItemSearchContext.java +++ b/iped-parsers/iped-parsers-impl/src/test/java/iped/parsers/util/BaseItemSearchContext.java @@ -50,6 +50,32 @@ protected ParseContext getContext(String testFilePath) throws IOException { File file = getFile(testFilePath); IItem item = new IItem() { + + @Override + public void setReEnqueueItem(boolean val) { + // TODO Auto-generated method stub + + } + + @Override + public boolean isReEnqueueItem() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void setFallBackTask(boolean val) { + // TODO Auto-generated method stub + + } + + @Override + public boolean isFallBackTask() { + // TODO Auto-generated method stub + return false; + } + + @Override public boolean isTimedOut() { // TODO Auto-generated method stub From 5b2748f0c6ea0c92c32b7e8be90dc4fb585eddd5 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:35:57 -0300 Subject: [PATCH 06/22] setup config parameters --- .../task/transcript/RemoteWav2Vec2TranscriptTask.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java index d7e1627af0..c6fa6b6f78 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java @@ -81,6 +81,13 @@ public void init(ConfigurationManager configurationManager) throws Exception { super.init(configurationManager); + this.setRemoteTask(true); + this.setRequeueHeuristic(transcriptConfig.getRequeueHeuristic()); + this.setClientTranscriptHelp(transcriptConfig.getClientTranscriptHelp()); + this.setClassNameFallBack(transcriptConfig.getClassNameFallBack()); + this.setRequeueRatio(transcriptConfig.getRequeueRatio()); + this.setRequeueDeltaTime(transcriptConfig.getRequeueDeltaTime()); + if (!this.isEnabled()) { return; } From 57e49ad63f7bcb89120d010db0e6a3a2e1e3b256 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:37:54 -0300 Subject: [PATCH 07/22] setup fallback task and call it when necessary --- .../task/transcript/AudioTranscriptTask.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/AudioTranscriptTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/AudioTranscriptTask.java index ad07705e32..559e353f05 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/AudioTranscriptTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/AudioTranscriptTask.java @@ -12,6 +12,8 @@ public class AudioTranscriptTask extends AbstractTask { private AbstractTranscriptTask impl; + private AbstractTranscriptTask implFallBack; + private boolean enableClientFallBack = false; @Override public List> getConfigurables() { @@ -25,6 +27,21 @@ public void init(ConfigurationManager configurationManager) throws Exception { impl = (AbstractTranscriptTask) Class.forName(transcriptConfig.getClassName()).getDeclaredConstructor().newInstance(); impl.setWorker(worker); impl.init(configurationManager); + + if (impl.getRequeueHeuristic() && impl.getClientTranscriptHelp()){ + String classNameFallBack = impl.getClassNameFallBack(); + if (classNameFallBack != null && !classNameFallBack.isEmpty()){ + implFallBack = (AbstractTranscriptTask) Class.forName(classNameFallBack).getDeclaredConstructor().newInstance(); + if (implFallBack.isRemoteTask()){ + implFallBack = null; + return; + } + enableClientFallBack = true; + implFallBack.setWorker(worker); + implFallBack.init(configurationManager); + } + } + } public boolean isEnabled() { @@ -34,11 +51,21 @@ public boolean isEnabled() { @Override public void finish() throws Exception { impl.finish(); + if (enableClientFallBack){ + implFallBack.finish(); + } + } @Override protected void process(IItem evidence) throws Exception { - impl.process(evidence); + + if(evidence.isFallBackTask() && enableClientFallBack){ + implFallBack.process(evidence); + }else{ + impl.process(evidence); + } + } } From ad1586fcaa6242ec930a5efe43e5689564d7b4db Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:40:16 -0300 Subject: [PATCH 08/22] add new reenqueue method with spaced positioning --- .../main/java/iped/engine/task/AbstractTask.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/iped-engine/src/main/java/iped/engine/task/AbstractTask.java b/iped-engine/src/main/java/iped/engine/task/AbstractTask.java index 70e60fde4d..6519498aeb 100644 --- a/iped-engine/src/main/java/iped/engine/task/AbstractTask.java +++ b/iped-engine/src/main/java/iped/engine/task/AbstractTask.java @@ -18,6 +18,8 @@ import iped.engine.data.CaseData; import iped.engine.io.TimeoutException; import iped.parsers.util.CorruptedCarvedException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Classe que representa uma tarefa de procesamento (assinatura, hash, carving, @@ -248,6 +250,20 @@ protected void sendToNextTask(IItem evidence) throws Exception { } } + protected void reEnqueueItemSpaced(IItem item, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) throws InterruptedException { + reEnqueueItemSpaced(item, worker.manager.getProcessingQueues().getCurrentQueuePriority(), numWorkers, lastQueueIndex, lastQueueTime, queueSplit, queueDeltaTime); + throw new ItemReEnqueuedException(); + } + + private void reEnqueueItemSpaced(IItem item, int queue, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) throws InterruptedException { + item.dispose(); + SkipCommitedTask.checkAgainLaterProcessedParents(item); + worker.manager.getProcessingQueues().addItemToQueueSpaced(item, queue, numWorkers, lastQueueIndex, lastQueueTime, queueSplit, queueDeltaTime); + if (!item.isQueueEnd()) { + worker.decItemsBeingProcessed(); + } + } + protected void reEnqueueItem(IItem item) throws InterruptedException { reEnqueueItem(item, worker.manager.getProcessingQueues().getCurrentQueuePriority()); throw new ItemReEnqueuedException(); From de1fcf064057b2966af381cea0166ba579ad0546 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:45:52 -0300 Subject: [PATCH 09/22] add new method to add itens on middle of the queue --- .../iped/engine/core/ProcessingQueues.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/iped-engine/src/main/java/iped/engine/core/ProcessingQueues.java b/iped-engine/src/main/java/iped/engine/core/ProcessingQueues.java index cbf4d74a04..3192595076 100644 --- a/iped-engine/src/main/java/iped/engine/core/ProcessingQueues.java +++ b/iped-engine/src/main/java/iped/engine/core/ProcessingQueues.java @@ -6,6 +6,8 @@ import iped.data.IItem; import iped.engine.data.CaseData; import iped.engine.util.Util; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class ProcessingQueues { @@ -61,6 +63,48 @@ public void addItemToQueue(IItem item, int queuePriority) throws InterruptedExce addItemToQueue(item, queuePriority, false, false); } + public void addItemToQueueSpaced(IItem item, int queuePriority, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) throws InterruptedException { + addItemToQueueSpaced(item, queuePriority, true, numWorkers, lastQueueIndex, lastQueueTime, queueSplit,queueDeltaTime); + } + + private void addItemToQueueSpaced(IItem item, int queuePriority, boolean blockIfFull, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) + throws InterruptedException { + + Util.calctrackIDAndUpdateID(caseData, item); + + LinkedList queue = queues.get(queuePriority); + boolean sleep = false; + while (true) { + if (sleep) { + sleep = false; + Thread.sleep(1000); + } + synchronized (this) { + if (blockIfFull && queuePriority == 0 && queue.size() >= maxQueueSize) { + sleep = true; + continue; + } else { + queueSplit = (queueSplit <= 0)?4:queueSplit; + queueDeltaTime = (queueDeltaTime <= 0)?5000:queueDeltaTime; + int queueSplitInteger = (int)(queue.size()/queueSplit); + if (lastQueueIndex.get() == -1){ + lastQueueIndex.set(queueSplitInteger); + lastQueueTime.set(System.currentTimeMillis()); + }else{ + if (lastQueueIndex.get() + numWorkers < queue.size() && ((System.currentTimeMillis() - lastQueueTime.get()) < queueDeltaTime)){ + lastQueueIndex.addAndGet(numWorkers); + }else{ + lastQueueIndex.set(queueSplitInteger); + } + lastQueueTime.set(System.currentTimeMillis()); + } + queue.add(lastQueueIndex.get(),item); + break; + } + } + } + } + private void addItemToQueue(IItem item, int queuePriority, boolean addFirst, boolean blockIfFull) throws InterruptedException { From e5687f64b07db6d1f75e13531c5c850e9369f30d Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:53:22 -0300 Subject: [PATCH 10/22] add static variables and logic to control the requeue heuristic --- .../transcript/AbstractTranscriptTask.java | 146 +++++++++++++++++- 1 file changed, 144 insertions(+), 2 deletions(-) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/AbstractTranscriptTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/AbstractTranscriptTask.java index 8c66a3c569..92df0debe9 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/AbstractTranscriptTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/AbstractTranscriptTask.java @@ -84,6 +84,26 @@ public abstract class AbstractTranscriptTask extends AbstractTask { protected IItem evidence; + private boolean remoteTask = false; + private boolean requeueHeuristic = false; + private boolean clientTranscriptHelp = false; + private String classNameFallBack = ""; + private int requeueRatio = 4; + private long requeueDeltaTime = 5000; + + private static int numRemoteTasks = 1; + private static int minRemoteTasks = 2; + private static int maxRemoteTasks = 2; + private static int currentRemoteTasks = 0; + private static int queueTranscriptions = 0; + private static int tasksCount = 0; + private static int tasksChars = 0; + private static int tasksTime = 0; + private static int numWorkers = 1; + private static int currentFallBackTasks = 0; + private static AtomicInteger lastQueueIndex = new AtomicInteger(-1); + private static AtomicLong lastQueueTime = new AtomicLong(-1); + @Override public boolean isEnabled() { return transcriptConfig.isEnabled(); @@ -181,6 +201,12 @@ public void init(ConfigurationManager configurationManager) throws Exception { transcriptConfig = configurationManager.findObject(AudioTranscriptConfig.class); MIN_TIMEOUT = transcriptConfig.getMinTimeout(); + numWorkers = this.worker.manager.getNumWorkers(); + numRemoteTasks = (((int)numWorkers/2)>0)?(int)numWorkers/2:1; + minRemoteTasks = (numRemoteTasks>2)?2:1; // at leat 2 workers for remote tasks + maxRemoteTasks = (numWorkers>1)?numWorkers-1:1; // at leat 1 worker for client usage + tasksCount = numRemoteTasks; + // clear default config service address in output this.transcriptConfig.clearTranscriptionServiceAddress(output); // clear profile config service address in output @@ -380,6 +406,36 @@ protected void process(IItem evidence) throws Exception { return; } + synchronized(this){ + if (this.getRequeueHeuristic() && this.isRemoteTask() && !evidence.isFallBackTask()){ + + if (currentRemoteTasks >= numRemoteTasks){ + + if (!evidence.isReEnqueueItem()){ + queueTranscriptions++; + } + + int remaingQueues = this.worker.manager.getProcessingQueues().getCurrentQueueSize(); + + if (this.getClientTranscriptHelp() && remaingQueues <= (queueTranscriptions + (numWorkers*2))){ + + if (currentFallBackTasks==0){ + currentFallBackTasks++; + evidence.setFallBackTask(true); + } + + } + + evidence.setReEnqueueItem(true); + super.reEnqueueItemSpaced(evidence, numWorkers, lastQueueIndex, lastQueueTime, this.getRequeueRatio(), this.getRequeueDeltaTime()); + return; + + }else if (currentRemoteTasks < numRemoteTasks){ + currentRemoteTasks ++; + } + } + } + try { evidence.getTempFile(); } catch (IOException e) { @@ -393,18 +449,22 @@ protected void process(IItem evidence) throws Exception { return; } + int chars = 0; + long time = 0; try { this.evidence = evidence; long t = System.currentTimeMillis(); TextAndScore result = transcribeAudio(tmpFile); - transcriptionTime.addAndGet(System.currentTimeMillis() - t); + time = System.currentTimeMillis() - t; + transcriptionTime.addAndGet(time); if (result != null) { evidence.getMetadata().set(ExtraProperties.CONFIDENCE_ATTR, Double.toString(result.score)); evidence.getMetadata().set(ExtraProperties.TRANSCRIPT_ATTR, result.text); storeTextInDb(evidence.getHash(), result.text, result.score); transcriptionSuccess.incrementAndGet(); if (result.text != null) { - transcriptionChars.addAndGet(result.text.length()); + chars = result.text.length(); + transcriptionChars.addAndGet(chars); } } else { transcriptionFail.incrementAndGet(); @@ -416,11 +476,93 @@ protected void process(IItem evidence) throws Exception { } LOGGER.error("Unexpected exception while transcribing: " + evidence.getPath(), e); } finally { + synchronized(this){ + if (this.getRequeueHeuristic() && this.isRemoteTask() && !evidence.isFallBackTask()){ + if (evidence.isReEnqueueItem()){ + queueTranscriptions--; + } + currentRemoteTasks--; + tasksCount--; + tasksChars += chars; + tasksTime += time; + if (tasksCount==0){ + String info = ""; + double speed = tasksChars; + speed = speed/(tasksTime/1000)/numRemoteTasks; + info = "Transcript "+tasksChars+" chars in "+(tasksTime/1000)+" seconds with "+numRemoteTasks+" tasks. Speed "+speed; + if (speed < 0.9){ + numRemoteTasks--; + numRemoteTasks = (minRemoteTasks < numRemoteTasks)?numRemoteTasks:minRemoteTasks; + info += ". Decreasing number of tasks to "+numRemoteTasks; + }else if (speed > 1.1){ + numRemoteTasks++; + numRemoteTasks = (maxRemoteTasks > numRemoteTasks)?numRemoteTasks:maxRemoteTasks; + info += ". Increasing number of tasks to "+numRemoteTasks; + } + LOGGER.info("{}", info); + tasksCount = numRemoteTasks; + tasksChars = 0; + tasksTime = 0; + } + }else { + if (currentFallBackTasks > 0){ + currentFallBackTasks--; + } + } + } tmp.close(); } } + public void setRemoteTask(boolean remoteTask){ + this.remoteTask = remoteTask; + } + + public boolean isRemoteTask(){ + return this.remoteTask; + } + + public void setRequeueHeuristic(boolean requeueHeuristic){ + this.requeueHeuristic = requeueHeuristic; + } + + public boolean getRequeueHeuristic(){ + return this.requeueHeuristic; + } + + public void setClientTranscriptHelp(boolean clientTranscriptHelp){ + this.clientTranscriptHelp = clientTranscriptHelp; + } + + public boolean getClientTranscriptHelp(){ + return this.clientTranscriptHelp; + } + + public void setClassNameFallBack(String classNameFallBack){ + this.classNameFallBack = classNameFallBack; + } + + public String getClassNameFallBack(){ + return this.classNameFallBack; + } + + public void setRequeueRatio(int requeueRatio){ + this.requeueRatio = requeueRatio; + } + + public int getRequeueRatio(){ + return this.requeueRatio; + } + + public void setRequeueDeltaTime(Long requeueDeltaTime){ + this.requeueDeltaTime = requeueDeltaTime; + } + + public Long getRequeueDeltaTime(){ + return this.requeueDeltaTime; + } + protected abstract TextAndScore transcribeAudio(File tmpFile) throws Exception; } From 6e54dda7126e1d8828eedf63832649c58024c0a7 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:07:33 -0300 Subject: [PATCH 11/22] resolve conflict --- .../RemoteWav2Vec2TranscriptTask.java | 332 +----------------- 1 file changed, 9 insertions(+), 323 deletions(-) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java index c6fa6b6f78..83fb1b17c0 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java @@ -1,325 +1,11 @@ package iped.engine.task.transcript; -import java.io.BufferedOutputStream; -import java.io.BufferedReader; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.net.ConnectException; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.tika.io.TemporaryResources; - -import iped.configuration.IConfigurationDirectory; -import iped.data.IItem; -import iped.engine.config.AudioTranscriptConfig; -import iped.engine.config.ConfigurationManager; -import iped.engine.core.Manager; -import iped.engine.io.TimeoutException; -import iped.engine.task.transcript.RemoteWav2Vec2Service.MESSAGES; -import iped.exception.IPEDException; - -public class RemoteWav2Vec2TranscriptTask extends AbstractTranscriptTask { - - private static Logger logger = LogManager.getLogger(Wav2Vec2TranscriptTask.class); - - private static final int MAX_CONNECT_ERRORS = 60; - - private static final int UPDATE_SERVERS_INTERVAL_MILLIS = 60000; - - private static List servers = new ArrayList<>(); - - private static int currentServer = -1; - - private static AtomicInteger numConnectErrors = new AtomicInteger(); - - private static AtomicLong audioSendingTime = new AtomicLong(); - - private static AtomicLong transcriptReceiveTime = new AtomicLong(); - - private static AtomicBoolean statsPrinted = new AtomicBoolean(); - - private static long lastUpdateServersTime = 0; - - private static class Server { - - String ip; - int port; - - public String toString() { - return ip + ":" + port; - } - } - - // See https://github.com/sepinf-inc/IPED/issues/1576 - private int getRetryIntervalMillis() { - // This depends on how much time worker nodes need to consume their queue. - // Of course audios duration, nodes queue size and performance affect this. - // This tries to be fair with clients independent of their number of threads. - return Manager.getInstance().getNumWorkers() * 100; - } - - @Override - public void init(ConfigurationManager configurationManager) throws Exception { - - super.init(configurationManager); - - this.setRemoteTask(true); - this.setRequeueHeuristic(transcriptConfig.getRequeueHeuristic()); - this.setClientTranscriptHelp(transcriptConfig.getClientTranscriptHelp()); - this.setClassNameFallBack(transcriptConfig.getClassNameFallBack()); - this.setRequeueRatio(transcriptConfig.getRequeueRatio()); - this.setRequeueDeltaTime(transcriptConfig.getRequeueDeltaTime()); - - if (!this.isEnabled()) { - return; - } - - if (!servers.isEmpty()) { - return; - } - - boolean disable = false; - if (transcriptConfig.getWav2vec2Service() == null) { - String ipedRoot = System.getProperty(IConfigurationDirectory.IPED_ROOT); - if (ipedRoot != null) { - Path path = new File(ipedRoot, "conf/" + AudioTranscriptConfig.CONF_FILE).toPath(); - configurationManager.getConfigurationDirectory().addPath(path); - configurationManager.addObject(transcriptConfig); - configurationManager.loadConfig(transcriptConfig); - // maybe user changed installation configs - if (transcriptConfig.getWav2vec2Service() == null) { - disable = true; - } else { - transcriptConfig.setEnabled(true); - transcriptConfig.setClassName(this.getClass().getName()); - } - } else { - disable = true; - } - } - - if (disable) { - transcriptConfig.setEnabled(false); - logger.warn("Remote transcription module disabled, service address not configured."); - return; - } - - requestServers(true); - - } - - private static synchronized void requestServers(RemoteWav2Vec2TranscriptTask task, boolean now) throws IOException { - if (!now && System.currentTimeMillis() - lastUpdateServersTime < UPDATE_SERVERS_INTERVAL_MILLIS) { - return; - } - String[] ipAndPort = task.transcriptConfig.getWav2vec2Service().split(":"); - String ip = ipAndPort[0]; - int port = Integer.parseInt(ipAndPort[1]); - try (Socket client = new Socket(ip, port); - InputStream is = client.getInputStream(); - BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); - PrintWriter writer = new PrintWriter(new OutputStreamWriter(client.getOutputStream(), StandardCharsets.UTF_8), true)) { - - client.setSoTimeout(10000); - writer.println(MESSAGES.DISCOVER); - int numServers = Integer.parseInt(reader.readLine()); - List servers = new ArrayList<>(); - for (int i = 0; i < numServers; i++) { - String[] ipPort = reader.readLine().split(":"); - Server server = new Server(); - server.ip = ipPort[0]; - server.port = Integer.parseInt(ipPort[1]); - servers.add(server); - logger.info("Transcription server discovered: {}:{}", server.ip, server.port); - } - RemoteWav2Vec2TranscriptTask.servers = servers; - lastUpdateServersTime = System.currentTimeMillis(); - } catch (ConnectException e) { - String msg = "Central transcription node refused connection, is it online? " + e.toString(); - if (servers.isEmpty()) { - throw new IPEDException(msg); - } else { - logger.warn(msg); - } - } - } - - private void requestServers(boolean now) throws IOException { - requestServers(this, now); - } - - - @Override - public void finish() throws Exception { - super.finish(); - if (!statsPrinted.getAndSet(true)) { - int numWorkers = this.worker.manager.getNumWorkers(); - DecimalFormat df = new DecimalFormat(); - logger.info("Time spent to send audios: {}s", df.format(audioSendingTime.get() / (1000 * numWorkers))); - logger.info("Time spent to receive transcriptions: {}s", df.format(transcriptReceiveTime.get() / (1000 * numWorkers))); - } - } - - /** - * Returns a transcription server between the discovered ones using a simple - * circular approach. - * - * @return Server instance to use - */ - private static synchronized Server getServer() { - if (servers.isEmpty()) { - throw new IPEDException("No transcription server available!"); - } - currentServer++; - if (currentServer >= servers.size()) { - currentServer = 0; - } - return servers.get(currentServer); - } - - /** - * Don't convert to WAV on client side, return the audio as is. - */ - @Override - protected File getTempFileToTranscript(IItem evidence, TemporaryResources tmp) throws IOException, InterruptedException { - return evidence.getTempFile(); - } - - @Override - protected TextAndScore transcribeAudio(File tmpFile) throws Exception { - - while (true) { - requestServers(false); - Server server = getServer(); - long requestTime = System.currentTimeMillis(); - try (Socket serverSocket = new Socket(server.ip, server.port); - InputStream is = serverSocket.getInputStream(); - BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); - BufferedOutputStream bos = new BufferedOutputStream(serverSocket.getOutputStream())) { - - numConnectErrors.set(0); - - int timeoutSecs = (int) (MIN_TIMEOUT + TIMEOUT_PER_MB * tmpFile.length() / (1 << 20)); - serverSocket.setSoTimeout(1000 * timeoutSecs); - - String response = reader.readLine(); - if (response == null || MESSAGES.BUSY.toString().equals(response)) { - logger.debug("Transcription server {} busy, trying another one.", server); - sleepBeforeRetry(requestTime); - continue; - } - if (!MESSAGES.ACCEPTED.toString().equals(response)) { - logger.error("Error 0 in communication with {}. The audio will be retried.", server); - continue; - } - - logger.debug("Transcription server {} accepted connection", server); - - long t0 = System.currentTimeMillis(); - - bos.write(MESSAGES.VERSION_1_2.toString().getBytes()); - // bos.write("\n".getBytes()); - - bos.write(MESSAGES.AUDIO_SIZE.toString().getBytes()); - - DataOutputStream dos = new DataOutputStream(bos); - // Must use long see #1833 - dos.writeLong(tmpFile.length()); - dos.flush(); - - Files.copy(tmpFile.toPath(), bos); - bos.flush(); - - long t1 = System.currentTimeMillis(); - - response = reader.readLine(); - - while (MESSAGES.PING.toString().equals(response)) { - logger.debug("ping {}", response); - response = reader.readLine(); - } - - if (MESSAGES.WARN.toString().equals(response)) { - String warn = reader.readLine(); - boolean tryAgain = false; - if (warn.contains(TimeoutException.class.getName())) { - // Timeout converting audio to wav, possibly it's corrupted - evidence.setTimeOut(true); - stats.incTimeouts(); - } else if (warn.contains(SocketTimeoutException.class.getName()) || warn.contains(SocketException.class.getName())) { - tryAgain = true; - } - logger.warn("Fail to transcribe on server: {} audio: {} error: {}.{}", server, evidence.getPath(), warn, (tryAgain ? " The audio will be retried." : "")); - if (tryAgain) { - continue; - } - return null; - } - if (MESSAGES.ERROR.toString().equals(response) || response == null) { - String error = response != null ? reader.readLine() : "Remote server process crashed or node was turned off!"; - logger.error("Error 1 in communication with {}: {}. The audio will be retried.", server, error); - throw new SocketException(error); - } - - TextAndScore textAndScore = new TextAndScore(); - textAndScore.score = Double.parseDouble(response); - textAndScore.text = reader.readLine(); - - long t2 = System.currentTimeMillis(); - - if (!MESSAGES.DONE.toString().equals(reader.readLine())) { - logger.error("Error 2 in communication with {}. The audio will be retried.", server); - throw new SocketException("Error receiving transcription."); - } - - audioSendingTime.addAndGet(t1 - t0); - transcriptReceiveTime.addAndGet(t2 - t1); - - return textAndScore; - - } catch (SocketTimeoutException | SocketException e) { - if (e instanceof ConnectException) { - numConnectErrors.incrementAndGet(); - if (numConnectErrors.get() / this.worker.manager.getNumWorkers() >= MAX_CONNECT_ERRORS) { - throw new TooManyConnectException(); - } - sleepBeforeRetry(requestTime); - requestServers(true); - } else { - logger.warn("Network error communicating to server: " + server + ", retrying audio: " + evidence.getPath(), e); - } - } - } - - } - - private void sleepBeforeRetry(long lastRequestTime) { - long sleep = getRetryIntervalMillis() - (System.currentTimeMillis() - lastRequestTime); - if (sleep > 0) { - try { - Thread.sleep(sleep); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - -} +/** + * Used just for backwards compatibility with old config files. + * + * @author Nassif + * + */ +public class RemoteWav2Vec2TranscriptTask extends RemoteTranscriptionTask { + +} \ No newline at end of file From 36a48d96be05ec1b36079f24f821f573164cb8a3 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:10:12 -0300 Subject: [PATCH 12/22] fix conflict --- .../engine/task/transcript/RemoteWav2Vec2TranscriptTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java index 83fb1b17c0..eda8b715de 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteWav2Vec2TranscriptTask.java @@ -8,4 +8,4 @@ */ public class RemoteWav2Vec2TranscriptTask extends RemoteTranscriptionTask { -} \ No newline at end of file +} From 95be1abf0d22d388577048100421690b1d58e238 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:17:08 -0300 Subject: [PATCH 13/22] fix fork conflicts --- .../engine/config/AudioTranscriptConfig.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java index 4a47580272..0b85161de2 100644 --- a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java +++ b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java @@ -31,6 +31,8 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private static final String GOOGLE_MODEL = "googleModel"; private static final String LANG_AUTO_VAL = "auto"; private static final String SKIP_KNOWN_FILES = "skipKnownFiles"; + private static final String PRECISION = "precision"; + private static final String BATCH_SIZE = "batchSize"; private static final String REQUEUE_HEURISTICS = "clientDynamicThreadRequeueHeuristics"; private static final String CLIENTE_HELP = "clientTranscriptHelp"; @@ -52,6 +54,8 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private String wav2vec2Service; private String googleModel; private boolean skipKnownFiles = true; + private String precision = "int8"; + private int batchSize = 1; private boolean requeueHeuristic = false; private boolean clientTranscriptHelp = false; @@ -59,6 +63,14 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private int requeueRatio = 4; private long requeueDeltaTime = 5000; + public String getPrecision() { + return precision; + } + + public int getBatchSize() { + return batchSize; + } + public boolean getSkipKnownFiles() { return this.skipKnownFiles; } @@ -223,6 +235,13 @@ public void processProperties(UTF8Properties properties) { if (value != null) { requeueDeltaTime = Long.valueOf(value.trim()); } + value = properties.getProperty(PRECISION); + if (value != null) { + precision = value.trim(); + } + value = properties.getProperty(BATCH_SIZE); + if (value != null) { + batchSize = Integer.parseInt(value.trim()); } /** From 5bf47e4d78ac0a61775cec19a5ae1ad5a4471324 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:18:51 -0300 Subject: [PATCH 14/22] fix conflits --- .../engine/config/AudioTranscriptConfig.java | 92 +++++-------------- 1 file changed, 25 insertions(+), 67 deletions(-) diff --git a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java index 0b85161de2..7e118e70d4 100644 --- a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java +++ b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java @@ -27,18 +27,14 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private static final String MAX_REQUESTS_KEY = "maxConcurrentRequests"; private static final String MIN_WORD_SCORE = "minWordScore"; public static final String HUGGING_FACE_MODEL = "huggingFaceModel"; + public static final String WHISPER_MODEL = "whisperModel"; public static final String WAV2VEC2_SERVICE = "wav2vec2Service"; + public static final String REMOTE_SERVICE = "remoteServiceAddress"; private static final String GOOGLE_MODEL = "googleModel"; private static final String LANG_AUTO_VAL = "auto"; private static final String SKIP_KNOWN_FILES = "skipKnownFiles"; private static final String PRECISION = "precision"; - private static final String BATCH_SIZE = "batchSize"; - - private static final String REQUEUE_HEURISTICS = "clientDynamicThreadRequeueHeuristics"; - private static final String CLIENTE_HELP = "clientTranscriptHelp"; - private static final String IMPL_CLASS_KEY_CLIENT = "clientTranscriptHelpImplementationClass"; - private static final String REQUEUE_RATIO = "clientSplitQueueRatio"; - private static final String REQUEUE_DELTA_TIME = "clientRequeueDeltaTime"; + private static final String BATCH_SIZE = "batchSize"; private List languages = new ArrayList<>(); private List mimesToProcess = new ArrayList<>(); @@ -51,17 +47,12 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private int maxConcurrentRequests; private float minWordScore = 0.7f; private String huggingFaceModel; - private String wav2vec2Service; + private String whisperModel; + private String remoteService; private String googleModel; private boolean skipKnownFiles = true; private String precision = "int8"; - private int batchSize = 1; - - private boolean requeueHeuristic = false; - private boolean clientTranscriptHelp = false; - private String classNameFallBack = ""; - private int requeueRatio = 4; - private long requeueDeltaTime = 5000; + private int batchSize = 1; public String getPrecision() { return precision; @@ -133,35 +124,18 @@ public String getHuggingFaceModel() { return huggingFaceModel; } - public String getWav2vec2Service() { - return wav2vec2Service; - } - - public String getGoogleModel() { - return googleModel; + public String getWhisperModel() { + return whisperModel; } - public boolean getRequeueHeuristic() { - return requeueHeuristic; + public String getRemoteService() { + return remoteService; } - public boolean getClientTranscriptHelp() { - return clientTranscriptHelp; - } - - public String getClassNameFallBack() { - return classNameFallBack; - } - - public int getRequeueRatio() { - return requeueRatio; + public String getGoogleModel() { + return googleModel; } - public long getRequeueDeltaTime() { - return requeueDeltaTime; - } - - @Override public void processProperties(UTF8Properties properties) { @@ -189,9 +163,17 @@ public void processProperties(UTF8Properties properties) { if (huggingFaceModel != null) { huggingFaceModel = huggingFaceModel.trim(); } - wav2vec2Service = properties.getProperty(WAV2VEC2_SERVICE); - if (wav2vec2Service != null) { - wav2vec2Service = wav2vec2Service.trim(); + whisperModel = properties.getProperty(WHISPER_MODEL); + if (whisperModel != null) { + whisperModel = whisperModel.strip(); + } + + remoteService = properties.getProperty(REMOTE_SERVICE); + if (remoteService == null) { + remoteService = properties.getProperty(WAV2VEC2_SERVICE); + } + if (remoteService != null) { + remoteService = remoteService.trim(); } googleModel = properties.getProperty(GOOGLE_MODEL); if (googleModel != null) { @@ -210,38 +192,14 @@ public void processProperties(UTF8Properties properties) { if (value != null) { timeoutPerSec = Integer.valueOf(value.trim()); } - - value = properties.getProperty(REQUEUE_HEURISTICS); - if (value != null) { - requeueHeuristic = Boolean.valueOf(value.trim()); - } - - value = properties.getProperty(CLIENTE_HELP); - if (value != null) { - clientTranscriptHelp = Boolean.valueOf(value.trim()); - } - - value = properties.getProperty(IMPL_CLASS_KEY_CLIENT); - if (value != null) { - classNameFallBack = value.trim(); - } - - value = properties.getProperty(REQUEUE_RATIO); - if (value != null) { - requeueRatio = Integer.valueOf(value.trim()); - } - - value = properties.getProperty(REQUEUE_DELTA_TIME); - if (value != null) { - requeueDeltaTime = Long.valueOf(value.trim()); - } value = properties.getProperty(PRECISION); if (value != null) { precision = value.trim(); } value = properties.getProperty(BATCH_SIZE); if (value != null) { - batchSize = Integer.parseInt(value.trim()); + batchSize = Integer.parseInt(value.trim()); + } } /** From 728c713ae1953180fd59b937a7ec1ce63cdab39c Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:24:19 -0300 Subject: [PATCH 15/22] fix conflicts --- .../engine/config/AudioTranscriptConfig.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java index 7e118e70d4..ca951889c7 100644 --- a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java +++ b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java @@ -35,6 +35,11 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private static final String SKIP_KNOWN_FILES = "skipKnownFiles"; private static final String PRECISION = "precision"; private static final String BATCH_SIZE = "batchSize"; + private static final String REQUEUE_HEURISTICS = "clientDynamicThreadRequeueHeuristics"; + private static final String CLIENTE_HELP = "clientTranscriptHelp"; + private static final String IMPL_CLASS_KEY_CLIENT = "clientTranscriptHelpImplementationClass"; + private static final String REQUEUE_RATIO = "clientSplitQueueRatio"; + private static final String REQUEUE_DELTA_TIME = "clientRequeueDeltaTime"; private List languages = new ArrayList<>(); private List mimesToProcess = new ArrayList<>(); @@ -53,6 +58,11 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private boolean skipKnownFiles = true; private String precision = "int8"; private int batchSize = 1; + private boolean requeueHeuristic = false; + private boolean clientTranscriptHelp = false; + private String classNameFallBack = ""; + private int requeueRatio = 4; + private long requeueDeltaTime = 5000; public String getPrecision() { return precision; @@ -136,6 +146,26 @@ public String getGoogleModel() { return googleModel; } + public boolean getRequeueHeuristic() { + return requeueHeuristic; + } + + public boolean getClientTranscriptHelp() { + return clientTranscriptHelp; + } + + public String getClassNameFallBack() { + return classNameFallBack; + } + + public int getRequeueRatio() { + return requeueRatio; + } + + public long getRequeueDeltaTime() { + return requeueDeltaTime; + } + @Override public void processProperties(UTF8Properties properties) { @@ -200,6 +230,22 @@ public void processProperties(UTF8Properties properties) { if (value != null) { batchSize = Integer.parseInt(value.trim()); } + value = properties.getProperty(CLIENTE_HELP); + if (value != null) { + clientTranscriptHelp = Boolean.valueOf(value.trim()); + } + value = properties.getProperty(IMPL_CLASS_KEY_CLIENT); + if (value != null) { + classNameFallBack = value.trim(); + } + value = properties.getProperty(REQUEUE_RATIO); + if (value != null) { + requeueRatio = Integer.valueOf(value.trim()); + } + value = properties.getProperty(REQUEUE_DELTA_TIME); + if (value != null) { + requeueDeltaTime = Long.valueOf(value.trim()); + } } /** From e2b1f05f3734acfebeca0f1e1399ed54fea0d44d Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:26:00 -0300 Subject: [PATCH 16/22] fix conflicts --- .../engine/config/AudioTranscriptConfig.java | 46 ------------------- 1 file changed, 46 deletions(-) diff --git a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java index ca951889c7..7e118e70d4 100644 --- a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java +++ b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java @@ -35,11 +35,6 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private static final String SKIP_KNOWN_FILES = "skipKnownFiles"; private static final String PRECISION = "precision"; private static final String BATCH_SIZE = "batchSize"; - private static final String REQUEUE_HEURISTICS = "clientDynamicThreadRequeueHeuristics"; - private static final String CLIENTE_HELP = "clientTranscriptHelp"; - private static final String IMPL_CLASS_KEY_CLIENT = "clientTranscriptHelpImplementationClass"; - private static final String REQUEUE_RATIO = "clientSplitQueueRatio"; - private static final String REQUEUE_DELTA_TIME = "clientRequeueDeltaTime"; private List languages = new ArrayList<>(); private List mimesToProcess = new ArrayList<>(); @@ -58,11 +53,6 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private boolean skipKnownFiles = true; private String precision = "int8"; private int batchSize = 1; - private boolean requeueHeuristic = false; - private boolean clientTranscriptHelp = false; - private String classNameFallBack = ""; - private int requeueRatio = 4; - private long requeueDeltaTime = 5000; public String getPrecision() { return precision; @@ -146,26 +136,6 @@ public String getGoogleModel() { return googleModel; } - public boolean getRequeueHeuristic() { - return requeueHeuristic; - } - - public boolean getClientTranscriptHelp() { - return clientTranscriptHelp; - } - - public String getClassNameFallBack() { - return classNameFallBack; - } - - public int getRequeueRatio() { - return requeueRatio; - } - - public long getRequeueDeltaTime() { - return requeueDeltaTime; - } - @Override public void processProperties(UTF8Properties properties) { @@ -230,22 +200,6 @@ public void processProperties(UTF8Properties properties) { if (value != null) { batchSize = Integer.parseInt(value.trim()); } - value = properties.getProperty(CLIENTE_HELP); - if (value != null) { - clientTranscriptHelp = Boolean.valueOf(value.trim()); - } - value = properties.getProperty(IMPL_CLASS_KEY_CLIENT); - if (value != null) { - classNameFallBack = value.trim(); - } - value = properties.getProperty(REQUEUE_RATIO); - if (value != null) { - requeueRatio = Integer.valueOf(value.trim()); - } - value = properties.getProperty(REQUEUE_DELTA_TIME); - if (value != null) { - requeueDeltaTime = Long.valueOf(value.trim()); - } } /** From 4c434b80ec1cabcb831f6786729b222163e060ca Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:28:58 -0300 Subject: [PATCH 17/22] fix conflicts --- .../engine/config/AudioTranscriptConfig.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java index 7e118e70d4..ca951889c7 100644 --- a/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java +++ b/iped-engine/src/main/java/iped/engine/config/AudioTranscriptConfig.java @@ -35,6 +35,11 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private static final String SKIP_KNOWN_FILES = "skipKnownFiles"; private static final String PRECISION = "precision"; private static final String BATCH_SIZE = "batchSize"; + private static final String REQUEUE_HEURISTICS = "clientDynamicThreadRequeueHeuristics"; + private static final String CLIENTE_HELP = "clientTranscriptHelp"; + private static final String IMPL_CLASS_KEY_CLIENT = "clientTranscriptHelpImplementationClass"; + private static final String REQUEUE_RATIO = "clientSplitQueueRatio"; + private static final String REQUEUE_DELTA_TIME = "clientRequeueDeltaTime"; private List languages = new ArrayList<>(); private List mimesToProcess = new ArrayList<>(); @@ -53,6 +58,11 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig { private boolean skipKnownFiles = true; private String precision = "int8"; private int batchSize = 1; + private boolean requeueHeuristic = false; + private boolean clientTranscriptHelp = false; + private String classNameFallBack = ""; + private int requeueRatio = 4; + private long requeueDeltaTime = 5000; public String getPrecision() { return precision; @@ -136,6 +146,26 @@ public String getGoogleModel() { return googleModel; } + public boolean getRequeueHeuristic() { + return requeueHeuristic; + } + + public boolean getClientTranscriptHelp() { + return clientTranscriptHelp; + } + + public String getClassNameFallBack() { + return classNameFallBack; + } + + public int getRequeueRatio() { + return requeueRatio; + } + + public long getRequeueDeltaTime() { + return requeueDeltaTime; + } + @Override public void processProperties(UTF8Properties properties) { @@ -200,6 +230,22 @@ public void processProperties(UTF8Properties properties) { if (value != null) { batchSize = Integer.parseInt(value.trim()); } + value = properties.getProperty(CLIENTE_HELP); + if (value != null) { + clientTranscriptHelp = Boolean.valueOf(value.trim()); + } + value = properties.getProperty(IMPL_CLASS_KEY_CLIENT); + if (value != null) { + classNameFallBack = value.trim(); + } + value = properties.getProperty(REQUEUE_RATIO); + if (value != null) { + requeueRatio = Integer.valueOf(value.trim()); + } + value = properties.getProperty(REQUEUE_DELTA_TIME); + if (value != null) { + requeueDeltaTime = Long.valueOf(value.trim()); + } } /** From 0d870f3c6fea7284f93c8e9758e9c69c10edb27b Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:31:50 -0300 Subject: [PATCH 18/22] fix conflicts --- .../engine/task/transcript/RemoteTranscriptionTask.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java index 8c321db98f..fe7b8fab51 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java @@ -81,6 +81,13 @@ public void init(ConfigurationManager configurationManager) throws Exception { super.init(configurationManager); + this.setRemoteTask(true); + this.setRequeueHeuristic(transcriptConfig.getRequeueHeuristic()); + this.setClientTranscriptHelp(transcriptConfig.getClientTranscriptHelp()); + this.setClassNameFallBack(transcriptConfig.getClassNameFallBack()); + this.setRequeueRatio(transcriptConfig.getRequeueRatio()); + this.setRequeueDeltaTime(transcriptConfig.getRequeueDeltaTime()); + if (!this.isEnabled()) { return; } From 7eaa8c8335a2469c72a1101be85db74c232c2481 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 30 Dec 2024 16:28:27 -0300 Subject: [PATCH 19/22] fix conflict --- .../iped/engine/task/transcript/RemoteTranscriptionTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java index fe7b8fab51..adeb5e60d1 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java @@ -88,7 +88,7 @@ public void init(ConfigurationManager configurationManager) throws Exception { this.setRequeueRatio(transcriptConfig.getRequeueRatio()); this.setRequeueDeltaTime(transcriptConfig.getRequeueDeltaTime()); - if (!this.isEnabled()) { + if (!isEnabled()) { return; } From 5002684aa855ec69fcfd7cc3e9535479dc0e2793 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 30 Dec 2024 16:29:33 -0300 Subject: [PATCH 20/22] fix conflict --- .../task/transcript/RemoteTranscriptionTask.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java index adeb5e60d1..b5d91e11a4 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java @@ -81,16 +81,7 @@ public void init(ConfigurationManager configurationManager) throws Exception { super.init(configurationManager); - this.setRemoteTask(true); - this.setRequeueHeuristic(transcriptConfig.getRequeueHeuristic()); - this.setClientTranscriptHelp(transcriptConfig.getClientTranscriptHelp()); - this.setClassNameFallBack(transcriptConfig.getClassNameFallBack()); - this.setRequeueRatio(transcriptConfig.getRequeueRatio()); - this.setRequeueDeltaTime(transcriptConfig.getRequeueDeltaTime()); - - if (!isEnabled()) { - return; - } + if (!servers.isEmpty()) { return; From 2c26c82a71e21968469a4cba2257644cd050525f Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 30 Dec 2024 16:30:48 -0300 Subject: [PATCH 21/22] fix conflict --- .../iped/engine/task/transcript/RemoteTranscriptionTask.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java index b5d91e11a4..46970396d8 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java @@ -81,7 +81,9 @@ public void init(ConfigurationManager configurationManager) throws Exception { super.init(configurationManager); - + if (!isEnabled()) { + return; + } if (!servers.isEmpty()) { return; From 7878e604a0535b1e41eed584c3429df14559f4a8 Mon Sep 17 00:00:00 2001 From: gfd2020 <59742865+gfd2020@users.noreply.github.com> Date: Mon, 30 Dec 2024 16:33:18 -0300 Subject: [PATCH 22/22] fix conflict --- .../engine/task/transcript/RemoteTranscriptionTask.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java index 678f835b37..c47773e122 100644 --- a/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java +++ b/iped-engine/src/main/java/iped/engine/task/transcript/RemoteTranscriptionTask.java @@ -83,7 +83,14 @@ public void init(ConfigurationManager configurationManager) throws Exception { super.init(configurationManager); - if (!isEnabled()) { + this.setRemoteTask(true); + this.setRequeueHeuristic(transcriptConfig.getRequeueHeuristic()); + this.setClientTranscriptHelp(transcriptConfig.getClientTranscriptHelp()); + this.setClassNameFallBack(transcriptConfig.getClassNameFallBack()); + this.setRequeueRatio(transcriptConfig.getRequeueRatio()); + this.setRequeueDeltaTime(transcriptConfig.getRequeueDeltaTime()); + + if (!this.isEnabled()) { return; }