Skip to content

Commit 7c552f7

Browse files
committed
Updated unit-tests
1 parent 99b47ad commit 7c552f7

File tree

6 files changed

+166
-61
lines changed

6 files changed

+166
-61
lines changed

tasq/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
from tasq.worker.jobqueue import JobQueue
55
from tasq.worker.executor import ProcessQueueExecutor
66
from tasq.remote.client import Client
7-
from tasq.remote.backend import RedisStoreBackend, RedisBackend
7+
from tasq.remote.backend import (
8+
RedisStoreBackend,
9+
RedisBackend,
10+
RabbitMQBackend,
11+
)
812
from tasq.remote.runner import Runners
913
from tasq.worker.actors import ClientWorker
1014
from tasq.actors.routers import RoundRobinRouter, actor_pool
@@ -61,6 +65,8 @@ def queue(backend="zmq://localhost:9000", store=None, signkey=None):
6165
client = Client(_backend)
6266
elif isinstance(backend, RedisBackend):
6367
client = Client(BackendConnection(backend))
68+
elif isinstance(backend, RabbitMQBackend):
69+
client = Client(BackendConnection(backend))
6470
if store:
6571
urlstore = urlparse(store)
6672
assert urlstore.scheme in {

tasq/remote/backend.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,10 @@ class RabbitMQBackend:
364364

365365
"""Simple Queue with RabbitMQ Backend"""
366366

367-
def __init__(self, host, port, role, name, namespace="queue"):
367+
def __init__(self, amqp_factory, role, name, namespace="queue"):
368368
"""The default connection parameters are: host='localhost', port=5672
369369
"""
370-
self._host, self._port = host, port
370+
self._driver = amqp_factory()
371371
assert role in {"receiver", "sender"}, f"Unknown role {role}"
372372
self._role = role
373373
self._queue_name = f"{namespace}:{name}"
@@ -378,9 +378,7 @@ def __init__(self, host, port, role, name, namespace="queue"):
378378
threading.Thread(target=self._start, daemon=True).start()
379379

380380
def _get_channel(self):
381-
channel = pika.BlockingConnection(
382-
pika.ConnectionParameters(host=self._host, port=self._port)
383-
).channel()
381+
channel = self._driver.channel()
384382
return channel
385383

386384
def _get_job(self, ch, method, _, body):
@@ -408,8 +406,8 @@ def _start(self):
408406

409407
def __repr__(self):
410408
return (
411-
f"RabbitMQBackend(amqp://{self._host}:{self._port}/"
412-
f"?name={self._name}, role=self._role)"
409+
f"RabbitMQBackend({self._driver}?"
410+
f"name={self._name}, role=self._role)"
413411
)
414412

415413
def put_job(self, serialized_job):
@@ -433,7 +431,8 @@ def get_available_result(self, timeout=None):
433431
return None
434432

435433
def stop(self):
436-
pass
434+
ch = self._get_channel()
435+
ch.stop()
437436

438437
def close(self):
439-
pass
438+
self.stop()

tasq/remote/connection.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,9 @@
55
This module contains classes to define connections using zmq sockets.
66
"""
77

8+
import sys
89
import zmq
910
from urllib.parse import urlparse
10-
11-
try:
12-
import redis
13-
except ImportError:
14-
print("You need to install redis python driver to use redis backend")
15-
1611
from .backend import RedisBackend, RabbitMQBackend
1712
from .sockets import CloudPickleContext, BackendSocket
1813
from ..exception import BackendCommunicationErrorException
@@ -167,19 +162,42 @@ def from_url(cls, url, signkey=None):
167162
"port": u.port or 6379,
168163
}
169164
if scheme == "redis":
165+
try:
166+
import redis
167+
except ImportError:
168+
print(
169+
"You need to install redis python driver to use redis backend"
170+
)
171+
sys.exit(1)
170172
conn_args["db"] = int(extraparams.get("db", 0))
171173
backend = RedisBackend(
172174
lambda: redis.StrictRedis(**conn_args), name=name
173175
)
174176
else:
175-
conn_args["role"] = extraparams.get("role", "sender")
176-
backend = RabbitMQBackend(**conn_args)
177+
try:
178+
import pika
179+
except ImportError:
180+
print("You need to install pika to use rabbitmq backend")
181+
sys.exit(1)
182+
role = extraparams.get("role", "sender")
183+
backend = RabbitMQBackend(
184+
lambda: pika.BlockingConnection(
185+
pika.ConnectionParameters(**conn_args)
186+
),
187+
role=role,
188+
name=name,
189+
)
177190
return cls(backend, signkey)
178191

179192

180193
def connect_redis_backend(
181194
host, port, db, name, namespace="queue", signkey=None
182195
):
196+
try:
197+
import redis
198+
except ImportError:
199+
print("You need to install redis python driver to use redis backend")
200+
sys.exit(1)
183201
return BackendConnection(
184202
RedisBackend(
185203
lambda: redis.StrictRedis(host, port, db), name, namespace
@@ -191,6 +209,19 @@ def connect_redis_backend(
191209
def connect_rabbitmq_backend(
192210
host, port, role, name, namespace="queue", signkey=None
193211
):
212+
try:
213+
import pika
214+
except ImportError:
215+
print("You need to install pika to use rabbitmq backend")
216+
sys.exit(1)
194217
return BackendConnection(
195-
RabbitMQBackend(host, port, role, name, namespace), signkey=signkey,
218+
RabbitMQBackend(
219+
lambda: pika.BlockingConnection(
220+
pika.ConnectionParameters(host=host, port=port)
221+
),
222+
role,
223+
name,
224+
namespace,
225+
),
226+
signkey=signkey,
196227
)

tests/backend_test.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import time
2+
import queue
23
import asyncio
34
import unittest
5+
import threading
46
import collections
57
from unittest.mock import patch
68
from tasq.remote.backend import ZMQBackend, RedisBackend, RabbitMQBackend
@@ -70,6 +72,53 @@ def lrange(self, queue_name, start, end):
7072
return self.queues[queue_name][start:end]
7173

7274

75+
class AMQPMethod:
76+
def __init__(self):
77+
self.delivery_tag = None
78+
79+
80+
class FakeAMQPChannel:
81+
def __init__(self):
82+
self.run = threading.Event()
83+
self.queues = collections.defaultdict(queue.Queue)
84+
self.consume_queue = None
85+
self.on_message = None
86+
87+
def basic_publish(self, exchange, queue_name, item):
88+
self.queues[queue_name].put(item)
89+
90+
def basic_consume(self, queue, on_message_callback):
91+
self.on_message = on_message_callback
92+
93+
def basic_ack(self, delivery_tag):
94+
pass
95+
96+
def basic_qos(self, prefetch_count):
97+
pass
98+
99+
def queue_declare(self, queue, durable=True):
100+
self.consume_queue = queue
101+
102+
def start_consuming(self):
103+
while not self.run.is_set():
104+
item = self.queues[self.consume_queue].get()
105+
if not item:
106+
break
107+
self.on_message(self, AMQPMethod(), None, item)
108+
109+
def stop(self):
110+
self.queues[self.consume_queue].put(None)
111+
self.run.set()
112+
113+
114+
_amqp_channel = FakeAMQPChannel()
115+
116+
117+
class FakeAMQPClient:
118+
def channel(self):
119+
return _amqp_channel
120+
121+
73122
class TestZMQBackend(unittest.TestCase):
74123
def test_init_zmqbackend(self):
75124
with patch(
@@ -138,4 +187,10 @@ def test_redis_get_pending_jobs(self):
138187

139188

140189
class TestRabbitMQBackend(unittest.TestCase):
141-
pass
190+
def test_amqp_put_job(self):
191+
backend = RabbitMQBackend(
192+
lambda: FakeAMQPClient(), role="receiver", name="test-queue"
193+
)
194+
backend.put_job("test-job")
195+
self.assertEqual(backend.get_next_job(), "test-job")
196+
backend.stop()

tests/queue_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,4 @@ def test_multiqueue_put_blocking(self):
141141
self.assertEqual(tq.pending_jobs(), [])
142142
self.assertEqual(len(tq.results()), 3)
143143
self.assertTrue(res)
144-
self.assertAlmostEqual(t2 - t1, .1, delta=0.1)
144+
self.assertAlmostEqual(t2 - t1, 0.1, delta=0.1)

tests/runner_test.py

Lines changed: 54 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import queue
22
import unittest
33
import threading
4+
from unittest.mock import patch
45
from concurrent.futures import Future
56
from tasq.remote.runner import Runner, ZMQRunner
67
from tasq.job import Job
8+
from .backend_test import FakeRedisClient, FakeAMQPClient
79

810

911
class FakeWorker:
@@ -40,43 +42,55 @@ def test_runner_running(self):
4042
def test_runner_factory(self):
4143
from tasq.remote.runner import runner_factory
4244

43-
zmqrunner = runner_factory.create(
44-
"ZMQ_ACTOR_RUNNER", host="localhost", channel=(20000, 20001)
45-
)
46-
zmqqrunner = runner_factory.create(
47-
"ZMQ_QUEUE_RUNNER", host="localhost", channel=(20000, 20001)
48-
)
49-
redis_actor_runner = runner_factory.create(
50-
"REDIS_ACTOR_RUNNER",
51-
host="1.2.3.4",
52-
port=6333,
53-
db=1,
54-
name="test-queue",
55-
)
56-
redis_queue_runner = runner_factory.create(
57-
"REDIS_QUEUE_RUNNER",
58-
host="1.2.3.4",
59-
port=6333,
60-
db=1,
61-
name="test-queue",
62-
)
63-
rabbitmq_queue_runner = runner_factory.create(
64-
"AMQP_QUEUE_RUNNER",
65-
host="1.2.3.4",
66-
port=6333,
67-
role="sender",
68-
name="test-queue",
69-
)
70-
rabbitmq_actor_runner = runner_factory.create(
71-
"AMQP_ACTOR_RUNNER",
72-
host="1.2.3.4",
73-
port=6333,
74-
role="sender",
75-
name="test-queue",
76-
)
77-
self.assertTrue(isinstance(zmqrunner, ZMQRunner))
78-
self.assertTrue(isinstance(zmqqrunner, ZMQRunner))
79-
self.assertTrue(isinstance(redis_actor_runner, Runner))
80-
self.assertTrue(isinstance(redis_queue_runner, Runner))
81-
self.assertTrue(isinstance(rabbitmq_actor_runner, Runner))
82-
self.assertTrue(isinstance(rabbitmq_queue_runner, Runner))
45+
with patch("tasq.remote.runner.ZMQBackend") as zmqmock, patch(
46+
"tasq.remote.runner.asyncio.get_event_loop"
47+
) as loopmock, patch("redis.StrictRedis") as rmock, patch(
48+
"pika.BlockingConnection"
49+
) as pmock, patch(
50+
"tasq.remote.runner.worker.build_jobqueue"
51+
) as jq:
52+
jq.return_value = None
53+
pmock.return_value = FakeAMQPClient()
54+
rmock.return_value = FakeRedisClient()
55+
loopmock.return_value = None
56+
zmqmock.return_value = None
57+
zmqrunner = runner_factory.create(
58+
"ZMQ_ACTOR_RUNNER", host="localhost", channel=(20000, 20001)
59+
)
60+
zmqqrunner = runner_factory.create(
61+
"ZMQ_QUEUE_RUNNER", host="localhost", channel=(20000, 20001)
62+
)
63+
redis_actor_runner = runner_factory.create(
64+
"REDIS_ACTOR_RUNNER",
65+
host="1.2.3.4",
66+
port=6333,
67+
db=1,
68+
name="test-queue",
69+
)
70+
redis_queue_runner = runner_factory.create(
71+
"REDIS_QUEUE_RUNNER",
72+
host="1.2.3.4",
73+
port=6333,
74+
db=1,
75+
name="test-queue",
76+
)
77+
rabbitmq_queue_runner = runner_factory.create(
78+
"AMQP_QUEUE_RUNNER",
79+
host="1.2.3.4",
80+
port=6333,
81+
role="sender",
82+
name="test-queue",
83+
)
84+
rabbitmq_actor_runner = runner_factory.create(
85+
"AMQP_ACTOR_RUNNER",
86+
host="1.2.3.4",
87+
port=6333,
88+
role="sender",
89+
name="test-queue",
90+
)
91+
self.assertTrue(isinstance(zmqrunner, ZMQRunner))
92+
self.assertTrue(isinstance(zmqqrunner, ZMQRunner))
93+
self.assertTrue(isinstance(redis_actor_runner, Runner))
94+
self.assertTrue(isinstance(redis_queue_runner, Runner))
95+
self.assertTrue(isinstance(rabbitmq_actor_runner, Runner))
96+
self.assertTrue(isinstance(rabbitmq_queue_runner, Runner))

0 commit comments

Comments
 (0)