from londiste.handler import EncodingValidator
from londiste.handlers import handler_args, update
-from londiste.handlers.part import PartHandler
+from londiste.handlers.shard import ShardHandler
__all__ = ['Dispatcher']
#------------------------------------------------------------------------------
-class Dispatcher (PartHandler):
+class Dispatcher (ShardHandler):
"""Partitioned loader.
Splits events into partitions, if requested.
Then applies them without further processing.
# compat for dest-table
dest_table = args.get('table', dest_table)
- PartHandler.__init__(self, table_name, args, dest_table)
+ ShardHandler.__init__(self, table_name, args, dest_table)
# show args
self.log.debug("dispatch.init: table_name=%r, args=%r", table_name, args)
def reset(self):
"""Called before starting to process a batch.
Should clean any pending data."""
- PartHandler.reset(self)
+ ShardHandler.reset(self)
def prepare_batch(self, batch_info, dst_curs):
"""Called on first event for this table in current batch."""
if self.conf.table_mode != 'ignore':
self.batch_info = batch_info
self.dst_curs = dst_curs
- PartHandler.prepare_batch(self, batch_info, dst_curs)
+ ShardHandler.prepare_batch(self, batch_info, dst_curs)
def filter_data(self, data):
"""Process with fields skip and map"""
"""Called when batch finishes."""
if self.conf.table_mode != 'ignore':
self.row_handler.flush(dst_curs)
- #PartHandler.finish_batch(self, batch_info, dst_curs)
+ #ShardHandler.finish_batch(self, batch_info, dst_curs)
def get_part_name(self):
# if custom part name template given, use it
def get_copy_condition(self, src_curs, dst_curs):
""" Prepare where condition for copy and replay filtering.
"""
- return PartHandler.get_copy_condition(self, src_curs, dst_curs)
+ return ShardHandler.get_copy_condition(self, src_curs, dst_curs)
def real_copy(self, tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
import skytools
from londiste.handler import TableHandler
-__all__ = ['PartHandler']
+__all__ = ['ShardHandler', 'PartHandler']
-class PartHandler(TableHandler):
+class ShardHandler (TableHandler):
__doc__ = __doc__
- handler_name = 'part'
+ handler_name = 'shard'
DEFAULT_HASHFUNC = "partconf.get_hash_raw"
DEFAULT_HASHEXPR = "%s(%s)"
def __init__(self, table_name, args, dest_table):
TableHandler.__init__(self, table_name, args, dest_table)
- self.max_part = None # max part number
- self.local_part = None # part number of local node
+ self.hash_mask = None # aka max part number (atm)
+ self.shard_nr = None # part number of local node
# primary key columns
self.hash_key = args.get('hash_key', args.get('key'))
def _validate_hash_key(self):
if self.hash_key is None:
- raise Exception('Specify key field as key argument')
+ raise Exception('Specify hash key field as hash_key argument')
def reset(self):
"""Forget config info."""
- self.max_part = None
- self.local_part = None
+ self.hash_mask = None
+ self.shard_nr = None
TableHandler.reset(self)
def add(self, trigger_arg_list):
def prepare_batch(self, batch_info, dst_curs):
"""Called on first event for this table in current batch."""
if self.hash_key is not None:
- if not self.max_part:
- self.load_part_info(dst_curs)
+ if not self.hash_mask:
+ self.load_shard_info(dst_curs)
TableHandler.prepare_batch(self, batch_info, dst_curs)
def process_event(self, ev, sql_queue_func, arg):
- """Filter event by hash in extra3, apply only local part."""
+ """Filter event by hash in extra3, apply only if for local shard."""
if ev.extra3 and self.hash_key is not None:
meta = skytools.db_urldecode(ev.extra3)
- self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d',
- int(meta['hash']), self.max_part, self.local_part)
- if (int(meta['hash']) & self.max_part) != self.local_part:
- self.log.debug('part.process_event: not my event')
+ self.log.debug('shard.process_event: hash=%i, hash_mask=%i, shard_nr=%i',
+ int(meta['hash']), self.hash_mask, self.shard_nr)
+ if (int(meta['hash']) & self.hash_mask) != self.shard_nr:
+ self.log.debug('shard.process_event: not my event')
return
self._process_event(ev, sql_queue_func, arg)
def _process_event(self, ev, sql_queue_func, arg):
- self.log.debug('part.process_event: my event, processing')
+ self.log.debug('shard.process_event: my event, processing')
TableHandler.process_event(self, ev, sql_queue_func, arg)
def get_copy_condition(self, src_curs, dst_curs):
"""Prepare the where condition for copy and replay filtering"""
if self.hash_key is None:
return TableHandler.get_copy_condition(self, src_curs, dst_curs)
- self.load_part_info(dst_curs)
- w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part)
- self.log.debug('part: copy_condition=%s', w)
+ self.load_shard_info(dst_curs)
+ w = "(%s & %d) = %d" % (self.hashexpr, self.hash_mask, self.shard_nr)
+ self.log.debug('shard: copy_condition=%r', w)
return w
- def load_part_info(self, curs):
- """Load slot info from database."""
+ def load_shard_info(self, curs):
+ """Load part/slot info from database."""
q = "select part_nr, max_part from partconf.conf"
curs.execute(q)
- self.local_part, self.max_part = curs.fetchone()
- if self.local_part is None or self.max_part is None:
- raise Exception('Error loading part info')
+ self.shard_nr, self.hash_mask = curs.fetchone()
+ if self.shard_nr is None or self.hash_mask is None:
+ raise Exception('Error loading shard info')
+
+class PartHandler (ShardHandler):
+ """ Deprecated compat name for shard handler. """
+ handler_name = 'part'
# register handler class
-__londiste_handlers__ = [PartHandler]
+__londiste_handlers__ = [ShardHandler, PartHandler]