Client Configuration

See our quickstart for a step-by-step walkthrough to begin streaming alerts now!

Python

The Python client is a very lightweight wrapper around confluent-kafka-python.

Open a terminal and run this command to install with pip:
pip install gcn-kafka
or this command to install with with conda:
conda install -c conda-forge gcn-kafka
Save the Python code below to a file called example.py:
from gcn_kafka import Consumer

# Connect as a consumer.
# Warning: don't share the client secret with others.
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in')

# List all topics
print(consumer.list_topics().topics)

# Subscribe to topics and receive alerts
consumer.subscribe(['gcn.classic.text.FERMI_GBM_FIN_POS',
                    'gcn.classic.text.LVC_INITIAL'])
while True:
    for message in consumer.consume():
        value = message.value()
        print(value)
Run the code by typing this command in the terminal:
python example.py

Node.js

The Node.js client is a very lightweight wrapper around Kafka.js. The sample code is slightly different for the two module styles supported by Node.js, ECMAScript (.mjs) or CommonJS (.cjs).

ECMAScript (.mjs)

Open a terminal and run this command to install with npm:
npm install gcn-kafka
Save the JavaScript code below to a file called example.mjs:
import { Kafka } from 'gcn-kafka'

// Create a client.
// Warning: don't share the client secret with others.
const kafka = new Kafka({
  client_id: 'fill me in',
  client_secret: 'fill me in',
})

// List topics
const admin = kafka.admin()
const topics = await admin.listTopics()
console.log(topics)

// Subscribe to topics and receive alerts
const consumer = kafka.consumer()
try {
  await consumer.subscribe({
    topics: [
        'gcn.classic.text.FERMI_GBM_FIN_POS',
        'gcn.classic.text.LVC_INITIAL',
    ],
  })
} catch (error) {
  if (error.type === 'TOPIC_AUTHORIZATION_FAILED')
  {
    console.warn('Not all subscribed topics are available')
  } else {
    throw error
  }
}

await consumer.run({
  eachMessage: async (payload) => {
    const value = payload.message.value
    console.log(value?.toString())
  },
})
Run the code by typing this command in the terminal:
node example.mjs

CommonJS (.cjs)

Open a terminal and run this command to install with npm:
npm install gcn-kafka
Save the JavaScript code below to a file called example.cjs:
const { Kafka } = require('gcn-kafka');

(async () => {
  // Create a client.
  // Warning: don't share the client secret with others.
  const kafka = new Kafka({
    client_id: 'fill me in',
    client_secret: 'fill me in',
  })

  // List topics
  const admin = kafka.admin()
  const topics = await admin.listTopics()
  console.log(topics)
  
  // Subscribe to topics and receive alerts
  const consumer = kafka.consumer()
  try {
    await consumer.subscribe({
      topics: [
          'gcn.classic.text.FERMI_GBM_FIN_POS',
          'gcn.classic.text.LVC_INITIAL',
      ],
    })
  } catch (error) {
    if (error.type === 'TOPIC_AUTHORIZATION_FAILED')
    {
      console.warn('Not all subscribed topics are available')
    } else {
      throw error
    }
  }

  await consumer.run({
    eachMessage: async (payload) => {
      const value = payload.message.value
      console.log(value?.toString())
    },
  })
})()
Run the code by typing this command in the terminal:
node example.cjs

C

For C, use librdkafka and the following sample code based on Confluent's guide, Getting Started with Apache Kafka and C/C++.

First, install librdkafka version 1.9.2 or newer. Then, save the C code below to a file called example.c:
#include <openssl/bio.h>
#include <openssl/evp.h>
#include <openssl/rand.h>
#include <librdkafka/rdkafka.h>
#include <stdio.h>


