Skip to content

Conversation

@zhuud
Copy link

@zhuud zhuud commented Nov 10, 2025

kq: 支持 kafka.Writer BatchTimeout/BatchBytes 配置

  • 新增 WithBatchTimeout 选项,支持配置批量发送超时时间
  • 新增 WithBatchBytes 选项,支持配置批量发送字节大小
  • 补充完整的单元测试,覆盖单个配置和组合配置场景

涉及文件:

  • kq/pusher.go: 添加两个配置选项和对应的 PushOption 函数
  • kq/pusher_test.go: 添加对应的单元测试用例

Greptile Overview

Updated On: 2025-11-10 06:45:25 UTC

Greptile Summary

This PR enhances the Kafka queue (kq) package by adding support for two critical Kafka writer batch configuration options: BatchTimeout and BatchBytes. The changes introduce WithBatchTimeout() and WithBatchBytes() configuration functions that allow users to fine-tune Kafka message batching behavior for performance optimization.

The implementation follows the existing functional options pattern used throughout the codebase. Two new fields are added to the pushOptions struct (batchTimeout and batchBytes), and the NewPusher() function applies these configurations to the underlying kafka.Writer with proper zero-value validation. This allows users to control both the maximum time to wait before sending a batch (BatchTimeout) and the maximum batch size in bytes (BatchBytes), which are essential parameters for balancing throughput and latency in Kafka message production.

The changes are well-integrated with the existing architecture and maintain backward compatibility by using zero-value checks before applying the new configurations.

Important Files Changed

Filename Score Overview
kq/pusher.go 4/5 Added BatchTimeout and BatchBytes configuration support with proper validation
kq/pusher_test.go 5/5 Added comprehensive unit tests for new batch configuration options

Confidence score: 4/5

  • This PR is safe to merge with low risk of production issues
  • Score reflects solid implementation following established patterns, but minor concerns about validation logic and potential edge cases with zero/negative values
  • Pay close attention to the validation logic in pusher.go to ensure proper handling of edge cases

Sequence Diagram

sequenceDiagram
    participant User
    participant Pusher
    participant KafkaWriter
    participant ChunkExecutor

    User->>+Pusher: NewPusher(addrs, topic, WithBatchTimeout(), WithBatchBytes())
    Pusher->>Pusher: "Create kafka.Writer with addresses and topic"
    Pusher->>Pusher: "Apply batch timeout and batch bytes options"
    Pusher->>+ChunkExecutor: "Create ChunkExecutor (if not sync mode)"
    ChunkExecutor-->>-Pusher: "Return executor"
    Pusher-->>-User: "Return Pusher instance"

    User->>+Pusher: Push(ctx, value)
    Pusher->>Pusher: "Generate timestamp key"
    Pusher->>+Pusher: PushWithKey(ctx, key, value)
    Pusher->>Pusher: "Create kafka.Message with key and value"
    Pusher->>Pusher: "Inject trace context into message"
    alt executor exists (async mode)
        Pusher->>+ChunkExecutor: Add(message, messageSize)
        ChunkExecutor-->>-Pusher: "Return error or nil"
    else sync mode
        Pusher->>+KafkaWriter: WriteMessages(ctx, message)
        KafkaWriter-->>-Pusher: "Return error or nil"
    end
    Pusher-->>-Pusher: "Return result"
    Pusher-->>-User: "Return error or nil"

    Note over ChunkExecutor, KafkaWriter: "ChunkExecutor batches messages and flushes to KafkaWriter"
    ChunkExecutor->>+KafkaWriter: WriteMessages(ctx, batchedMessages)
    KafkaWriter-->>-ChunkExecutor: "Return error or nil"

    User->>+Pusher: Close()
    alt executor exists
        Pusher->>+ChunkExecutor: Flush()
        ChunkExecutor-->>-Pusher: "Complete pending writes"
    end
    Pusher->>+KafkaWriter: Close()
    KafkaWriter-->>-Pusher: "Return error or nil"
    Pusher-->>-User: "Return error or nil"
Loading

zhudi added 2 commits November 7, 2025 16:22
- 新增 WithBatchTimeout 选项,支持配置批量发送超时时间
- 新增 WithBatchSize 选项,支持配置批量发送消息数量
- 新增 WithBatchBytes 选项,支持配置批量发送字节大小
- 补充完整的单元测试,覆盖单个配置和组合配置场景

涉及文件:
- kq/pusher.go: 添加三个配置选项和对应的 PushOption 函数
- kq/pusher_test.go: 添加对应的单元测试用例
- 新增 WithBatchTimeout 选项,支持配置批量发送超时时间
- 新增 WithBatchBytes 选项,支持配置批量发送字节大小
- 补充完整的单元测试,覆盖单个配置和组合配置场景

涉及文件:
- kq/pusher.go: 添加两个配置选项和对应的 PushOption 函数
- kq/pusher_test.go: 添加对应的单元测试用例
Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant