55import random
66import psycopg2
77import time
8+ import datetime
9+ import copy
810import aioprocessing
911import multiprocessing
1012
1113class MtmTxAggregate (object ):
1214
1315 def __init__ (self , name ):
1416 self .name = name
17+ self .isolation = 0
1518 self .clear_values ()
1619
1720 def clear_values (self ):
1821 self .max_latency = 0.0
19- self .running_latency = 0.0
2022 self .finish = {}
2123
22- def add_finish (self , name ):
24+ def start_tx (self ):
25+ self .start_time = datetime .datetime .now ()
26+
27+ def finish_tx (self , name ):
28+ latency = (datetime .datetime .now () - self .start_time ).total_seconds ()
29+
30+ if latency > self .max_latency :
31+ self .max_latency = latency
32+
2333 if name not in self .finish :
2434 self .finish [name ] = 1
2535 else :
2636 self .finish [name ] += 1
37+
38+ def as_dict (self ):
39+ return {
40+ 'running_latency' : (datetime .datetime .now () - self .start_time ).total_seconds (),
41+ 'max_latency' : self .max_latency ,
42+ 'isolation' : self .isolation ,
43+ 'finish' : copy .deepcopy (self .finish )
44+ }
2745
2846class MtmClient (object ):
2947
3048 def __init__ (self , dsns , n_accounts = 100000 ):
3149 self .n_accounts = n_accounts
3250 self .dsns = dsns
33-
34- self .aggregates = [MtmTxAggregate ('transfer' ), MtmTxAggregate ('transfer' ), MtmTxAggregate ('transfer' )]
51+ self .aggregates = {}
3552 self .initdb ()
3653
3754 def initdb (self ):
@@ -53,90 +70,106 @@ async def status(self):
5370 while True :
5471 msg = await self .child_pipe .coro_recv ()
5572 if msg == 'status' :
56- self .child_pipe .send (self .aggregates )
57- for aggregate in self .aggregates :
73+ serialized_aggs = {}
74+ for name , aggregate in self .aggregates .items ():
75+ serialized_aggs [name ] = aggregate .as_dict ()
5876 aggregate .clear_values ()
77+ self .child_pipe .send (serialized_aggs )
5978
60- async def transfer (self , i ):
61- pool = await aiopg .create_pool (self .dsns [i ])
62- async with pool .acquire () as conn :
63- async with conn .cursor () as cur :
64- while True :
65- amount = 1
66- from_uid = random .randint (1 , self .n_accounts - 1 )
67- to_uid = random .randint (1 , self .n_accounts - 1 )
68-
69- try :
70- await cur .execute ('begin' )
71- await cur .execute ('''update bank_test
72- set amount = amount - %s
73- where uid = %s''' ,
74- (amount , from_uid ))
75- await cur .execute ('''update bank_test
76- set amount = amount + %s
77- where uid = %s''' ,
78- (amount , to_uid ))
79- await cur .execute ('commit' )
80-
81- self .aggregates [i ].add_finish ('commit' )
82- except psycopg2 .Error as e :
83- await cur .execute ('rollback' )
84- self .aggregates [i ].add_finish (e .pgerror )
79+ async def exec_tx (self , tx_block , aggname_prefix , conn_i ):
80+ aggname = "%s_%i" % (aggname_prefix , conn_i )
81+ agg = self .aggregates [aggname ] = MtmTxAggregate (aggname )
8582
86- async def total (self , i ):
87- pool = await aiopg .create_pool (self .dsns [i ])
83+ pool = await aiopg .create_pool (self .dsns [conn_i ])
8884 async with pool .acquire () as conn :
8985 async with conn .cursor () as cur :
9086 while True :
91-
87+ agg . start_tx ()
9288 try :
93- await cur .execute ('select sum(amount) from bank_test' )
94-
95- if 'commit' not in self .tps_vector [i ]:
96- self .tps_vector [i ]['commit' ] = 1
97- else :
98- self .tps_vector [i ]['commit' ] += 1
89+ await tx_block (conn , cur )
90+ agg .finish_tx ('commit' )
9991 except psycopg2 .Error as e :
10092 await cur .execute ('rollback' )
101- if e .pgerror not in self .tps_vector [i ]:
102- self .tps_vector [i ][e .pgerror ] = 1
103- else :
104- self .tps_vector [i ][e .pgerror ] += 1
93+ agg .finish_tx (e .pgerror )
94+
95+ async def transfer_tx (self , conn , cur ):
96+ amount = 1
97+ # to avoid deadlocks:
98+ from_uid = random .randint (1 , self .n_accounts - 2 )
99+ to_uid = from_uid + 1
100+ await cur .execute ('begin' )
101+ await cur .execute ('''update bank_test
102+ set amount = amount - %s
103+ where uid = %s''' ,
104+ (amount , from_uid ))
105+ await cur .execute ('''update bank_test
106+ set amount = amount + %s
107+ where uid = %s''' ,
108+ (amount , to_uid ))
109+ await cur .execute ('commit' )
110+
111+ async def total_tx (self , conn , cur ):
112+ await cur .execute ('select sum(amount) from bank_test' )
113+ total = await cur .fetchone ()
114+ if total [0 ] != 0 :
115+ self .isolation_errors += 1
105116
106117 def run (self ):
107118 asyncio .set_event_loop_policy (uvloop .EventLoopPolicy ())
108119 self .loop = asyncio .get_event_loop ()
109120
110121 for i , _ in enumerate (self .dsns ):
111- asyncio .ensure_future (self .transfer ( i ))
112- # asyncio.ensure_future(self.total( i))
122+ asyncio .ensure_future (self .exec_tx ( self . transfer_tx , 'transfer' , i ))
123+ asyncio .ensure_future (self .exec_tx ( self . total_tx , 'sumtotal' , i ))
113124
114125 asyncio .ensure_future (self .status ())
115126
116127 self .loop .run_forever ()
117128
118129 def bgrun (self ):
119130 print ('Starting evloop in different process' );
120-
121131 self .parent_pipe , self .child_pipe = aioprocessing .AioPipe ()
122-
123132 self .evloop_process = multiprocessing .Process (target = self .run , args = ())
124133 self .evloop_process .start ()
125134
126135 def get_status (self ):
127136 c .parent_pipe .send ('status' )
128137 return c .parent_pipe .recv ()
129138
139+ def print_aggregates (serialized_agg ):
140+ columns = ['running_latency' , 'max_latency' , 'isolation' , 'finish' ]
141+
142+ # print table header
143+ print ("\t \t " , end = "" )
144+ for col in columns :
145+ print (col , end = "\t " )
146+ print ("\n " , end = "" )
147+
148+ serialized_agg
149+
150+ for aggname in sorted (serialized_agg .keys ()):
151+ agg = serialized_agg [aggname ]
152+ print ("%s\t " % aggname , end = "" )
153+ for col in columns :
154+ if col in agg :
155+ if isinstance (agg [col ], float ):
156+ print ("%.2f\t " % (agg [col ],), end = "\t " )
157+ else :
158+ print (agg [col ], end = "\t " )
159+ else :
160+ print ("-\t " , end = "" )
161+ print ("" )
162+ print ("" )
130163
131164c = MtmClient (['dbname=postgres user=stas host=127.0.0.1' ,
132165 'dbname=postgres user=stas host=127.0.0.1 port=5433' ,
133- 'dbname=postgres user=stas host=127.0.0.1 port=5434' ], n_accounts = 1000 )
134- # c = MtmClient(['dbname=postgres user=stas host=127.0.0.1'])
166+ 'dbname=postgres user=stas host=127.0.0.1 port=5434' ], n_accounts = 10000 )
135167c .bgrun ()
136168
137169while True :
138170 time .sleep (1 )
139171 aggs = c .get_status ()
140- for agg in aggs :
141- print (agg .finish )
172+ print_aggregates (aggs )
173+ # for k, v in aggs.items():
174+ # print(k, v.finish)
142175
0 commit comments