Skip to content

Commit

Permalink
Merge pull request #340 from cumulus-nasa/CUMULUS-527---Parse-pdr-que…
Browse files Browse the repository at this point in the history
…ues-up-selected-granules

Cumulus 527   parse pdr queues up selected granules
  • Loading branch information
Marc authored May 14, 2018
2 parents 2d5e0c0 + 0142023 commit 31e506a
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 7 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- updated cmrjs.deleteConcept to return success if the record is not found in CMR.

### Added
- **CUMULUS-527 - "parse-pdr queues up all granules and ignores regex"**
- Add an optional config property to the ParsePdr task called
"granuleIdFilter". This property is a regular expression that is applied
against the filename of the first file of each granule contained in the
PDR. If the regular expression matches, then the granule is included in
the output. Defaults to '.', which will match all granules in the PDR.
- File checksums in PDRs now support MD5
- Deployment support to subscribe to an SNS topic that already exists
- **CUMULUS-470, CUMULUS-471** In-region S3 Policy lambda added to API to update bucket policy for in-region access.
Expand Down
6 changes: 2 additions & 4 deletions packages/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ const os = require('os');
/**
* Synchronously makes a temporary directory, smoothing over the differences between
* mkdtempSync in node.js for various platforms and versions
*
* @param {string} name - A base name for the temp dir, to be uniquified for the final name
* @return - The absolute path to the created dir
* @returns {string} - The absolute path to the created dir
*/
exports.mkdtempSync = (name) => {
if (fs.mkdtempSync) {
return fs.mkdtempSync(`gitc_${name}`);
}
const dirname = ['gitc', name, +new Date()].join('_');
const abspath = path.join(os.tmpdir(), dirname);
fs.mkdirSync(abspath, 0o700);
Expand Down
29 changes: 29 additions & 0 deletions packages/test-data/pdrs/MODAPSops7.1234567.PDR
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
ORIGINATING_SYSTEM = MODAPS_SIPS_TEST;
TOTAL_FILE_COUNT = 2;
EXPIRATION_TIME = 2018-03-31T00:00:47;
OBJECT=FILE_GROUP;
DATA_TYPE = MYG29_S1D_SIR;
DATA_VERSION = 6;
NODE_NAME = modpdr01;
OBJECT = FILE_SPEC;
DIRECTORY_ID = /s3bucket/MODAPS/DATA;
FILE_ID = MYG29_S1D_SIR.A2012254.tiled.006.2018082201326.tar.gz;
FILE_TYPE = TGZ;
FILE_SIZE = 1503297;
FILE_CKSUM_TYPE = MD5;
FILE_CKSUM_VALUE = c2edfbb8a777d4f24a90ff685b4271cc;
END_OBJECT = FILE_SPEC;
END_OBJECT = FILE_GROUP;
OBJECT=FILE_GROUP;
DATA_TYPE = MYG29_N1D_SIR;
DATA_VERSION = 6;
NODE_NAME = modpdr01;
OBJECT = FILE_SPEC;
DIRECTORY_ID = /s3bucket/MODAPS/DATA;
FILE_ID = MYG29_N1D_SIR.A2012254.tiled.006.2018082201354.tar.gz;
FILE_TYPE = TGZ;
FILE_SIZE = 2449346;
FILE_CKSUM_TYPE = MD5;
FILE_CKSUM_VALUE = 0512441cd2ac76092d1000fd14299c19;
END_OBJECT = FILE_SPEC;
END_OBJECT = FILE_GROUP;
28 changes: 27 additions & 1 deletion tasks/parse-pdr/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
'use strict';

const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js');
const cloneDeep = require('lodash.clonedeep');
const get = require('lodash.get');
const errors = require('@cumulus/common/errors');
const pdr = require('@cumulus/ingest/pdr');
const log = require('@cumulus/common/log');
const { justLocalRun } = require('@cumulus/common/local-helpers');

/**
* Parse a PDR
Expand Down Expand Up @@ -36,7 +38,23 @@ function parsePdr(event) {
return parse.ingest()
.then((payload) => {
if (parse.connected) parse.end();
return Object.assign({}, event.input, payload);

// Filter based on the granuleIdFilter, default to match all granules
const granuleIdFilter = config.granuleIdFilter || '.';
const granules = payload.granules.filter((g) => g.files[0].name.match(granuleIdFilter));
const granulesCount = granules.length;
const filesCount = granules.reduce((total, granule) => total + granule.files.length, 0);
const totalSize = granules.reduce((total, granule) => total + granule.granuleSize, 0);

return Object.assign(
cloneDeep(event.input),
{
granules,
granulesCount,
filesCount,
totalSize
}
);
})
.catch((e) => {
if (e.toString().includes('ECONNREFUSED')) {
Expand Down Expand Up @@ -68,3 +86,11 @@ function handler(event, context, callback) {
cumulusMessageAdapter.runCumulusTask(parsePdr, event, context, callback);
}
exports.handler = handler;

// use node index.js local to invoke this
justLocalRun(() => {
const payload = require('@cumulus/test-data/cumulus_messages/parse-pdr.json'); // eslint-disable-line global-require, max-len
handler(payload, {}, (e, r) => {
console.log(JSON.stringify(r, null, '\t')); // eslint-disable-line no-console
});
});
1 change: 1 addition & 0 deletions tasks/parse-pdr/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"@cumulus/cumulus-message-adapter-js": "^1.0.1",
"@cumulus/ingest": "^1.5.0",
"@cumulus/test-data": "^1.4.0",
"lodash.clonedeep": "^4.4.2",
"lodash.get": "^4.4.2"
},
"devDependencies": {
Expand Down
4 changes: 4 additions & 0 deletions tasks/parse-pdr/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
"description": "The name of the Task's CloudFormation Task, useful as a prefix",
"type": "string"
},
"granuleIdFilter": {
"description": "A regular expression that is applied against the filename of the first file of each granule contained in the PDR. If the regular expression matches, then the granule is included in the output. Defaults to '.', which will match all granules in the PDR.",
"type": "string"
},
"provider": {
"type": "object",
"properties": {
Expand Down
103 changes: 101 additions & 2 deletions tasks/parse-pdr/tests/parse_pdrs_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ test.beforeEach(async (t) => {
granuleIdExtraction: '^(.*)\.hdf'
};

const collectionConfigStore = new CollectionConfigStore(
t.context.collectionConfigStore = new CollectionConfigStore(
t.context.payload.config.bucket,
t.context.payload.config.stack
);
await collectionConfigStore.put('MOD09GQ', collectionConfig);
await t.context.collectionConfigStore.put('MOD09GQ', collectionConfig);
});

test.afterEach(async (t) => {
Expand Down Expand Up @@ -261,3 +261,102 @@ test('Parse a PDR from an S3 provider', async (t) => {
await recursivelyDeleteS3Bucket(t.context.payload.config.provider.host);
}
});

test('Parse a PDR without a granuleIdFilter in the config', async (t) => {
// Create the collections contained in this PDR
await Promise.all([
t.context.collectionConfigStore.put(
'MYG29_S1D_SIR',
{ name: 'MYG29_S1D_SIR', granuleIdExtraction: '^(.*)\.tar.gz' }
),
t.context.collectionConfigStore.put(
'MYG29_N1D_SIR',
{ name: 'MYG29_N1D_SIR', granuleIdExtraction: '^(.*)\.tar.gz' }
)
]);

// Set up the task config
t.context.payload.config.provider = {
id: 'MODAPS',
protocol: 'ftp',
host: 'localhost',
username: 'testuser',
password: 'testpass'
};
t.context.payload.config.useList = true;

// Set up the task input
t.context.payload.input.pdr.name = 'MODAPSops7.1234567.PDR';

await validateInput(t, t.context.payload.input);
await validateConfig(t, t.context.payload.config);

let output;
try {
output = await parsePdr(t.context.payload);

await validateOutput(t, output);

t.deepEqual(output.pdr, t.context.payload.input.pdr);
t.is(output.granules.length, 2);
t.is(output.granulesCount, 2);
t.is(output.filesCount, 2);
t.is(output.totalSize, 3952643);
}
catch (err) {
if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') {
t.pass('ignoring this test. Test server seems to be down');
}
else t.fail(err);
}
});

test('Parse a PDR with a granuleIdFilter in the config', async (t) => {
// Create the collections contained in this PDR
await Promise.all([
t.context.collectionConfigStore.put(
'MYG29_S1D_SIR',
{ name: 'MYG29_S1D_SIR', granuleIdExtraction: '^(.*)\.tar.gz' }
),
t.context.collectionConfigStore.put(
'MYG29_N1D_SIR',
{ name: 'MYG29_N1D_SIR', granuleIdExtraction: '^(.*)\.tar.gz' }
)
]);

// Set up the task config
t.context.payload.config.provider = {
id: 'MODAPS',
protocol: 'ftp',
host: 'localhost',
username: 'testuser',
password: 'testpass'
};
t.context.payload.config.useList = true;
t.context.payload.config.granuleIdFilter = '^MYG29_S1D_SIR.A2012254.tiled.006.2018082201326\..*';

// Set up the task input
t.context.payload.input.pdr.name = 'MODAPSops7.1234567.PDR';

await validateInput(t, t.context.payload.input);
await validateConfig(t, t.context.payload.config);

let output;
try {
output = await parsePdr(t.context.payload);

await validateOutput(t, output);

t.deepEqual(output.pdr, t.context.payload.input.pdr);
t.is(output.granules.length, 1);
t.is(output.granulesCount, 1);
t.is(output.filesCount, 1);
t.is(output.totalSize, 1503297);
}
catch (err) {
if (err instanceof errors.RemoteResourceError || err.code === 'AllAccessDisabled') {
t.pass('ignoring this test. Test server seems to be down');
}
else t.fail(err);
}
});

0 comments on commit 31e506a

Please sign in to comment.