Kafka Client Setup
See our Start Streaming GCN Notices quick start guide for a step-by-step walkthrough to begin streaming alerts now!
Python
The Python client is a very lightweight wrapper around confluent-kafka-python.
Open a terminal and run this command to install with pip:pip install gcn-kafka
conda install -c conda-forge gcn-kafka
example.py
:from gcn_kafka import Consumer
# Connect as a consumer
# Warning: don't share the client secret with others.
consumer = Consumer(client_id='fill me in',
client_secret='fill me in')
# List all topics
print(consumer.list_topics().topics)
# Subscribe to topics and receive alerts
consumer.subscribe(['gcn.classic.text.FERMI_GBM_FIN_POS',
'gcn.classic.text.LVC_INITIAL'])
while True:
for message in consumer.consume(timeout=1):
if message.error():
print(message.error())
continue
# Print the topic and message ID
print(f'topic={message.topic()}, offset={message.offset()}')
value = message.value()
print(value)
python example.py
Node.js
The Node.js client is a very lightweight wrapper around Kafka.js. The sample code is slightly different for the two module styles supported by Node.js, ECMAScript (.mjs) or CommonJS (.cjs).
ECMAScript (.mjs)
Open a terminal and run this command to install with npm:npm install gcn-kafka
example.mjs
:import { Kafka } from 'gcn-kafka'
// Create a client.
// Warning: don't share the client secret with others.
const kafka = new Kafka({
client_id: 'fill me in',
client_secret: 'fill me in',
})
// List topics
const admin = kafka.admin()
const topics = await admin.listTopics()
console.log(topics)
// Subscribe to topics and receive alerts
const consumer = kafka.consumer()
try {
await consumer.subscribe({
topics: [
'gcn.classic.text.FERMI_GBM_FIN_POS',
'gcn.classic.text.LVC_INITIAL',
],
})
} catch (error) {
if (error.type === 'TOPIC_AUTHORIZATION_FAILED')
{
console.warn('Not all subscribed topics are available')
} else {
throw error
}
}
await consumer.run({
eachMessage: async (payload) => {
const value = payload.message.value
console.log(`topic=${payload.topic}, offset=${payload.message.offset}`)
console.log(value?.toString())
},
})
node example.mjs
CommonJS (.cjs)
Open a terminal and run this command to install with npm:npm install gcn-kafka
example.cjs
:const { Kafka } = require('gcn-kafka');
(async () => {
// Create a client.
// Warning: don't share the client secret with others.
const kafka = new Kafka({
client_id: 'fill me in',
client_secret: 'fill me in',
})
// List topics
const admin = kafka.admin()
const topics = await admin.listTopics()
console.log(topics)
// Subscribe to topics and receive alerts
const consumer = kafka.consumer()
try {
await consumer.subscribe({
topics: [
'gcn.classic.text.FERMI_GBM_FIN_POS',
'gcn.classic.text.LVC_INITIAL',
],
})
} catch (error) {
if (error.type === 'TOPIC_AUTHORIZATION_FAILED')
{
console.warn('Not all subscribed topics are available')
} else {
throw error
}
}
await consumer.run({
eachMessage: async (payload) => {
const value = payload.message.value
console.log(`topic=${payload.topic}, offset=${payload.message.offset}`)
console.log(value?.toString())
},
})
})()
node example.cjs
C
For C, use librdkafka and the following sample code based on Confluent's guide, Getting Started with Apache Kafka and C/C++.
First, install librdkafka version 2.2.0 or newer. Then, save the C code below to a file calledexample.c
:#include <inttypes.h>
#include <openssl/bio.h>
#include <openssl/evp.h>
#include <openssl/rand.h>
#include <librdkafka/rdkafka.h>
#include <stdio.h>
int main(int argc, char **argv)
{
char errstr[512];
int err;
// Generate random group ID
char rand_bytes[256], group_id[2 * sizeof(rand_bytes)] = {'\0'};
RAND_bytes(rand_bytes, sizeof(rand_bytes));
BIO *b64 = BIO_new(BIO_f_base64());
BIO *mem = BIO_new(BIO_s_mem());
BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
BIO_push(b64, mem);
BIO_write(b64, rand_bytes, sizeof(rand_bytes) - 1);
BIO_flush(b64);
BIO_read(mem, group_id, sizeof(group_id) - 1);
BIO_free_all(b64);
char *conf_kv[][2] = {
{"bootstrap.servers", "kafka.gcn.nasa.gov"},
{"group.id", group_id},
{"sasl.mechanisms", "OAUTHBEARER"},
// Warning: don't share the client secret with others.
{"sasl.oauthbearer.client.id", "fill me in"},
{"sasl.oauthbearer.client.secret", "fill me in"},
{"sasl.oauthbearer.method", "oidc"},
{"sasl.oauthbearer.token.endpoint.url", "https://auth.gcn.nasa.gov/oauth2/token"},
{"security.protocol", "sasl_ssl"}
};
static const char *topics[] = {
"gcn.classic.text.FERMI_GBM_FIN_POS",
"gcn.classic.text.LVC_INITIAL"
};
static const int num_conf_kv = sizeof(conf_kv) / sizeof(*conf_kv);
static const int num_topics = sizeof(topics) / sizeof(*topics);
// Assemble configuration
rd_kafka_conf_t *conf = rd_kafka_conf_new();
for (int i = 0; i < num_conf_kv; i ++)
{
if (rd_kafka_conf_set(conf, conf_kv[i][0], conf_kv[i][1],
errstr, sizeof(errstr)))
{
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
}
// Create consumer
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "%s\n", errstr);
return 1;
}
// List topics
const rd_kafka_metadata_t *metadata;
err = rd_kafka_metadata(rk, 0, NULL, &metadata, -1);
if (err) {
fprintf(stderr, "%s\n", rd_kafka_err2str(err));
rd_kafka_destroy(rk);
return 1;
}
for (int i = 0; i < metadata->topic_cnt; i ++)
{
printf("%s\n", metadata->topics[i].topic);
}
rd_kafka_metadata_destroy(metadata);
// Subscribe to topics
rd_kafka_topic_partition_list_t *topics_partitions =
rd_kafka_topic_partition_list_new(num_topics);
for (int i = 0; i < num_topics; i ++)
rd_kafka_topic_partition_list_add(topics_partitions, topics[i],
RD_KAFKA_PARTITION_UA);
err = rd_kafka_subscribe(rk, topics_partitions);
rd_kafka_topic_partition_list_destroy(topics_partitions);
if (err)
{
rd_kafka_destroy(rk);
fprintf(stderr, "%s\n", rd_kafka_err2str(err));
return 1;
}
// Consume messages
while (1)
{
rd_kafka_message_t *message = rd_kafka_consumer_poll(rk, -1);
if (!message)
{
continue;
} else if (message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
// Ignore this error; it just means that we are at the end of the stream
// and need to wait for more data.
} else if (message->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) {
// Unknown topic or partition; print warning and continue.
fprintf(stderr, "%s\n", rd_kafka_message_errstr(message));
} else if (message->err) {
fprintf(stderr, "%s\n", rd_kafka_message_errstr(message));
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);
return 1;
} else {
// We received a message; print its topic and offset.
printf(
"topic=%s, offset=%" PRId64 "\n",
rd_kafka_topic_name(message->rkt),
message->offset
);
// Print the message itself.
printf("%.*s\n", message->len, message->payload);
}
rd_kafka_message_destroy(message);
}
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);
return 0;
}
gcc $(pkg-config --cflags libcrypto rdkafka) example.c $(pkg-config --libs libcrypto rdkafka)
./a.out
C#
For C#, use the Confluent.Kafka NuGet package. The code sample was based on Confluent's Kafka .NET Client documentation.
In Visual Studio, create a new C# Console App under File > New > Project. After the project initializes, right click the solution in the Solution Explorer and click Manage NuGet packages for solution. Browse for and install Confluent.Kafka. Copy the following into your Program.cs file. using Confluent.Kafka;
var config = new ConsumerConfig
{
SecurityProtocol = SecurityProtocol.SaslSsl,
BootstrapServers = "kafka.gcn.nasa.gov",
GroupId = Guid.NewGuid().ToString(),
SaslMechanism = SaslMechanism.OAuthBearer,
SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc,
SaslOauthbearerTokenEndpointUrl = "https://auth.gcn.nasa.gov/oauth2/token",
// Warning: don't share the client secret with others
SaslOauthbearerClientId = "fill me in",
SaslOauthbearerClientSecret = "fill me in"
};
// Create a client.
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// Subscribe to topics
consumer.Subscribe(new List<string> {
"gcn.classic.text.FERMI_GBM_FIN_POS",
"gcn.classic.text.LVC_INITIAL"
});
// List all topics
consumer.Subscription.ForEach(topic => Console.WriteLine(topic));
// Consume messages
while (true)
{
try
{
var consumeResult = consumer.Consume();
Console.WriteLine(string.Format("topic={0}, offset={1}",consumeResult.Topic, consumeResult.Offset));
Console.WriteLine(consumeResult.Message.Value);
}
catch (Exception ex)
{
if (ex.Message.Contains("Subscribed topic not available"))
{
Console.WriteLine(ex.Message);
}
else
{
throw;
}
}
}
}
Known Issues
confluent-kafka-python
If you use confluent-kafka-python v2.1.0 or v2.1.1 with librdkafka v2.1.1 you will encounter a segmentation fault when subscribed to unavailable topics.
Please refer to the confluent-kafka-python github issue for updates on the issue.