Write Data

Use the TypeScript client to write to a Synnax cluster.

The Synnax TypeScript client supports multiple methods for writing data to a cluster. We can write directly to a channel, or we can write to multiple channels in a streaming fashion using a writer.

Writing to a Channel

Writing to a channel requires us to write timestamps to it’s index channel before we write the data. We’ll create the following channels to use as examples:

import { DataType } from "@synnaxlabs/client";

const timeChannel = await client.channels.create({
    name: "time",
    dataType: DataType.TIMESTAMP,
    isIndex: true,
});

const temperatureChannel = await client.channels.create({
    name: "temperature",
    dataType: DataType.FLOAT32,
    index: timeChannel.key,
});

We can then write data to timeChannel and temperatureChannel. We need to write to the index channel before writing to the regular channel, so we will write to timeChannel first. Writing data via the channel’s write method requires a typed array:

import { TimeStamp, TimeSpan } from "@synnaxlabs/client";

const start = TimeStamp.now();
const timestamps = new BigInt64Array([
    start.valueOf(), // valueOf() converts a TimeSpan to a bigint.
    start.add(TimeSpan.seconds(1)).valueOf(),
    start.add(TimeSpan.seconds(2)).valueOf(),
    start.add(TimeSpan.seconds(3)).valueOf(),
    start.add(TimeSpan.seconds(4)).valueOf(),
]);
const temperatures = new Float32Array([20.0, 20.1, 20.2, 20.3, 20.4]);

// Write the timestamps to the index first
await timeChannel.write(start, timestamps);
// Then write the data
await temperatureChannel.write(start, temperatures);

Notice how we align the two arrays by using the common start timestamp. This tells Synnax that the first sample in the temperatures array is associated with the first timestamp in the timestamps array, and so on.

Synnax will raise a ValidationError if the index does not contain a corresponding timestamp for every sample in the data array. After all, it wouldn’t make sense to have a temperature reading without an associated timestamp.

Using a Writer

While the above methods are great for writing static, existing data, it’s common to want to write data in a streaming fashion as it’s acquired. This is useful for for use in control sequences and live data processing. The Writer class is designed to handle this use case (and is actually used under the hood by the above methods).

To keep things intuitive, we’ve designed the writer API around a file-like interface. There are a few key differences, the most important being that writers are governed by a transaction. If you’d like to learn more about transactions, see the concepts page.

We’ll create the following channels to use as examples:

import { DataType } from "@synnaxlabs/client";

const timeChannel = await client.channels.create({
    name: "time",
    dataType: DataType.TIMESTAMP,
    isIndex: true,
});

const temperatureChannel = await client.channels.create({
    name: "temperature",
    dataType: DataType.FLOAT32,
    index: timeChannel.key,
});

To open the writer, we’ll use the openWriter method on the client:

import { TimeStamp, Series, Frame } from "@synnaxlabs/client";

const writer = await client.openWriter({
    start: TimeStamp.now(),
    channels: ["time", "temperature"]
});

try {
    for (let i = 0; i < 100; i++) {
        const start = TimeStamp.now();
        const timeSeries = BigInt64Array.from({ length: 10 }, (_, i) => (
            start.add(TimeSpan.milliseconds(i)).valueOf()
        ))
        const dataSeries = Float32Array.from({ length: 10 }, (_, i) => (
            Math.sin(i / 100)
        ))

        await writer.write(new Frame({
            [timeChannel.key]: new Series(timeSeries),
            [temperatureChannel.key]: new Series(dataSeries)
        }));
        await new Promise(resolve => setTimeout(resolve, 100));
    }
    await writer.commit():
} finally {
    await writer.close()
}

This example will write 100 batches of 10 samples to the temperature channel, each roughly 100ms apart, and will commit all writes when finished.

It’s typical to write and commit millions of samples over the course of hours or days, intermittently calling commit to ensure that the data is safely stored in the cluster.

Closing the Writer

It’s very important to free the writer resources when finished by calling the close method. If close is not called at the end of the writer, other writers may not be able to write to the same channels. We typically recommend placing the writer operations inside a try-finally block to ensure that the writer is always closed.

Different Ways of Writing Data

There are a number of argument formats that the write method accepts. Use the one that best fits your use case.

// Write a single sample for a channel
await writer.write("temperature", 20.0);

// Write multiple samples for a channel
await writer.write("temperature", [20.0, 20.1, 20.2, 20.3, 20.4]);

// Write a single sample for several channels
await writer.write({
    time: TimeStamp.now(),
    temperature: 20.0,
});

// Write multiple samples for several channels
const start = TimeStamp.now();
await writer.write({
    time: [start, start.add(TimeSpan.seconds(1))],
    temperature: [20.0, 20.1],
});

// Write typed arrays for several channels
await writer.write(
    new Frame({
        [timeChannel.key]: new BigInt64Array([
            start.valueOf(),
            start.add(TimeSpan.seconds(1)).valueOf(),
        ]),
        [temperatureChannel.key]: new Float32Array([20.0, 20.1]),
    }),
);

Auto-Commit

