Publish Messages to Streams
This page explains how to publish messages to a stream in Macrometa.
- Python SDK
- JavaScript SDK
- REST API - Python
- REST API - JavaScript
- CLI
You must Install the Python SDK before you can run this code.
from operator import concat
from c8 import C8Client
# Connect to GDN.
URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "xxxxxx" # Change this to your API key
is_local = False # For a global stream pass True and False for local stream
demo_stream = "streamQuickstart"
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)
# Create the producer and publish messages.
def sendData():
    """ This function sends data through a stream """
    producer = client.create_stream_producer(demo_stream, local=is_local)
    while True:
        user_input = input("Enter your message to publish: ")
        if user_input == '0':
            break
        producer.send(user_input)
sendData()
You must Install the JavaScript SDK before you can run this code.
// Connect to GDN.
const jsc8 = require("jsc8");
const client = new jsc8({url: "https://play.paas.macrometa.io", apiKey: "XXXXX", fabricName: "_system"});
console.log("Authentication done!!...");
const stream = "streamQuickstart";
async function sendData () {
  console.log("\n ------- Publish Messages  ------");
  const producer = await client.createStreamProducer(stream);
  producer.on("open", () => {
    for (let i = 0; i < 10; i++) {
      const msg1 = `Persistent hello from (${JSON.stringify(i)})`;
      const data = {
        payload: Buffer.from(msg1).toString("base64")
      };
      console.log(`Stream: ${msg1}`);
      producer.send(JSON.stringify(data));
    }
  });
  producer.onclose = function (e) {
    console.log("Closed WebSocket:Producer connection for " + streamName);
  };
}
sendData()
Use our interactive API reference with code generation in 18 programming languages to publish a message.
import requests
# Constants
URL = "api-play.paas.macrometa.io"
HTTP_URL = f"https://{URL}"
FABRIC = "_system"
STREAM_NAME = "streamQuickstart"
API_KEY = "XXXXX" # Use your API key here
AUTH_TOKEN = f"apikey {API_KEY}" # Append the key word for the API key
session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})
# Publish messages
# Send message in body
producerurl = f"wss://{URL}/_ws/ws/v2/producer/persistent/{TENANT_NAME}/{stream_type}.{FABRIC}/{stream_type}s.{STREAM_NAME}"
# Enter your message here
msg = "Hello World"
def create_producer():
    ws = create_connection(producerurl, header=[f"Authorization: {AUTH_TOKEN}"])
    payload = {
        "payload": base64.b64encode(
            six.b(msg)
        ).decode("utf-8")
    }
    ws.send(json.dumps(payload))
    print(f"Message sent: {msg}")
    time.sleep(3)
    response = json.loads(ws.recv())
    if response['result'] == 'ok':
        print("Received acknowledgement that message was delivered successfully")
    else:
        print(f"Failed to publish message: {response}")
    ws.close()
# Or
# Use publish message api to publish message
#url = f"{HTTP_URL}/_fabric/{FABRIC}/_api/streams/{stream_type}s.{STREAM_NAME}/publish?global={IS_GLOBAL}"
#resp = session.post(url, data="Hello")
#print("\nMessage Posted: ", resp.text)
Use our interactive API reference with code generation in 18 programming languages to publish a message.
class APIRequest {
  _headers = {
    Accept: "application/json",
    "Content-Type": "application/json"
  };
  constructor (url, apiKey) {
    this._url = url;
    this._headers.authorization = `apikey ${apiKey}`; // Append the key word for the API key
  }
  _handleResponse (response, resolve, reject) {
    if (response.ok) {
      resolve(response.json());
    } else {
      reject(response);
    }
  }
  req (endpoint, { body, ...options } = {}) {
    const self = this;
    return new Promise(function (resolve, reject) {
      fetch(self._url + endpoint, {
        headers: self._headers,
        body: body ? JSON.stringify(body) : undefined,
        ...options
      }).then((response) => self._handleResponse(response, resolve, reject));
    });
  }
}
const apiKey = "XXXXX"; // Use your apikey here
const federationName = "api-play.paas.macrometa.io";
const federationUrl = `https://${federationName}`;
const stream = "streamQuickstart";
const isGlobal = true;
const run = async function () {
  const connection = new APIRequest(federationUrl, apiKey);
  /* ----------------- Publish and subscribe message to stream ---------------- */
    const region = isGlobal ? "c8global" : "c8local";
    const streamName = `${region}s.${stream}`;
    // Fetching local URL in case the stream is local
    const localDcDetails = await connection.req(`/datacenter/local`, {
      method: "GET"
    });
    const dcUrl = localDcDetails.tags.url;
    url = isGlobal
      ? url
      : `api-${dcUrl}`;
    const otpConsumer = await connection.req(`/apid/otp`, {
      method: "POST"
    });
    const otpProducer = await connection.req(`/apid/otp`, {
      method: "POST"
    });
    const consumerUrl = `wss://${url}/_ws/ws/v2/consumer/persistent/${tenant}/${region}._system/${streamName}/${consumerName}?otp=${otpConsumer.otp}`;
    const producerUrl = `wss://${url}/_ws/ws/v2/producer/persistent/${tenant}/${region}._system/${streamName}?otp=${otpProducer.otp}`;
    let consumer;
    let producer;
    let producerInterval;
run();
Use the gdnsl streams publish CLI command to create a document collection.