Skip to content

Source Sink URIs

xitep edited this page Feb 20, 2017 · 1 revision

A Flow allows us using the #createInput and #createOutput methods to obtain data sets representing external sources to read data from or sinks to store data to. These methods come in two flavors:

  • Methods accepting DataSources or DataSinks are the raw building blocks. The parameters fully describe to the runtime how to read data in for processing or how to write data out.
  • Methods accepting URIs are a convenience for the user to ease and shorten the boilerplate required to set up sources and sinks. Behind the scenes the code replaces such a URI with a new DataSource or DataSink instance. We'll explain the details of this process here.

Example

Instead of directly instantiating a particular DataSource in our programs, we can make it more general by providing users with the possibility to specify the source at runtime. Of course, as program authors are free to implement our own code to allow our users choose a particular implementation of a DataSource. As part of euphoria, we've come across a re-usable strategy to rely on URIs:

    // register schemes to determine a particular data source factory
    Settings s = new Settings();
    s.setClass("euphoria.io.datasource.factory.file", SimpleHadoopTextFileSource.Factory.clas);
    s.setClass("euphoria.io.datasource.factory.hdfs", SimpleHadoopTextFileSource.Factory.class);
    // the same concept applies to data sinks as well
    s.setClass("euphoria.io.datasink.factory.stdout", StdoutSink.Factory.class);
    ...

    // a user defined URI specifying some data to read; note the
    // "file" schema here: it will determine which factory chosen
    // to create the data source
    URI inputUri = URI.create("file:///tmp/input.txt");

    // regularly create a flow passing on configuration values
    Flow f = Flow.create("WordCount", s);

    // create the input data set
    Dataset<String> lines = f.createInput(inputUri);
    ...

    // equivalently, we can specify data sinks indirectly
    dataset.persist(URI.create("stdout:///"));

How it works

We, as program authors, specify the set of supported data sources by registering them with the flow wide configuration instance and leave the rest of bootstrapping the data source implementation instances to the underlying protocol. By carefully choosing the supported data sources, we can guarantee type-safety, even though at some point it technically isn't - in the example this is the implicit "cast" to Dataset<String> in the last line.

Given a URI, the underlying protocol does the following:

  1. Determine the schema of the given URI
  2. Look up a factory for the schema
    • fail if there's no factory for the schema registered
    • fail if it doesn't implement DataSourceFactory
  3. Instantiate the factory using its public default constructor
  4. Invoke the #get method of the newly created factory instance passing along the original URI and the flow wide configuration instance. This leaves the instantiating of the actual data source implementation up to the factory, allowing the factory to interpret the given URI in a source specific format.
  • All of these steps are carried out in the launcher program when the flow is being constructed. A flow by itself will not retain references to the URIs, but will merely reference the finally constructor data source.
  • The same protocol is implemented for data sinks, with the DataSinkFactory being the factory interface and euphoria.io.datasink.factory as the configuration key prefix for registering.
  • All of the IO data source and sink implementations which are part of euphoria should come with a nested Factory class for usage out of the box - just as shown in the above example.

IORegistry

In the above sections we've seen that IO factories are registered for specific schemes under a specific key prefix. This key prefix is hard-coded.

However, the above described algorithm itself can be changed. Indeed, it is just one possible implementation for the task of "getting from a URI to an data source/sink." The class to carry out this task is IORegistry and can be changed using the configuration key euphoria.io.registry.impl naming a sub-class of IORegistry. If the key is defined, an instance of the named class is then responsible to implement the strategy for "getting from an URI to a data source/sink."

Clone this wiki locally