11#!/usr/bin/env python3
22import asyncio
3- import uvloop
3+ # import uvloop
44import aiopg
55import random
66import psycopg2
@@ -50,6 +50,7 @@ def __init__(self, dsns, n_accounts=100000):
5050 self .dsns = dsns
5151 self .aggregates = {}
5252 self .initdb ()
53+ self .running = True
5354
5455 def initdb (self ):
5556 conn = psycopg2 .connect (self .dsns [0 ])
@@ -66,63 +67,67 @@ def initdb(self):
6667 cur .close ()
6768 conn .close ()
6869
69- async def status (self ):
70- while True :
71- msg = await self .child_pipe .coro_recv ()
70+ @asyncio .coroutine
71+ def status (self ):
72+ while self .running :
73+ msg = yield from self .child_pipe .coro_recv ()
7274 if msg == 'status' :
7375 serialized_aggs = {}
7476 for name , aggregate in self .aggregates .items ():
7577 serialized_aggs [name ] = aggregate .as_dict ()
7678 aggregate .clear_values ()
7779 self .child_pipe .send (serialized_aggs )
7880
79- async def exec_tx (self , tx_block , aggname_prefix , conn_i ):
81+
82+ @asyncio .coroutine
83+ def exec_tx (self , tx_block , aggname_prefix , conn_i ):
8084 aggname = "%s_%i" % (aggname_prefix , conn_i )
8185 agg = self .aggregates [aggname ] = MtmTxAggregate (aggname )
82-
83- pool = await aiopg . create_pool ( self . dsns [ conn_i ] )
84- async with pool . acquire () as conn :
85- async with conn . cursor () as cur :
86- while True :
87- agg . start_tx ()
88- try :
89- await tx_block (conn , cur )
90- agg .finish_tx ('commit' )
91- except psycopg2 .Error as e :
92- await cur . execute ( 'rollback' )
93- agg . finish_tx ( e . pgerror )
94-
95- async def transfer_tx (self , conn , cur ):
86+ pool = yield from aiopg . create_pool ( self . dsns [ conn_i ])
87+ conn = yield from pool . acquire ( )
88+ cur = yield from conn . cursor ()
89+ while self . running :
90+ agg . start_tx ()
91+ try :
92+ yield from cur . execute ( 'commit' )
93+ yield from tx_block (conn , cur )
94+ agg .finish_tx ('commit' )
95+ except psycopg2 .Error as e :
96+ agg . finish_tx ( e . pgerror )
97+
98+ @ asyncio . coroutine
99+ def transfer_tx (self , conn , cur ):
96100 amount = 1
97101 # to avoid deadlocks:
98102 from_uid = random .randint (1 , self .n_accounts - 2 )
99103 to_uid = from_uid + 1
100- await cur .execute ('begin' )
101- await cur .execute ('''update bank_test
104+ yield from cur .execute ('begin' )
105+ yield from cur .execute ('''update bank_test
102106 set amount = amount - %s
103107 where uid = %s''' ,
104108 (amount , from_uid ))
105- await cur .execute ('''update bank_test
109+ yield from cur .execute ('''update bank_test
106110 set amount = amount + %s
107111 where uid = %s''' ,
108112 (amount , to_uid ))
109- await cur .execute ('commit' )
113+ yield from cur .execute ('commit' )
110114
111- async def total_tx (self , conn , cur ):
112- await cur .execute ('select sum(amount) from bank_test' )
113- total = await cur .fetchone ()
115+ @asyncio .coroutine
116+ def total_tx (self , conn , cur ):
117+ yield from cur .execute ('select sum(amount) from bank_test' )
118+ total = yield from cur .fetchone ()
114119 if total [0 ] != 0 :
115120 self .isolation_errors += 1
116121
117122 def run (self ):
118- asyncio .set_event_loop_policy (uvloop .EventLoopPolicy ())
123+ # asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
119124 self .loop = asyncio .get_event_loop ()
120125
121126 for i , _ in enumerate (self .dsns ):
122- asyncio .ensure_future (self .exec_tx (self .transfer_tx , 'transfer' , i ))
123- asyncio .ensure_future (self .exec_tx (self .total_tx , 'sumtotal' , i ))
127+ asyncio .async (self .exec_tx (self .transfer_tx , 'transfer' , i ))
128+ asyncio .async (self .exec_tx (self .total_tx , 'sumtotal' , i ))
124129
125- asyncio .ensure_future (self .status ())
130+ asyncio .async (self .status ())
126131
127132 self .loop .run_forever ()
128133
@@ -133,43 +138,46 @@ def bgrun(self):
133138 self .evloop_process .start ()
134139
135140 def get_status (self ):
136- c .parent_pipe .send ('status' )
137- return c .parent_pipe .recv ()
138-
139- def print_aggregates (serialized_agg ):
140- columns = ['running_latency' , 'max_latency' , 'isolation' , 'finish' ]
141-
142- # print table header
143- print ("\t \t " , end = "" )
144- for col in columns :
145- print (col , end = "\t " )
146- print ("\n " , end = "" )
141+ self .parent_pipe .send ('status' )
142+ return self .parent_pipe .recv ()
143+
144+ def stop (self ):
145+ self .running = False
146+ self .evloop_process .terminate ()
147147
148- serialized_agg
148+ @classmethod
149+ def print_aggregates (cls , serialized_agg ):
150+ columns = ['running_latency' , 'max_latency' , 'isolation' , 'finish' ]
149151
150- for aggname in sorted (serialized_agg .keys ()):
151- agg = serialized_agg [aggname ]
152- print ("%s\t " % aggname , end = "" )
152+ # print table header
153+ print ("\t \t " , end = "" )
153154 for col in columns :
154- if col in agg :
155- if isinstance (agg [col ], float ):
156- print ("%.2f\t " % (agg [col ],), end = "\t " )
155+ print (col , end = "\t " )
156+ print ("\n " , end = "" )
157+
158+ serialized_agg
159+
160+ for aggname in sorted (serialized_agg .keys ()):
161+ agg = serialized_agg [aggname ]
162+ print ("%s\t " % aggname , end = "" )
163+ for col in columns :
164+ if col in agg :
165+ if isinstance (agg [col ], float ):
166+ print ("%.2f\t " % (agg [col ],), end = "\t " )
167+ else :
168+ print (agg [col ], end = "\t " )
157169 else :
158- print (agg [col ], end = "\t " )
159- else :
160- print ("-\t " , end = "" )
170+ print ("-\t " , end = "" )
171+ print ("" )
161172 print ("" )
162- print ("" )
163-
164- c = MtmClient (['dbname=postgres user=stas host=127.0.0.1' ,
165- 'dbname=postgres user=stas host=127.0.0.1 port=5433' ,
166- 'dbname=postgres user=stas host=127.0.0.1 port=5434' ], n_accounts = 10000 )
167- c .bgrun ()
168173
169- while True :
170- time .sleep (1 )
171- aggs = c .get_status ()
172- print_aggregates (aggs )
173- # for k, v in aggs.items():
174- # print(k, v.finish)
175174
175+ if __name__ == "__main__" :
176+ c = MtmClient (['dbname=postgres user=postgres host=127.0.0.1' ,
177+ 'dbname=postgres user=postgres host=127.0.0.1 port=5433' ,
178+ 'dbname=postgres user=postgres host=127.0.0.1 port=5434' ], n_accounts = 10000 )
179+ c .bgrun ()
180+ while True :
181+ time .sleep (1 )
182+ aggs = c .get_status ()
183+ MtmClient .print_aggregates (aggs )
0 commit comments