Skip to content

Commit

Permalink
Merge pull request #49 from DataDog/matt/mango
Browse files Browse the repository at this point in the history
Matt/mango
  • Loading branch information
clutchski authored Aug 11, 2016
2 parents 9df15cd + 966f128 commit 0fd7416
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 72 deletions.
14 changes: 12 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@

desc "run tests"
desc "run all tests"
task :test do
sh "python setup.py test"
sh "tox"
end

desc "Run tests with envs matching the given pattern."
task :"test:envs", [:grep] do |t, args|
pattern = args[:grep]
if !pattern
puts 'specify a pattern like rake test:envs["py27.*mongo"]'
else
sh "tox -l | grep '#{pattern}' | xargs tox -e"
end
end

desc "install the library in dev mode"
Expand Down
7 changes: 7 additions & 0 deletions ddtrace/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ def iteritems(obj, **kwargs):
func = obj.items
return func(**kwargs)

def to_unicode(s):
""" Return a unicode string for the given bytes or string instance. """
if hasattr(s, "decode"):
return s.decode("utf-8")
else:
return stringify(s)

if PY2:
numeric_types = (int, long, float)
else:
Expand Down
138 changes: 133 additions & 5 deletions ddtrace/contrib/pymongo/parse.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,140 @@

import ctypes
import logging
import struct

# 3p
import bson
from bson.codec_options import CodecOptions
from bson.son import SON

# project
from ...compat import to_unicode
from ...ext import net as netx


log = logging.getLogger(__name__)


# MongoDB wire protocol commands
# http://docs.mongodb.com/manual/reference/mongodb-wire-protocol
OP_CODES = {
1 : "reply",
1000 : "msg",
2001 : "update",
2002 : "insert",
2003 : "reserved",
2004 : "query",
2005 : "get_more",
2006 : "delete",
2007 : "kill_cursors",
2010 : "command",
2011 : "command_reply",
}

# The maximum message length we'll try to parse
MAX_MSG_PARSE_LEN = 1024 * 1024

header_struct = struct.Struct("<iiii")


class Command(object):
""" Command stores information about a pymongo network command, """

__slots__ = ['name', 'coll', 'tags', 'metrics', 'query']
__slots__ = ['name', 'coll', 'db', 'tags', 'metrics', 'query']

def __init__(self, name, coll):
def __init__(self, name, db, coll):
self.name = name
self.coll = coll
self.db = db
self.tags = {}
self.metrics = {}
self.query = None

def __repr__(self):
return (
"Command("
"name=%s,"
"db=%s,"
"coll=%s)"
) % (self.name, self.db, self.coll)


def parse_msg(msg_bytes):
""" Return a command from a binary mongo db message or None if we shoudln't
trace it. The protocol is documented here:
http://docs.mongodb.com/manual/reference/mongodb-wire-protocol
"""
# NOTE[matt] this is used for queries in pymongo <= 3.0.0 and for inserts
# in up to date versions.
msg_len = len(msg_bytes)
if msg_len <= 0:
return None

header = header_struct.unpack_from(msg_bytes, 0)
(length, req_id, response_to, op_code) = header

op = OP_CODES.get(op_code)
if not op:
log.debug("unknown op code: %s", op_code)
return None

db = None
coll = None

offset = header_struct.size
cmd = None
if op == "query":
# NOTE[matt] inserts, updates and queries can all use this opcode

offset += 4 # skip flags
ns = _cstring(msg_bytes[offset:])
offset += len(ns) + 1 # include null terminator

# note: here coll could be '$cmd' because it can be overridden in the
# query itself (like {"insert":"songs"})
db, coll = _split_namespace(ns)

offset += 8 # skip numberToSkip & numberToReturn
if msg_len <= MAX_MSG_PARSE_LEN:
# FIXME[matt] don't try to parse large messages for performance
# reasons. ideally we'd just peek at the first bytes to get
# the critical info (op type, collection, query, # of docs)
# rather than parse the whole thing. i suspect only massive
# inserts will be affected.
codec = CodecOptions(SON)
spec = next(bson.decode_iter(msg_bytes[offset:], codec_options=codec))
cmd = parse_spec(spec, db)
else:
# let's still note that a command happened.
cmd = Command("command", db, "untraced_message_too_large")

