|
1 | 1 | # -*- coding: utf-8 -*- |
2 | 2 | import psycopg2 |
| 3 | +import psycopg2.pool as pgpool |
3 | 4 | import psycopg2.extras as pg_extras |
4 | 5 | import abc |
5 | 6 | import six |
|
11 | 12 |
|
12 | 13 |
|
13 | 14 | class DatabaseManager(object): |
14 | | - def __init__(self, dsn, pool_size=1): |
| 15 | + def __init__(self, dsn=None, database=None, user=None, password=None, |
| 16 | + host=None, port=None, pool_size=1): |
15 | 17 | self.dsn = dsn |
16 | | - self._conn = None |
17 | | - self.pool_size = pool_size |
| 18 | + |
| 19 | + # Pass connection params as is |
| 20 | + conn_params = dict(dsn=dsn, database=database, user=user, |
| 21 | + password=password, host=host, port=port) |
| 22 | + if pool_size < 1: |
| 23 | + raise ValueError('Wrong pool_size value. Must be >= 1. ' |
| 24 | + 'Current: {}'.format(pool_size)) |
| 25 | + # Init thread-safe connection pool |
| 26 | + self.pool = pgpool.ThreadedConnectionPool( |
| 27 | + minconn=1, maxconn=pool_size, **conn_params) |
18 | 28 |
|
19 | 29 | @property |
20 | 30 | def connection(self): |
21 | 31 | """Lazy connection property |
22 | 32 |
|
23 | 33 | :return: postgresql connection instance |
24 | 34 | """ |
25 | | - if self._conn is None: |
26 | | - self._conn = self._get_connection() |
27 | | - return self._conn |
| 35 | + return self.pool.getconn() |
28 | 36 |
|
| 37 | + @contextmanager |
| 38 | + def _get_cursor(self, cursor_factory=None): |
| 39 | + conn = self.connection |
| 40 | + try: |
| 41 | + yield conn.cursor(cursor_factory=cursor_factory) |
| 42 | + conn.commit() |
| 43 | + except psycopg2.DatabaseError as err: |
| 44 | + conn.rollback() |
| 45 | + raise psycopg2.DatabaseError(err) |
| 46 | + finally: |
| 47 | + self.pool.putconn(conn) |
| 48 | + |
| 49 | + # TODO: rename it |
29 | 50 | @property |
30 | 51 | def cursor(self): |
31 | | - return self.connection.cursor() |
| 52 | + return self._get_cursor() |
32 | 53 |
|
33 | 54 | @property |
34 | 55 | def dict_cursor(self): |
35 | 56 | """Return dict cursor. It enables accessing via column names instead |
36 | 57 | of indexes |
37 | 58 | """ |
38 | | - return self.connection.cursor(cursor_factory=pg_extras.DictCursor) |
39 | | - |
40 | | - def _get_connection(self): |
41 | | - return psycopg2.connect(dsn=self.dsn) |
42 | | - |
43 | | - @contextmanager |
44 | | - def get_transaction_cursor(self): |
45 | | - """Public transaction context manager. Useful for getting transactional |
46 | | - cursor to be able execute several SQL requests in one transaction |
47 | | -
|
48 | | - Example: |
49 | | - with db_manager.get_transaction_cursor() as t_cursor: |
50 | | - t_cursor.execute('SELECT * FROM ...') |
51 | | - t_cursor.execute('INSERT INTO my_table VALUES ...') |
52 | | -
|
53 | | - So, the code above will be executed within one transaction |
54 | | -
|
55 | | - """ |
56 | | - with self.connection as conn: |
57 | | - with conn.cursor() as t_cursor: |
58 | | - yield t_cursor |
| 59 | + return self._get_cursor(cursor_factory=pg_extras.DictCursor) |
59 | 60 |
|
60 | 61 |
|
61 | 62 | @six.add_metaclass(abc.ABCMeta) |
|
0 commit comments