44import time
55import sys
66from event_history import *
7+ import select
8+ import signal
79
810class ClientCollection (object ):
911 def __init__ (self , connstrs ):
@@ -30,17 +32,29 @@ def stop(self):
3032 for client in self ._clients :
3133 client .stop ()
3234
35+ def set_acc_to_tx (self , max_acc ):
36+ for client in self ._clients :
37+ client .set_acc_to_tx (max_acc )
38+
3339
3440class BankClient (object ):
3541
36- def __init__ (self , connstr , node_id ):
42+ def __init__ (self , connstr , node_id , accounts = 10000 ):
3743 self .connstr = connstr
3844 self .node_id = node_id
3945 self .run = Value ('b' , True )
4046 self ._history = EventHistory ()
41- self .accounts = 10000
47+ self .accounts = accounts
48+ self .accounts_to_tx = accounts
4249 self .show_errors = True
4350
51+ #x = self
52+ #def on_sigint(sig, frame):
53+ # x.stop()
54+ #
55+ #signal.signal(signal.SIGINT, on_sigint)
56+
57+
4458 def initialize (self ):
4559 conn = psycopg2 .connect (self .connstr )
4660 cur = conn .cursor ()
@@ -57,6 +71,22 @@ def initialize(self):
5771 cur .close ()
5872 conn .close ()
5973
74+ def aconn (self ):
75+ return psycopg2 .connect (self .connstr , async = 1 )
76+
77+ @classmethod
78+ def wait (cls , conn ):
79+ while 1 :
80+ state = conn .poll ()
81+ if state == psycopg2 .extensions .POLL_OK :
82+ break
83+ elif state == psycopg2 .extensions .POLL_WRITE :
84+ select .select ([], [conn .fileno ()], [])
85+ elif state == psycopg2 .extensions .POLL_READ :
86+ select .select ([conn .fileno ()], [], [])
87+ else :
88+ raise psycopg2 .OperationalError ("poll() returned %s" % state )
89+
6090 @property
6191 def history (self ):
6292 return self ._history
@@ -74,25 +104,16 @@ def exec_tx(self, name, tx_block):
74104
75105 if conn .closed :
76106 self .history .register_finish (event_id , 'ReConnect' )
77- try :
78- conn = psycopg2 .connect (self .connstr )
79- cur = conn .cursor ()
80- except :
81- continue
82- else :
83- continue
107+ conn = psycopg2 .connect (self .connstr )
108+ cur = conn .cursor ()
84109
85110 try :
86- tx_block (conn , cur )
111+ tx_block (conn , cur )
112+ self .history .register_finish (event_id , 'Commit' )
87113 except psycopg2 .InterfaceError :
88114 self .history .register_finish (event_id , 'InterfaceError' )
89115 except psycopg2 .Error :
90116 self .history .register_finish (event_id , 'PsycopgError' )
91- except :
92- print (sys .exc_info ())
93- self .history .register_finish (event_id , 'OtherError' )
94- else :
95- self .history .register_finish (event_id , 'Commit' )
96117
97118 cur .close ()
98119 conn .close ()
@@ -104,17 +125,20 @@ def tx(conn, cur):
104125 res = cur .fetchone ()
105126 conn .commit ()
106127 if res [0 ] != 0 :
107- print ("Isolation error, total = %d" % (res [0 ],))
128+ print ("Isolation error, total = %d, node = %d " % (res [0 ],self . node_id ))
108129 raise BaseException
109130
110131 self .exec_tx ('total' , tx )
111132
133+ def set_acc_to_tx (self , max_acc ):
134+ self .accounts_to_tx = max_acc
135+
112136 def transfer_money (self ):
113137
114138 def tx (conn , cur ):
115139 amount = 1
116- from_uid = random .randrange (1 , self .accounts - 10 )
117- to_uid = from_uid + 1 # random.randrange(1, self.accounts + 1)
140+ from_uid = random .randrange (1 , self .accounts_to_tx - 1 )
141+ to_uid = random .randrange (1 , self .accounts_to_tx - 1 )
118142
119143 conn .commit ()
120144 cur .execute ('''update bank_test
@@ -130,7 +154,10 @@ def tx(conn, cur):
130154 self .exec_tx ('transfer' , tx )
131155
132156 def start (self ):
133- self .transfer_process = Process (target = self .transfer_money , args = ())
157+ print ('Starting client' );
158+ self .run .value = True
159+
160+ self .transfer_process = Process (target = self .transfer_money , name = "txor" , args = ())
134161 self .transfer_process .start ()
135162
136163 self .total_process = Process (target = self .check_total , args = ())
@@ -139,7 +166,7 @@ def start(self):
139166 return
140167
141168 def stop (self ):
142- print ('Stopping! ' );
169+ print ('Stopping client ' );
143170 self .run .value = False
144171 self .total_process .terminate ()
145172 self .transfer_process .terminate ()
0 commit comments