Skip to main content
New Announcement Feature, Code of Conduct, Circular Revisions. See news and announcements

Kafka Client Setup

See our Start Streaming GCN Notices quick start guide for a step-by-step walkthrough to begin streaming alerts now!

Python

The Python client is a very lightweight wrapper around confluent-kafka-python.

Open a terminal and run this command to install with pip:

pip install gcn-kafka

or this command to install with with conda:

conda install -c conda-forge gcn-kafka

Save the Python code below to a file called example.py:

from gcn_kafka import Consumer

# Connect as a consumer
# Warning: don't share the client secret with others.
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in')

# List all topics
print(consumer.list_topics().topics)

# Subscribe to topics and receive alerts
consumer.subscribe(['gcn.classic.text.FERMI_GBM_FIN_POS',
                    'gcn.classic.text.LVC_INITIAL'])
while True:
    for message in consumer.consume(timeout=1):
        if message.error():
            print(message.error())
            continue
        # Print the topic and message ID
        print(f'topic={message.topic()}, offset={message.offset()}')
        value = message.value()
        print(value)

Run the code by typing this command in the terminal:

python example.py

Node.js

The Node.js client is a very lightweight wrapper around Kafka.js. The sample code is slightly different for the two module styles supported by Node.js, ECMAScript (.mjs) or CommonJS (.cjs).

ECMAScript (.mjs)

Open a terminal and run this command to install with npm:

npm install gcn-kafka

Save the JavaScript code below to a file called example.mjs:

import { Kafka } from 'gcn-kafka'

// Create a client.
// Warning: don't share the client secret with others.
const kafka = new Kafka({
  client_id: 'fill me in',
  client_secret: 'fill me in',
})

// List topics
const admin = kafka.admin()
const topics = await admin.listTopics()
console.log(topics)

// Subscribe to topics and receive alerts
const consumer = kafka.consumer()
try {
  await consumer.subscribe({
    topics: [
        'gcn.classic.text.FERMI_GBM_FIN_POS',
        'gcn.classic.text.LVC_INITIAL',
    ],
  })
} catch (error) {
  if (error.type === 'TOPIC_AUTHORIZATION_FAILED')
  {
    console.warn('Not all subscribed topics are available')
  } else {
    throw error
  }
}

await consumer.run({
  eachMessage: async (payload) => {
    const value = payload.message.value
    console.log(`topic=${payload.topic}, offset=${payload.message.offset}`)
    console.log(value?.toString())
  },
})

Run the code by typing this command in the terminal:

node example.mjs

CommonJS (.cjs)

Open a terminal and run this command to install with npm:

npm install gcn-kafka

Save the JavaScript code below to a file called example.cjs:

const { Kafka } = require('gcn-kafka');

(async () => {
  // Create a client.
  // Warning: don't share the client secret with others.
  const kafka = new Kafka({
    client_id: 'fill me in',
    client_secret: 'fill me in',
  })

  // List topics
  const admin = kafka.admin()
  const topics = await admin.listTopics()
  console.log(topics)
  
  // Subscribe to topics and receive alerts
  const consumer = kafka.consumer()
  try {
    await consumer.subscribe({
      topics: [
          'gcn.classic.text.FERMI_GBM_FIN_POS',
          'gcn.classic.text.LVC_INITIAL',
      ],
    })
  } catch (error) {
    if (error.type === 'TOPIC_AUTHORIZATION_FAILED')
    {
      console.warn('Not all subscribed topics are available')
    } else {
      throw error
    }
  }

  await consumer.run({
    eachMessage: async (payload) => {
      const value = payload.message.value
      console.log(`topic=${payload.topic}, offset=${payload.message.offset}`)
      console.log(value?.toString())
    },
  })
})()

Run the code by typing this command in the terminal:

node example.cjs

C

For C, use librdkafka and the following sample code based on Confluent's guide, Getting Started with Apache Kafka and C/C++.

First, install librdkafka version 2.2.0 or newer. Then, save the C code below to a file called example.c:

