Skip to content

Commit

Permalink
Implemented akubra migration
Browse files Browse the repository at this point in the history
  • Loading branch information
vlahoda committed Dec 6, 2019
1 parent 719a6ba commit 2187af5
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 164 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,49 @@
package cz.incad.migration;

import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
import com.qbizm.kramerius.imp.jaxb.DigitalObject;
import cz.incad.kramerius.FedoraAccess;
import cz.incad.kramerius.fedora.RepoModule;
import cz.incad.kramerius.resourceindex.ProcessingIndexFeeder;
import cz.incad.kramerius.resourceindex.ResourceIndexModule;
import cz.incad.kramerius.solr.SolrModule;
import cz.incad.kramerius.statistics.NullStatisticsModule;
import cz.incad.kramerius.utils.RESTHelper;
import cz.incad.kramerius.utils.XMLUtils;
import cz.incad.kramerius.utils.conf.KConfiguration;
import org.akubraproject.map.IdMapper;
import org.antlr.stringtemplate.StringTemplate;
import org.apache.commons.io.FileUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.fcrepo.server.storage.lowlevel.akubra.HashPathIdMapper;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import java.io.*;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

import static cz.incad.kramerius.resourceindex.ProcessingIndexRebuild.rebuildProcessingIndex;
import static cz.incad.migration.LegacyMigrationParts.LOG_MESSAGE_ITERATION;
import static cz.incad.migration.Utils.BUILDER;
import static cz.incad.migration.Utils.MD5;
import static cz.incad.kramerius.utils.XMLUtils.*;
Expand All @@ -30,172 +53,94 @@ public enum AkubraMigrationParts {

OBJECT_AND_STREAMS {
@Override
public void doMigrationPart() throws SQLException, IOException, SAXException {
public void doMigrationPart(String[] args) throws SQLException, IOException, SAXException {
long start = System.currentTimeMillis();
try {
String objectsSource = KConfiguration.getInstance().getProperty("akubrafs.objects.source");
String objectsTarget = KConfiguration.getInstance().getProperty("akubrafs.objects.target");

String streamsSource = KConfiguration.getInstance().getProperty("akubrafs.streams.source");
String streamsTarget = KConfiguration.getInstance().getProperty("akubrafs.streams.target");

solrSelect(new File(objectsTarget), (pid)->{
try {
if (!pid.contains("/@")) {
String hex = Utils.asHex(MD5.digest(("info:fedora/"+pid).getBytes(Charset.forName("UTF-8"))));

File sourceDirectory = Utils.directory(new File(objectsSource), hex, 2, 1);
File sourceFile = new File(sourceDirectory, Utils.encode("info:fedora/" + pid));

File targetDirectory = Utils.directory(new File(objectsTarget), hex, 2, 3);
File targetFile = new File(targetDirectory, sourceFile.getName());

if (!targetDirectory.exists()) { targetDirectory.mkdirs(); }

FileUtils.moveFile(sourceFile , targetFile);

Document parsed = BUILDER.parse(targetFile);
Element rootElement = parsed.getDocumentElement();

NodeList childNodes = rootElement.getChildNodes();
for (int j=0,lj=childNodes.getLength();j<lj;j++) {
Node n = childNodes.item(j);
if (n.getNodeType() == Node.ELEMENT_NODE) {
Element elm = (Element) n;
String state = elm.getAttribute("STATE");
String controlGroup =elm.getAttribute("CONTROL_GROUP");
if (state.equals("A") && controlGroup.equals("M")) {
NodeList contentLocation = elm.getElementsByTagNameNS("info:fedora/fedora-system:def/foxml#","contentLocation");
if (contentLocation.getLength() == 1) {
Element elemLocation = (Element) contentLocation.item(0);
String type = elemLocation.getAttribute("TYPE");
if (type.equals("INTERNAL_ID")) {
String ref = elemLocation.getAttribute("REF");

String s = ref.replaceAll("\\+", "/");

String hexStream = Utils.asHex(MD5.digest(("info:fedora/"+s).getBytes(Charset.forName("UTF-8"))));
File sourceStreamFolder = Utils.directory(new File(streamsSource), hexStream, 2, 1);
File destStreamFolder = Utils.directory(new File(streamsTarget), hexStream, 2, 3);

FileUtils.moveFileToDirectory(new File(sourceStreamFolder, Utils.encode("info:fedora/"+s)) , destStreamFolder, true);
}
}
}
}
}
}
} catch (SAXException e) {
LOGGER.log(Level.SEVERE,e.getMessage(),e);
} catch (IOException e) {
LOGGER.log(Level.SEVERE,e.getMessage(),e);
}
});
}catch(IOException ex) {
String objectSource = KConfiguration.getInstance().getProperty("objectStore.migrationsource");
String objectPaths = KConfiguration.getInstance().getProperty("objectStore.path");
String objectPattern = KConfiguration.getInstance().getProperty("objectStore.pattern");


String datastreamSource = KConfiguration.getInstance().getProperty("datastreamStore.migrationsource");
String datastreamPaths = KConfiguration.getInstance().getProperty("datastreamStore.path");
String datastreamPattern = KConfiguration.getInstance().getProperty("datastreamStore.pattern");

Injector injector = Guice.createInjector(new SolrModule(), new ResourceIndexModule(), new RepoModule(), new NullStatisticsModule());
final ProcessingIndexFeeder feeder = injector.getInstance(ProcessingIndexFeeder.class);
final boolean rebuildProcessingIndex = "true".equalsIgnoreCase(args[1]);
processRoot( feeder, datastreamSource, datastreamPaths, datastreamPattern, false);
processRoot( feeder, objectSource, objectPaths, objectPattern, rebuildProcessingIndex);

}catch(Exception ex) {
throw new RuntimeException(ex);
} finally {
long stop = System.currentTimeMillis();
LOGGER.info("POKUS "+(stop - start )+ " ms");
LOGGER.info("AKubra repository restructured in "+(stop - start )+ " ms");
}
}

};

private static void processRoot(ProcessingIndexFeeder feeder, String datastreamSource, String datastreamPaths, String datastreamPattern, boolean rebuildProcessingIndex) throws IOException, SolrServerException {
try {

abstract void doMigrationPart() throws SQLException, IOException, SAXException;


static void fileSelect(File targetDir, Consumer<String> consumer) throws IOException {
File file = new File("pids_1.txt");
FileReader freader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(freader);
String line = null;
while((line = bufferedReader.readLine()) != null) {
consumer.accept(line);
}
}


if (rebuildProcessingIndex) {
feeder.deleteProcessingIndex();
}
Path objectStoreRoot = Paths.get(datastreamSource);
IdMapper idMapper = new HashPathIdMapper(datastreamPattern);
final AtomicInteger currentIteration = new AtomicInteger(0);
Files.walk(objectStoreRoot).parallel().filter(Files::isRegularFile).forEach(path -> {
try {
if ((currentIteration.incrementAndGet() % LOG_MESSAGE_ITERATION) == 0) {
LOGGER.info("Migrated " + currentIteration + " items.");
}
String filename = "";try {
filename = java.net.URLDecoder.decode(path.getFileName().toString(), StandardCharsets.UTF_8.name());
filename = filename.replace("info:fedora/", "");
filename = filename.replace("/", "+");
} catch (UnsupportedEncodingException e) {
// not going to happen - value came from JDK's own StandardCharsets
}
String internalId = idMapper.getInternalId(LegacyMigrationParts.getBlobId(filename)).toString();
String subdirPath = internalId.substring(internalId.indexOf(":") + 1, internalId.lastIndexOf("/"));
String targetFileName = internalId.substring(internalId.lastIndexOf("/") + 1);
File directory = new File(datastreamPaths, subdirPath);
directory.mkdirs();

File targetFile = new File(directory, targetFileName);
boolean renamed = path.toFile().renameTo(targetFile);
if (!renamed) {
throw new RuntimeException("Cannot rename file " + path + " to " + targetFile.getAbsolutePath());
}

static void solrSelect(File targetDir, Consumer<String> consumer) throws SQLException, IOException, SAXException {
int start = 0;
int numFound = Integer.MAX_VALUE;
Element rootElm = returnStream(start);
Element result = results(rootElm);
numFound = Integer.parseInt(result.getAttribute("numFound"));
do {
List<Element> docs = getElements(result, new ElementsFilter() {
@Override
public boolean acceptElement(Element element) {
String localName = element.getLocalName();
return localName.equals("doc");
}
});
docs.stream().forEach((doc) -> {
Element pidElm = XMLUtils.findElement(doc, new ElementsFilter() {
@Override
public boolean acceptElement(Element field) {
if (field.getLocalName().equals("str")) {
return (field.getAttribute("name").equals("PID"));
}
return false;
if (rebuildProcessingIndex) {
FileInputStream inputStream = new FileInputStream(targetFile);
DigitalObject digitalObject = LegacyMigrationParts.createDigitalObject(inputStream);
rebuildProcessingIndex(feeder, digitalObject);
}
});
consumer.accept(pidElm.getTextContent());
} catch (Exception ex) {
LOGGER.log(Level.SEVERE, "Error processing file: ", ex);
}
});
start += ROWS;
result = results(returnStream(start));
} while(start < numFound);
}

private static Element results(Element rootElm) {
Element result = findElement(rootElm, new ElementsFilter() {
@Override
public boolean acceptElement(Element element) {
return element.getLocalName().equals("result");
} catch (Exception ex) {
LOGGER.log(Level.SEVERE, "Error processing file: ", ex);
} finally {
if (feeder != null) {
feeder.commit();
LOGGER.info("Feeder commited.");
}
});
return result;
}
}

private static Element returnStream(int start) throws IOException, SAXException {
long startRequest = System.currentTimeMillis();

String reduce = "\""+ Arrays.asList(SOURCES).stream().reduce("", (i, t) -> {
if (i.length()==0) {
return t;
} else {
return i + "\" OR \"" + t;
}
})+"\"";
abstract void doMigrationPart(String[] args) throws SQLException, IOException, SAXException;

StringTemplate template = new StringTemplate(CDK_ADDRESS);
template.setAttribute("condition", URLEncoder.encode(reduce,"UTF-8"));
template.setAttribute("start", start);
template.setAttribute("rows", ROWS);

InputStream inputStream = RESTHelper.inputStream(template.toString(), "", "");
Document parsed = Utils.BUILDER.parse(inputStream);
LOGGER.info("\tRequest "+template.toString()+" took "+(System.currentTimeMillis() - startRequest)+ " ms ");
return parsed.getDocumentElement();
}


public static String[] SOURCES = new String[] {
"vc:3c06120c-ffc0-4b96-b8df-80bc12e030d9",
"vc:b7b1b67a-25d1-4055-905d-45fedfc6a2b5",
"vc:b7b1b67a-25d1-4055-905d-45fedfc6a2b5",
"vc:c4bb27af-3a51-4ac2-95c7-fd393b489e26",
"vc:cd324f70-c034-46f1-9674-e0df4f93de86",
"vc:d34ba74b-026a-4c60-aee7-9250a307952c",
"vc:d4b466de-5435-4b76-bff7-2838bbae747b",
"vc:f750b424-bda4-4113-849a-5e9dbbfb5846"
};

public static final int ROWS = 2000;
static Logger LOGGER = Logger.getLogger(AkubraMigrationParts.class.getName());

public static final String CDK_ADDRESS="https://cdk.lib.cas.cz/search/api/v5.0/search?q=collection:($condition$)&fl=collection,PID&rows=$rows$&start=$start$";

static Logger LOGGER = Logger.getLogger(AkubraMigrationParts.class.getName());

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import cz.incad.kramerius.utils.database.JDBCQueryTemplate;
import org.akubraproject.map.IdMapper;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.solr.client.solrj.SolrServerException;
import org.fcrepo.common.Constants;
import org.fcrepo.common.FaultException;
import org.fcrepo.common.PID;
Expand Down Expand Up @@ -69,6 +70,11 @@ public void doMigrationPart(Connection db, String[] args) throws SQLException {
String objectPattern = KConfiguration.getInstance().getProperty("objectStore.pattern");
Consumer<File> consumer = null;
if ("true".equalsIgnoreCase(args[3])) {
try {
feeder.deleteProcessingIndex();
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error in deleteProcessingIndex: ", e);
}
consumer = (f) -> {
try {
FileInputStream inputStream = new FileInputStream(f);
Expand Down Expand Up @@ -140,7 +146,7 @@ public boolean handleRow(ResultSet rs, List<Pair<String, String>> returnsList) t
}.executeQuery(sqlCommand);
}

private static DigitalObject createDigitalObject(InputStream inputStream) {
static DigitalObject createDigitalObject(InputStream inputStream) {
DigitalObject obj = null;
try {
synchronized (unmarshaller) {
Expand Down Expand Up @@ -170,9 +176,9 @@ private static DigitalObject createDigitalObject(InputStream inputStream) {


// Message after 60 iterations
static int LOG_MESSAGE_ITERATION = 10000;
static int LOG_MESSAGE_ITERATION = KConfiguration.getInstance().getConfiguration().getInt("akubra.migration.logfrequency", 10000);

private static URI getBlobId(String token) {
static URI getBlobId(String token) {
try {
int i = token.indexOf('+');
if (i == -1) {
Expand Down
Loading

0 comments on commit 2187af5

Please sign in to comment.