Generic notes
I've experienced the same issue lately with several conditions I had to met:
- solution must be thread safe
- multiple connections to database from the same machine may be active at the same time, kill the exact one connection/query
- application contains connections to many different databases - portable handler for each DB host
We had following class layout (unfortunately I cannot post real sources):
class AbstractModel: pass
class FirstDatabaseModel(AbstractModel): pass # Connection to one DB host
class SecondDatabaseModel(AbstractModel): pass # Connection to one DB host
And created several threads for each model.
Solution Python 3.2
In our application one model = one database. So I've created "service connection" for each model (so we could execute KILL in parallel connection). Therefore if one instance of FirstDatabaseModel was created, 2 database connection were created; if 5 instances were created only 6 connections were used:
class AbstractModel:
_service_connection = None # Formal declaration
def __init__(self):
''' Somehow load config and create connection
'''
self.config = # ...
self.connection = MySQLFromConfig(self.config)
self._init_service_connection()
# Get connection ID (pseudocode)
self.connection_id = self.connection.FetchOneCol('SELECT CONNECTION_ID()')
def _init_service_connection(self):
''' Initialize one singleton connection for model
'''
cls = type(self)
if cls._service_connection is not None:
return
cls._service_connection = MySQLFromConfig(self.config)
Now we need a killer:
def _kill_connection(self):
# Add your own mysql data escaping
sql = 'KILL CONNECTION {}'.format(self.connection_id)
# Do your own connection check and renewal
type(self)._service_connection.execute(sql)
Note: connection.execute = create cursor, execute, close cursor.
And make killer thread safe using threading.Lock:
def _init_service_connection(self):
''' Initialize one singleton connection for model
'''
cls = type(self)
if cls._service_connection is not None:
return
cls._service_connection = MySQLFromConfig(self.config)
cls._service_connection_lock = threading.Lock()
def _kill_connection(self):
# Add your own mysql data escaping
sql = 'KILL CONNECTION {}'.format(self.connection_id)
cls = type(self)
# Do your own connection check and renewal
try:
cls._service_connection_lock.acquire()
cls._service_connection.execute(sql)
finally:
cls._service_connection_lock.release()
And finally add timed execution method using threading.Timer:
def timed_query(self, sql, timeout=5):
kill_query_timer = threading.Timer(timeout, self._kill_connection)
kill_query_timer.start()
try:
self.connection.long_query()
finally:
kill_query_timer.cancel()