API Reference#
Tip
All the functionality of zio-maelstrom is available via following import statement
import com.bilalfazlani.zioMaelstrom.*
Primitive Types#
NodeId#
NodeId represents a unique identifier for a node. It is a wrapper (opaque type) around String and can be created using NodeId(String) method.
MessageId#
MessageId represents a unique identifier for a message. It is a wrapper (opaque type) around Int and can be created using MessageId(Int) method.
Protocol#
In order to send and receive messages from a node, we need to create case classes that contain the fields which we need to send or receive.
The case classes dont need to contain standard fields defined in maelstrom protocol like src, dest, type, msg_id, in_reply_to, etc. Those are automatically added and parsed by the runtime.
case class Echo(text: String) derives JsonCodec
{
"src" : "c1", // (1)!
"dest" : "n1", // (2)!
"body" : {
"type": "echo", // (3)!
"msg_id": 10, // (4)!
"in_reply_to": 5, // (5)!
"text": "Hello" // (6)!
}
}
srcis theNodeIdof the node that sent the messagedestis theNodeIdof the node that received the messagetypeis the type of the message (snake case version of the case class name)msg_idis automatically added whenaskapi is usedin_reply_tois automatically added whenreplyapi is used- These are the fields of the case class (in this case,
text). Fields can be zero or multiple and nested as well.
val echo = Echo("Hello")
val nodeId = NodeId("n1")
nodeId.send(echo)
{
"src" : "c1"
"dest" : "n1",
"body" : {
"type": "echo",
"text": "Hello"
}
}
val echo = Echo("Hello")
val nodeId = NodeId("n1")
nodeId.ask[Echo](echo, 5.seconds)
{
"src" : "c1",
"dest" : "n1",
"body" : {
"type": "echo",
"text": "Hello",
"msg_id": 10
}
}
val echo = Echo("Hello")
reply(echo, 3.seconds)
{
"src" : "n1",
"dest" : "c1",
"body" : {
"type": "echo",
"text": "Hello",
"in_reply_to": 10
}
}
Note
Keep in mind that if you use ask api, framework adds a msg_id to the message. If you use reply api, framework adds a in_reply_to to the message.
Caution
If you try to use reply api for a message that does not have a msg_id (i.e. sent using send api), it will throw an error at runtime.
Caution
If you use ask api, the called will wait for the reply with a timeout. If the reply is not received within the timeout, it will return Timeout error.
The idea behind framework design is that when writing solutions to problems, should should not have to think about msg_id and other things. Much like when we write an HTTP/GRPC client or server.
I/O APIs#
1. receive#
receive api takes a handler function I => ZIO[MaelstromRuntime & R, Nothing, Unit]
Note
Ineeds have azio.json.JsonDecoderinstanceRcan be anything. You will need to provideR&MaelstromRuntimewhen you run the ZIO effect
Here's an example
case class Gossip(numbers: Seq[Int]) derives JsonCodec
val messageHandler =
receive[Gossip] (msg =>
for {
src <- MaelstromRuntime.src
me <- MaelstromRuntime.me
others <- MaelstromRuntime.others
_ <- ZIO.logDebug(s"received $msg from $src") //(1)!
_ <- ZIO.logDebug(s"my node id is $me") //(2)!
_ <- ZIO.logDebug(s"other node ids are $others") //(3)!
} yield ()
)
srcis theNodeIdof the node that sent the messagemeis theNodeIdof the node that received the messageothersis a list ofNodeIdreceived in the init message at the start of node
2. send#
You can send a message to any NodeId using NodeId.send() API. It takes a instance of a case class which has a zio.json.JsonEncoder.
case class Gossip(numbers: Seq[Int]) derives JsonCodec
val messageHandler =
receive[Gossip] (_ =>
for
others <- MaelstromRuntime.others
_ <- ZIO.foreach(others)(_.send(Gossip(Seq(1,2)))).unit //(1)!
yield ()
)
val result = NodeId("n5") send Gossip(Seq(1,2))
- these will be sent to all nodes in cluster
3. ask#
ask api is a combination of send and receive. It sends a message to a remote node and waits for a reply. It expects a zio.json.JsonDecoder instance for the reply & a zio.json.JsonEncoder instance for the request message.
You can either use the default timeout (configured in Settings) or provide a custom timeout for the operation.
Uses the default timeout configured in Settings (100ms by default)
case class Ping(text: String) derives JsonCodec
case class Pong(text: String) derives JsonCodec
// Uses default timeout configured in Settings (100ms by default)
val pingResult: ZIO[MaelstromRuntime, AskError, Pong] =
NodeId("n2").ask[Pong](Ping("Hello"))
Override the default timeout for this specific operation
case class Ping(text: String) derives JsonCodec
case class Pong(text: String) derives JsonCodec
// Custom timeout overrides the default timeout
val pingResult: ZIO[MaelstromRuntime, AskError, Pong] =
NodeId("n2").ask[Pong](Ping("Hello"), 5.seconds)
The ask api can return either a successful response or an AskError
type AskError = Error | DecodingFailure | Timeout
Ask error can be one of the following:
Timeoutif the reply was not received within given durationDecodingFailureif the reply could not be decoded into the given typeErrorif the sender sends an error message instead of the reply message.
case class Query(id: Int) derives JsonCodec
case class Answer(text: String) derives JsonCodec
val askResponse: ZIO[MaelstromRuntime, AskError, Unit] = for
answer <- NodeId("g4").ask[Answer](Query(1), 5.seconds)
_ <- ZIO.logInfo(s"answer: $answer")
yield ()
val program = askResponse
.catchAll {
case t: Timeout => ZIO.logError(s"timeout: ${t.timeout}")
case d: DecodingFailure => ZIO.logError(s"decoding failure: ${d.error}")
case e: Error =>
val code: ErrorCode = e.code
val text: String = e.text
ZIO.logError(s"error code: $code, error text: $text")
}
Sender can send an error message if it encounters an error while processing the request message or when request is invalid. You can read more about error messages in the error messages section and error handling section
4. reply#
You can call reply api to send a reply message to the source of the current message (if the message was sent using ask api)
case class Gossip(numbers: Seq[Int]) derives JsonCodec
case class GossipOk(myNumbers: Seq[Int]) derives JsonCodec
val program = receive[Gossip] (_ => reply(GossipOk(Seq(1,2))))
reply api takes an instance of a case class which has a zio.json.JsonEncoder
5. replyError#
You can call replyError api to send an error message to the source of the current message (if the message was sent using ask api)
case class Gossip(numbers: Seq[Int]) derives JsonCodec
val program = receive[Gossip] (_ => replyError(ErrorCode.PreconditionFailed, "some text message"))
See error handling for more details.
Context APIs#
MaelstromRuntime.me-
returns the
NodeIdof the current nodeSignature:
ZIO[MaelstromRuntime, Nothing, NodeId] MaelstromRuntime.others-
returns a list of
NodeIdof all other nodes in the clusterSignature:
ZIO[MaelstromRuntime, Nothing, Set[NodeId]] MaelstromRuntime.src-
returns the
NodeIdof the source of the current messageSignature:
ZIO[MessageContext, Nothing, NodeId]
Error messages#
zio-maelstrom has a built-in data type for error messages called Error
case class Error(
code: ErrorCode,
text: String
) derives JsonCodec
It supports all the standard maelstrom error codes as well as ability to send custom error codes
View all error codes
sealed trait ErrorCode(private val code1: Int, val definite: Boolean) {
def code: Int = code1
override def toString: String = s"error: ${this.getClass.getSimpleName.replace("$","")}, code: $code"
}
object ErrorCode:
/**
* Indicates that the requested operation could not be completed within a timeout.
*/
object Timeout extends ErrorCode(0, false)
/**
* Thrown when a client sends an RPC request to a node which does not exist.
*/
object NodeNotFound extends ErrorCode(1, true)
/**
* Use this error to indicate that a requested operation is not supported by the current implementation. Helpful for stubbing out APIs during development.
*/
object NotSupported extends ErrorCode(10, true)
/**
* Indicates that the operation definitely cannot be performed at this time--perhaps because the server is in a read-only state, has not yet been initialized, believes its peers to be down, and so on. Do not use this error for indeterminate cases, when the operation may actually have taken place.
*/
object TemporarilyUnavailable extends ErrorCode(11, true)
/**
* The client's request did not conform to the server's expectations, and could not possibly have been processed.
*/
object MalformedRequest extends ErrorCode(12, true)
/**
* Indicates that some kind of general, indefinite error occurred. Use this as a catch-all for errors you can't otherwise categorize, or as a starting point for your error handler: it's safe to return internal-error for every problem by default, then add special cases for more specific errors later.
*/
object Crash extends ErrorCode(13, false)
/**
* Indicates that some kind of general, definite error occurred. Use this as a catch-all for errors you can't otherwise categorize, when you specifically know that the requested operation has not taken place. For instance, you might encounter an indefinite failure during the prepare phase of a transaction: since you haven't started the commit process yet, the transaction can't have taken place. It's therefore safe to return a definite abort to the client.
*/
object Abort extends ErrorCode(14, true)
/**
* The client requested an operation on a key which does not exist (assuming the operation should not automatically create missing keys).
*/
object KeyDoesNotExist extends ErrorCode(20, true)
/**
* The client requested the creation of a key which already exists, and the server will not overwrite it.
*/
object KeyAlreadyExists extends ErrorCode(21, true)
/**
* The requested operation expected some conditions to hold, and those conditions were not met. For instance, a compare-and-set operation might assert that the value of a key is currently 5; if the value is 3, the server would return precondition-failed.
*/
object PreconditionFailed extends ErrorCode(22, true)
/**
* The requested transaction has been aborted because of a conflict with another transaction. Servers need not return this error on every conflict: they may choose to retry automatically instead.
*/
object TxnConflict extends ErrorCode(30, true)
/**
* Custom error code
*
* @param code the error code
*/
case class Custom(override val code: Int) extends ErrorCode(code, false)
You can send an error message to any node id as a reply to another message. Here's an example
case class InMessage() derives JsonCodec
val program = receive[InMessage](_ =>
replyError(ErrorCode.PreconditionFailed, "some text message") // (1)!
)
- You can set any text in
textfield
case class InMessage() derives JsonCodec
val program = receive[InMessage](_ => replyError(ErrorCode.Custom(1005), "some text message"))
Error handling#
When you use ask api, it can return an AskError which can be one of the following:
Timeoutif the reply was not received within given durationDecodingFailureif the reply could not be decoded into the given typeErrorif the sender sends an error message instead of the reply message.
The error is returned via a failed ZIO effect.
You can handle it using ZIO.fold or ZIO.catchAll or any
other error handling method supported by ZIO.
When handling an AskError, you can either
- recover from the error by retrying the ask operation
- ignore the error and continue the handler execution
- log the error
- convert it to a
Errorand return it to the sender
or all of the above.
Note that default logging happens in the framework already. So you don't need to log the error again unless you want custom logging.
Returning an error to the sender#
There is an api called replyError that can be used to return an instance of Error to the sender.
case class Query(id: Int) derives JsonCodec
val program = receive[Query](_ => replyError(ErrorCode.PreconditionFailed, "some text message"))
Alternatively, you can fail the ZIO effect with an Error type and the framework will automatically return that error to the sender if there is a msg_id in the request message.
Info
One key difference between replyError and ZIO.fail is that replyError allows you to continue the handler execution after returning the error. While ZIO.fail will immediately stop handler execution for the current message.
case class Query(id: Int) derives JsonCodec
val program = receive[Query](_ => ZIO.fail(Error(ErrorCode.PreconditionFailed, "some text message")))
There is a very easy way to convert any AskError to an Error response to the sender.
case class Query(id: Int) derives JsonCodec
case class Answer(text: String) derives JsonCodec
private def askResponse(q: Query) = NodeId("g4").ask[Answer](q, 5.seconds)
val program = receive[Query] { q =>
for
answer <- askResponse(q).defaultAskHandler
_ <- reply(answer)
yield ()
}
defaultAskHandler is an extension method that converts ZIO[R, AskError, A] to ZIO[R, Error, A].
Important
Note that it does not retain the original code and message. Regardless of the original error, it will always return Crash error code.
The above code will not compile if you remove the defaultAskHandler call. This is to force you to handle the error explicitly.
Maelstrom services#
Maelstrom starts some services at the beginning of every simulation by default
These are their node ids:
lin-kvlww-kvseq-kvlin-tso
You can read more these services on the maelstrom docs
ZIO-Maelstrom provides LinkKv, LwwKv, SeqKv & LinTso clients to interact with these services. SeqKv, LwwKv & LinKv are all key value stores. They have the same api but different consistency guarantees.
Native KV APIs#
Native apis are provided by the maelstrom services
All KV operations support timeout configuration. You can either use the default timeout (configured in Settings) or provide a custom timeout for specific operations.
read-
Takes a key and returns the value of the key. If the value does not exist, it returns
KeyDoesNotExisterror code.Uses the default timeout configured in Settings (100ms by default)
KV read with default timeoutval counterValue: ZIO[SeqKv, AskError, Int] = SeqKv.read("counter")Override the default timeout for this specific read operation
KV read with custom timeout// Custom timeout overrides the default timeout val counterValueCustom: ZIO[SeqKv, AskError, Int] = SeqKv.read("counter", 30.millis) write-
Takes a key and a value and writes the value against the key. If a value already exists against the key, it is overwritten.
KV write with default timeoutval _: ZIO[LwwKv, AskError, Unit] = LwwKv.write("counter", 1)KV write with custom timeoutval _: ZIO[SeqKv, AskError, Unit] = SeqKv.write("counter", 1, 50.millis) cas-
CAS stands for
compare-and-swap. It takes a key, a value and an expected value. It writes the value against the key only if the expected value matches the current value of the key. If the value is different, then it returnsPreconditionFailederror code. If the key does not exist, it returnsKeyDoesNotExisterror code. If you setcreateIfNotExiststo true, it will create the key if it does not exist.KV CAS with default timeoutval _: ZIO[LinKv, AskError, Unit] = LinKv.cas(key = "counter", from = 1, to = 3, createIfNotExists = false)KV CAS with custom timeoutval _: ZIO[SeqKv, AskError, Unit] = SeqKv.cas("counter", 1, 3, createIfNotExists = true, 75.millis)Above example will write
3tocounteronly if the current value ofcounteris1. If the current value is different, it will returnPreconditionFailederror code.
High level KV APIs#
High level apis are built on top of native apis by combining multiple native apis and/or adding additional logic
readOption-
Takes a key and returns an
Optionof the value of the key. If the value does not exist, it returnsNone. Does not returnKeyDoesNotExisterror code.val counterMaybe: ZIO[SeqKv, AskError, Option[Int]] = SeqKv.readOption("counter") writeIfNotExists-
Takes a key and a value and writes the value against the key only if the key does not exist. If the key already exists, it returns
PreconditionFailederror code.val _: ZIO[LwwKv, AskError, Unit] = LwwKv.writeIfNotExists("counter", 1) update-
This is a high level api built on top of other apis. It takes a key, a function that takes the current value and returns a new value. It reads the current value of the key, applies the function and writes the new value against the key. If the value has changed in the meantime, it applies the function again and keeps trying until the value does not change. This is useful for implementing atomic operations like incrementing a value.
val increasedValue: ZIO[SeqKv, AskError, Int] = SeqKv.update("counter") { case Some(oldValue) => oldValue + 1 case None => 1 }The timeout value does not apply to entire operation but to each individual read, cas and write operation. So the total time taken by the operation can be more than the timeout value. Retries are only done when the value has changed in the meantime. And other error is returned immediately. This also applies to
updateZIOapi. updateZIO-
This is a high level api built on top of other apis. It takes a key, a function that takes the current value and returns a
ZIOthat returns a new value. It reads the current value of the key, applies theZIOand writes the new value against the key. If the value has changed in the meantime, it applies the function again and keeps trying until the value does not change. This is very similar toupdatebut the function can be aZIOwhich can do some async operations.def getNewNumber(oldValue: Option[Int]): ZIO[Any, Nothing, Int] = ??? val increasedValueZIO: ZIO[SeqKv, AskError, Int] = SeqKv.updateZIO("counter")(getNewNumber)
Danger
When retries happen, the ZIO is retried as well, so side effects should be avoided in this function.
Important
-
Because all these apis are built on top of
askapi, they can returnAskErrorwhich you may need to handle. According to maelstrom documentation, they can returnKeyDoesNotExistorPreconditionFailederror codes. -
In case of network partition or delay, all of the above apis can return
Timeouterror code. -
When incorrect types are used to decode the response, they can return
DecodingFailureerror code.
Tip
key and value of the key value store can be any type that has a zio.json.JsonCodec instance
TSO APIs#
LinTso is a linearizable timestamp oracle. It supports timeout configuration like other Maelstrom services.
ts-
Returns a unique, monotonically increasing timestamp from the linearizable timestamp oracle.
Uses the default timeout configured in Settings (100ms by default)
TSO with default timeoutval timestamp: ZIO[LinTso, AskError, Int] = LinTso.tsOverride the default timeout for this specific timestamp operation
TSO with custom timeout// Custom timeout overrides the default timeout val timestamp: ZIO[LinTso, AskError, Int] = LinTso.ts(2.seconds)
Timeout Behavior
All operations that communicate with remote nodes (ask, KV operations, and TSO operations) support timeout configuration:
- Default behavior: Uses the timeout configured in Settings (100ms by default)
- Custom timeout: You can override the default timeout for specific operations
- Error handling: When timeout occurs, operations return a
Timeouterror that you can handle using ZIO error management
Settings#
Below are the settings that can be configured for a node
-
Log Level
The default log level is
LogLevel.Info. If you want more detailed logs, you can set it toLogLevel.Debug. If you want to disable logs, you can set it toLogLevel.None -
Log Format
Log format can be either Plain or Colored. Default is colored.
-
Concurrency
This is the concurrency level for processing messages. Default is 1024. This means 1024 request messages(receive api) + 1024 response messages (ask api) = 2048 messages can be processed in parallel.
-
Default Timeout
The default timeout for
askoperations and all KV store operations. Default is 100 milliseconds. This timeout is used when no explicit timeout is provided toask()or KV operations likeread(),write(),cas().You can override this globally for all operations, or provide operation-specific timeouts when needed.
object MainApp extends MaelstromNode {
val program = ???
}
object MainApp extends MaelstromNode {
override val configure = NodeConfig
.withConcurrency(100)
.withLogLevelDebug
.withPlaintextLog
.withAskTimeout(1.second)
val program = ZIO.logDebug("Starting node...")
}
Logging#
You can log at different levels using ZIO's logging APIs - ZIO.logDebug, ZIO.logInfo, etc.
All these APIs log to STDERR because STDOUT is used for sending messages.
You can configure the log level using settings API.
By default, log statements are colored. You can change it to plain using settings API
object MainApplication extends MaelstromNode {
override val configure = NodeConfig.withLogLevelDebug
def program = for
_ <- ZIO.logDebug("Starting node")
_ <- ZIO.logInfo("Received message")
_ <- ZIO.logWarning("Something is wrong")
_ <- ZIO.logError("Something is really wrong")
yield ()
}
Above program, when initialized, will output the following:

