diff --git a/monitor.py b/monitor.py new file mode 100644 index 0000000..cf969e2 --- /dev/null +++ b/monitor.py @@ -0,0 +1,52 @@ +# -*- coding: utf -*- +""" +monitor.py + +Simple script to monitor one or more output files from track_keywords.py + +Based on: +https://github.com/seb-m/pyinotify/blob/master/python2/examples/transient_file.py + +Kevin Driscoll, 2014 +""" + +import colors +import pyinotify +import sys +import time + +global nextcolor + +class ProcessTransientFile(pyinotify.ProcessEvent): + + def __init__(self, n): + global nextcolor + self.fg = nextcolor + self.lastupdate = time.time() + self.freq = 0 + + def process_IN_MODIFY(self, event): + now = time.time() + report = "{0}s".format(str(round(now-self.lastupdate, 2))) + self.lastupdate = now + self.freq += 1 + if not self.freq % 50: + print colors.color(' '.join((event.name, report)), fg=self.fg) + + def process_default(self, event): + print 'default: ', event.maskname + +if __name__=="__main__": + + filenames = sys.argv[1:] + wm = pyinotify.WatchManager() + notifier = pyinotify.Notifier(wm) + + print "Tracking..." + for n, fn in enumerate(filenames): + print fn + nextcolor = (n % 6) + 1 + wm.watch_transient_file(fn, pyinotify.IN_MODIFY, ProcessTransientFile) + print + + notifier.loop() diff --git a/track_keywords.py b/track_keywords.py index 010720e..3dfd6c9 100644 --- a/track_keywords.py +++ b/track_keywords.py @@ -2,6 +2,15 @@ """ Track keywords on Twitter using the public Streaming API +TODO + Add hooks for extracting info about incoming tweets + e.g., identifying new users + Better monitoring + E-mail admins on error + +Limitations: + Max 400 keywords, 5000 users + https://dev.twitter.com/discussions/4120 2013 """ @@ -11,8 +20,11 @@ import json import time import datetime +import math import pymongo import requests +import socket +import time import webbrowser from urlparse import parse_qs from requests_oauthlib import OAuth1, OAuth1Session @@ -92,6 +104,32 @@ def authorize(consumer_key, consumer_secret): # API wrappers # +def track_sample(twitter): + """Yields tweets one dict at a time from "sample" endpoint + See: https://dev.twitter.com/docs/api/1.1/get/statuses/sample + + twitter: OAuth1Session object authenticated already + """ + + # Prepare for GET request + sample_url = "https://stream.twitter.com/1.1/statuses/sample.json" + + # Create Request.get object + r = twitter.get(url=sample_url, params={}, stream=True) + + # Iterate over the request + for line in r.iter_lines(): + if line : + try: + # TODO + # Sometimes it returns a "disconnect" obj + # before closing the stream + tweet = json.loads(line) + yield tweet + except ValueError: + # Couldn't construct a valid tweet + pass + def track(twitter, keywords=[], user_ids=[]): """Iterator that yields tweets as dicts one at a time @@ -181,8 +219,8 @@ def dump_to_mongo(tracker, collection): # Insert each json as an entry in the mongodb collection entry = collection.insert(tweet) -def dump_to_stdout(tracker, encoding='utf-16', tracer=0): - """ Loop over tweets in tracker and print them to stdout +def process(tracker, encoding='utf-16', tracer=0): + """ Yield tweets from tracker stream If tracer a non-zero integer, then the text of every tracer-th tweet will be printed to stderr """ @@ -194,7 +232,12 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0): for n, tweet in enumerate(tracker): j = json.dumps(tweet, encoding=encoding) - print j + # Check that this is a tweet + if not u'delete' in j: + # Any other pre-processing can happen here + # For example, removing unwanted keys to shrink the dict + yield j + minuteCounter += 1 # Print tracer to stderr @@ -246,6 +289,10 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0): type=str, default=u"", help="Access Token Secret") + parser.add_argument('--sample', + action='store_true', + default=False, + help="Use Twitter's sample endpoint") parser.add_argument('--keywords', type=str, default='', @@ -254,6 +301,10 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0): type=str, default='', help='Path to file with user IDs, one per line') + parser.add_argument('--retries', + type=int, + default=0, + help="Maximum retries upon error") parser.add_argument('--tracer', type=int, default=0, @@ -266,8 +317,8 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0): access_token = args.resourcekey access_token_secret = args.resourcesecret - if not args.keywords and not args.userids: - sys.stderr.write("Nothing to track! Please supply keywords or user IDs.\n") + if not args.keywords and not args.userids and not args.sample: + sys.stderr.write("Nothing to track! Please supply keywords or user IDs or use the Twitter sample.\n") sys.exit(1) keywords = [] @@ -289,18 +340,46 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0): for line in f: uid = line.strip() if uid: - keywords.append(uid) + user_ids.append(uid) sys.stderr.write('\t') sys.stderr.write(uid) sys.stderr.write('\n') - sys.stderr.write('\nAuthorizing tracker with Twitter...') - sesh = get_session(consumer_key, - consumer_secret, - access_token, - access_token_secret) - stream = track(sesh, keywords, user_ids) - sys.stderr.write('done!\n') - - sys.stderr.write('\nStarting tracker...\n') - dump_to_stdout(stream, tracer=args.tracer) + # The purpose of this loop is to restart the tracker + # when an error comes down the stream. + # It's pretty dumb about which errors it catches + # which is why there is a maximum number of retries + retries = 0 + while retries <= args.retries: + if retries: + naptime = int(round(math.log(retries)) * 10) + sys.stderr.write('Sleeping for {0} seconds...\n'.format(naptime)) + time.sleep(naptime) + sys.stderr.write('\nAuthorizing tracker with Twitter...') + sesh = get_session(consumer_key, + consumer_secret, + access_token, + access_token_secret) + if args.sample: + sys.stderr.write('\nReading from the Twitter sample endpoint...') + stream = track_sample(sesh) + else: + sys.stderr.write('\nReading from the Twitter filter endpoint...') + stream = track(sesh, keywords, user_ids) + sys.stderr.write('done!\n') + + sys.stderr.write('\nStarting tracker...\n') + try: + for cleantweet in process(stream, tracer=args.tracer): + print cleantweet + except socket.error, (value, message): + sys.stderr.write(value) + sys.stderr.write(message) + sys.stderr.write('\n') + except KeyboardInterrupt: + sys.exit(1) + except: + sys.stderr.write('Unknown exception.\n') + retries += 1 + sys.stderr.write('Trying to restart tracker ({0})...\n'.format(retries)) + sys.stderr.write('Nope. Maximum retries reached.\n')