5
5
import os
6
6
import os .path as op
7
7
import psycopg2
8
+ from psycopg2 .pool import PoolError
8
9
9
10
sys .path .append (
10
11
op .abspath (op .dirname (__file__ )) + '/../'
@@ -24,12 +25,14 @@ class PostgresClientSystemTest(unittest.TestCase):
24
25
DB_NAME = 'test'
25
26
DB_PORT = os .environ .get ('POSTGRES_PORT' , 5432 )
26
27
TABLE_NAME = 'users'
28
+ POOL_SIZE = 3
27
29
28
30
def setUp (self ):
29
- dsn = 'user={} password={} dbname={} host=localhost port={}' . format (
30
- self .DB_USER , self .DB_PASSWORD , self .DB_NAME , self .DB_PORT )
31
+ self . dsn = 'user={} password={} dbname={} host=localhost port={}' \
32
+ . format ( self .DB_USER , self .DB_PASSWORD , self .DB_NAME , self .DB_PORT )
31
33
try :
32
- self .pg_client = PostgresClient (dsn = dsn , pool_size = 10 )
34
+ self .pg_client = PostgresClient (dsn = self .dsn ,
35
+ pool_size = self .POOL_SIZE )
33
36
except psycopg2 .OperationalError as err :
34
37
print ('Check that postgres docker container is started. '
35
38
'Check README for more information' )
@@ -65,6 +68,12 @@ def _drop_table(self):
65
68
def tearDown (self ):
66
69
self ._drop_table ()
67
70
71
+ def test_create_with_wrong_pool_value (self ):
72
+ with self .assertRaises (ValueError ) as err :
73
+ pg_client = PostgresClient (dsn = self .dsn , pool_size = 0 )
74
+ self .assertIsNone (pg_client )
75
+ self .assertIn ('Wrong pool_size value' , err )
76
+
68
77
def test_cursor (self ):
69
78
with self .pg_client .cursor as cursor :
70
79
cursor .execute ('SELECT * FROM users' )
@@ -98,10 +107,28 @@ def test_success_transaction(self):
98
107
self .assertEqual (len (result_set ), 101 )
99
108
100
109
def test_rollback_transaction (self ):
101
- # Insert null username must cause an error
102
- with self .pg_client . cursor as transaction :
103
- with self .assertRaises ( psycopg2 . DatabaseError ) as err :
110
+ # Inserting null username value must raise an error
111
+ with self .assertRaises ( psycopg2 . DatabaseError ) as err :
112
+ with self .pg_client . cursor as transaction :
104
113
transaction .execute (
105
- "INSERT INTO {} (username) VALUES (%s)" .format (self .TABLE_NAME ),
114
+ "INSERT INTO {} (username) VALUES (%s)" .format (
115
+ self .TABLE_NAME ),
106
116
(None , ))
107
- self .assertIn ('null value in column' , err .exception .message )
117
+ print ('abc' )
118
+ self .assertIn ('null value in column' , err .exception .message )
119
+ print ('transaction finished' )
120
+
121
+ def test_connection_pool_overflow (self ):
122
+ # Consume all connection to check overflow case
123
+ connections = []
124
+ for i in range (self .POOL_SIZE ):
125
+ connections .append (self .pg_client .acquire_conn ())
126
+
127
+ with self .assertRaises (PoolError ) as err :
128
+ with self .pg_client .cursor as cursor :
129
+ self .assertIsNone (cursor )
130
+ self .assertIn ('connection pool exhausted' , err )
131
+
132
+ # Release all connections back to pool
133
+ for conn in connections :
134
+ self .pg_client .release_conn (conn )
0 commit comments