Skip to content

Commit 6b3293b

Browse files
jorgeebentshermanpditommaso
authored
PoC for CID store annotations and workflow outputs structure (#5885)
Signed-off-by: jorgee <[email protected]> Signed-off-by: Jorge Ejarque <[email protected]> Signed-off-by: Ben Sherman <[email protected]> Signed-off-by: Paolo Di Tommaso <[email protected]> Co-authored-by: Ben Sherman <[email protected]> Co-authored-by: Paolo Di Tommaso <[email protected]>
1 parent db79c43 commit 6b3293b

35 files changed

+1461
-195
lines changed

build.gradle

+3-3
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ task compile {
237237

238238
def getRuntimeConfigs() {
239239
def names = subprojects
240-
.findAll { prj -> prj.name in ['nextflow','nf-cid','nf-commons','nf-httpfs','nf-lang'] }
240+
.findAll { prj -> prj.name in ['nextflow','nf-commons','nf-httpfs','nf-lang', 'nf-cid'] }
241241
.collect { it.name }
242242

243243
FileCollection result = null
@@ -263,7 +263,7 @@ task exportClasspath {
263263
def home = System.getProperty('user.home')
264264
def all = getRuntimeConfigs()
265265
def libs = all.collect { File file -> /*println file.canonicalPath.replace(home, '$HOME');*/ file.canonicalPath; }
266-
['nextflow','nf-cid','nf-commons','nf-httpfs','nf-lang'].each {libs << file("modules/$it/build/libs/${it}-${version}.jar").canonicalPath }
266+
['nextflow','nf-commons','nf-httpfs','nf-lang', 'nf-cid'].each {libs << file("modules/$it/build/libs/${it}-${version}.jar").canonicalPath }
267267
file('.launch.classpath').text = libs.unique().join(':')
268268
}
269269
}
@@ -276,7 +276,7 @@ ext.nexusEmail = project.findProperty('nexusEmail')
276276
// `signing.keyId` property needs to be defined in the `gradle.properties` file
277277
ext.enableSignArchives = project.findProperty('signing.keyId')
278278

279-
ext.coreProjects = projects( ':nextflow', ':nf-cid', ':nf-commons', ':nf-httpfs', ':nf-lang' )
279+
ext.coreProjects = projects( ':nextflow', ':nf-commons', ':nf-httpfs', ':nf-lang', ':nf-cid' )
280280

281281
configure(coreProjects) {
282282
group = 'io.nextflow'

modules/nextflow/src/main/groovy/nextflow/Session.groovy

+4-4
Original file line numberDiff line numberDiff line change
@@ -1126,22 +1126,22 @@ class Session implements ISession {
11261126
}
11271127
}
11281128

1129-
void notifyWorkflowPublish(Object value) {
1129+
void notifyWorkflowPublish(String name, Object value) {
11301130
for( final observer : observers ) {
11311131
try {
1132-
observer.onWorkflowPublish(value)
1132+
observer.onWorkflowPublish(name, value)
11331133
}
11341134
catch( Exception e ) {
11351135
log.error "Failed to invoke observer on workflow publish: $observer", e
11361136
}
11371137
}
11381138
}
11391139

1140-
void notifyFilePublish(Path destination, Path source=null) {
1140+
void notifyFilePublish(Path destination, Path source, Map annotations) {
11411141
def copy = new ArrayList<TraceObserver>(observers)
11421142
for( TraceObserver observer : copy ) {
11431143
try {
1144-
observer.onFilePublish(destination, source)
1144+
observer.onFilePublish(destination, source, annotations)
11451145
}
11461146
catch( Exception e ) {
11471147
log.error "Failed to invoke observer on file publish: $observer", e

modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy

+30
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class CmdCid extends CmdBase implements UsageAware {
4343
void log(ConfigMap config)
4444
void show(ConfigMap config, List<String> args)
4545
void lineage(ConfigMap config, List<String> args)
46+
void diff(ConfigMap config, List<String> args)
4647
}
4748

4849
interface SubCmd {
@@ -62,6 +63,7 @@ class CmdCid extends CmdBase implements UsageAware {
6263
commands << new CmdLog()
6364
commands << new CmdShow()
6465
commands << new CmdLineage()
66+
commands << new CmdDiff()
6567
}
6668

6769
@Parameter(hidden = true)
@@ -228,4 +230,32 @@ class CmdCid extends CmdBase implements UsageAware {
228230
}
229231

230232
}
233+
234+
class CmdDiff implements SubCmd {
235+
236+
@Override
237+
String getName() { 'diff' }
238+
239+
@Override
240+
String getDescription() {
241+
return 'Show differences between two CID descriptions'
242+
}
243+
244+
void apply(List<String> args) {
245+
if (args.size() != 2) {
246+
println("ERROR: Incorrect number of parameters")
247+
usage()
248+
return
249+
}
250+
operation.diff(config, args)
251+
}
252+
253+
@Override
254+
void usage() {
255+
println description
256+
println "Usage: nextflow $NAME $name <CID 1> <CID 2>"
257+
}
258+
259+
}
260+
231261
}

modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy

+26-16
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class PublishOp {
3838

3939
private Session session
4040

41+
private String name
42+
4143
private DataflowReadChannel source
4244

4345
private Map opts
@@ -52,8 +54,9 @@ class PublishOp {
5254

5355
private volatile boolean complete
5456

55-
PublishOp(Session session, DataflowReadChannel source, Map opts) {
57+
PublishOp(Session session, String name, DataflowReadChannel source, Map opts) {
5658
this.session = session
59+
this.name = name
5760
this.source = source
5861
this.opts = opts
5962
this.path = opts.path as String
@@ -89,13 +92,14 @@ class PublishOp {
8992
if( targetResolver == null )
9093
return
9194

92-
// emit workflow publish event
93-
session.notifyWorkflowPublish(value)
94-
9595
// create publisher
9696
final overrides = targetResolver instanceof Closure
9797
? [saveAs: targetResolver]
9898
: [path: targetResolver]
99+
if (opts.annotations instanceof Closure){
100+
final annotations = opts.annotations as Closure
101+
overrides.annotations = annotations.call(value) as Map
102+
}
99103
final publisher = PublishDir.create(opts + overrides)
100104

101105
// publish files
@@ -106,13 +110,10 @@ class PublishOp {
106110
publisher.apply(files, sourceDir)
107111
}
108112

109-
// append record to index file
110-
if( indexOpts ) {
111-
final record = indexOpts.mapper != null ? indexOpts.mapper.call(value) : value
112-
final normalized = normalizePaths(record, targetResolver)
113-
log.trace "Normalized record for index file: ${normalized}"
114-
indexRecords << normalized
115-
}
113+
// append record to index
114+
final normalized = normalizePaths(value, targetResolver)
115+
log.trace "Normalized record for index file: ${normalized}"
116+
indexRecords << normalized
116117
}
117118

118119
/**
@@ -151,12 +152,21 @@ class PublishOp {
151152
}
152153

153154
/**
154-
* Once all values have been published, write the
155-
* index file (if enabled).
155+
* Once all values have been published, publish the index
156+
* and write it to a file (if enabled).
156157
*/
157158
protected void onComplete(nope) {
158-
if( indexOpts && indexRecords.size() > 0 ) {
159-
log.trace "Saving records to index file: ${indexRecords}"
159+
// publish individual record if source is a value channel
160+
final index = CH.isValue(source)
161+
? indexRecords.first()
162+
: indexRecords
163+
164+
// publish workflow output
165+
session.notifyWorkflowPublish(name, index)
166+
167+
// write index file
168+
if( indexOpts && index ) {
169+
log.trace "Saving records to index file: ${index}"
160170
final indexPath = indexOpts.path
161171
final ext = indexPath.getExtension()
162172
indexPath.parent.mkdirs()
@@ -169,7 +179,7 @@ class PublishOp {
169179
else {
170180
log.warn "Invalid extension '${ext}' for index file '${indexPath}' -- should be 'csv' or 'json'"
171181
}
172-
session.notifyFilePublish(indexPath)
182+
session.notifyFilePublish(indexPath, null, opts.tags as Map)
173183
}
174184

175185
log.trace "Publish operator complete"

modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy

+9-1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ class PublishDir {
109109
*/
110110
private def tags
111111

112+
/**
113+
* Annotations to be associated to the target file
114+
*/
115+
private Map annotations
116+
112117
/**
113118
* The content type of the file. Currently only supported by AWS S3.
114119
* This can be either a MIME type content type string or a Boolean value
@@ -211,6 +216,9 @@ class PublishDir {
211216
if( params.tags != null )
212217
result.tags = params.tags
213218

219+
if( params.annotations != null )
220+
result.annotations = params.annotations as Map
221+
214222
if( params.contentType instanceof Boolean )
215223
result.contentType = params.contentType
216224
else if( params.contentType )
@@ -581,7 +589,7 @@ class PublishDir {
581589
}
582590

583591
protected void notifyFilePublish(Path destination, Path source=null) {
584-
session.notifyFilePublish(destination, source)
592+
session.notifyFilePublish(destination, source, annotations)
585593
}
586594

587595
}

modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy

+9-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class OutputDsl {
8484
final opts = publishOptions(name, defaults, overrides)
8585

8686
if( opts.enabled == null || opts.enabled )
87-
ops << new PublishOp(session, CH.getReadChannel(mixed), opts).apply()
87+
ops << new PublishOp(session, name, CH.getReadChannel(mixed), opts).apply()
8888
}
8989
}
9090

@@ -171,6 +171,14 @@ class OutputDsl {
171171
setOption('tags', value)
172172
}
173173

174+
void annotations(Map value) {
175+
setOption('annotations', value)
176+
}
177+
178+
void annotations(Closure value) {
179+
setOption('annotations', value)
180+
}
181+
174182
private void setOption(String name, Object value) {
175183
if( opts.containsKey(name) )
176184
throw new ScriptRuntimeException("Publish option `${name}` cannot be defined more than once for a given target")

modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy

+17-2
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,15 @@ trait TraceObserver {
123123
void onFlowError(TaskHandler handler, TraceRecord trace){}
124124

125125
/**
126-
* Method that is invoked when a value is published from a channel.
126+
* Method that is invoked when a workflow output is published.
127127
*
128+
* @param name
129+
* The name of the workflow output
128130
* @param value
131+
* A list if the published channel was a queue channel,
132+
* otherwise an object if the channel was a value channel
129133
*/
130-
void onWorkflowPublish(Object value){}
134+
void onWorkflowPublish(String name, Object value){}
131135

132136
/**
133137
* Method that is invoke when an output file is published
@@ -150,4 +154,15 @@ trait TraceObserver {
150154
void onFilePublish(Path destination, Path source){
151155
onFilePublish(destination)
152156
}
157+
/**
158+
* Method that is invoked when a output file is annotated
159+
* @param destination
160+
* The destination path at `publishDir` folder.
161+
* @param annotations
162+
* The annotations attached to this file
163+
*/
164+
void onFilePublish(Path destination, Path source, Map annotations){
165+
onFilePublish(destination, source)
166+
}
167+
153168
}

modules/nextflow/src/test/groovy/nextflow/cli/CmdCidTest.groovy

+47-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package nextflow.cli
1919

20+
import nextflow.data.cid.serde.CidEncoder
21+
2022
import java.nio.file.Files
2123

2224
import nextflow.SysEnv
@@ -28,11 +30,13 @@ import nextflow.data.cid.model.Parameter
2830
import nextflow.data.cid.model.TaskOutput
2931
import nextflow.data.cid.model.TaskRun
3032
import nextflow.data.cid.model.WorkflowOutput
31-
import nextflow.data.cid.serde.CidEncoder
3233
import nextflow.plugin.Plugins
3334
import org.junit.Rule
3435
import spock.lang.Specification
3536
import test.OutputCapture
37+
38+
import java.time.Instant
39+
3640
/**
3741
* CLI cid Tests
3842
*
@@ -132,9 +136,10 @@ class CmdCidTest extends Specification {
132136
def launcher = Mock(Launcher){
133137
getOptions() >> new CliOptions(config: [configFile.toString()])
134138
}
139+
def time = Instant.ofEpochMilli(123456789).toString()
135140
def encoder = new CidEncoder().withPrettyPrint(true)
136141
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
137-
"cid://123987/file.bam", 1234, 123456789, 123456789, null)
142+
"cid://123987/file.bam", 1234, time, time, null)
138143
def jsonSer = encoder.encode(entry)
139144
def expectedOutput = jsonSer
140145
cidFile.text = jsonSer
@@ -177,7 +182,7 @@ class CmdCidTest extends Specification {
177182

178183
then:
179184
stdout.size() == 1
180-
stdout[0] == "No entry found for cid://12345."
185+
stdout[0] == "No entries found for cid://12345."
181186

182187
cleanup:
183188
folder?.deleteDir()
@@ -203,19 +208,20 @@ class CmdCidTest extends Specification {
203208
Files.createDirectories(cidFile4.parent)
204209
Files.createDirectories(cidFile5.parent)
205210
def encoder = new CidEncoder()
211+
def time = Instant.ofEpochMilli(123456789).toString()
206212
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
207-
"cid://123987/file.bam", 1234, 123456789, 123456789, null)
213+
"cid://123987/file.bam", 1234, time, time, null)
208214
cidFile.text = encoder.encode(entry)
209215
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
210-
"cid://123987", 1234, 123456789, 123456789, null)
216+
"cid://123987", 1234, time, time, null)
211217
cidFile2.text = encoder.encode(entry)
212218
entry = new TaskRun("u345-2346-1stw2", "foo", new Checksum("abcde2345","nextflow","standard"),
213219
[new Parameter( "ValueInParam", "sample_id","ggal_gut"),
214220
new Parameter("FileInParam","reads",["cid://45678/output.txt"])],
215221
null, null, null, null, [:],[], null)
216222
cidFile3.text = encoder.encode(entry)
217223
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
218-
"cid://45678", 1234, 123456789, 123456789, null)
224+
"cid://45678", 1234, time, time, null)
219225
cidFile4.text = encoder.encode(entry)
220226
entry = new TaskRun("u345-2346-1stw2", "bar", new Checksum("abfs2556","nextflow","standard"),
221227
null,null, null, null, null, [:],[], null)
@@ -258,4 +264,39 @@ class CmdCidTest extends Specification {
258264

259265
}
260266

267+
def 'should show query results'(){
268+
given:
269+
def folder = Files.createTempDirectory('test').toAbsolutePath()
270+
def configFile = folder.resolve('nextflow.config')
271+
configFile.text = "workflow.data.enabled = true\nworkflow.data.store.location = '$folder'".toString()
272+
def cidFile = folder.resolve(".meta/12345/.data.json")
273+
Files.createDirectories(cidFile.parent)
274+
def launcher = Mock(Launcher){
275+
getOptions() >> new CliOptions(config: [configFile.toString()])
276+
}
277+
def encoder = new CidEncoder().withPrettyPrint(true)
278+
def time = Instant.ofEpochMilli(123456789).toString()
279+
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
280+
"cid://123987/file.bam", 1234, time, time, null)
281+
def jsonSer = encoder.encode(entry)
282+
def expectedOutput = jsonSer
283+
cidFile.text = jsonSer
284+
when:
285+
def cidCmd = new CmdCid(launcher: launcher, args: ["show", "cid:///?type=WorkflowOutput"])
286+
cidCmd.run()
287+
def stdout = capture
288+
.toString()
289+
.readLines()// remove the log part
290+
.findResults { line -> !line.contains('DEBUG') ? line : null }
291+
.findResults { line -> !line.contains('INFO') ? line : null }
292+
.findResults { line -> !line.contains('plugin') ? line : null }
293+
294+
then:
295+
stdout.size() == expectedOutput.readLines().size()
296+
stdout.join('\n') == expectedOutput
297+
298+
cleanup:
299+
folder?.deleteDir()
300+
}
301+
261302
}

0 commit comments

Comments
 (0)