You can configure a writer to automatically commit written data after each write, making it immediately available for read access. To do this, set the enableAutoCommit option to true when opening the writer:

import { TimeStamp, Series, Frame } from "@synnaxlabs/client";

const writer = await client.openWriter({
    start: TimeStamp.now(),
    channels: ["time", "temperature"],
    enableAutoCommit: true,
});

try {
    for (let i = 0; i < 100; i++) {
        const start = TimeStamp.now();
        await writer.write(
            new Frame({
                time: start,
                temperature: Math.sin(i / 100),
            }),
        );
        await new Promise((resolve) => setTimeout(resolve, 100));
    }
} finally {
    await writer.close();
}

Write Authorities

Writers support dynamic control handoff. Multiple writers can be opened on a channel at the same time, but only one writer is allowed to write to the channel. To determine which writer has control, an authority from 0 to 255 is assigned to each writer (or, optionally, each channel in the writer). The writer with the highest authority will be allowed to write. If two writers have the same authority, the writer that opened first will be allowed to write. For more information, see the concepts page on writers.

By default, writers are opened with an authority of ABSOLUTE i.e. 255. This means that no other writers can write to the channel as long as the writer is open.

Opening a writer with the same authority on all channels

To open a writer with the same authority on all channels, you can pass the authority argument with an integer.

import { TimeStamp, Series, Frame } from "@synnaxlabs/client";

const writer = await client.openWriter({
    start: TimeStamp.now(),
    channels: ["time", "temperature"],
    authority: 100,
});

Opening a writer with different authorities on each channel

To open a writer with different authorities on each channel, you can pass the authority argument with a list of integers. This list must be the same length as the number of channels in the writer.

import { TimeStamp, Series, Frame } from "@synnaxlabs/client";

const writer = await client.openWriter({
    start: TimeStamp.now(),
    channels: ["time", "temperature"],
    authority: [100, 200],
});

Adjusting write authorities after open

To change the authority of a writer during operation, you can use the setAuthority method:

// Set the authority on all channels
await writer.setAuthority(200);
// Set the authority on just a few channels
await writer.setAuthority({
    time: 200,
    temperature: 100,
});

Persistence/Streaming Mode

By default, writers are opened in stream + persist mode. To change the mode of a writer, specify the value of the mode argument when opening the writer. This can be persist, stream, or persistStream.

For example, to open a writer that only persists data:

import { TimeStamp, Series, Frame, WriterMode } from "@synnaxlabs/client";

const writer = await client.openWriter({
    start: TimeStamp.now(),
    channels: ["time", "temperature"],
    mode: "persist",
});

Common Pitfalls

There are several common pitfalls to avoid when writing data to a Synnax cluster. These are important to avoid as they can lead to performance degradation and/or control issues.

Using Many Individual Write Calls Instead of a Writer

When writing large volumes of data in a streaming fashion (or in batches), it’s important to use a writer instead of making individual write calls to a channel. Calls to write on a channel use an entirely new transaction for each call - constantly creating, committing, and closing transactions has a dramatic impact on performance. So, don’t do this:

time = await client.channels.retrieve("timestamps")
my_tc = await client.channels.retrieve("my_precise_tc")
# This is a very bad idea
for i in range(100):
    ts = TimeStamp.now()
    await time.write(ts, ts)
    await my_tc.write(ts, i)

This is also a bad idea:

for (let i = 0; i < 100; i++) {
    const writer = await client.openWriter({
        start: TimeStamp.now(),
        channels: ["time", "temperature"],
        enableAutoCommit: true,
    });
    await writer.write({
        time: TimeStamp.now(),
        temperature: Math.sin(i / 100),
    });
    await writer.close();
}

Instead, repeatedly call write on a single writer:

# This is dramatically more efficient
const writer = await client.openWriter({
    start: TimeStamp.now(),
    channels: ["time", "temperature"],
    enableAutoCommit: true
});
try {
    for (let i = 0; i < 100; i++)
        await writer.write({
            "time": TimeStamp.now(),
            "temperature": Math.sin(i / 100)
        });
} finally {
    await writer.close();
}

Calling Commit on Every Write

If you’re not using auto-commit, it’s important to call commit on the writer periodically to ensure that the data is persisted to the cluster. However, calling commit on every write is a bad idea. This is because commit requires a round-trip to the cluster to ensure that the data is persisted. This can be very slow if you’re writing a lot of data. If you’re writing a lot of data, commit every few seconds or turn on auto-commit.

This is a bad idea:

const writer = await client.openWriter({
    start: TimeStamp.now(),
    channels: ["time", "temperature"],
});
try {
    for (let i = 0; i < 100; i++) {
        await writer.write({
            time: TimeStamp.now(),
            temperature: Math.sin(i / 100),
        });
        await writer.commit();
    }
} finally {
    await writer.close();
}

Instead, use auto-commit:

const writer = await client.openWriter({
    start: TimeStamp.now(),
    channels: ["time", "temperature"],
    enableAutoCommit: true,
});
try {
    for (let i = 0; i < 100; i++)
        await writer.write({
            time: TimeStamp.now(),
            temperature: Math.sin(i / 100),
        });
} finally {
    await writer.close();
}