44import time
55import sys
66from event_history import *
7+ import select
8+ import signal
79
810class ClientCollection (object ):
911 def __init__ (self , connstrs ):
@@ -30,17 +32,29 @@ def stop(self):
3032 for client in self ._clients :
3133 client .stop ()
3234
35+ def set_acc_to_tx (self , max_acc ):
36+ for client in self ._clients :
37+ client .set_acc_to_tx (max_acc )
38+
3339
3440class BankClient (object ):
3541
36- def __init__ (self , connstr , node_id ):
42+ def __init__ (self , connstr , node_id , accounts = 10000 ):
3743 self .connstr = connstr
3844 self .node_id = node_id
3945 self .run = Value ('b' , True )
4046 self ._history = EventHistory ()
41- self .accounts = 10000
47+ self .accounts = accounts
48+ self .accounts_to_tx = accounts
4249 self .show_errors = True
4350
51+ #x = self
52+ #def on_sigint(sig, frame):
53+ # x.stop()
54+ #
55+ #signal.signal(signal.SIGINT, on_sigint)
56+
57+
4458 def initialize (self ):
4559 conn = psycopg2 .connect (self .connstr )
4660 cur = conn .cursor ()
@@ -57,6 +71,22 @@ def initialize(self):
5771 cur .close ()
5872 conn .close ()
5973
74+ def aconn (self ):
75+ return psycopg2 .connect (self .connstr , async = 1 )
76+
77+ @classmethod
78+ def wait (cls , conn ):
79+ while 1 :
80+ state = conn .poll ()
81+ if state == psycopg2 .extensions .POLL_OK :
82+ break
83+ elif state == psycopg2 .extensions .POLL_WRITE :
84+ select .select ([], [conn .fileno ()], [])
85+ elif state == psycopg2 .extensions .POLL_READ :
86+ select .select ([conn .fileno ()], [], [])
87+ else :
88+ raise psycopg2 .OperationalError ("poll() returned %s" % state )
89+
6090 @property
6191 def history (self ):
6292 return self ._history
@@ -74,25 +104,16 @@ def exec_tx(self, name, tx_block):
74104
75105 if conn .closed :
76106 self .history .register_finish (event_id , 'ReConnect' )
77- try :
78- conn = psycopg2 .connect (self .connstr )
79- cur = conn .cursor ()
80- except :
81- continue
82- else :
83- continue
107+ conn = psycopg2 .connect (self .connstr )
108+ cur = conn .cursor ()
84109
85110 try :
86- tx_block (conn , cur )
111+ tx_block (conn , cur )
112+ self .history .register_finish (event_id , 'Commit' )
87113 except psycopg2 .InterfaceError :
88114 self .history .register_finish (event_id , 'InterfaceError' )
89115 except psycopg2 .Error :
90116 self .history .register_finish (event_id , 'PsycopgError' )
91- except :
92- print (sys .exc_info ())
93- self .history .register_finish (event_id , 'OtherError' )
94- else :
95- self .history .register_finish (event_id , 'Commit' )
96117
97118 cur .close ()
98119 conn .close ()
@@ -103,17 +124,20 @@ def tx(conn, cur):
103124 cur .execute ('select sum(amount) from bank_test' )
104125 res = cur .fetchone ()
105126 if res [0 ] != 0 :
106- print ("Isolation error, total = %d" % (res [0 ],))
127+ print ("Isolation error, total = %d, node = %d " % (res [0 ],self . node_id ))
107128 raise BaseException
108129
109130 self .exec_tx ('total' , tx )
110131
132+ def set_acc_to_tx (self , max_acc ):
133+ self .accounts_to_tx = max_acc
134+
111135 def transfer_money (self ):
112136
113137 def tx (conn , cur ):
114138 amount = 1
115- from_uid = random .randrange (1 , self .accounts - 10 )
116- to_uid = from_uid + 1 # random.randrange(1, self.accounts + 1)
139+ from_uid = random .randrange (1 , self .accounts_to_tx - 1 )
140+ to_uid = random .randrange (1 , self .accounts_to_tx - 1 )
117141
118142 conn .commit ()
119143 cur .execute ('''update bank_test
@@ -129,7 +153,10 @@ def tx(conn, cur):
129153 self .exec_tx ('transfer' , tx )
130154
131155 def start (self ):
132- self .transfer_process = Process (target = self .transfer_money , args = ())
156+ print ('Starting client' );
157+ self .run .value = True
158+
159+ self .transfer_process = Process (target = self .transfer_money , name = "txor" , args = ())
133160 self .transfer_process .start ()
134161
135162 self .total_process = Process (target = self .check_total , args = ())
@@ -138,7 +165,7 @@ def start(self):
138165 return
139166
140167 def stop (self ):
141- print ('Stopping! ' );
168+ print ('Stopping client ' );
142169 self .run .value = False
143170 self .total_process .terminate ()
144171 self .transfer_process .terminate ()
0 commit comments