Skip to content

Commit

Permalink
Merge pull request #117 from DataONEorg/feature-105-hashstore
Browse files Browse the repository at this point in the history
Feature 105 hashstore
  • Loading branch information
taojing2002 authored Aug 30, 2024
2 parents 2c66f94 + 9bc801c commit 408a5c4
Show file tree
Hide file tree
Showing 17 changed files with 656 additions and 791 deletions.
9 changes: 9 additions & 0 deletions helm/config/dataone-indexer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ index.resourcemap.waitingComponent.time={{ default 800 .Values.idxworker.resourc
index.resourcemap.waitingComponent.max.attempts={{ default 25 .Values.idxworker.resourcemapMaxTries }}
index.solr.versionConflict.waiting.time={{ default 1000 .Values.idxworker.solrVerConflictWaitMs }}
index.solr.versionConflict.max.attempts={{ default 50 .Values.idxworker.solrVerConflictMaxTries }}

# Storage properties
storage.className={{ default "org.dataone.hashstore.filehashstore.FileHashStore" .Values.idxworker.storage.hashStoreClassName }}
storage.hashstore.rootDirectory={{ default "./target/hashstore" .Values.idxworker.storage.hashStoreRootDir }}
storage.hashstore.defaultNamespace={{ default "https://ns.dataone.org/service/types/v2.0#SystemMetadata" .Values.idxworker.storage.hashStoreDefaultNamespace }}
# The following three properties must NOT be modified after the hash store is initialized
storage.hashstore.fileNameAlgorithm={{ default "SHA-256" .Values.idxworker.storage.hashStoreAlgorithm }}
storage.hashstore.directory.width={{ default 2 .Values.idxworker.storage.hashStoreDirWidth }}
storage.hashstore.directory.depth={{ default 3 .Values.idxworker.storage.hashStoreDirDepth }}
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@
<artifactId>jaxb-runtime</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.dataone</groupId>
<artifactId>hashstore</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/org/dataone/cn/indexer/IndexWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -427,23 +427,21 @@ private void indexObject(IndexQueueMessageParser parser, boolean multipleThread)
Identifier pid = parser.getIdentifier();
String indexType = parser.getIndexType();
int priority = parser.getPriority();
String finalFilePath = parser.getObjectPath();
try {
long threadId = Thread.currentThread().getId();
logger.info("IndexWorker.consumer.indexObject by multiple thread? " + multipleThread
+ ", with the thread id " + threadId
+ " - Received the index task from the index queue with the identifier: "
+ pid.getValue() + " , the index type: " + indexType
+ ", the file path (null means not to have): " + finalFilePath
+ ", the priority: " + priority);
switch (indexType) {
case CREATE_INDEXT_TYPE -> {
boolean sysmetaOnly = false;
solrIndex.update(pid, finalFilePath, sysmetaOnly);
solrIndex.update(pid, sysmetaOnly);
}
case SYSMETA_CHANGE_TYPE -> {
boolean sysmetaOnly = true;
solrIndex.update(pid, finalFilePath, sysmetaOnly);
solrIndex.update(pid, sysmetaOnly);
}
case DELETE_INDEX_TYPE -> solrIndex.remove(pid);
default -> throw new InvalidRequest(
Expand All @@ -455,7 +453,6 @@ private void indexObject(IndexQueueMessageParser parser, boolean multipleThread)
logger.info("IndexWorker.indexOjbect with the thread id " + threadId
+ " - Completed the index task from the index queue with the identifier: "
+ pid.getValue() + " , the index type: " + indexType
+ ", the file path (null means not to have): " + finalFilePath
+ ", the priority: " + priority + " and the time taking is "
+ (end - start) + " milliseconds");

Expand Down
401 changes: 158 additions & 243 deletions src/main/java/org/dataone/cn/indexer/SolrIndex.java

Large diffs are not rendered by default.

309 changes: 109 additions & 200 deletions src/main/java/org/dataone/cn/indexer/object/ObjectManager.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -80,42 +80,6 @@ public static Identifier getPid(Identifier identifier)
return pid;
}

/**
* Check if the given identifier is a PID or a SID
*
* @param identifier
* @return true if the identifier is a SID, false if a PID
* @throws NotFound
* @throws ServiceFailure
* @throws NotImplemented
* @throws NotAuthorized
* @throws InvalidToken
* @throws MarshallingException
* @throws IOException
* @throws IllegalAccessException
* @throws InstantiationException
*/
public static boolean isSeriesId(Identifier identifier)
throws InvalidToken, NotAuthorized, NotImplemented, ServiceFailure, NotFound,
InstantiationException, IllegalAccessException, IOException, MarshallingException {

// if we have system metadata available via HZ map, then it's a PID
String relativeObjPath = null;//we don't know the path
SystemMetadata systemMetadata =
ObjectManager.getInstance().getSystemMetadata(identifier.getValue(), relativeObjPath);
if (systemMetadata != null) {
return false;
}

//TODO: check that it's not just bogus value by looking up the pid?
// Identifier pid = getPid(identifier);
// if (pid.equals(identifier)) {
// return false;
// }

// okay, it's a SID
return true;

}

}
Original file line number Diff line number Diff line change
@@ -1,25 +1,3 @@
/**
* This work was created by participants in the DataONE project, and is
* jointly copyrighted by participating institutions in DataONE. For
* more information on DataONE, see our web site at http://dataone.org.
*
* Copyright ${year}
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* $Id$
*/

package org.dataone.cn.indexer.resourcemap;

import java.io.ByteArrayOutputStream;
Expand All @@ -29,6 +7,7 @@
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -218,60 +197,65 @@ private void _init(InputStream is) throws OREException, URISyntaxException,
public static boolean representsResourceMap(String formatId) {
return RESOURCE_MAP_FORMAT.equals(formatId);
}

private boolean isHeadVersion(Identifier pid, Identifier sid) {
boolean isHead = true;
if(pid != null && sid != null) {
/*Identifier newId = new Identifier();
newId.setValue("peggym.130.5");
if(pid.getValue().equals("peggym.130.4") && HazelcastClientFactory.getSystemMetadataMap().get(newId) != null) {
isHead =false;
} else if (pid.getValue().equals("peggym.130.4") && HazelcastClientFactory.getSystemMetadataMap().get(newId) == null) {
isHead = true;
}*/
Identifier head = null;
try {
head = SeriesIdResolver.getPid(sid);//if the passed sid actually is a pid, the method will return the pid.
//if the passed sid actually is a pid, the method will return the pid.
head = SeriesIdResolver.getPid(sid);
} catch (Exception e) {
System.out.println(""+e.getStackTrace());
isHead = true;
}
if(head != null ) {
//System.out.println("||||||||||||||||||| the head version is "+ head.getValue()+" for sid "+sid.getValue());
logger.info("||||||||||||||||||| the head version is "+ head.getValue()+" for sid "+sid.getValue());

logger.info("||||||||||||||||||| the head version is " + head.getValue()
+ " for sid " + sid.getValue());
if(head.equals(pid)) {
logger.info("||||||||||||||||||| the pid "+ pid.getValue()+" is the head version for sid "+sid.getValue());
logger.info("||||||||||||||||||| the pid " + pid.getValue()
+ " is the head version for sid " + sid.getValue());
isHead=true;
} else {
logger.info("||||||||||||||||||| the pid "+ pid.getValue()+" is NOT the head version for sid "+sid.getValue());
logger.info("||||||||||||||||||| the pid " + pid.getValue()
+ " is NOT the head version for sid " + sid.getValue());
isHead=false;
}
} else {
//System.out.println("||||||||||||||||||| can't find the head version for sid "+sid.getValue());
logger.info("||||||||||||||||||| can't find the head version for sid "+sid.getValue() + " and we think the given pid "+pid.getValue()+" is the head version.");
logger.info("||||||||||||||||||| can't find the head version for sid "
+ sid.getValue() + " and we think the given pid " + pid.getValue()
+ " is the head version.");
}
}
return isHead;
}

private SolrDoc _mergeMappedReference(ResourceEntry resourceEntry, SolrDoc mergeDocument) throws InvalidToken, NotAuthorized, NotImplemented,
ServiceFailure, NotFound, InstantiationException, IllegalAccessException, IOException, MarshallingException {

Identifier identifier = new Identifier();
identifier.setValue(mergeDocument.getIdentifier());
//SystemMetadata sysMeta = HazelcastClientFactory.getSystemMetadataMap().get(identifier);
String relativeObjPath = null; //we don't know the path
SystemMetadata sysMeta = ObjectManager.getInstance().getSystemMetadata(identifier.getValue(), relativeObjPath);
if (sysMeta.getSeriesId() != null && sysMeta.getSeriesId().getValue() != null && !sysMeta.getSeriesId().getValue().trim().equals("")) {
// skip this one
if(!isHeadVersion(identifier, sysMeta.getSeriesId())) {
//System.out.println("The id "+identifier+" is not the head of the serial id "+sysMeta.getSeriesId().getValue()+" So, skip merge this one!!!!!!!!!!!!!!!!!!!!!!"+mergeDocument.getIdentifier());
logger.info("The id "+identifier+" is not the head of the serial id "+sysMeta.getSeriesId().getValue()+" So, skip merge this one!!!!!!!!!!!!!!!!!!!!!!"+mergeDocument.getIdentifier());
return mergeDocument;
}

}

private SolrDoc _mergeMappedReference(ResourceEntry resourceEntry, SolrDoc mergeDocument)
throws InvalidToken, NotAuthorized, NotImplemented,
NoSuchAlgorithmException, ServiceFailure, NotFound, InstantiationException,
IllegalAccessException, IOException, MarshallingException {

Identifier identifier = new Identifier();
identifier.setValue(mergeDocument.getIdentifier());
try {
SystemMetadata sysMeta = (SystemMetadata) ObjectManager.getInstance()
.getSystemMetadata(identifier.getValue());
if (sysMeta.getSeriesId() != null && sysMeta.getSeriesId().getValue() != null
&& !sysMeta.getSeriesId().getValue().trim().equals("")) {
// skip this one
if(!isHeadVersion(identifier, sysMeta.getSeriesId())) {
logger.info("The id " + identifier + " is not the head of the serial id "
+ sysMeta.getSeriesId().getValue()
+ " So, skip merge this one!!!!!!!!!!!!!!!!!!!!!!"
+ mergeDocument.getIdentifier());
return mergeDocument;
}
}
} catch (ClassCastException e) {
logger.warn("The systemmetadata is a v1 object and we need to do nothing");
}


if (mergeDocument.hasField(SolrElementField.FIELD_ID) == false) {
mergeDocument.addField(new SolrElementField(SolrElementField.FIELD_ID, resourceEntry
.getIdentifier()));
Expand Down Expand Up @@ -362,19 +346,22 @@ public List<SolrDoc> mergeIndexedDocuments(List<SolrDoc> docs) {
List<SolrDoc> mergedDocuments = new ArrayList<SolrDoc>();
for (ResourceEntry resourceEntry : this.resourceMap.values()) {
for (SolrDoc doc : docs) {
//System.out.println(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the doc id is "+doc.getIdentifier() +" in the thread "+Thread.currentThread().getId());
//System.out.println(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the doc series id is "+doc.getSeriesId()+" in the thread "+Thread.currentThread().getId());
//System.out.println(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the resource entry id is "+resourceEntry.getIdentifier()+" in the thread "+Thread.currentThread().getId());
logger.debug(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the doc id is "+doc.getIdentifier() +" in the thread "+Thread.currentThread().getId());
logger.debug(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the doc series id is "+doc.getSeriesId()+" in the thread "+Thread.currentThread().getId());
logger.debug(">>>>>>>>in mergeIndexedDocuments of ForesiteResourceMap, the resource entry id is "+resourceEntry.getIdentifier()+" in the thread "+Thread.currentThread().getId());

logger.debug("in mergeIndexedDocuments of ForesiteResourceMap, the doc id is "
+ doc.getIdentifier() + " in the thread "+Thread.currentThread().getId());
logger.debug("in mergeIndexedDocuments of ForesiteResourceMap, the doc series id is "
+ doc.getSeriesId() + " in the thread "+Thread.currentThread().getId());
logger.debug("in mergeIndexedDocuments of ForesiteResourceMap, the resource entry id is "
+ resourceEntry.getIdentifier() + " in the thread "
+ Thread.currentThread().getId());

if (doc.getIdentifier().equals(resourceEntry.getIdentifier())
|| resourceEntry.getIdentifier().equals(doc.getSeriesId())) {
try {
mergedDocuments.add(_mergeMappedReference(resourceEntry, doc));
} catch (Exception e) {
logger.error("ForestieResourceMap.mergeIndexedDocuments - cannot merge the document since " + e.getMessage());
logger.error("ForestieResourceMap.mergeIndexedDocuments - cannot merge the document since "
+ e.getMessage());
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.dataone.cn.indexer.resourcemap;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;

import org.apache.log4j.Logger;
import org.dataone.cn.indexer.object.ObjectManager;
Expand All @@ -12,7 +13,7 @@
import org.dataone.service.exceptions.NotImplemented;
import org.dataone.service.exceptions.ServiceFailure;
import org.dataone.service.types.v1.Identifier;
import org.dataone.service.types.v2.SystemMetadata;
import org.dataone.service.types.v1.SystemMetadata;

public class IndexVisibilityDelegateImpl implements IndexVisibilityDelegate {

Expand All @@ -25,10 +26,8 @@ public IndexVisibilityDelegateImpl() {
public boolean isDocumentVisible(Identifier pid) {
boolean visible = false;
try {

//SystemMetadata systemMetadata = HazelcastClientFactory.getSystemMetadataMap().get(pid);
String relativeObjPath = null; //we don't know the path
SystemMetadata systemMetadata = ObjectManager.getInstance().getSystemMetadata(pid.getValue(), relativeObjPath);
SystemMetadata systemMetadata = ObjectManager.getInstance()
.getSystemMetadata(pid.getValue());
// TODO: Is pid Identifier a SID?
if (systemMetadata == null) {
return true;
Expand Down Expand Up @@ -56,16 +55,16 @@ public boolean isDocumentVisible(Identifier pid) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
} catch (MarshallingException e) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
} catch (NoSuchAlgorithmException e) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
}
return visible;
}

public boolean documentExists(Identifier pid) {
boolean exists = false;
try {
//SystemMetadata systemMetadata = HazelcastClientFactory.getSystemMetadataMap().get(pid);
String relativeObjPath = null; //we don't know the path
SystemMetadata systemMetadata = ObjectManager.getInstance().getSystemMetadata(pid.getValue(), relativeObjPath);
SystemMetadata systemMetadata = ObjectManager.getInstance().getSystemMetadata(pid.getValue());
if (systemMetadata != null) {
exists = true;
} else {
Expand All @@ -92,6 +91,8 @@ public boolean documentExists(Identifier pid) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
} catch (MarshallingException e) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
} catch (NoSuchAlgorithmException e) {
logger.warn("Could not get visible value for pid: " + pid.getValue() + " since " +e.getMessage());
}
return exists;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/dataone/cn/indexer/solrhttp/SolrDoc.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.dataone.service.types.v2.SystemMetadata;
import org.dataone.service.types.v1.SystemMetadata;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
Expand Down
Loading

0 comments on commit 408a5c4

Please sign in to comment.