#include <inttypes.h>
#include <openssl/bio.h>
#include <openssl/evp.h>
#include <openssl/rand.h>
#include <librdkafka/rdkafka.h>
#include <stdio.h>


int main(int argc, char **argv)
{
  char errstr[512];
  int err;

  // Generate random group ID
  char rand_bytes[256], group_id[2 * sizeof(rand_bytes)] = {'\0'};
  RAND_bytes(rand_bytes, sizeof(rand_bytes));
  BIO *b64 = BIO_new(BIO_f_base64());
  BIO *mem = BIO_new(BIO_s_mem());
  BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
  BIO_push(b64, mem);
  BIO_write(b64, rand_bytes, sizeof(rand_bytes) - 1);
  BIO_flush(b64);
  BIO_read(mem, group_id, sizeof(group_id) - 1);
  BIO_free_all(b64);

  char *conf_kv[][2] = {
    {"bootstrap.servers", "kafka.gcn.nasa.gov"},
    {"group.id", group_id},
    {"sasl.mechanisms", "OAUTHBEARER"},
    // Warning: don't share the client secret with others.
    {"sasl.oauthbearer.client.id", "fill me in"},
    {"sasl.oauthbearer.client.secret", "fill me in"},
    {"sasl.oauthbearer.method", "oidc"},
    {"sasl.oauthbearer.token.endpoint.url", "https://auth.gcn.nasa.gov/oauth2/token"},
    {"security.protocol", "sasl_ssl"}
  };

  static const char *topics[] = {
    "gcn.classic.text.FERMI_GBM_FIN_POS",
    "gcn.classic.text.LVC_INITIAL"
  };

  static const int num_conf_kv = sizeof(conf_kv) / sizeof(*conf_kv);
  static const int num_topics = sizeof(topics) / sizeof(*topics);

  // Assemble configuration
  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  for (int i = 0; i < num_conf_kv; i ++)
  {
    if (rd_kafka_conf_set(conf, conf_kv[i][0], conf_kv[i][1],
                          errstr, sizeof(errstr)))
    {
      fprintf(stderr, "%s\n", errstr);
      rd_kafka_conf_destroy(conf);
      return 1;
    }
  }

  // Create consumer
  rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
  if (!rk) {
    fprintf(stderr, "%s\n", errstr);
    return 1;
  }
  
  // List topics
  const rd_kafka_metadata_t *metadata;
  err = rd_kafka_metadata(rk, 0, NULL, &metadata, -1);
  if (err) {
    fprintf(stderr, "%s\n", rd_kafka_err2str(err));
    rd_kafka_destroy(rk);
    return 1;
  }
  for (int i = 0; i < metadata->topic_cnt; i ++)
  {
    printf("%s\n", metadata->topics[i].topic);
  }
  rd_kafka_metadata_destroy(metadata);
  
  // Subscribe to topics
  rd_kafka_topic_partition_list_t *topics_partitions =
    rd_kafka_topic_partition_list_new(num_topics);
  for (int i = 0; i < num_topics; i ++)
    rd_kafka_topic_partition_list_add(topics_partitions, topics[i],
                                      RD_KAFKA_PARTITION_UA);
  err = rd_kafka_subscribe(rk, topics_partitions);
  rd_kafka_topic_partition_list_destroy(topics_partitions);
  if (err)
  {
    rd_kafka_destroy(rk);
    fprintf(stderr, "%s\n", rd_kafka_err2str(err));
    return 1;
  }

  // Consume messages
  while (1)
  {
    rd_kafka_message_t *message = rd_kafka_consumer_poll(rk, -1);

    if (!message)
    {
      continue;
    } else if (message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
      // Ignore this error; it just means that we are at the end of the stream
      // and need to wait for more data.
    } else if (message->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) {
      // Unknown topic or partition; print warning and continue.
      fprintf(stderr, "%s\n", rd_kafka_message_errstr(message));
    } else if (message->err) {
      fprintf(stderr, "%s\n", rd_kafka_message_errstr(message));
      rd_kafka_consumer_close(rk);
      rd_kafka_destroy(rk);
      return 1;
    } else {
      // We received a message; print its topic and offset.
      printf(
        "topic=%s, offset=%" PRId64 "\n",
        rd_kafka_topic_name(message->rkt),
        message->offset
      );
      // Print the message itself.
      printf("%.*s\n", message->len, message->payload);
    }

    rd_kafka_message_destroy(message);
  }

  rd_kafka_consumer_close(rk);
  rd_kafka_destroy(rk);

  return 0;
}

