Sample Code
Here is a collection of functions and example code that may be useful when working with GCN Notices. These include conversions from text and VOEvent alert formats to JSON and XML (for VOEvent alerts), how to save alerts, and some samples from the FAQs section of the gcn-kafka-python repository.
To contribute your own ideas, make a GitHub pull request to add it to the Markdown source for this document, or contact us.
Parsing
Within your consumer loop, use the following functions to convert the
content of message.value()
into other data types.
The xmltodict
package is not part of the Python standard library. You must install it by running:
pip:
pip install xmltodict
import email
import xml.etree.ElementTree as ET
import xmltodict
def parse_text_alert_to_dict(message_value):
return dict(email.message_from_bytes(message_value))
def parse_voevent_alert_to_xml_root(message_value):
return ET.fromstring(message_value)
def parse_voevent_alert_to_dict(message_value):
return xmltodict.parse(message_value)
Saving
Use the following to save the data to a local file:
def save_alert(message):
with open('path/to/your/file', 'wb') as file:
file.write(message.value())
Keep track of the last read message when restarting a client
Kafka can keep track of which messages your client has read, allowing your client to recover missed messages after a restart by beginning at the earliest unread message rather than the next available message from the stream.
To enable this feature, you will need to set a client Group ID using the configuration dictionary argument for the Consumer and change the auto offset reset option to the 'earliest' setting. Once this is done, every new client with the given Group ID will begin reading the specified topic at the earliest unread message.
When doing this, it is recommended to turn off the auto commit feature because it can lose track of the last read message if the client crashes before the auto commit interval (5 seconds by default) occurs. Manually committing messages (i.e. storing the state of the last read message) once they are read is the most robust method for tracking the last read message.
from gcn_kafka import Consumer
config = {'group.id': 'my group name',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False}
consumer = Consumer(config=config,
client_id='fill me in',
client_secret='fill me in',
domain='gcn.nasa.gov')
topics = ['gcn.classic.voevent.FERMI_GBM_SUBTHRESH']
consumer.subscribe(topics)
while True:
for message in consumer.consume(timeout=1):
if message.error():
print(message.error())
continue
print(message.value())
consumer.commit(message)
Read messages beginning at the earliest available messages for a given stream
To read a given topic stream from the earliest message that is present in the stream buffer, set the Group ID to an empty string and applying the 'earliest' setting for the auto offset reset option in the configuration dictionary argument for the Consumer class.
This feature allows the user to scan for older messages for testing purposes or to recover messages that may have been missed due to a crash or network outage. Keep in mind that the stream buffers are finite in size. They currently hold messages from the past few days.
from gcn_kafka import Consumer
config = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config=config,
client_id='fill me in',
client_secret='fill me in',
domain='gcn.nasa.gov')
topics = ['gcn.classic.voevent.INTEGRAL_SPIACS']
consumer.subscribe(topics)
while True:
for message in consumer.consume(timeout=1):
if message.error():
print(message.error())
continue
print(message.value())
Search for messages occurring within a given date range
To search for messages in a given date range, use the offsets_for_times()
method from the Consumer class to get the message offsets for the desired date range.
Then assign the starting offset to the Consumer and read the desired number of messages.
When doing so, keep in mind that the stream buffers are finite in size. It is not
possible to recover messages prior to the start of the stream buffer. The GCN stream
buffers are currently set to hold messages from the past few days.
import datetime
from gcn_kafka import Consumer
from confluent_kafka import TopicPartition
consumer = Consumer(client_id='fill me in',
client_secret='fill me in',
domain='gcn.nasa.gov')
# get messages occurring 3 days ago
timestamp1 = int((datetime.datetime.now() - datetime.timedelta(days=3)).timestamp() * 1000)
timestamp2 = timestamp1 + 86400000 # +1 day
topic = 'gcn.classic.voevent.INTEGRAL_SPIACS'
start = consumer.offsets_for_times(
[TopicPartition(topic, 0, timestamp1)])
end = consumer.offsets_for_times(
[TopicPartition(topic, 0, timestamp2)])
consumer.assign(start)
for message in consumer.consume(end[0].offset - start[0].offset, timeout=1):
if message.error():
print(message.error())
continue
print(message.value())