londiste.handlers: renamed handler 'part' to 'shard'
authormartinko <gamato@users.sf.net>
Thu, 16 May 2013 14:52:09 +0000 (16:52 +0200)
committermartinko <gamato@users.sf.net>
Thu, 16 May 2013 14:52:09 +0000 (16:52 +0200)
It was getting more and more confusing, especially since 'dispatch' handler mixed both partitioning types (table and database) in one class.

python/londiste/handlers/__init__.py
python/londiste/handlers/dispatch.py
python/londiste/handlers/shard.py [moved from python/londiste/handlers/part.py with 66% similarity]

index 927054627b046e6e23aebac9229069427292c114..b6b39100c92940bdb0c71db53b4a5cefe4acd77b 100644 (file)
@@ -5,7 +5,7 @@ import sys
 DEFAULT_HANDLERS = [
     'londiste.handlers.qtable',
     'londiste.handlers.applyfn',
-    'londiste.handlers.part',
+    'londiste.handlers.shard',
     'londiste.handlers.multimaster',
     'londiste.handlers.vtable',
 
index e5c5ca7a94744d409da1cd59bd92c83f1b10befa..0b02edcd463530a760b805e5175760ef18e93c67 100644 (file)
@@ -170,7 +170,7 @@ from skytools.utf8 import safe_utf8_decode
 
 from londiste.handler import EncodingValidator
 from londiste.handlers import handler_args, update
-from londiste.handlers.part import PartHandler
+from londiste.handlers.shard import ShardHandler
 
 
 __all__ = ['Dispatcher']
@@ -625,7 +625,7 @@ ROW_HANDLERS = {'plain': RowHandler,
 #------------------------------------------------------------------------------
 
 
-class Dispatcher (PartHandler):
+class Dispatcher (ShardHandler):
     """Partitioned loader.
     Splits events into partitions, if requested.
     Then applies them without further processing.
@@ -637,7 +637,7 @@ class Dispatcher (PartHandler):
         # compat for dest-table
         dest_table = args.get('table', dest_table)
 
-        PartHandler.__init__(self, table_name, args, dest_table)
+        ShardHandler.__init__(self, table_name, args, dest_table)
 
         # show args
         self.log.debug("dispatch.init: table_name=%r, args=%r", table_name, args)
@@ -736,14 +736,14 @@ class Dispatcher (PartHandler):
     def reset(self):
         """Called before starting to process a batch.
         Should clean any pending data."""
-        PartHandler.reset(self)
+        ShardHandler.reset(self)
 
     def prepare_batch(self, batch_info, dst_curs):
         """Called on first event for this table in current batch."""
         if self.conf.table_mode != 'ignore':
             self.batch_info = batch_info
             self.dst_curs = dst_curs
-        PartHandler.prepare_batch(self, batch_info, dst_curs)
+        ShardHandler.prepare_batch(self, batch_info, dst_curs)
 
     def filter_data(self, data):
         """Process with fields skip and map"""
@@ -813,7 +813,7 @@ class Dispatcher (PartHandler):
         """Called when batch finishes."""
         if self.conf.table_mode != 'ignore':
             self.row_handler.flush(dst_curs)
-        #PartHandler.finish_batch(self, batch_info, dst_curs)
+        #ShardHandler.finish_batch(self, batch_info, dst_curs)
 
     def get_part_name(self):
         # if custom part name template given, use it
@@ -932,7 +932,7 @@ class Dispatcher (PartHandler):
     def get_copy_condition(self, src_curs, dst_curs):
         """ Prepare where condition for copy and replay filtering.
         """
-        return PartHandler.get_copy_condition(self, src_curs, dst_curs)
+        return ShardHandler.get_copy_condition(self, src_curs, dst_curs)
 
     def real_copy(self, tablename, src_curs, dst_curs, column_list):
         """do actual table copy and return tuple with number of bytes and rows
similarity index 66%
rename from python/londiste/handlers/part.py
rename to python/londiste/handlers/shard.py
index 85d507d3d1be06650c8e082640cff0ea346203ab..950ac7c6d30718a850c98d28c4ab1e12571e91f4 100644 (file)
@@ -24,19 +24,19 @@ Local config:
 import skytools
 from londiste.handler import TableHandler
 
-__all__ = ['PartHandler']
+__all__ = ['ShardHandler', 'PartHandler']
 
-class PartHandler(TableHandler):
+class ShardHandler (TableHandler):
     __doc__ = __doc__
-    handler_name = 'part'
+    handler_name = 'shard'
 
     DEFAULT_HASHFUNC = "partconf.get_hash_raw"
     DEFAULT_HASHEXPR = "%s(%s)"
 
     def __init__(self, table_name, args, dest_table):
         TableHandler.__init__(self, table_name, args, dest_table)
-        self.max_part = None       # max part number
-        self.local_part = None     # part number of local node
+        self.hash_mask = None   # aka max part number (atm)
+        self.shard_nr = None    # part number of local node
 
         # primary key columns
         self.hash_key = args.get('hash_key', args.get('key'))
@@ -51,12 +51,12 @@ class PartHandler(TableHandler):
 
     def _validate_hash_key(self):
         if self.hash_key is None:
-            raise Exception('Specify key field as key argument')
+            raise Exception('Specify hash key field as hash_key argument')
 
     def reset(self):
         """Forget config info."""
-        self.max_part = None
-        self.local_part = None
+        self.hash_mask = None
+        self.shard_nr = None
         TableHandler.reset(self)
 
     def add(self, trigger_arg_list):
@@ -68,41 +68,45 @@ class PartHandler(TableHandler):
     def prepare_batch(self, batch_info, dst_curs):
         """Called on first event for this table in current batch."""
         if self.hash_key is not None:
-            if not self.max_part:
-                self.load_part_info(dst_curs)
+            if not self.hash_mask:
+                self.load_shard_info(dst_curs)
         TableHandler.prepare_batch(self, batch_info, dst_curs)
 
     def process_event(self, ev, sql_queue_func, arg):
-        """Filter event by hash in extra3, apply only local part."""
+        """Filter event by hash in extra3, apply only if for local shard."""
         if ev.extra3 and self.hash_key is not None:
             meta = skytools.db_urldecode(ev.extra3)
-            self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d',
-                           int(meta['hash']), self.max_part, self.local_part)
-            if (int(meta['hash']) & self.max_part) != self.local_part:
-                self.log.debug('part.process_event: not my event')
+            self.log.debug('shard.process_event: hash=%i, hash_mask=%i, shard_nr=%i',
+                           int(meta['hash']), self.hash_mask, self.shard_nr)
+            if (int(meta['hash']) & self.hash_mask) != self.shard_nr:
+                self.log.debug('shard.process_event: not my event')
                 return
         self._process_event(ev, sql_queue_func, arg)
 
     def _process_event(self, ev, sql_queue_func, arg):
-        self.log.debug('part.process_event: my event, processing')
+        self.log.debug('shard.process_event: my event, processing')
         TableHandler.process_event(self, ev, sql_queue_func, arg)
 
     def get_copy_condition(self, src_curs, dst_curs):
         """Prepare the where condition for copy and replay filtering"""
         if self.hash_key is None:
             return TableHandler.get_copy_condition(self, src_curs, dst_curs)
-        self.load_part_info(dst_curs)
-        w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part)
-        self.log.debug('part: copy_condition=%s', w)
+        self.load_shard_info(dst_curs)
+        w = "(%s & %d) = %d" % (self.hashexpr, self.hash_mask, self.shard_nr)
+        self.log.debug('shard: copy_condition=%r', w)
         return w
 
-    def load_part_info(self, curs):
-        """Load slot info from database."""
+    def load_shard_info(self, curs):
+        """Load part/slot info from database."""
         q = "select part_nr, max_part from partconf.conf"
         curs.execute(q)
-        self.local_part, self.max_part = curs.fetchone()
-        if self.local_part is None or self.max_part is None:
-            raise Exception('Error loading part info')
+        self.shard_nr, self.hash_mask = curs.fetchone()
+        if self.shard_nr is None or self.hash_mask is None:
+            raise Exception('Error loading shard info')
+
+class PartHandler (ShardHandler):
+    """ Deprecated compat name for shard handler. """
+    handler_name = 'part'
 
 # register handler class
-__londiste_handlers__ = [PartHandler]
+__londiste_handlers__ = [ShardHandler, PartHandler]