Skip to main content
Stack Overflow for Teams is now Stack Internal: See how we’re powering the human intelligence layer of enterprise AI. Read more >
Filter by
Sorted by
Tagged with
0 votes
2 answers
80 views

Kafka-python doesn't see kafka-headers, while kafka golang does. It doesn't see any headers in all the messages. Example of code: for message in consumer: # message value and key are raw bytes -- ...
Irina's user avatar
  • 1,417
0 votes
0 answers
60 views

producer log: ERROR:kafka.conn:<BrokerConnection client_id=kafka-python-producer-3, node_id=bootstrap-0 host=redpanda:9093 <handshake> [IPv4 ('172.22.0.2', 9093)]>: Closing connection. ...
Frederick Yip's user avatar
1 vote
0 answers
24 views

I'm working on a Kafka-based pipeline using Python (kafka-python) where I have two separate consumers: consumer.py tracks user health factors from the topic aave-raw → uses group_id="risk-dash-...
Prajwal Khot's user avatar
0 votes
0 answers
42 views

I try to use kafka-python to produce a message. The error kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs. occurs. I guess something is wrong with ...
pav's user avatar
  • 139
0 votes
2 answers
167 views

My consumers are inherited from the BasicKafkaConsumerV2. During deployments when the pods are rotating I am missing few messages which is visible from the offsets printed after the manual commit(). ...
Dev's user avatar
  • 567
1 vote
1 answer
730 views

I'm trying to connect an AWS Lambda function to an Amazon MSK (Managed Streaming for Apache Kafka) cluster using the kafka-python library with SASL_SSL authentication. I am following the official ...
shady xv's user avatar
0 votes
0 answers
76 views

On a local network, I have a broker at cgw.local and a client at rpi.local, connected to the same switch. No matter what I put into kafka.advertised.listeners: PLAINTEXT://cgw.local:9092 or PLAINTEXT:/...
scriptfoo's user avatar
  • 513
0 votes
1 answer
116 views

I am new to Kafka and currently attempting to send data from Airflow to a Kafka broker. However, I'm encountering an issue where it reports a DNS lookup failure for the broker. broker service in ...
Dhainik Suthar's user avatar
0 votes
1 answer
45 views

I have two kafka broker (S1/S2) with the config below, and by default all my topics produce in one partition only. default.replication.factor=2 min.insync.replicas=1 offsets.topic.replication.factor=2 ...
ing's user avatar
  • 1
0 votes
0 answers
306 views

I am trying to create a consumer using AWS Lambda (with kafka-python). I have an event listener which listens to AWS MSK and invoke a lambda function. The lambda works properly the first time but ...
Abhiram Ajith's user avatar
0 votes
1 answer
229 views