Testing#
Using static inline messages:
When developing a solution, you sometimes want to test it without maelstrom. While you can use stdIn to enter the input, you can also hardcode the input messages in the program itself.
object InlineInput extends MaelstromNode {
case class Ping() derives JsonCodec // (1)!
case class Pong() derives JsonEncoder
val program = receive[Ping](_ => reply(Pong()))
override val configure: NodeConfig =
NodeConfig
.withStaticInput(
NodeId("A"), // (2)!
Set(NodeId("B"), NodeId("C")), // (3)!
InlineMessage(NodeId("B"), Body(MsgName[Ping], Ping(), Some(MessageId(1)), None)), // (4)!
InlineMessage(NodeId("C"), Body(MsgName[Ping], Ping(), Some(MessageId(2)), None))
)
}
- When using inline input, you need JsonEncoder and JsonDecoder instances for input messages
- Node's own id
- Other nodes in the cluster
- Messages to be sent to the node. These are varargs and you can send any number of messages
Tip
When debugging an issue, use static input, set log level to debug and set concurrency to 1. This might help you isolate the issue.
Using static input files:
You can configure the runtime to read the input from a file.
object Main extends MaelstromNode {
case class Ping() derives JsonDecoder
case class Pong() derives JsonEncoder
val program = receive[Ping](_ => reply(Pong()))
override val configure: NodeConfig =
NodeConfig
.withStaticInput(
NodeId("A"), // (1)!
Set(NodeId("B"), NodeId("C")), // (2)!
"examples" / "echo" / "fileinput.txt" // (3)!
)
.withLogLevelDebug
}
- Node's own id
- Other nodes in the cluster
- Path to the file containing the input messages
{"src": "c1", "dest": "n1", "body": { "type" : "ping", "msg_id": 1 }}
sleep 2s
{"src": "c1", "dest": "n1", "body": { "type" : "ping", "msg_id": 2 }}
This will run the entire program with the input from the file. With file input you also get to simulate delay in inputs using sleep statements as shown above.
