@@ -134,3 +134,105 @@ Tests
134
134
=====
135
135
136
136
See [ kafka/README] ( kafka/README )
137
+
138
+
139
+
140
+ Getting started
141
+ ===============
142
+
143
+ Installing librdkafka
144
+ ---------------------
145
+
146
+ git clone https://github.com/edenhill/librdkafka.git
147
+ cd librdkafka
148
+ ./configure --prefix /usr
149
+ make
150
+ sudo make install
151
+
152
+
153
+ Build the Go client
154
+ -------------------
155
+
156
+ From the confluent-kafka-go directory which should reside
157
+ in ` $GOPATH/src/github.com/confluentinc/confluent-kafka-go ` :
158
+
159
+ cd kafka
160
+ go install
161
+
162
+
163
+ High-level consumer
164
+ -------------------
165
+
166
+ * Decide if you want to read messages and events from the ` .Events() ` channel
167
+ (set ` "go.events.channel.enable": true ` ) or by calling ` .Poll() ` .
168
+
169
+ * Create a Consumer with ` kafka.NewConsumer() ` providing at
170
+ least the ` bootstrap.servers ` and ` group.id ` configuration properties.
171
+
172
+ * Call .Subscribe() or (.SubscribeTopics() to subscribe to multiple topics)
173
+ to join the group with the specified subscription set.
174
+ Subscriptions are atomic, calling ` .Subscribe*() ` again will leave
175
+ the group and rejoin with the new set of topics.
176
+
177
+ * Start reading events and messages from either the ` .Events ` channel
178
+ or by calling ` .Poll() ` .
179
+
180
+ * When the group has rebalanced each client member is assigned a
181
+ (sub-)set of topic+partitions.
182
+ By default the consumer will start fetching messages for its assigned
183
+ partitions at this point, but your application may enable rebalance
184
+ events to get an insight into what the assigned partitions where
185
+ as well as set the initial offsets. To do this you need to pass
186
+ ` "go.application.rebalance.enable": true ` to the ` NewConsumer() ` call
187
+ mentioned above. You will (eventually) see a ` kafka.AssignedPartitions ` event
188
+ with the assigned partition set. You can optionally modify the initial
189
+ offsets (they'll default to stored offsets and if there are no previously stored
190
+ offsets it will fall back to ` "default.topic.config": {"auto.offset.reset": ..} `
191
+ which defaults to the ` latest ` message) and then call ` .Assign(partitions) `
192
+ to start consuming. If you don't need to modify the initial offsets you will
193
+ not need to call ` .Assign() ` , the client will do so automatically for you if
194
+ you dont.
195
+
196
+ * As messages are fetched they will be made available on either the
197
+ ` .Events ` channel or by calling ` .Poll() ` , look for event type ` *kafka.Message ` .
198
+
199
+ * Handle messages, events and errors to your liking.
200
+
201
+ * When you are done consuming call ` .Close() ` to commit final offsets
202
+ and leave the consumer group.
203
+
204
+
205
+
206
+ Producer
207
+ --------
208
+
209
+ * Create a Producer with ` kafka.NewProducer() ` providing at least
210
+ the ` bootstrap.servers ` configuration properties.
211
+
212
+ * Messages may now be produced either by sending a ` *kafka.Message `
213
+ on the ` .ProduceChannel ` or by calling ` .Produce() ` .
214
+
215
+ * Producing is an asynchronous operation so the client notifies the application
216
+ of per-message produce success or failure through something called delivery reports.
217
+ Delivery reports are by default emitted on the ` .Events ` channel as ` *kafka.Message `
218
+ and you should check ` msg.TopicPartition.Error ` for ` nil ` to find out if the message
219
+ was succesfully delivered or not.
220
+ It is also possible to direct delivery reports to alternate channels
221
+ by providing a non-nil ` chan Event ` channel to ` .Produce() ` .
222
+ If no delivery reports are wanted they can be completely disabled by
223
+ setting configuration property ` "go.delivery.reports": false ` .
224
+
225
+ * When you are done producing messages you will need to make sure all messages
226
+ are indeed delivered to the broker (or failed), remember that this is
227
+ an asynchronous client so some of your messages may be lingering in internal
228
+ channels or tranmission queues.
229
+ To do this you can either keep track of the messages you've produced
230
+ and wait for their corresponding delivery reports, or call the convenience
231
+ function ` .Flush() ` that will block until all message deliveries are done
232
+ or the provided timeout elapses.
233
+
234
+ * Finally call ` .Close() ` to decommission the producer.
235
+
236
+
237
+
238
+ See the [ examples] ( examples ) directory for example implementations of the above.
0 commit comments