-
Notifications
You must be signed in to change notification settings - Fork 11
Source Sink URIs
A Flow
allows us using the #createInput
and #createOutput
methods to obtain data sets representing external sources to read data
from or sinks to store data to. These methods come in two flavors:
- Methods accepting
DataSources
orDataSinks
are the raw building blocks. The parameters fully describe to the runtime how to read data in for processing or how to write data out. - Methods accepting
URIs
are a convenience for the user to ease and shorten the boilerplate required to set up sources and sinks. Behind the scenes the code replaces such a URI with a newDataSource
orDataSink
instance. We'll explain the details of this process here.
Instead of directly instantiating a particular DataSource
in our
programs, we can make it more general by providing users with the
possibility to specify the source at runtime. Of course, as program
authors are free to implement our own code to allow our users choose a
particular implementation of a DataSource
. As part of euphoria,
we've come across a re-usable strategy to rely on URIs:
// register schemes to determine a particular data source factory
Settings s = new Settings();
s.setClass("euphoria.io.datasource.factory.file", SimpleHadoopTextFileSource.Factory.clas);
s.setClass("euphoria.io.datasource.factory.hdfs", SimpleHadoopTextFileSource.Factory.class);
// the same concept applies to data sinks as well
s.setClass("euphoria.io.datasink.factory.stdout", StdoutSink.Factory.class);
...
// a user defined URI specifying some data to read; note the
// "file" schema here: it will determine which factory chosen
// to create the data source
URI inputUri = URI.create("file:///tmp/input.txt");
// regularly create a flow passing on configuration values
Flow f = Flow.create("WordCount", s);
// create the input data set
Dataset<String> lines = f.createInput(inputUri);
...
// equivalently, we can specify data sinks indirectly
dataset.persist(URI.create("stdout:///"));
We, as program authors, specify the set of supported data sources by
registering them with the flow wide configuration instance and leave
the rest of bootstrapping the data source implementation instances to
the underlying protocol. By carefully choosing the supported data
sources, we can guarantee type-safety, even though at some point it
technically isn't - in the example this is the implicit "cast" to
Dataset<String>
in the last line.
Given a URI, the underlying protocol does the following:
- Determine the schema of the given URI
- Look up a factory for the schema
- fail if there's no factory for the schema registered
- fail if it doesn't implement
DataSourceFactory
- Instantiate the factory using its public default constructor
- Invoke the
#get
method of the newly created factory instance passing along the original URI and the flow wide configuration instance. This leaves the instantiating of the actual data source implementation up to the factory, allowing the factory to interpret the given URI in a source specific format.
- All of these steps are carried out in the launcher program when the flow is being constructed. A flow by itself will not retain references to the URIs, but will merely reference the finally constructor data source.
- The same protocol is implemented for data sinks, with the
DataSinkFactory
being the factory interface andeuphoria.io.datasink.factory
as the configuration key prefix for registering. - All of the IO data source and sink implementations which are part of
euphoria should come with a nested
Factory
class for usage out of the box - just as shown in the above example.
In the above sections we've seen that IO factories are registered for specific schemes under a specific key prefix. This key prefix is hard-coded.
However, the above described algorithm itself can be changed. Indeed,
it is just one possible implementation for the task of "getting from
a URI to an data source/sink." The class to carry out this task is
IORegistry
and can be changed using the configuration key
euphoria.io.registry.impl
naming a sub-class of IORegistry
. If
the key is defined, an instance of the named class is then responsible
to implement the strategy for "getting from an URI to a data
source/sink."