Compile the code. The command will vary slightly depending on your operating system and compiler. For GCC on Linux or macOS, run the following command:

gcc $(pkg-config --cflags libcrypto rdkafka) example.c $(pkg-config --libs libcrypto rdkafka)

Run the program. On Linux or macOS, run the following command:

./a.out

C#

For C#, use the Confluent.Kafka NuGet package. The code sample was based on Confluent's Kafka .NET Client documentation.

In Visual Studio, create a new C# Console App under File > New > Project. After the project initializes, right click the solution in the Solution Explorer and click Manage NuGet packages for solution. Browse for and install Confluent.Kafka. Copy the following into your Program.cs file.

  using Confluent.Kafka;


  var config = new ConsumerConfig
  {
    SecurityProtocol = SecurityProtocol.SaslSsl,
    BootstrapServers = "kafka.gcn.nasa.gov",
    GroupId = Guid.NewGuid().ToString(),
    SaslMechanism = SaslMechanism.OAuthBearer,
    SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc,
    SaslOauthbearerTokenEndpointUrl = "https://auth.gcn.nasa.gov/oauth2/token",
    // Warning: don't share the client secret with others
    SaslOauthbearerClientId = "fill me in",
    SaslOauthbearerClientSecret = "fill me in"
  };

  // Create a client.
  using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
  {
    // Subscribe to topics
    consumer.Subscribe(new List<string> {
      "gcn.classic.text.FERMI_GBM_FIN_POS",
      "gcn.classic.text.LVC_INITIAL"
    });

    // List all topics
    consumer.Subscription.ForEach(topic => Console.WriteLine(topic));
    
    // Consume messages
    while (true)
    {
      try
      {
        var consumeResult = consumer.Consume();
        Console.WriteLine(string.Format("topic={0}, offset={1}",consumeResult.Topic, consumeResult.Offset));
        Console.WriteLine(consumeResult.Message.Value);
      }
      catch (Exception ex)
      {
        if (ex.Message.Contains("Subscribed topic not available"))
        {
          Console.WriteLine(ex.Message);
        }
        else
        {
          throw;
        }
      }
    }
  }

Build the solution from the build menu, or Ctrl + Shift + B. The resulting executable can be found in the bin folder within the project.

Java

The following instructions are for the official Kafka command line tools which use Java and come with either Apache Kafka version 3.4.0 or newer or Confluent 7.4.x or newer. However, they should work with most Java code that uses the official Apache Kafka client libraries.

Save the configuration below to a file called example.properties:

security.protocol = SASL_SSL
sasl.mechanism = OAUTHBEARER
sasl.login.callback.handler.class = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url = https://auth.gcn.nasa.gov/oauth2/token
sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  clientId="fill me in" \
  clientSecret="fill me in";
# Warning: don't share the client secret with others.

Next, open a terminal.

Run the following command to list available Kafka topics.

kafka-topics.sh --bootstrap-server kafka.gcn.nasa.gov:9092 --command-config example.properties --list

Run the following command to consume Kafka records and print them to the console (supports only a single topic at a time).

kafka-console-consumer.sh --bootstrap-server kafka.gcn.nasa.gov:9092 --consumer.config example.properties --topic gcn.classic.text.FERMI_GBM_FIN_POS

Known Issues

confluent-kafka-python

If you use confluent-kafka-python v2.1.0 or v2.1.1 with librdkafka v2.1.1 you will encounter a segmentation fault when subscribed to unavailable topics.

Please refer to the confluent-kafka-python github issue for updates on the issue.

Looking for U.S. government information and services? Visit USA.gov