consumer_SASL = KafkaConsumer(topics, bootstrap_servers=kafkaBrokers, group_id=group, security_protocol='...
Aditya's user avatar
  • 53
0 votes
0 answers
484 views

I am trying to connect to confluent-kafka from Databricks and produce/consume messages. Right now I am just experimenting with the python client and not the spark connectors. Now, if I try to list out ...
Tarique's user avatar
  • 711
0 votes
0 answers
977 views

Goal The goal is to create a Snowflake stored procedure that consumes messages from a Kafka topic once per day, processes these messages, and then loads the processed data into Snowflake tables for ...
Evangelos Malandrakis's user avatar
0 votes
0 answers
16 views

I have my kafka running on my docker. When I create a topic from CLI and start producer and consumer on CLI then kafka is able to transfer messages. but when I do same thing with python, then I don't ...
SHIVAM SINGH's user avatar
0 votes
1 answer
136 views

I have a problem while trying to make a data flux between a class called AD_Drone and another called AD_Engine. Engine has to produce for every drone connected their final position, after that, Drone ...
Saúl Campello Mas's user avatar
0 votes
1 answer
424 views

I have an Airflow DAG with a BashOperator that runs a Kafka producer, generating a random count of messages. These messages are consumed by a Kafka consumer, which writes them to a JSON file. However, ...
smert97's user avatar
0 votes
1 answer
549 views

After multiple attempt I am not able to decode the error thrown by fastavro library when validating data against the schema.Below is what I am getting File "fastavro\\_validation.pyx", line ...
Deepak_Spark_Beginner's user avatar
0 votes
0 answers
326 views

I am working with AWS EC2 instance. I am able to produce and consume data normally with kraft in terminal from my ubuntu 22.04 using ssh. When I try to produce data using a python script to the same ...
Archit Mishra's user avatar
-1 votes
1 answer
123 views

How to run python script running when django server starts. For example, I have a python kafka script , this script should be running continously when django server starts. when i give two commands in ...
Sugunanayak Darbe's user avatar
0 votes
1 answer
38 views

In the answer from this post: https://stackoverflow.com/a/55832055/5937760 The output is: {TopicPartition(topic=u'python-test', partition=0): 5, TopicPartition(topic=u'python-test', partition=1): 20, ...
sojim2's user avatar
  • 1,317
0 votes
1 answer
222 views

My task is to count messages in Kafka topics (some with one partition, some with many partitions). I tried two techniques: one with subscribe() and other with assign(). Full code: #!/usr/bin/env ...
Michał Niklas's user avatar
0 votes
0 answers
188 views

I have a KafkaConsumer that is stuck in a "joining group" loop. It's a simple Kafka Python consumer, here is the python script: from kafka import KafkaConsumer import logging import sys ...
jceddy's user avatar
  • 1,997
0 votes
0 answers
218 views

For test purposes I have to read all messages in some kafka topics. Before test I remove all messages using /kafka-delete-records.sh then I run tests that fill kafka topics. After test I want to ...
Michał Niklas's user avatar
0 votes
1 answer
454 views

I have three consumers and one producer in Kafka. When the producer sends all the messages (there are 100 messages in my simple code), these messages are divided among three consumers, and my main ...
Sardar's user avatar
  • 788
1 vote
1 answer
146 views

I need to have a kafka producer and 4 consumers in python that balancing queue. My Topic bash code: kafka-topics --bootstrap-server localhost:9092 --create --topic numbers --partitions 4 --...
Ali Esmaeili's user avatar
3 votes
1 answer
476 views

I have custom function forward_to_kafka(list: List) which sends my events to kafka even if there are problems, it's purpose to deliver all messages in my list. I have already tested it with JB Plugin ...
shameoff's user avatar
0 votes
1 answer
457 views

I want to write my Kafka messages to a jsonl file which should each contain a number of lines (let's say 2). My producer currently writes 3 messages at a time so I should get two jsonl files: one with ...
Omega's user avatar
  • 881
0 votes
0 answers
14 views

I'm searching for a solution to use Kafka to display my API (localhost) to my docker. My producer file works fine and here it is Consumer file on the other hand returns this... I tried creating a new ...
Hawvb Ged's user avatar
1 vote
1 answer
1k views

I am trying to import: from kafka import KafkaConsumer from kafka import KafkaProducer but I am getting this error ImportError: cannot import name 'IncompatibleBrokerVersion' from 'kafka.errors'. I ...
user2540646's user avatar
0 votes
0 answers
41 views

Previosly I have error like this SyntaxError on "self.async" when running python kafka producer Based on the answer, best answer is switch from kafka package to kafka-python package, this is ...
Nabih Bawazir's user avatar
0 votes
1 answer
426 views

I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5. I'm still getting my feet wet with Kafka so it's entirely possible I'm doing things the wrong way. ...
Classified's user avatar
  • 6,100
1 vote
0 answers
63 views

I am unable to connect to kafka running in container. I have .env file KAFKA_BROKER_ID=1 KAFKA_ENABLE_KRAFT=true KAFKA_CFG_PROCESS_ROLES=broker,controller KAFKA_CFG_CONTROLLER_LISTENER_NAMES=...
Panda's user avatar
  • 683
0 votes
1 answer
150 views

How does Kafka store messages on a local server or laptop? I'm new to Kafka and just playing around with the tech for now but I'm curious to the answer because I started by looking at the Kafka ...
Classified's user avatar
  • 6,100
1 vote
1 answer
1k views

I am trying to set up kafka message polling using consumer poll. I can see loads of example code that uses message.error e.g. while True: try: message = consumer.poll(10.0) if ...
mez63's user avatar
  • 166
0 votes
1 answer
3k views

I have downloaded Kafka and installed kafka-python library using "pip install kafka-python" and "conda install -c conda-forge kafka-python". I am able to run "from kafka ...
SHIVANI SINGH's user avatar
0 votes
0 answers
25 views

I am trying to connect to Kafka using Python and I am able to connect and see the topic names however when I try to read messages for any topic nothing happens. It feels like the connection times-out ...
Prabhat_Seth's user avatar
0 votes
0 answers
17 views

I have a kafka consumer in VSCode IDE (MS Windows) that works properly. I take the same python script in docker container, but the kafka broker is not recognized. I put them inside the same network; ...
FSH's user avatar
  • 1
1 vote
2 answers
440 views

I am very new to docker and Kafka, and have a simple kafka python publisher shown below The following are in my dockerfile: FROM python:3.10 WORKDIR /app COPY . /app RUN pip install --user pip==23....
Alejandro's user avatar
  • 979
1 vote
1 answer
1k views

I can't seem to figure out how to mock the instantiation of a class, any pointers would be greatly appreciated. Here is what I am trying to do: I would like to test the method ClassA.some_method() and ...
jimfawkes's user avatar
  • 385
0 votes
2 answers
2k views

I have a method that continuously generates data and prints them on the console. Let's say something simple like generating random number: def number_generator(): while True: random....
Varun Pius Rodrigues's user avatar
4 votes
1 answer
5k views

Introduction I am currently experimenting with Kafka and FastAPI and trying to build a template to enable me to quickly write programs in a microservice pattern. Goal - Vision Building a repository of ...
mm117's user avatar
  • 91
0 votes
1 answer
914 views

I try to use confluent-kafka, but can't manage to send message to topic. I did the same with kafka-python, and message sends. What i am missing? Thank you! i use this code # using confluent-kafka ...
Vitalik's user avatar
0 votes
1 answer
94 views

Wrote two functions, 1. uploads a gzip file to Artifactory and 2. listens to the Kafka topic. As further steps, wanted to validate whether the upload is successful within 5 minutes by listening to the ...
Goku's user avatar
  • 544
2 votes
2 answers
1k views

I'm using Kafka to send logs to a topic. While sending the messages, I always get this error Message: 'test log' Arguments: () --- Logging error --- Traceback (most recent call last): File "/...
node_man's user avatar
  • 1,469
0 votes
0 answers
16 views

I have a container in the same docker-compose that is trying to connect to my broker. It suddenly stopped connecting and is resulting with error: 2023-02-22 14:24:00 ERROR:kafka.conn:Connect attempt ...
LauraAlice's user avatar
0 votes
2 answers
783 views

My kafka consumer is connected to a cluster of brokers. There is a domain name server in between. Each broker has a domain name associated with an IP address. Problem is, the IPs are not static and ...
sovon's user avatar
  • 927
0 votes
1 answer
928 views

I'm missing events when reading from a Kafaka queue because the consumer is updating the offset without an explicit commit even when enable_auto_commit is disabled. from kafka import KafkaClient, ...
Gree Tree Python's user avatar
2 votes
1 answer
2k views

I am trying to find a suitable Python library to do stream processing with streams Kafka topics, Kafka streams. Specifically, I am looking for libraries that support the following operations. KStream-...
Eagle1992's user avatar
0 votes
1 answer
428 views

I have an application with the need to pass messages into multiple layers of processing. I need to do this because all the new messages should be put into the first generic topic so they can be ...
Firouziam's user avatar
  • 805
0 votes
1 answer
966 views

Im working on a project that uses kafka producer and consumer in order to acquire articles (with specific topics) from a news_api every two hours and then with a consumer save them in a mongodb. So i ...
chris's user avatar
  • 1

1
2 3 4 5
10