int main(int argc, char **argv)
{
  char errstr[512];
  int err;

  // Generate random group ID
  char rand_bytes[256], group_id[2 * sizeof(rand_bytes)] = {'\0'};
  RAND_bytes(rand_bytes, sizeof(rand_bytes));
  BIO *b64 = BIO_new(BIO_f_base64());
  BIO *mem = BIO_new(BIO_s_mem());
  BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
  BIO_push(b64, mem);
  BIO_write(b64, rand_bytes, sizeof(rand_bytes) - 1);
  BIO_flush(b64);
  BIO_read(mem, group_id, sizeof(group_id) - 1);
  BIO_free_all(b64);

  char *conf_kv[][2] = {
    {"bootstrap.servers", "kafka.gcn.nasa.gov"},
    {"group.id", group_id},
    {"sasl.mechanisms", "OAUTHBEARER"},
    // Warning: don't share the client secret with others.
    {"sasl.oauthbearer.client.id", "fill me in"},
    {"sasl.oauthbearer.client.secret", "fill me in"},
    {"sasl.oauthbearer.method", "oidc"},
    {"sasl.oauthbearer.token.endpoint.url", "https://auth.gcn.nasa.gov/oauth2/token"},
    {"security.protocol", "sasl_ssl"}
  };

  static const char *topics[] = {
    "gcn.classic.text.FERMI_GBM_FIN_POS",
    "gcn.classic.text.LVC_INITIAL"
  };

  static const int num_conf_kv = sizeof(conf_kv) / sizeof(*conf_kv);
  static const int num_topics = sizeof(topics) / sizeof(*topics);

  // Assemble configuration
  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  for (int i = 0; i < num_conf_kv; i ++)
  {
    if (rd_kafka_conf_set(conf, conf_kv[i][0], conf_kv[i][1],
                          errstr, sizeof(errstr)))
    {
      fprintf(stderr, "%s\n", errstr);
      rd_kafka_conf_destroy(conf);
      return 1;
    }
  }

  // Create consumer
  rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
  if (!rk) {
    fprintf(stderr, "%s\n", errstr);
    return 1;
  }
  
  // List topics
  const rd_kafka_metadata_t *metadata;
  err = rd_kafka_metadata(rk, 0, NULL, &metadata, -1);
  if (err) {
    fprintf(stderr, "%s\n", rd_kafka_err2str(err));
    rd_kafka_destroy(rk);
    return 1;
  }
  for (int i = 0; i < metadata->topic_cnt; i ++)
  {
    printf("%s\n", metadata->topics[i].topic);
  }
  rd_kafka_metadata_destroy(metadata);
  
  // Subscribe to topics
  rd_kafka_topic_partition_list_t *topics_partitions =
    rd_kafka_topic_partition_list_new(num_topics);
  for (int i = 0; i < num_topics; i ++)
    rd_kafka_topic_partition_list_add(topics_partitions, topics[i],
                                      RD_KAFKA_PARTITION_UA);
  err = rd_kafka_subscribe(rk, topics_partitions);
  rd_kafka_topic_partition_list_destroy(topics_partitions);
  if (err)
  {
    rd_kafka_destroy(rk);
    fprintf(stderr, "%s\n", rd_kafka_err2str(err));
    return 1;
  }

  // Consume messages
  while (1)
  {
    rd_kafka_message_t *message = rd_kafka_consumer_poll(rk, -1);

    if (!message)
    {
      continue;
    } else if (message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
      // Ignore this error; it just means that we are at the end of the stream
      // and need to wait for more data.
    } else if (message->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) {
      // Unknown topic or partition; print warning and continue.
      fprintf(stderr, "%s\n", rd_kafka_message_errstr(message));
    } else if (message->err) {
      fprintf(stderr, "%s\n", message->err, rd_kafka_message_errstr(message));
      rd_kafka_consumer_close(rk);
      rd_kafka_destroy(rk);
      return 1;
    } else {
      printf("%.*s\n", message->len, message->payload);
    }

    rd_kafka_message_destroy(message);
  }

  rd_kafka_consumer_close(rk);
  rd_kafka_destroy(rk);

  return 0;
}
Compile the code. The command will vary slightly depending on your operating system and compiler. For GCC on Linux or macOS, run the following command:
gcc $(pkg-config --cflags libcrypto) example.c $(pkg-config --libs libcrypto) -lrdkafka
Run the program. On Linux or macOS, run the following command:
./a.out

C#

For C#, use the Confluent.Kafka NuGet package. The code sample was based on Confluent's Kafka .NET Client documentation.

In Visual Studio, create a new C# Console App under File > New > Project. After the project initializes, right click the solution in the Solution Explorer and click Manage NuGet packages for solution. Browse for and install Confluent.Kafka. Copy the following into your Progam.cs file.
  using Confluent.Kafka;


  var config = new ConsumerConfig
  {
    SecurityProtocol = SecurityProtocol.SaslSsl,
    BootstrapServers = "kafka.gcn.nasa.gov",
    GroupId = Guid.NewGuid().ToString(),
    SaslMechanism = SaslMechanism.OAuthBearer,
    SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc,
    SaslOauthbearerTokenEndpointUrl = "https://auth.gcn.nasa.gov/oauth2/token",
    // Warning: don't share the client secret with others
    SaslOauthbearerClientId = "fill me in",
    SaslOauthbearerClientSecret = "fill me in"
  };

  // Create a client.
  using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
  {
    // Subscribe to topics
    consumer.Subscribe(new List<string> {
      "gcn.classic.text.FERMI_GBM_FIN_POS",
      "gcn.classic.text.LVC_INITIAL"
    });

    // List all topics
    consumer.Subscription.ForEach(topic => Console.WriteLine(topic));    
    
    // Consume messages
    while (true)
    {
      try
      {
        var consumeResult = consumer.Consume();
        Console.WriteLine(consumeResult.Message.Value);
      }
      catch (Exception ex)
      {
        if (!ex.Message.Contains("Subscribed topic not available"))
        {
          throw;
        }
      }
    }
  }
Build the solution from the build menu, or Ctrl + Shift + B. The resulting executable can be found in the bin folder within the project.