# If the command didn't contain namespace info, set it here.
if not cmd.coll:
cmd.coll = coll

cmd.metrics[netx.BYTES_OUT] = msg_len
return cmd

def parse_query(query):
""" Return a command parsed from the given mongo db query. """
cmd = Command("query", query.coll)
db, coll = None, None
ns = getattr(query, "ns", None)
if ns:
# version < 3.1 stores the full namespace
db, coll = _split_namespace(ns)
else:
# version >= 3.1 stores the db and coll seperately
coll = getattr(query, "coll", None)
db = getattr(query, "db", None)

# FIXME[matt] mongo < 3.1 _Query doesn't not have a name field,
# so hardcode to query.
cmd = Command("query", db, coll)
cmd.query = query.spec
return cmd

def parse_spec(spec):
def parse_spec(spec, db=None):
""" Return a Command that has parsed the relevant detail for the given
pymongo SON spec.
"""
Expand All @@ -29,7 +144,7 @@ def parse_spec(spec):
if not items:
return None
name, coll = items[0]
cmd = Command(name, coll)
cmd = Command(name, db, coll)

if 'ordered' in spec: # in insert and update
cmd.tags['mongodb.ordered'] = spec['ordered']
Expand All @@ -52,4 +167,17 @@ def parse_spec(spec):

return cmd

def _cstring(raw):
""" Return the first null terminated cstring from the bufffer. """
return ctypes.create_string_buffer(raw).value

def _split_namespace(ns):
""" Return a tuple of (db, collecton) from the "db.coll" string. """
if ns:
# NOTE[matt] ns is unicode or bytes depending on the client version
# so force cast to unicode
split = to_unicode(ns).split(".", 1)
if len(split) == 1:
raise Exception("namespace doesn't contain period: %s" % ns)
return split
return (None, None)
73 changes: 48 additions & 25 deletions ddtrace/contrib/pymongo/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from wrapt import ObjectProxy

# project
from ...compat import iteritems
from ...compat import iteritems, json
from ...ext import AppTypes
from ...ext import mongo as mongox
from ...ext import net as netx
from .parse import parse_spec, parse_query, Command
from .parse import parse_spec, parse_query, parse_msg


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,38 +39,44 @@ def __init__(self, tracer, service, sock):
def command(self, dbname, spec, *args, **kwargs):
cmd = None
try:
cmd = parse_spec(spec)
cmd = parse_spec(spec, dbname)
except Exception:
log.exception("error parsing spec. skipping trace")

# skip tracing if we don't have a piece of data we need
if not dbname or not cmd:
return self.__wrapped__.command(dbname, spec, *args, **kwargs)

with self.__trace(dbname, cmd):
cmd.db = dbname
with self.__trace(cmd):
return self.__wrapped__.command(dbname, spec, *args, **kwargs)

def write_command(self, *args, **kwargs):
# FIXME[matt] parse the db name and collection from the
# message.
coll = ""
db = ""
cmd = Command("insert_many", coll)
with self.__trace(db, cmd) as s:
s.resource = "insert_many"
result = self.__wrapped__.write_command(*args, **kwargs)
def write_command(self, request_id, msg):
cmd = None
try:
cmd = parse_msg(msg)
except Exception:
log.exception("error parsing msg")

# if we couldn't parse it, don't try to trace it.
if not cmd:
return self.__wrapped__.write_command(request_id, msg)

with self.__trace(cmd) as s:
s.resource = _resource_from_cmd(cmd)
result = self.__wrapped__.write_command(request_id, msg)
if result:
s.set_metric(mongox.ROWS, result.get("n", -1))
return result

def __trace(self, db, cmd):
def __trace(self, cmd):
s = self._tracer.trace(
"pymongo.cmd",
span_type=mongox.TYPE,
service=self._srv)

if db:
s.set_tag(mongox.DB, db)
if cmd.db:
s.set_tag(mongox.DB, cmd.db)
if cmd:
s.set_tag(mongox.COLLECTION, cmd.coll)
s.set_tags(cmd.tags)
Expand All @@ -93,31 +99,35 @@ def __init__(self, tracer, service, topology):
self._srv = service

