From 6be2a55341bf4c6dc2a97b26891b09dc4a12ab29 Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Wed, 13 Nov 2013 16:07:11 -0800 Subject: [PATCH 01/12] Catching socket exceptions and restarting stream --- track_keywords.py | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/track_keywords.py b/track_keywords.py index 010720e..675e13e 100644 --- a/track_keywords.py +++ b/track_keywords.py @@ -13,6 +13,7 @@ import datetime import pymongo import requests +import socket import webbrowser from urlparse import parse_qs from requests_oauthlib import OAuth1, OAuth1Session @@ -181,8 +182,8 @@ def dump_to_mongo(tracker, collection): # Insert each json as an entry in the mongodb collection entry = collection.insert(tweet) -def dump_to_stdout(tracker, encoding='utf-16', tracer=0): - """ Loop over tweets in tracker and print them to stdout +def process(tracker, encoding='utf-16', tracer=0): + """ Yield tweets from tracker stream If tracer a non-zero integer, then the text of every tracer-th tweet will be printed to stderr """ @@ -194,7 +195,10 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0): for n, tweet in enumerate(tracker): j = json.dumps(tweet, encoding=encoding) - print j + # Any other pre-processing can happen here + # For example, removing unwanted keys to shrink the dict + yield j + minuteCounter += 1 # Print tracer to stderr @@ -294,13 +298,27 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0): sys.stderr.write(uid) sys.stderr.write('\n') - sys.stderr.write('\nAuthorizing tracker with Twitter...') - sesh = get_session(consumer_key, - consumer_secret, - access_token, - access_token_secret) - stream = track(sesh, keywords, user_ids) - sys.stderr.write('done!\n') + # TODO + # these lines should be in a loop that catches errors + while True: + sys.stderr.write('\nAuthorizing tracker with Twitter...') + sesh = get_session(consumer_key, + consumer_secret, + access_token, + access_token_secret) + stream = track(sesh, keywords, user_ids) + sys.stderr.write('done!\n') + + sys.stderr.write('\nStarting tracker...\n') + try: + for cleantweet in process(stream, tracer=args.tracer): + print cleantweet + except socket.error, (value, message): + sys.stderr.write(message) + sys.stderr.write('\nRestarting tracker...\n') + except: + sys.stderr.write('Unknown exception') + sys.stderr.write('\nRestarting tracker...\n') + + - sys.stderr.write('\nStarting tracker...\n') - dump_to_stdout(stream, tracer=args.tracer) From 5e29cf041f24fcec759cd2f7e0f6e1404d79b61a Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Wed, 13 Nov 2013 16:20:36 -0800 Subject: [PATCH 02/12] Added commandline option for max retries --- track_keywords.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/track_keywords.py b/track_keywords.py index 675e13e..44e4097 100644 --- a/track_keywords.py +++ b/track_keywords.py @@ -14,6 +14,7 @@ import pymongo import requests import socket +import time import webbrowser from urlparse import parse_qs from requests_oauthlib import OAuth1, OAuth1Session @@ -258,6 +259,10 @@ def process(tracker, encoding='utf-16', tracer=0): type=str, default='', help='Path to file with user IDs, one per line') + parser.add_argument('--retries', + type=int, + default=0, + help="Maximum retries upon error") parser.add_argument('--tracer', type=int, default=0, @@ -300,7 +305,8 @@ def process(tracker, encoding='utf-16', tracer=0): # TODO # these lines should be in a loop that catches errors - while True: + retries = 0 + while retries <= args.retries: sys.stderr.write('\nAuthorizing tracker with Twitter...') sesh = get_session(consumer_key, consumer_secret, @@ -315,10 +321,14 @@ def process(tracker, encoding='utf-16', tracer=0): print cleantweet except socket.error, (value, message): sys.stderr.write(message) - sys.stderr.write('\nRestarting tracker...\n') + except KeyboardInterrupt: + sys.exit(1) except: sys.stderr.write('Unknown exception') - sys.stderr.write('\nRestarting tracker...\n') + retries += 1 + sys.stderr.write('\nTrying to restart tracker ({0})...\n'.format(retries)) + sys.stderr.write('Nope. Maximum retries reached.\n') + From 74f4b3e37b137894d8dd55754a7f266a561ed204 Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Wed, 13 Nov 2013 16:30:28 -0800 Subject: [PATCH 03/12] Cleaning up comments --- track_keywords.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/track_keywords.py b/track_keywords.py index 44e4097..0b1a41e 100644 --- a/track_keywords.py +++ b/track_keywords.py @@ -2,6 +2,12 @@ """ Track keywords on Twitter using the public Streaming API +TODO + Add hooks for extracting info about incoming tweets + e.g., identifying new users + Better monitoring + E-mail admins on error + 2013 """ @@ -303,8 +309,10 @@ def process(tracker, encoding='utf-16', tracer=0): sys.stderr.write(uid) sys.stderr.write('\n') - # TODO - # these lines should be in a loop that catches errors + # The purpose of this loop is to restart the tracker + # when an error comes down the stream. + # It's pretty dumb about which errors it catches + # which is why there is a maximum number of retries retries = 0 while retries <= args.retries: sys.stderr.write('\nAuthorizing tracker with Twitter...') From e2123ee9873e4f0c0c11ebaae52504cfc99aff4a Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Wed, 13 Nov 2013 16:38:51 -0800 Subject: [PATCH 04/12] Added a little sleeper to avoid clobbering the server --- track_keywords.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/track_keywords.py b/track_keywords.py index 0b1a41e..207b636 100644 --- a/track_keywords.py +++ b/track_keywords.py @@ -17,6 +17,7 @@ import json import time import datetime +import math import pymongo import requests import socket @@ -315,6 +316,10 @@ def process(tracker, encoding='utf-16', tracer=0): # which is why there is a maximum number of retries retries = 0 while retries <= args.retries: + if retries: + naptime = int(round(math.log(retries)) * 10) + sys.stderr.write('Sleeping for {0} seconds...\n'.format(naptime)) + time.sleep(naptime) sys.stderr.write('\nAuthorizing tracker with Twitter...') sesh = get_session(consumer_key, consumer_secret, @@ -329,12 +334,13 @@ def process(tracker, encoding='utf-16', tracer=0): print cleantweet except socket.error, (value, message): sys.stderr.write(message) - except KeyboardInterrupt: - sys.exit(1) + sys.stderr.write('\n') + #except KeyboardInterrupt: + # sys.exit(1) except: - sys.stderr.write('Unknown exception') + sys.stderr.write('Unknown exception.\n') retries += 1 - sys.stderr.write('\nTrying to restart tracker ({0})...\n'.format(retries)) + sys.stderr.write('Trying to restart tracker ({0})...\n'.format(retries)) sys.stderr.write('Nope. Maximum retries reached.\n') From c3ec6bb559eea6396b2fdc8b67d98e2c8a7f78e2 Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Wed, 13 Nov 2013 16:41:48 -0800 Subject: [PATCH 05/12] Cleaning up debug code --- track_keywords.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/track_keywords.py b/track_keywords.py index 207b636..d50b338 100644 --- a/track_keywords.py +++ b/track_keywords.py @@ -335,8 +335,8 @@ def process(tracker, encoding='utf-16', tracer=0): except socket.error, (value, message): sys.stderr.write(message) sys.stderr.write('\n') - #except KeyboardInterrupt: - # sys.exit(1) + except KeyboardInterrupt: + sys.exit(1) except: sys.stderr.write('Unknown exception.\n') retries += 1 From f4b0eb317d03a6186de3b5dc80e3cc0ede6ebd1d Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Mon, 25 Nov 2013 15:55:42 -0800 Subject: [PATCH 06/12] Added option to track the Twitter streaming sample --- track_keywords.py | 46 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/track_keywords.py b/track_keywords.py index d50b338..d4646b1 100644 --- a/track_keywords.py +++ b/track_keywords.py @@ -101,6 +101,32 @@ def authorize(consumer_key, consumer_secret): # API wrappers # +def track_sample(twitter): + """Yields tweets one dict at a time from "sample" endpoint + See: https://dev.twitter.com/docs/api/1.1/get/statuses/sample + + twitter: OAuth1Session object authenticated already + """ + + # Prepare for GET request + sample_url = "https://stream.twitter.com/1.1/statuses/sample.json" + + # Create Request.get object + r = twitter.get(url=sample_url, params={}, stream=True) + + # Iterate over the request + for line in r.iter_lines(): + if line : + try: + # TODO + # Sometimes it returns a "disconnect" obj + # before closing the stream + tweet = json.loads(line) + yield tweet + except ValueError: + # Couldn't construct a valid tweet + pass + def track(twitter, keywords=[], user_ids=[]): """Iterator that yields tweets as dicts one at a time @@ -258,6 +284,10 @@ def process(tracker, encoding='utf-16', tracer=0): type=str, default=u"", help="Access Token Secret") + parser.add_argument('--sample', + action='store_true', + default=False, + help="Use Twitter's sample endpoint") parser.add_argument('--keywords', type=str, default='', @@ -282,8 +312,8 @@ def process(tracker, encoding='utf-16', tracer=0): access_token = args.resourcekey access_token_secret = args.resourcesecret - if not args.keywords and not args.userids: - sys.stderr.write("Nothing to track! Please supply keywords or user IDs.\n") + if not args.keywords and not args.userids and not args.sample: + sys.stderr.write("Nothing to track! Please supply keywords or user IDs or use the Twitter sample.\n") sys.exit(1) keywords = [] @@ -325,7 +355,12 @@ def process(tracker, encoding='utf-16', tracer=0): consumer_secret, access_token, access_token_secret) - stream = track(sesh, keywords, user_ids) + if args.sample: + sys.stderr.write('\nReading from the Twitter sample endpoint...') + stream = track_sample(sesh) + else: + sys.stderr.write('\nReading from the Twitter filter endpoint...') + stream = track(sesh, keywords, user_ids) sys.stderr.write('done!\n') sys.stderr.write('\nStarting tracker...\n') @@ -333,6 +368,7 @@ def process(tracker, encoding='utf-16', tracer=0): for cleantweet in process(stream, tracer=args.tracer): print cleantweet except socket.error, (value, message): + sys.stderr.write(value) sys.stderr.write(message) sys.stderr.write('\n') except KeyboardInterrupt: @@ -342,7 +378,3 @@ def process(tracker, encoding='utf-16', tracer=0): retries += 1 sys.stderr.write('Trying to restart tracker ({0})...\n'.format(retries)) sys.stderr.write('Nope. Maximum retries reached.\n') - - - - From 7a98ffe7571cdd962ff6a4e7abb1fd69b8bc1a0d Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Mon, 25 Nov 2013 16:03:04 -0800 Subject: [PATCH 07/12] Handling "delete" messages --- track_keywords.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/track_keywords.py b/track_keywords.py index d4646b1..4cd59a9 100644 --- a/track_keywords.py +++ b/track_keywords.py @@ -229,9 +229,11 @@ def process(tracker, encoding='utf-16', tracer=0): for n, tweet in enumerate(tracker): j = json.dumps(tweet, encoding=encoding) - # Any other pre-processing can happen here - # For example, removing unwanted keys to shrink the dict - yield j + # Check that this is a tweet + if not u'delete' in j: + # Any other pre-processing can happen here + # For example, removing unwanted keys to shrink the dict + yield j minuteCounter += 1 From 812812e0a4cc0e866c3a9378d2010f9a440cb4a6 Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Tue, 25 Feb 2014 17:09:51 -0800 Subject: [PATCH 08/12] Simple pyinotify script to monitor output --- monitor.py | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 monitor.py diff --git a/monitor.py b/monitor.py new file mode 100644 index 0000000..4bb9c40 --- /dev/null +++ b/monitor.py @@ -0,0 +1,47 @@ +# -*- coding: utf -*- +""" +monitor.py + +Simple script to monitor one or more output files from track_keywords.py + +Based on: +https://github.com/seb-m/pyinotify/blob/master/python2/examples/transient_file.py + +Kevin Driscoll, 2014 +""" + +import colors +import pyinotify +import sys +import time +from collections import Counter + +global nextcolor + +class ProcessTransientFile(pyinotify.ProcessEvent): + + def __init__(self, n): + global nextcolor + self.fg = nextcolor + self.lastupdate = time.time() + + def process_IN_MODIFY(self, event): + now = time.time() + report = "{0}s".format(str(round(now-self.lastupdate, 2))) + self.lastupdate = now + print colors.color(' '.join((event.name, report)), fg=self.fg) + + def process_default(self, event): + print 'default: ', event.maskname + + +if __name__=="__main__": + + wm = pyinotify.WatchManager() + notifier = pyinotify.Notifier(wm, read_freq=60) + + for n, fn in enumerate(outfiles): + nextcolor = (n % 6) + 1 + wm.watch_transient_file(fn, pyinotify.IN_MODIFY, ProcessTransientFile) + + notifier.loop() From d54ed313bb50b170bdd3832ef73dd24cee1a032b Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Tue, 25 Feb 2014 17:11:44 -0800 Subject: [PATCH 09/12] Removed debug code. --- monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitor.py b/monitor.py index 4bb9c40..e8b3a83 100644 --- a/monitor.py +++ b/monitor.py @@ -40,7 +40,7 @@ def process_default(self, event): wm = pyinotify.WatchManager() notifier = pyinotify.Notifier(wm, read_freq=60) - for n, fn in enumerate(outfiles): + for n, fn in enumerate(fileinput.input()): nextcolor = (n % 6) + 1 wm.watch_transient_file(fn, pyinotify.IN_MODIFY, ProcessTransientFile) From 5c4071d62cfe6787a00bbb9af900f7448d81619a Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Tue, 25 Feb 2014 17:12:49 -0800 Subject: [PATCH 10/12] Missing import --- monitor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/monitor.py b/monitor.py index e8b3a83..31db6d8 100644 --- a/monitor.py +++ b/monitor.py @@ -11,10 +11,9 @@ """ import colors +import fileinput import pyinotify -import sys import time -from collections import Counter global nextcolor From ef2e17a23083d1681b0def4554ab752e5b924a9a Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Tue, 25 Feb 2014 17:23:41 -0800 Subject: [PATCH 11/12] Removed the delay. --- monitor.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/monitor.py b/monitor.py index 31db6d8..cf969e2 100644 --- a/monitor.py +++ b/monitor.py @@ -11,8 +11,8 @@ """ import colors -import fileinput import pyinotify +import sys import time global nextcolor @@ -23,24 +23,30 @@ def __init__(self, n): global nextcolor self.fg = nextcolor self.lastupdate = time.time() + self.freq = 0 def process_IN_MODIFY(self, event): now = time.time() report = "{0}s".format(str(round(now-self.lastupdate, 2))) self.lastupdate = now - print colors.color(' '.join((event.name, report)), fg=self.fg) + self.freq += 1 + if not self.freq % 50: + print colors.color(' '.join((event.name, report)), fg=self.fg) def process_default(self, event): print 'default: ', event.maskname - if __name__=="__main__": + filenames = sys.argv[1:] wm = pyinotify.WatchManager() - notifier = pyinotify.Notifier(wm, read_freq=60) + notifier = pyinotify.Notifier(wm) - for n, fn in enumerate(fileinput.input()): + print "Tracking..." + for n, fn in enumerate(filenames): + print fn nextcolor = (n % 6) + 1 wm.watch_transient_file(fn, pyinotify.IN_MODIFY, ProcessTransientFile) + print notifier.loop() From 3f4958bf8a690d6b5e115d8f8ac06254927d389b Mon Sep 17 00:00:00 2001 From: Kevin Driscoll Date: Thu, 27 Feb 2014 17:52:31 -0800 Subject: [PATCH 12/12] Fixed bug that was appending userIDs to the keywords --- track_keywords.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/track_keywords.py b/track_keywords.py index 4cd59a9..3dfd6c9 100644 --- a/track_keywords.py +++ b/track_keywords.py @@ -8,6 +8,9 @@ Better monitoring E-mail admins on error +Limitations: + Max 400 keywords, 5000 users + https://dev.twitter.com/discussions/4120 2013 """ @@ -337,7 +340,7 @@ def process(tracker, encoding='utf-16', tracer=0): for line in f: uid = line.strip() if uid: - keywords.append(uid) + user_ids.append(uid) sys.stderr.write('\t') sys.stderr.write(uid) sys.stderr.write('\n')