The Oracle Cloud Infrastructure Streaming Service is a Kafka-compatible event streaming platform for developers and data scientists.
In this post I will show how to create an OSS Stream and how to use it with the Oracle Cloud Infrastructure (OCI) Command Line Interface (CLI)
Create a Stream
In the cloud console go to Analytics>Streaming (I wonder why Streaming is under Analytics, as it’s usage goes beyond pure streaming analytics – it would make more sense under Developer Services or in a Messaging category). Anyway, press the Create Stream button!
There are a couple of parameters including the number of partitions and the retention time, but I leave all as default.
That’s all – fast and easy.
Publish a message (Console)
The console offers an option to easily publish a message.
In the console we can also view recent messages. So far all super easy – Let’s look to other options to manage the Stream.
Other ways to use the Stream
There are several ways to use OSS
- Apache Kafka compatibility (Kafka API’s or Kafka Connect)
- SDK for Java
- SDK for Python
- SDK for TypeScript and JavaScript
- SDK for .NET
- SDK for Go
- SDK for Ruby
Command Line Interface (CLI) The CLI provides the same core capabilities as the Oracle Cloud Infrastructure Console and provides additional commands that can extend the Console’s functionality. The CLI is convenient for developers or anyone who prefers the command line to a GUI.
Install OCI CLI
There are 2 options in order to use the CLI:
- Cloud Shell – The easy approach, straight from the console browser.
- Local CLI Installation
For this case I will use my local CLI.
Put Message
The first step it to look at the CLI documentation 🙂 The method I want to try is PUT, which will allow to write a message to the Stream.
oci streaming stream message put
--stream-id <stream_id>
--messages <JSON_messages>
--endpoint <messages_endpoint>
stream_id is the ocid of the stream
messages must be json format, and it can be passed in file or as a reference to a file. The message must be base64 encoded. Example: {“value”:”bXkgZmlyc3QgbWVzc2FnZQ==”}
enpoint is the stream endpoint as can be found in the console.
Will all of these put together I execute the PUT method via the CLI as seen below.
The console offers a view on the messages loaded in the last minute. Probably only handy for this type of tests.
Get Message
The Get message is a 2 step task as we need to set up a cursor first.
When comparing with JMS messaging the Kafka based message reading offers more possibilities, based on offset reading. This cursor is specific to the OSS and has even more options:
“Cursors are used to consume a stream, starting from a specific point in the partition and going forward from there. “
AFTER_OFFSET: The partition position immediately following the offset you specify. (Offsets are assigned when you successfully append a message to a partition in a stream.)
AT_OFFSET: The exact partition position indicated by the offset you specify. –
AT_TIME: A specific point in time.
LATEST: The most recent message in the partition that was added after the cursor was created.
TRIM_HORIZON: The oldest message in the partition that is within the retention period window.
The create-cursor command can be found here.
oci streaming stream message create-cursor
--cursor <cursor_id>
--stream-id <stream_id>
--type <type>
--endpoint <messages_endpoint>
Once we have our cursor id, then we can execute the GET message.
Note: Because I use Latest as the cursor option, i need to PUT messages after the cursor is created, otherwise the GET would be empty.
oci streaming stream message get
--cursor <cursor_id>
--stream-id <stream_id>
--endpoint <messages_endpoint>
The retrieved message comes in base64 encoding, but that is no surprise 🙂
As with all of the Cloud Services, setting everything up is really easy and fast!