tgoop.com/opensource_findings_python/154
Create:
Last Update:
Last Update:
🚀 New issue to ag2ai/faststream by @melenudo
📝 Bug: auto_commit is always True in Confluent Kafka (#2610)
Describe the bug
Even if you use the deprecated autocommit subscriber parameter, or if you use the ack_policy parameter with a value other than AckPolicy.ACK_FIRST, the consumer will set enable.auto.commit to True.
How to reproduce
from asyncio import sleep
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import FastStream, Logger
from faststream.confluent import KafkaBroker
class DataBasic(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.publisher("output_data")
@broker.subscriber("input_data", group_id="my-group", auto_commit=False)
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
logger.info(msg)
await sleep(20)
return DataBasic(data=msg.data + 1.0)
You can run this snippet (using the deprecated
auto_commit; the same behavior can be observed if you use ack_policy=AckPolicy.ACK).In Kafka, you will notice that the message is automatically committed before
on_input_data finishes.You can also debug the code and observe that in the consumer:
https://github.com/ag2ai/faststream/blob/8a4c60bdae02c7632c15ff1a1d15b268da6e095d/faststream/confluent/helpers/client.py#L236
self.config always has the property enable.auto.commit to TrueExpected behavior
When use
@subscriber(...,ack_policy=AckPolicy.ACK) the autocommit must be disabled (same behavior for a policy different than AckPolicy.ACK_FIRST)Observed behavior
enable.auto.commit is always True ignoring subscriber parameters.Environment
Running FastStream 0.6.2 with CPython 3.12.9 on Darwin
#goodfirstissue #bug
sent via relator
BY Находки в опенсорсе: Python
Share with your friend now:
tgoop.com/opensource_findings_python/154
