diff --git a/REQUIREMENTS b/REQUIREMENTS index 2853901..0b43b90 100644 --- a/REQUIREMENTS +++ b/REQUIREMENTS @@ -5,3 +5,4 @@ PyYAML Jinja2 argcomplete networkx +six diff --git a/examples/model-correlations/src/cdf.py b/examples/model-correlations/src/cdf.py index f25826b..0bee7e7 100644 --- a/examples/model-correlations/src/cdf.py +++ b/examples/model-correlations/src/cdf.py @@ -25,4 +25,4 @@ data.sort() for i, x in enumerate(data): cdf = float(i)/len(data) - print x, "%0.4f" % cdf + print(x, "%0.4f" % cdf) diff --git a/examples/model-correlations/src/correlation.py b/examples/model-correlations/src/correlation.py index fe32e7e..fbb2aa6 100644 --- a/examples/model-correlations/src/correlation.py +++ b/examples/model-correlations/src/correlation.py @@ -2,7 +2,6 @@ """ import sys -import csv import math # use headless matplotlib @@ -33,4 +32,4 @@ for i, j in zip(x, y): r += (i - xave)/xsig * (j - yave)/ysig r /= len(x) - 1 -print r +print(r) diff --git a/examples/sp500/code/parse_sp500.py b/examples/sp500/code/parse_sp500.py index 676a0e9..229c906 100644 --- a/examples/sp500/code/parse_sp500.py +++ b/examples/sp500/code/parse_sp500.py @@ -5,7 +5,7 @@ # read in html data with open(sys.argv[1]) as stream: - soup = BeautifulSoup(stream.read()) + soup = BeautifulSoup(stream.read(), 'lxml') # extract the table into a csv writer = csv.writer(sys.stdout) diff --git a/examples/sp500/flo.yaml b/examples/sp500/flo.yaml index d813122..45ece8f 100644 --- a/examples/sp500/flo.yaml +++ b/examples/sp500/flo.yaml @@ -8,12 +8,12 @@ tasks: creates: "{{input_name}}" command: - mkdir -p $(dirname {{creates}}) - - wget http://en.wikipedia.org/wiki/List_of_S%26P_500_companies -O {{creates}} + - wget http://en.wikipedia.org/wiki/List_of_S%26P_500_companies --no-check-certificate -O {{creates}} # extract the data - creates: data/sp500.csv - depends: + depends: - code/parse_sp500.py - "{{input_name}}" - command: python {{depends|join(' ')}} > {{creates}} \ No newline at end of file + command: python {{depends|join(' ')}} > {{creates}} diff --git a/flo/commands/__init__.py b/flo/commands/__init__.py index a320aa4..511223d 100644 --- a/flo/commands/__init__.py +++ b/flo/commands/__init__.py @@ -43,9 +43,9 @@ def run_subcommand(args): """This function runs the command that is selected by this particular subcommand parser. """ - command = args.__dict__.pop("command") try: + command = args.__dict__.pop("command") command.execute(**args.__dict__) - except CommandLineException, error: + except (CommandLineException, KeyError) as error: print(colors.red(error)) sys.exit(getattr(error, 'exit_code', 1)) diff --git a/flo/commands/status.py b/flo/commands/status.py index 7ebc109..0881a01 100644 --- a/flo/commands/status.py +++ b/flo/commands/status.py @@ -1,5 +1,5 @@ -import BaseHTTPServer -import SocketServer +from six.moves import BaseHTTPServer +from six.moves import socketserver import socket from .run import Command as RunCommand @@ -40,8 +40,8 @@ def serve_status_page(self, port): Handler.task_graph = self.task_graph print("Starting server at http://localhost:%d" % port) try: - httpd = SocketServer.TCPServer(("", port), Handler) - except socket.error, error: + httpd = socketserver.TCPServer(("", port), Handler) + except socket.error as error: raise CommandLineException(error.strerror) try: httpd.serve_forever() diff --git a/flo/exceptions.py b/flo/exceptions.py index f3e1297..e8d3571 100644 --- a/flo/exceptions.py +++ b/flo/exceptions.py @@ -1,5 +1,6 @@ import yaml import jinja2 +import six class CommandLineException(Exception): @@ -75,7 +76,7 @@ def __init__(self, template_string, error, **context_dict): def __str__(self): context_str = "{\n" - for k, v in self.context_dict.iteritems(): + for k, v in six.iteritems(self.context_dict): context_str += "%s%s: %s,\n" % (self.tab, k, v) context_str += "}" msg = "\n\nContext:\n\n%s" % self.indent(context_str) diff --git a/flo/logger.py b/flo/logger.py index 4944106..91ea5d9 100644 --- a/flo/logger.py +++ b/flo/logger.py @@ -13,20 +13,27 @@ # achieves exactly what we want, albeit not using the standard # library. If you have other ideas for how to do this, check out the # code before issue #53 was resolved. -class Logger(file): +# class Logger(file): +class Logger(): """Log output to stdout in color and to a log file in plain text. """ + log_file = None + def __init__(self, task_graph): - super(Logger, self).__init__(task_graph.abs_log_path, 'w') + self.log_file = open(task_graph.abs_log_path, 'w') def write(self, content): + if isinstance(content, bytes): + content = content.decode() sys.stdout.write(content) sys.stdout.flush() - super(Logger, self).write(colorless(content)) + self.log_file.write(colorless(content)) def info(self, content): self.write(content + '\n') + def close(self): + self.log_file.close() # _logger is a singleton instance of the logger that is a local cache # of the one and only logger instance for all TaskGraphs. This is diff --git a/flo/parser.py b/flo/parser.py index b0cee83..3c3019a 100644 --- a/flo/parser.py +++ b/flo/parser.py @@ -91,7 +91,7 @@ def get_task_kwargs_list(config=None): config_yaml = yaml.load_all(stream.read()) try: return config_yaml2task_kwargs_list(config_yaml) - except yaml.constructor.ConstructorError, error: + except yaml.constructor.ConstructorError as error: raise exceptions.YamlError(config_path, error) diff --git a/flo/resources/base.py b/flo/resources/base.py index 70dabaa..75f954c 100644 --- a/flo/resources/base.py +++ b/flo/resources/base.py @@ -72,7 +72,12 @@ def get_stream_state(self, stream, block_size=2**20): data = stream.read(block_size) if not data: break - state.update(data) + try: + state.update(data) + except TypeError: + # Assuming this is caused by getting Unicode + # Unicode must be encoded before hashing + state.update(data.encode('utf8')) return state.hexdigest() def get_previous_state(self): diff --git a/flo/tasks/graph.py b/flo/tasks/graph.py index e00c060..80416b9 100644 --- a/flo/tasks/graph.py +++ b/flo/tasks/graph.py @@ -7,6 +7,8 @@ import glob from distutils.util import strtobool import json +import six +from six.moves import input import networkx as nx @@ -106,7 +108,8 @@ def iter_tasks(self, tasks=None): if task in source_tasks: distance = 0 else: - distance = max(map(distances.get, task.upstream_tasks)) + 1 + distance = max([distances.get(x) + for x in task.upstream_tasks]) + 1 distances[task] = distance horizon.remove(task) horizon_set.discard(task) @@ -119,7 +122,7 @@ def iter_tasks(self, tasks=None): # now create a decorated list of based on the distance and # ordering in the YAML file decorated_list = [] - for task, distance in distances.iteritems(): + for task, distance in six.iteritems(distances): decorated_list.append(( distance, self.task_list.index(task), task, )) @@ -191,7 +194,7 @@ def subgraph_needed_for(self, start_at, end_at): new graph. """ assert start_at or end_at, "one of {start_at,end_at} must be a task id" - start, end = map(self.task_dict.get, [start_at, end_at]) + start, end = [self.task_dict.get(start_at), self.task_dict.get(end_at)] if None in [start, end]: graph = self.get_networkx_graph() if start: @@ -273,7 +276,7 @@ def get_user_clean_confirmation(self, task_list=None, self.logger.info(green(self.internals_path)) for task in task_list: self.logger.info(task.creates_message()) - yesno = raw_input(colors.red("Delete aforementioned files? [Y/n] ")) + yesno = input(colors.red("Delete aforementioned files? [Y/n] ")) if yesno == '': yesno = 'y' return strtobool(yesno) @@ -377,7 +380,7 @@ def _run_helper(self, starting_tasks, do_run_func, mock_run): else: try: task.timed_run() - except (KeyboardInterrupt, ShellError), error: + except (KeyboardInterrupt, ShellError) as error: self.save_state( override_resource_states={task.name: ''}, ) @@ -432,7 +435,7 @@ def read_from_storage(self, storage_location): def write_to_storage(self, dictionary, storage_location): with open(storage_location, 'w') as stream: writer = csv.writer(stream) - for item in dictionary.iteritems(): + for item in six.iteritems(dictionary): writer.writerow(item) def get_state_from_storage(self, resource): @@ -453,7 +456,7 @@ def _load_state(self): ) # typecast the task_durations - for task_id, duration in self.task_durations.iteritems(): + for task_id, duration in six.iteritems(self.task_durations): self.task_durations[task_id] = float(duration) def save_state(self, override_resource_states=None): @@ -469,7 +472,7 @@ def save_state(self, override_resource_states=None): # CSV. this is important for situations where a subgraph is # selected to run after_resource_states = self.read_from_storage(self.abs_state_path) - for name, resource in self.resource_dict.iteritems(): + for name, resource in six.iteritems(self.resource_dict): after_resource_states[name] = resource.get_current_state() # if override states are provided, update the resources diff --git a/flo/tasks/task.py b/flo/tasks/task.py index 30bcd47..79fe2c4 100644 --- a/flo/tasks/task.py +++ b/flo/tasks/task.py @@ -1,6 +1,7 @@ import os import time -import StringIO +import six +from six import StringIO import copy from ..exceptions import InvalidTaskDefinition, CommandLineException @@ -10,13 +11,18 @@ from .. import templates from ..types import UniqueOrderedList +if six.PY2: + str_types = (str, unicode) +else: + str_types = str + def _cast_as_list(obj): if isinstance(obj, (list, tuple)): return obj elif obj is None: return [] - elif isinstance(obj, (str, unicode)): + elif isinstance(obj, str_types): return [obj] else: raise TypeError("unexpected type passed to _cast_as_list") @@ -39,7 +45,7 @@ def __init__(self, graph, creates=None, depends=None, "every task must define a `creates`", self.yaml_data, ) - if not isinstance(self._creates, (str, unicode)): + if not isinstance(self._creates, str_types): raise InvalidTaskDefinition( "each task must define a single `creates` as a string", self.yaml_data, @@ -123,7 +129,7 @@ def id(self): def config_resource_id(self): """Canonical way to identify the resource id associated with this Task """ - return 'config:'+self.id + return 'config:' + self.id @property def root_directory(self): @@ -174,7 +180,6 @@ def reset_task_dependencies(self): self.downstream_tasks.clear() def get_all_filenames(self): - """Identify the set of all filenames that pertain to this task """ # TODO: when we allow for non-filesystem targets, this will @@ -190,11 +195,10 @@ def get_current_state(self): # the machinery in self.get_stream_state to calculate the # state msg = self.creates + str(self.depends) + str(self._command) - keys = self.attrs.keys() - keys.sort() + keys = sorted(self.attrs.keys()) for k in keys: msg += k + str(self.attrs[k]) - return self.get_stream_state(StringIO.StringIO(msg)) + return self.get_stream_state(StringIO(msg)) def in_sync(self): """Test whether this task is in sync with the stored state and @@ -242,8 +246,14 @@ def timed_run(self): # confirm that all depends resources exist on the filesystem if not all(resource.exists() for resource in self.depends_resources): - raise CommandLineException(self.depends + ( - " does not exist just before running this task. " + unmet = self.depends + if isinstance(unmet, list): + unmet = '{} and {} do'.format(', '.join(unmet[:-1]), + unmet[-1]) + else: + unmet = unmet + ' does' + raise CommandLineException(unmet + ( + " not exist just before running this task. " "Double check the `depends` to confirm that these " "dependencies are correct for this task." )) diff --git a/flo/templates/__init__.py b/flo/templates/__init__.py index 3d45b1b..3be84e6 100644 --- a/flo/templates/__init__.py +++ b/flo/templates/__init__.py @@ -9,11 +9,11 @@ def render_from_string(template_string, **context_dict): env = jinja2.Environment(undefined=jinja2.StrictUndefined) try: template_obj = env.from_string(template_string) - except jinja2.exceptions.TemplateSyntaxError, error: + except jinja2.exceptions.TemplateSyntaxError as error: raise JinjaTemplateError(template_string, error) try: return template_obj.render(**context_dict) - except jinja2.exceptions.UndefinedError, error: + except jinja2.exceptions.UndefinedError as error: raise JinjaRenderError(template_string, error, **context_dict) diff --git a/flo/types.py b/flo/types.py index 081e378..9cabad3 100644 --- a/flo/types.py +++ b/flo/types.py @@ -1,4 +1,5 @@ import collections +import six class FrozenDict(collections.Mapping): @@ -21,7 +22,7 @@ def __getitem__(self, key): return self._d[key] def __hash__(self): - return hash(tuple(sorted(self._d.iteritems()))) + return hash(tuple(sorted(six.iteritems(self._d)))) class UniqueOrderedList(list): diff --git a/tests/run_functional_tests.sh b/tests/run_functional_tests.sh index 5a95f97..2df56f4 100755 --- a/tests/run_functional_tests.sh +++ b/tests/run_functional_tests.sh @@ -21,7 +21,7 @@ if [ $? -ne 0 ]; then fi # formatting functions -red () { +red () { echo $'\033[31m'"$1"$'\033[0m' } @@ -80,20 +80,22 @@ update_status $? "--start-at tests failed" # modifying a specific task that would otherwise branch to other tasks # and make sure that skipping it does not trigger the workflow to run cd ${EXAMPLE_ROOT}/model-correlations -flo run -f -sed -i 's/\+1/+2/g' flo.yaml +flo run --force +sed 's/\+1/+2/g' flo.yaml > new_flo.yaml +mv flo.yaml old_flo.yaml +mv new_flo.yaml flo.yaml flo run --skip data/x_y.dat grep "No tasks are out of sync" .flo/flo.log > /dev/null update_status $? "Nothing should have been run when --skip'ping changed task" flo run grep "|-> cut " .flo/flo.log > /dev/null update_status $? "data/x_y.dat command was not re-run even though it changed" -sed -i 's/\+2/+1/g' flo.yaml +mv old_flo.yaml flo.yaml cd ${EXAMPLE_ROOT} # test the --only option cd ${EXAMPLE_ROOT}/hello-world -flo run -f +flo run --force flo run --only data/hello_world.txt grep "No tasks are out of sync" .flo/flo.log > /dev/null update_status $? "data/hello_world.txt shouldn't have been run" @@ -111,7 +113,7 @@ flo clean --force update_status $? "force cleaning failed on deterministic-order example" flo run --force update_status $? "deterministic-order example failed somewhere along the way" -sed -n '/|-> /{g;1!p;};h' .flo/flo.log | sort -C +sed -n '/|-> /{g;1!p;};h' .flo/flo.log | sort -c update_status $? "flo not running in expected deterministic order" cd ${EXAMPLE_ROOT}