66import subprocess
77import time
88
9- from collections import Iterable
9+ try :
10+ from collections .abc import Iterable
11+ except ImportError :
12+ from collections import Iterable
13+
1014from shutil import rmtree
1115from six import raise_from , iteritems , text_type
1216from tempfile import mkstemp , mkdtemp
@@ -91,7 +95,6 @@ class ProcessProxy(object):
9195 process: wrapped psutill.Process object
9296 ptype: instance of ProcessType
9397 """
94-
9598 def __init__ (self , process , ptype = None ):
9699 self .process = process
97100 self .ptype = ptype or ProcessType .from_process (process )
@@ -196,7 +199,6 @@ def auxiliary_processes(self):
196199 Returns a list of auxiliary processes.
197200 Each process is represented by :class:`.ProcessProxy` object.
198201 """
199-
200202 def is_aux (process ):
201203 return process .ptype != ProcessType .Unknown
202204
@@ -430,10 +432,9 @@ def init(self, initdb_params=None, **kwargs):
430432 """
431433
432434 # initialize this PostgreSQL node
433- cached_initdb (
434- data_dir = self .data_dir ,
435- logfile = self .utils_log_file ,
436- params = initdb_params )
435+ cached_initdb (data_dir = self .data_dir ,
436+ logfile = self .utils_log_file ,
437+ params = initdb_params )
437438
438439 # initialize default config files
439440 self .default_conf (** kwargs )
@@ -480,8 +481,8 @@ def default_conf(self,
480481 if allow_streaming :
481482 # get auth method for host or local users
482483 def get_auth_method (t ):
483- return next ((s .split ()[- 1 ] for s in lines
484- if s .startswith (t )), 'trust' )
484+ return next ((s .split ()[- 1 ]
485+ for s in lines if s .startswith (t )), 'trust' )
485486
486487 # get auth methods
487488 auth_local = get_auth_method ('local' )
@@ -760,12 +761,11 @@ def promote(self, dbname=None, username=None):
760761 if self ._pg_version < '10' :
761762 check_query = "SELECT pg_is_in_recovery()"
762763
763- self .poll_query_until (
764- query = check_query ,
765- expected = False ,
766- dbname = dbname ,
767- username = username ,
768- max_attempts = 0 ) # infinite
764+ self .poll_query_until (query = check_query ,
765+ expected = False ,
766+ dbname = dbname ,
767+ username = username ,
768+ max_attempts = 0 ) # infinite
769769
770770 # node becomes master itself
771771 self ._master = None
@@ -884,11 +884,10 @@ def psql(self,
884884 psql_params .append (dbname )
885885
886886 # start psql process
887- process = subprocess .Popen (
888- psql_params ,
889- stdin = subprocess .PIPE ,
890- stdout = subprocess .PIPE ,
891- stderr = subprocess .PIPE )
887+ process = subprocess .Popen (psql_params ,
888+ stdin = subprocess .PIPE ,
889+ stdout = subprocess .PIPE ,
890+ stderr = subprocess .PIPE )
892891
893892 # wait until it finishes and get stdout and stderr
894893 out , err = process .communicate (input = input )
@@ -1043,11 +1042,10 @@ def poll_query_until(self,
10431042 attempts = 0
10441043 while max_attempts == 0 or attempts < max_attempts :
10451044 try :
1046- res = self .execute (
1047- dbname = dbname ,
1048- query = query ,
1049- username = username ,
1050- commit = commit )
1045+ res = self .execute (dbname = dbname ,
1046+ query = query ,
1047+ username = username ,
1048+ commit = commit )
10511049
10521050 if expected is None and res is None :
10531051 return # done
@@ -1165,8 +1163,8 @@ def set_synchronous_standbys(self, standbys):
11651163 standbys = First (1 , standbys )
11661164 else :
11671165 if isinstance (standbys , Iterable ):
1168- standbys = u", " .join (
1169- u" \" {} \" " . format ( r . name ) for r in standbys )
1166+ standbys = u", " .join (u" \" {} \" " . format ( r . name )
1167+ for r in standbys )
11701168 else :
11711169 raise TestgresException ("Feature isn't supported in "
11721170 "Postgres 9.5 and below" )
@@ -1195,11 +1193,10 @@ def catchup(self, dbname=None, username=None):
11951193 username = username )[0 ][0 ] # yapf: disable
11961194
11971195 # wait until this LSN reaches replica
1198- self .poll_query_until (
1199- query = wait_lsn .format (lsn ),
1200- dbname = dbname ,
1201- username = username ,
1202- max_attempts = 0 ) # infinite
1196+ self .poll_query_until (query = wait_lsn .format (lsn ),
1197+ dbname = dbname ,
1198+ username = username ,
1199+ max_attempts = 0 ) # infinite
12031200 except Exception as e :
12041201 raise_from (CatchUpException ("Failed to catch up" , poll_lsn ), e )
12051202
@@ -1215,7 +1212,11 @@ def publish(self, name, **kwargs):
12151212 """
12161213 return Publication (name = name , node = self , ** kwargs )
12171214
1218- def subscribe (self , publication , name , dbname = None , username = None ,
1215+ def subscribe (self ,
1216+ publication ,
1217+ name ,
1218+ dbname = None ,
1219+ username = None ,
12191220 ** params ):
12201221 """
12211222 Create subscription for logical replication
0 commit comments