Please take time to read full question to understand the exact issue. Thankyou.
I have a runner/driver program that listens to a Kafka topic and dispatches tasks using a ThreadPoolExecuter whenever a new message is received on the topic ( as shown below ) :
consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
enable_auto_commit=False,
auto_offset_reset='latest',
max_poll_records=1,
max_poll_interval_ms=300000)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for message in consumer:
futures.append(executor.submit(SOME_FUNCTION, ARG1, ARG2))
There is a bunch of code in between but that code is not important here so I have skipped it.
Now, the SOME_FUNCTION is from another python script that is imported ( infact there is a hierarchy of imports that happen in later stages ). What is important is that at some point in these scripts, I call the Multiprocessing Pool because I need to do parallel processing on data ( SIMD - single instruction multiple data ) and use the apply_async function to do so.
for loop_message_chunk in loop_message_chunks:
res_list.append(self.pool.apply_async(self.one_matching.match, args=(hash_set, loop_message_chunk, fields)))
Now, I have 2 versions of the runner/driver program :
Kafka based ( the one shown above )
- This version spawns threads that start multiprocessing
Listen To Kafka -> Start A Thread -> Start Multiprocessing
REST based ( using flask to achieve same task with a REST call )
- This version does not start any threads and calls multiprocessing right away
Listen to REST endpoint -> Start Multiprocessing
Why 2 runner/driver scripts you ask? - this microservice will be used by multiple teams and some want synchronous REST based while some teams want a real time and asynchronous system that is KAFKA based
When I do logging from the parallelized function ( self.one_matching.match in above example ) it works when called through the REST version but not when called using the KAFKA version ( basically when multiprocessing is kicked off by a thread - it does not work ).
Also notice that only the logging from the parallelized function does not work. rest of the scripts in the hierarchy from runner to the script that calls apply_async - which includes scripts that are called from within the thread - log successfully.
Other details :
- I configure loggers using yaml file
- I configure the logger in the runner script itself for either KAFKA or REST version
- I do a
logging.getLoggerin every other script called after the runner script to get specific loggers to log to different files
Logger Config ( values replaced with generic since I cannot chare exact names ):
version: 1
formatters:
simple:
format: '%(asctime)s | %(name)s | %(filename)s : %(funcName)s : %(lineno)d | %(levelname)s :: %(message)s'
custom1:
format: '%(asctime)s | %(filename)s :: %(message)s'
time-message:
format: '%(asctime)s | %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
handler1:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile1.log
handler2:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: custom1
level: INFO
filename: logs/logfile2.log
handler3:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile3.log
handler4:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile4.log
handler5:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile5.log
loggers:
logger1:
level: DEBUG
handlers: [console, handler1]
propagate: no
logger2:
level: DEBUG
handlers: [console, handler5]
propagate: no
logger3:
level: INFO
handlers: [handler2]
propagate: no
logger4:
level: DEBUG
handlers: [console, handler3]
propagate: no
logger5:
level: DEBUG
handlers: [console, handler4]
propagate: no
kafka:
level: WARNING
handlers: [console]
propogate: no
root:
level: INFO
handlers: [console]
propogate: no
SOME_FUNCTIONthe same (create it's own Pool every call rather than calling back to a global ProcessPoolExecutor), it still should work in the same way. I was just thinking it could be less total overhead to not keep creating and destroying separate independent pools.forked, but not if they arespawned. QueueHandler may not enough, you need SocketHandler to be sure. You could read this thread to understand more stackoverflow.com/questions/64335940/…