456 questions
0
votes
2
answers
80
views
python kafka doesn't see headers/metadata while kafka ui and kafka go lang does
Kafka-python doesn't see kafka-headers, while kafka golang does.
It doesn't see any headers in all the messages.
Example of code:
for message in consumer:
# message value and key are raw bytes -- ...
0
votes
0
answers
60
views
Redpanda container with SSL configured failed to communicate with producer client at python-slim container
producer log:
ERROR:kafka.conn:<BrokerConnection client_id=kafka-python-producer-3, node_id=bootstrap-0 host=redpanda:9093 <handshake> [IPv4 ('172.22.0.2', 9093)]>: Closing connection. ...
1
vote
0
answers
24
views
Kafka Consumer Rebalancing Despite Different Group IDs
I'm working on a Kafka-based pipeline using Python (kafka-python) where I have two separate consumers:
consumer.py tracks user health factors from the topic aave-raw
→ uses group_id="risk-dash-...
0
votes
0
answers
42
views
How do I produce a message with kafka-python?
I try to use kafka-python to produce a message.
The error kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs. occurs.
I guess something is wrong with ...
0
votes
2
answers
167
views
Kafka consumer is missing messages during deployment
My consumers are inherited from the BasicKafkaConsumerV2. During deployments when the pods are rotating I am missing few messages which is visible from the offsets printed after the manual commit().
...
1
vote
1
answer
730
views
Error Connecting to MSK Cluster with Kafka-Python Using SASL_SSL
I'm trying to connect an AWS Lambda function to an Amazon MSK (Managed Streaming for Apache Kafka) cluster using the kafka-python library with SASL_SSL authentication. I am following the official ...
0
votes
0
answers
76
views
Kafka external client receives other name than in ADVERTISED_LISTENERS
On a local network, I have a broker at cgw.local and a client at rpi.local, connected to the same switch. No matter what I put into kafka.advertised.listeners: PLAINTEXT://cgw.local:9092 or PLAINTEXT:/...
0
votes
1
answer
116
views
DNS lookup failed for broker
I am new to Kafka and currently attempting to send data from Airflow to a Kafka broker. However, I'm encountering an issue where it reports a DNS lookup failure for the broker.
broker service in ...
0
votes
1
answer
45
views
Kakfa failed to produce topic and consume value on certain consumer group when one of the broker down
I have two kafka broker (S1/S2) with the config below, and by default all my topics produce in one partition only.
default.replication.factor=2
min.insync.replicas=1
offsets.topic.replication.factor=2
...
0
votes
0
answers
306
views
Kafka Consumer using kafka-python is only reading the first message.It's not reading the remaining messages
I am trying to create a consumer using AWS Lambda (with kafka-python). I have an event listener which listens to AWS MSK and invoke a lambda function.
The lambda works properly the first time but ...
0
votes
1
answer
229
views
Not able to consume messages using Kafka-python consumer SSL and SASL
consumer_SASL = KafkaConsumer(topics,
bootstrap_servers=kafkaBrokers,
group_id=group,
security_protocol='...
0
votes
0
answers
484
views
Confluent Kafka connection from Databricks
I am trying to connect to confluent-kafka from Databricks and produce/consume messages. Right now I am just experimenting with the python client and not the spark connectors.
Now, if I try to list out ...
0
votes
0
answers
977
views
Encountering "[Errno 16] Device or Resource Busy" Error with Kafka Consumer in Snowflake Stored Procedure
Goal
The goal is to create a Snowflake stored procedure that consumes messages from a Kafka topic once per day, processes these messages, and then loads the processed data into Snowflake tables for ...
0
votes
0
answers
16
views
kafka-python not receiving message on consumer side [duplicate]
I have my kafka running on my docker.
When I create a topic from CLI and start producer and consumer on CLI then kafka is able to transfer messages. but when I do same thing with python, then I don't ...
0
votes
1
answer
136
views
Kafka consumer not receiveing messages after first consumption
I have a problem while trying to make a data flux between a class called AD_Drone and another called AD_Engine. Engine has to produce for every drone connected their final position, after that, Drone ...
0
votes
1
answer
424
views
How to gracefully stop Kafka consumer after processing a specific number of messages in Python?
I have an Airflow DAG with a BashOperator that runs a Kafka producer, generating a random count of messages. These messages are consumed by a Kafka consumer, which writes them to a JSON file. However, ...
0
votes
1
answer
549
views
ValidationError while validating data against schema FastAvro
After multiple attempt I am not able to decode the error thrown by fastavro library when validating data against the schema.Below is what I am getting
File "fastavro\\_validation.pyx", line ...
0
votes
0
answers
326
views
Kafka producer producing message but consumer not consuming using Python
I am working with AWS EC2 instance. I am able to produce and consume data normally with kraft in terminal from my ubuntu 22.04 using ssh. When I try to produce data using a python script to the same ...
-1
votes
1
answer
123
views
How to run python script running when django server starts
How to run python script running when django server starts. For example, I have a python kafka script , this script should be running continously when django server starts. when i give two commands in ...
0
votes
1
answer
38
views
How to format unconventional output into simple json dump
In the answer from this post:
https://stackoverflow.com/a/55832055/5937760
The output is:
{TopicPartition(topic=u'python-test', partition=0): 5,
TopicPartition(topic=u'python-test', partition=1): 20,
...
0
votes
1
answer
222
views
Strange difference between reading from kafka topic using subscribe() and assign()
My task is to count messages in Kafka topics (some with one partition, some with many partitions). I tried two techniques: one with subscribe() and other with assign().
Full code:
#!/usr/bin/env ...
0
votes
0
answers
188
views
Why is my KafkaConsumer stuck in a Joining Group Loop?
I have a KafkaConsumer that is stuck in a "joining group" loop.
It's a simple Kafka Python consumer, here is the python script:
from kafka import KafkaConsumer
import logging
import sys
...
0
votes
0
answers
218
views
Why Python KafkaConsumer do not read all messages from topic?
For test purposes I have to read all messages in some kafka topics. Before test I remove all messages using /kafka-delete-records.sh then I run tests that fill kafka topics. After test I want to ...
0
votes
1
answer
454
views
How to send a message to every consumer who is idle in Kafka?
I have three consumers and one producer in Kafka.
When the producer sends all the messages (there are 100 messages in my simple code), these messages are divided among three consumers, and my main ...
1
vote
1
answer
146
views
Need to balancing kafka consumer tasks
I need to have a kafka producer and 4 consumers in python that balancing queue.
My Topic bash code:
kafka-topics --bootstrap-server localhost:9092 --create --topic numbers --partitions 4 --...
3
votes
1
answer
476
views
Integration tests with Kafka fail in first read, but then work correctly
I have custom function forward_to_kafka(list: List) which sends my events to kafka even if there are problems, it's purpose to deliver all messages in my list. I have already tested it with JB Plugin ...
0
votes
1
answer
457
views
Batch process Kafka messages
I want to write my Kafka messages to a jsonl file which should each contain a number of lines (let's say 2). My producer currently writes 3 messages at a time so I should get two jsonl files: one with ...
0
votes
0
answers
14
views
No data from the Kafka Consumer Python - Consumer listens but nothing is consumed [duplicate]
I'm searching for a solution to use Kafka to display my API (localhost) to my docker.
My producer file works fine and here it is
Consumer file on the other hand returns this... I tried creating a new ...
1
vote
1
answer
1k
views
ImportError: cannot import name 'IncompatibleBrokerVersion' from 'kafka.errors'
I am trying to import:
from kafka import KafkaConsumer
from kafka import KafkaProducer
but I am getting this error
ImportError: cannot import name 'IncompatibleBrokerVersion' from
'kafka.errors'.
I ...
0
votes
0
answers
41
views
Is change kafka to kafka-python in docker container is changing from docker-compose configuration?
Previosly I have error like this SyntaxError on "self.async" when running python kafka producer
Based on the answer, best answer is switch from kafka package to kafka-python package, this is ...
0
votes
1
answer
426
views
Why do I get a ValueError in my Kafka Consumer if I seek to another position? [duplicate]
I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.
I'm still getting my feet wet with Kafka so it's entirely possible I'm doing things the wrong way.
...
1
vote
0
answers
63
views
Unable to connect to kafka running in docker container [duplicate]
I am unable to connect to kafka running in container.
I have .env file
KAFKA_BROKER_ID=1
KAFKA_ENABLE_KRAFT=true
KAFKA_CFG_PROCESS_ROLES=broker,controller
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=...
0
votes
1
answer
150
views
How does Kafka store messages offsets on a local computer?
How does Kafka store messages on a local server or laptop?
I'm new to Kafka and just playing around with the tech for now but I'm curious to the answer because I started by looking at the Kafka ...
1
vote
1
answer
1k
views
python kafka consumer.poll msg.error dict object has no attribute error
I am trying to set up kafka message polling using consumer poll.
I can see loads of example code that uses message.error
e.g.
while True:
try:
message = consumer.poll(10.0)
if ...
0
votes
1
answer
3k
views
ModuleNotFoundError: No module named 'kafka'
I have downloaded Kafka and installed kafka-python library using "pip install kafka-python" and "conda install -c conda-forge kafka-python".
I am able to run "from kafka ...
0
votes
0
answers
25
views
How to setup Python connection to Kafka instance running in Kubernetes? [duplicate]
I am trying to connect to Kafka using Python and I am able to connect and see the topic names however when I try to read messages for any topic nothing happens. It feels like the connection times-out ...
0
votes
0
answers
17
views
kafka docker python API does not recognizes the broker [duplicate]
I have a kafka consumer in VSCode IDE (MS Windows) that works properly. I take the same python script in docker container, but the kafka broker is not recognized. I put them inside the same network; ...
1
vote
2
answers
440
views
dockerized Kafka service stuck at producer.send [duplicate]
I am very new to docker and Kafka, and have a simple kafka python publisher shown below
The following are in my dockerfile:
FROM python:3.10
WORKDIR /app
COPY . /app
RUN pip install --user pip==23....
1
vote
1
answer
1k
views
How can I mock the instantiation of a Class
I can't seem to figure out how to mock the instantiation of a class, any pointers would be greatly appreciated. Here is what I am trying to do:
I would like to test the method ClassA.some_method() and ...
0
votes
2
answers
2k
views
Python How to pass continuous generated data stream between methods
I have a method that continuously generates data and prints them on the console. Let's say something simple like generating random number:
def number_generator():
while True:
random....
4
votes
1
answer
5k
views
Kafka + FastAPI + Docker template
Introduction
I am currently experimenting with Kafka and FastAPI and trying to build a template to enable me to quickly write programs in a microservice pattern.
Goal - Vision
Building a repository of ...
0
votes
1
answer
914
views
Can produce message with kafka-python, but can't with confluent-kafka
I try to use confluent-kafka, but can't manage to send message to topic.
I did the same with kafka-python, and message sends.
What i am missing?
Thank you!
i use this code
# using confluent-kafka
...
0
votes
1
answer
94
views
Python - Optimal way to check for a condition with a timeout?
Wrote two functions, 1. uploads a gzip file to Artifactory and 2. listens to the Kafka topic.
As further steps, wanted to validate whether the upload is successful within 5 minutes by listening to the ...
2
votes
2
answers
1k
views
Python KafkaTimeoutError: Timeout waiting for future
I'm using Kafka to send logs to a topic. While sending the messages, I always get this error
Message: 'test log'
Arguments: ()
--- Logging error ---
Traceback (most recent call last):
File "/...
0
votes
0
answers
16
views
Cannot connect to Kafka broker, BrokerConnection error 111 [duplicate]
I have a container in the same docker-compose that is trying to connect to my broker. It suddenly stopped connecting and is resulting with error:
2023-02-22 14:24:00 ERROR:kafka.conn:Connect attempt ...
0
votes
2
answers
783
views
When kafka consumer connected to a broker by domain name, if the ip changed after broker crash, will the kafka consumer reconnect? [closed]
My kafka consumer is connected to a cluster of brokers. There is a domain name server in between. Each broker has a domain name associated with an IP address.
Problem is, the IPs are not static and ...
0
votes
1
answer
928
views
Kafka Consumer Committing With Auto-Commit Disabled
I'm missing events when reading from a Kafaka queue because the consumer is updating the offset without an explicit commit even when enable_auto_commit is disabled.
from kafka import KafkaClient, ...
2
votes
1
answer
2k
views
Is there a suitable python library for doing stream processing with Kafka topics? [duplicate]
I am trying to find a suitable Python library to do stream processing with streams Kafka topics, Kafka streams. Specifically, I am looking for libraries that support the following operations.
KStream-...
0
votes
1
answer
428
views
kafka produce messages in consumer
I have an application with the need to pass messages into multiple layers of processing.
I need to do this because all the new messages should be put into the first generic topic so they can be ...
0
votes
1
answer
966
views
Kafka producer and consumer not working properly on python with docker
Im working on a project that uses kafka producer and consumer in order to acquire articles (with specific topics) from a news_api every two hours and then with a consumer save them in a mongodb.
So i ...