def send_message_with_response(self, operation, *args, **kwargs):

# if we're processing something unexpected, just skip tracing.
if getattr(operation, 'name', None) != 'find':
cmd = None
# Only try to parse something we think is a query.
if self._is_query(operation):
try:
cmd = parse_query(operation)
except Exception:
log.exception("error parsing query")

# if we couldn't parse or shouldn't trace the message, just go.
if not cmd:
return self.__wrapped__.send_message_with_response(
operation,
*args,
**kwargs)

# trace the given query.
cmd = parse_query(operation)
with self._tracer.trace(
"pymongo.cmd",
span_type=mongox.TYPE,
service=self._srv) as span:

span.resource = _resource_from_cmd(cmd)
span.set_tag(mongox.DB, operation.db)
span.set_tag(mongox.DB, cmd.db)
span.set_tag(mongox.COLLECTION, cmd.coll)
span.set_tags(cmd.tags)

result = self.__wrapped__.send_message_with_response(
operation,
*args,
**kwargs
)
**kwargs)

if result and result.address:
_set_address_tags(span, result.address)
Expand All @@ -131,6 +141,12 @@ def get_socket(self, *args, **kwargs):
else:
yield TracedSocket(self._tracer, self._srv, s)

@staticmethod
def _is_query(op):
# NOTE: _Query should alwyas have a spec field
return hasattr(op, 'spec')


class TracedTopology(ObjectProxy):

_tracer = None
Expand All @@ -155,6 +171,10 @@ class TracedMongoClient(ObjectProxy):
_srv = None

def __init__(self, tracer, service, client):
# NOTE[matt] the TracedMongoClient attempts to trace all of the network
# calls in the trace library. This is good because it measures the
# actual network time. It's bad because it uses a private API which
# could change. We'll see how this goes.
client._topology = TracedTopology(tracer, service, client._topology)
super(TracedMongoClient, self).__init__(client)
self._tracer = tracer
Expand Down Expand Up @@ -189,6 +209,9 @@ def _set_address_tags(span, address):
def _resource_from_cmd(cmd):
if cmd.query is not None:
nq = normalize_filter(cmd.query)
return "%s %s %s" % (cmd.name, cmd.coll, nq)
# needed to dump json so we don't get unicode
# dict keys like {u'foo':'bar'}
q = json.dumps(nq)
return "%s %s %s" % (cmd.name, cmd.coll, q)
else:
return "%s %s" % (cmd.name, cmd.coll)
1 change: 1 addition & 0 deletions ddtrace/ext/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
TARGET_HOST = "out.host"
TARGET_PORT = "out.port"

BYTES_OUT = "net.out.bytes"
6 changes: 3 additions & 3 deletions tests/contrib/mongoengine/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def test_insert_update_delete_query():
spans = tracer.writer.pop()
eq_(len(spans), 1)
span = spans[0]
eq_(span.resource, "query artist {'first_name': '?'}")
eq_(span.resource, 'query artist {"first_name": "?"}')
eq_(span.span_type, 'mongodb')
eq_(span.service, 'my-mongo')
_assert_timing(span, start, end)
Expand All @@ -100,7 +100,7 @@ def test_insert_update_delete_query():
spans = tracer.writer.pop()
eq_(len(spans), 1)
span = spans[0]
eq_(span.resource, "update artist {'_id': '?'}")
eq_(span.resource, 'update artist {"_id": "?"}')
eq_(span.span_type, 'mongodb')
eq_(span.service, 'my-mongo')
_assert_timing(span, start, end)
Expand All @@ -113,7 +113,7 @@ def test_insert_update_delete_query():
spans = tracer.writer.pop()
eq_(len(spans), 1)
span = spans[0]
eq_(span.resource, "delete artist {'_id': '?'}")
eq_(span.resource, 'delete artist {"_id": "?"}')
eq_(span.span_type, 'mongodb')
eq_(span.service, 'my-mongo')
_assert_timing(span, start, end)
Expand Down
Loading

0 comments on commit 0fd7416

Please sign in to comment.