Skip to content
This repository was archived by the owner on Oct 27, 2025. It is now read-only.

Commit 9481cfb

Browse files
committed
Add opaque reference handling in Configs & Callbacks, produce & Message
1 parent 0ee9076 commit 9481cfb

29 files changed

+710
-43
lines changed

benchmarks/ConsumerBench.php

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use RdKafka\Producer;
99

1010
/**
11-
* @Groups({"Consumer", "ffi", "ext"})
11+
* @Groups({"Consumer"})
1212
* @BeforeClassMethods({"produce10000Messages"})
1313
*/
1414
class ConsumerBench
@@ -31,6 +31,7 @@ public static function produce10000Messages(): void
3131
* @Warmup(1)
3232
* @Revs(100)
3333
* @Iterations(5)
34+
* @Groups({"ffi", "ext"})
3435
*/
3536
public function benchConsume1Message(): void
3637
{
@@ -60,6 +61,7 @@ public function benchConsume1Message(): void
6061
* @Warmup(1)
6162
* @Revs(100)
6263
* @Iterations(5)
64+
* @Groups({"ffi", "ext"})
6365
*/
6466
public function benchConsumeCallback1Message(): void
6567
{
@@ -75,7 +77,7 @@ public function benchConsumeCallback1Message(): void
7577
$callback = new class() {
7678
public int $messages = 0;
7779

78-
public function __invoke(Message $message, ?object $opaque = null): void
80+
public function __invoke(Message $message, $opaque = null): void
7981
{
8082
$this->messages++;
8183
}
@@ -88,10 +90,45 @@ public function __invoke(Message $message, ?object $opaque = null): void
8890
}
8991
}
9092

93+
/**
94+
* @Warmup(10)
95+
* @Revs(100)
96+
* @Iterations(5)
97+
* @Groups({"ffi"})
98+
*/
99+
public function benchConsumeCallback1MessageWithOpaque(): void
100+
{
101+
$counter = new stdClass();
102+
$counter->count = 0;
103+
104+
$conf = new Conf();
105+
$conf->set('metadata.broker.list', 'kafka:9092');
106+
$conf->set('auto.offset.reset', 'earliest');
107+
$conf->set('log_level', (string) 0);
108+
$conf->set('consume.callback.max.messages', (string) 1);
109+
$consumer = new Consumer($conf);
110+
$topic = $consumer->newTopic('benchmarks');
111+
112+
$topic->consumeStart(0, 0);
113+
$callback = new class() {
114+
public function __invoke(Message $message, $opaque = null): void
115+
{
116+
$opaque->count++;
117+
}
118+
};
119+
$topic->consumeCallback(0, 500, $callback, $counter);
120+
$topic->consumeStop(0);
121+
122+
if ($counter->count < 1) {
123+
throw new Exception('failed to consume 1 message');
124+
}
125+
}
126+
91127
/**
92128
* @Warmup(1)
93129
* @Revs(100)
94130
* @Iterations(5)
131+
* @Groups({"ffi", "ext"})
95132
*/
96133
public function benchConsume100Messages(): void
97134
{
@@ -121,6 +158,7 @@ public function benchConsume100Messages(): void
121158
* @Warmup(1)
122159
* @Revs(100)
123160
* @Iterations(5)
161+
* @Groups({"ffi", "ext"})
124162
*/
125163
public function benchConsume100MessagesWithLogCallback(): void
126164
{
@@ -156,6 +194,7 @@ function (Consumer $consumer, int $level, string $facility, string $message): vo
156194
* @Warmup(1)
157195
* @Revs(100)
158196
* @Iterations(5)
197+
* @Groups({"ffi", "ext"})
159198
*/
160199
public function benchConsumeBatch100Messages(): void
161200
{
@@ -179,6 +218,7 @@ public function benchConsumeBatch100Messages(): void
179218
* @Warmup(1)
180219
* @Revs(100)
181220
* @Iterations(5)
221+
* @Groups({"ffi", "ext"})
182222
*/
183223
public function benchConsumeCallback100Message(): void
184224
{
@@ -194,7 +234,7 @@ public function benchConsumeCallback100Message(): void
194234
$callback = new class() {
195235
public int $messages = 0;
196236

197-
public function __invoke(Message $message, ?object $opaque = null): void
237+
public function __invoke(Message $message, $opaque = null): void
198238
{
199239
$this->messages++;
200240
}
@@ -206,4 +246,38 @@ public function __invoke(Message $message, ?object $opaque = null): void
206246
throw new Exception('failed to consume 100 messages');
207247
}
208248
}
249+
250+
/**
251+
* @Warmup(10)
252+
* @Revs(100)
253+
* @Iterations(5)
254+
* @Groups({"ffi"})
255+
*/
256+
public function benchConsumeCallback100MessageWithOpaque(): void
257+
{
258+
$counter = new stdClass();
259+
$counter->count = 0;
260+
261+
$conf = new Conf();
262+
$conf->set('metadata.broker.list', 'kafka:9092');
263+
$conf->set('auto.offset.reset', 'earliest');
264+
$conf->set('log_level', (string) 0);
265+
$conf->set('consume.callback.max.messages', (string) 100);
266+
$consumer = new Consumer($conf);
267+
$topic = $consumer->newTopic('benchmarks');
268+
269+
$topic->consumeStart(0, 0);
270+
$callback = new class() {
271+
public function __invoke(Message $message, $opaque = null): void
272+
{
273+
$opaque->count++;
274+
}
275+
};
276+
$topic->consumeCallback(0, 500, $callback, $counter);
277+
$topic->consumeStop(0);
278+
279+
if ($counter->count < 100) {
280+
throw new Exception('failed to consume 100 messages');
281+
}
282+
}
209283
}

benchmarks/ProducerBench.php

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
use RdKafka\Producer;
88

99
/**
10-
* @Groups({"Producer", "ffi", "ext"})
10+
* @Groups({"Producer"})
1111
*/
1212
class ProducerBench
1313
{
1414
/**
1515
* @Warmup(1)
1616
* @Revs(100)
1717
* @Iterations(5)
18+
* @Groups({"ffi", "ext"})
1819
*/
1920
public function benchProduce1Message(): void
2021
{
@@ -36,6 +37,7 @@ public function benchProduce1Message(): void
3637
* @Warmup(1)
3738
* @Revs(100)
3839
* @Iterations(5)
40+
* @Groups({"ffi", "ext"})
3941
*/
4042
public function benchProduce100Messages(): void
4143
{
@@ -59,6 +61,7 @@ public function benchProduce100Messages(): void
5961
* @Warmup(1)
6062
* @Revs(100)
6163
* @Iterations(5)
64+
* @Groups({"ffi", "ext"})
6265
*/
6366
public function benchProduce100MessagesWithLogAndDrMsgCallbacks(): void
6467
{
@@ -75,7 +78,7 @@ function (Producer $producer, int $level, string $facility, string $message): vo
7578
$deliveryCallback = new class() {
7679
public int $messages = 0;
7780

78-
public function __invoke(Producer $producer, Message $message, ?object $opaque = null): void
81+
public function __invoke(Producer $producer, Message $message, $opaque = null): void
7982
{
8083
$this->messages++;
8184
}
@@ -92,4 +95,45 @@ public function __invoke(Producer $producer, Message $message, ?object $opaque =
9295
$producer->poll(0);
9396
}
9497
}
98+
99+
/**
100+
* @Warmup(10)
101+
* @Revs(1000)
102+
* @Iterations(5)
103+
* @Groups({"ffi"})
104+
*/
105+
public function benchProduce100MessagesWithLogAndDrMsgCallbacksWithOpaque(): void
106+
{
107+
$counter = new stdClass();
108+
$counter->count = 0;
109+
110+
$conf = new Conf();
111+
$conf->set('metadata.broker.list', 'kafka:9092');
112+
$conf->set('batch.num.messages', (string) 100);
113+
$conf->set('debug', 'broker,topic,msg');
114+
$conf->set('log_level', (string) LOG_DEBUG);
115+
$conf->setOpaque($counter);
116+
$conf->setLogCb(
117+
function (Producer $producer, int $level, string $facility, string $message): void {
118+
// echo sprintf('log: %d %s %s', $level, $facility, $message) . PHP_EOL;
119+
}
120+
);
121+
$deliveryCallback = new class() {
122+
public function __invoke(Producer $producer, Message $message, $opaque = null): void
123+
{
124+
$message->_private->count += $opaque->add;
125+
}
126+
};
127+
$conf->setDrMsgCb($deliveryCallback);
128+
$producer = new Producer($conf);
129+
$topic = $producer->newTopic('benchmarks');
130+
131+
for ($i = 0; $i < 100; $i++) {
132+
$topic->produce(0, 0, 'bench', 'mark', 1);
133+
}
134+
135+
while ($counter->count < 100) {
136+
$producer->poll(0);
137+
}
138+
}
95139
}

examples/consumer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ function (Consumer $consumer, int $level, string $facility, string $message): vo
2121

2222
$conf->set('statistics.interval.ms', (string) 1000);
2323
$conf->setStatsCb(
24-
function (Consumer $consumer, string $json, int $jsonLength, ?object $opaque): void {
24+
function (Consumer $consumer, string $json, int $jsonLength, $opaque): void {
2525
echo "stats: ${json}" . PHP_EOL;
2626
}
2727
);

examples/producer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ function (Producer $producer, int $level, string $facility, string $message): vo
3131
);
3232
$conf->set('statistics.interval.ms', (string) 1000);
3333
$conf->setStatsCb(
34-
function ($consumer, $json, $json_len, $opaque): void {
34+
function (Producer $producer, string $json, int $json_len, $opaque): void {
3535
echo "stats: ${json}" . PHP_EOL;
3636
}
3737
);

src/RdKafka.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use RdKafka\Conf;
77
use RdKafka\Exception;
88
use RdKafka\FFI\Library;
9+
use RdKafka\FFI\OpaqueMap;
910
use RdKafka\Metadata;
1011
use RdKafka\Topic;
1112

@@ -159,4 +160,13 @@ public function queryWatermarkOffsets(string $topic, int $partition, int &$low,
159160
$low = (int) $lowResult->cdata;
160161
$high = (int) $highResult->cdata;
161162
}
163+
164+
/**
165+
* @return mixed|null
166+
*/
167+
public function getOpaque()
168+
{
169+
$cOpaque = Library::rd_kafka_opaque($this->kafka);
170+
return OpaqueMap::get($cOpaque);
171+
}
162172
}

src/RdKafka/Conf.php

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use RdKafka\FFI\Library;
1212
use RdKafka\FFI\LogCallbackProxy;
1313
use RdKafka\FFI\OffsetCommitCallbackProxy;
14+
use RdKafka\FFI\OpaqueMap;
1415
use RdKafka\FFI\RebalanceCallbackProxy;
1516
use RdKafka\FFI\StatsCallbackProxy;
1617

@@ -20,6 +21,7 @@
2021
class Conf
2122
{
2223
private CData $conf;
24+
private ?CData $cOpaque;
2325

2426
public function __construct()
2527
{
@@ -29,6 +31,8 @@ public function __construct()
2931
$this->set('client.software.name', 'php-rdkafka-ffi');
3032
$this->set('client.software.version', Library::getClientVersion());
3133
}
34+
35+
$this->cOpaque = null;
3236
}
3337

3438
public function __destruct()
@@ -120,7 +124,7 @@ public function setDefaultTopicConf(TopicConf $topic_conf): void
120124
}
121125

122126
/**
123-
* @param callable $callback function(Producer $producer, Message $message, ?object $opaque = null)
127+
* @param callable $callback function(Producer $producer, Message $message, ?mixed $opaque = null)
124128
* @throws Exception
125129
*/
126130
public function setDrMsgCb(callable $callback): void
@@ -150,7 +154,7 @@ public function setLogCb(?callable $callback): void
150154
}
151155

152156
/**
153-
* @param callable $callback function($consumerOrProducer, int $err, string $reason, ?object $opaque = null)
157+
* @param callable $callback function($consumerOrProducer, int $err, string $reason, ?mixed $opaque = null)
154158
*/
155159
public function setErrorCb(callable $callback): void
156160
{
@@ -161,7 +165,7 @@ public function setErrorCb(callable $callback): void
161165
}
162166

163167
/**
164-
* @param callable $callback function(KafkaConsumer $consumer, int $err, array $topicPartitions, ?object $opaque = null)
168+
* @param callable $callback function(KafkaConsumer $consumer, int $err, array $topicPartitions, ?mixed $opaque = null)
165169
*/
166170
public function setRebalanceCb(callable $callback): void
167171
{
@@ -172,7 +176,7 @@ public function setRebalanceCb(callable $callback): void
172176
}
173177

174178
/**
175-
* @param callable $callback function($consumerOrProducer, string $json, int $jsonLength, ?object $opaque = null)
179+
* @param callable $callback function($consumerOrProducer, string $json, int $jsonLength, ?mixed $opaque = null)
176180
*/
177181
public function setStatsCb(callable $callback): void
178182
{
@@ -183,7 +187,7 @@ public function setStatsCb(callable $callback): void
183187
}
184188

185189
/**
186-
* @param callable $callback function(KafkaConsumer $consumer, int $err, array $topicPartitions, ?object $opaque = null)
190+
* @param callable $callback function(KafkaConsumer $consumer, int $err, array $topicPartitions, ?mixed $opaque = null)
187191
*/
188192
public function setOffsetCommitCb(callable $callback): void
189193
{
@@ -192,4 +196,17 @@ public function setOffsetCommitCb(callable $callback): void
192196
OffsetCommitCallbackProxy::create($callback)
193197
);
194198
}
199+
200+
/**
201+
* @param mixed $opaque
202+
*/
203+
public function setOpaque($opaque): void
204+
{
205+
if ($this->cOpaque !== null) {
206+
OpaqueMap::pull($this->cOpaque);
207+
}
208+
209+
$this->cOpaque = OpaqueMap::push($opaque);
210+
Library::rd_kafka_conf_set_opaque($this->conf, FFI::addr($this->cOpaque));
211+
}
195212
}

0 commit comments

Comments
 (0)