API Reference#
Tip
All the functionality of zio-maelstrom is available via following import statement
import com.bilalfazlani.zioMaelstrom.*
Primitive Types#
NodeId
#
NodeId represents a unique identifier for a node. It is a wrapper (opaque type) around String
and can be created using NodeId(String)
method.
MessageId
#
MessageId represents a unique identifier for a message. It is a wrapper (opaque type) around Int
and can be created using MessageId(Int)
method.
Protocol#
In order to send and receive messages from a node, we need to create data types that extend from one or more of the following traits:
1. Sendable
#
If a message needs to be "sent" out of a node, it needs to extend from Sendable
trait Sendable:
val `type`: String
type
identifies the type of message which helps in decoding and handling of the message.
2. Reply
#
If we want to "reply" against another message to a node, the reply data class needs to extend from Reply
trait
trait Reply:
val in_reply_to: MessageId
in_reply_to
is the MessageId
of the message that we are replying against. Most of the time, you will have to extend reply messages from Sendable
as well because replies need to be "sent" out of a node.
3. NeedsReply
#
If we want to "receive" a reply against a message, the message data class needs to extend from NeedsReply
trait
trait NeedsReply:
val msg_id: MessageId
This is required to map response messages to request message using the msg_id
field.
Json SerDe#
Besides the traits, any message that needs to be sent as a request to another node or as a response for another message, should have a zio.json.JsonEncoder
instance. This is required to encode the message into a JSON string which is then sent to the node. Likewise, any message that needs to be received as a request from another node or as a response for another message, should have a zio.json.JsonDecoder
instance. This is required to decode the JSON string into the message.
In the unique-ids example, we have defined the following messages:
case class Generate(msg_id: MessageId) extends NeedsReply derives JsonDecoder
case class GenerateOk(id: String, in_reply_to: MessageId, `type`: String = "generate_ok")
extends Sendable,
Reply derives JsonEncoder
Here, Generate
message extends from NeedsReply
because it expects a reply message. Generate
message is sent by maelstrom server nodes and not the application nodes. Application nodes just receive the message. Hence it does not need to extend from Sendable
. GenerateOk
message is the response for Generate
and because application node needs to send it, it needs to extend from both Sendable
and Reply
.
If a message needs to be sent as well as received, it needs an instance of zio.json.JsonCodec
which is a combination of zio.json.JsonEncoder
and zio.json.JsonDecoder
.
Usually a node wants to handle more than one type of message. For that, we discriminate messages using the type
field using jsonDiscriminator
annotation of zio-json. Here's an example
@jsonDiscriminator("type")
sealed trait CalculatorMessage extends NeedsReply derives JsonDecoder
@jsonHint("add") case class Add(a: Int, b: Int, msg_id: MessageId) extends CalculatorMessage
@jsonHint("subtract") case class Subtract(a: Int, b: Int, msg_id: MessageId) extends CalculatorMessage
@jsonHint("multiply") case class Multiply(a: Int, b: Int, msg_id: MessageId) extends CalculatorMessage
@jsonHint("divide") case class Divide(a: Int, b: Int, msg_id: MessageId) extends CalculatorMessage
Since, the parent trait is deriving a JsonDecoder
, we don't need to derive it for individual messages.
However, we need to derive JsonEncoder
for each outgoing message because there is usually no parent type for outgoing messages.
case class AddOk(result: Int, in_reply_to: MessageId, `type`: String = "add_ok")
extends Sendable, Reply derives JsonEncoder
case class SubtractOk(result: Int, in_reply_to: MessageId, `type`: String = "subtract_ok")
extends Sendable, Reply derives JsonEncoder
case class MultiplyOk(result: Int, in_reply_to: MessageId, `type`: String = "multiply_ok")
extends Sendable, Reply derives JsonEncoder
case class DivideOk(result: Int, in_reply_to: MessageId, `type`: String = "divide_ok")
extends Sendable, Reply derives JsonEncoder
Note
Outgoing message can also extend for input parent trait if they also need to be received by the node. They just need to derive JsonDecoder
additionally in that case.
I/O APIs#
1. receive
#
receive
api takes a handler function I => ZIO[MaelstromRuntime & R, Nothing, Unit]
Note
I
needs have azio.json.JsonDecoder
instanceR
can be anything. You will need to provideR
&MaelstromRuntime
when you run the ZIO effect
Here's an example
case class Gossip(msg_id: MessageId, numbers: Seq[Int], `type`: String = "gossip")
derives JsonCodec
val messageHandler: ZIO[MaelstromRuntime, Nothing, Unit] =
receive[Gossip] {
case msg: Gossip =>
ZIO.logDebug(s"received $msg from $src") //(1)!
*> ZIO.logDebug(s"my node id is $me") //(2)!
*> ZIO.logDebug(s"other node ids are $others") //(3)!
}
src
is theNodeId
of the node that sent the messageme
is theNodeId
of the node that received the messageothers
is a list ofNodeId
received in the init message at the start of node
receive
is a context function and it it gives some variables in the context of the handler function. i.e. me
, others
and src
2. send
#
You can send a message to any NodeId
using NodeId.send()
API. It takes a Sendable
message which has a zio.json.JsonEncoder
instance.
case class Gossip(numbers: Seq[Int], `type`: String = "gossip")
extends Sendable derives JsonCodec
val messageHandler =
receive[Gossip] {
case msg: Gossip =>
ZIO.foreach(others)(_.send(Gossip(Seq(1,2)))).unit //(1)!
}
val result = NodeId("n5") send Gossip(Seq(1,2))
- these will be sent to all nodes in cluster
3. ask
#
ask
api is a combination of send
and receive
. It sends a message to a remote node and waits for a reply. It takes a Sendable
& Receive
message and returns a Reply
message. It also takes a timeout argument which is the maximum time to wait for a reply. It expects a zio.json.JsonDecoder
instance for the reply & a zio.json.JsonEncoder
instance for the request message. ask
api can be called from within and outside of receive
function.
case class Gossip(msg_id: MessageId, numbers: Seq[Int], `type`: String = "gossip")
extends NeedsReply, Sendable derives JsonCodec
case class GossipOk(in_reply_to: MessageId, myNumbers: Seq[Int], `type`: String = "gossip_ok")
extends Reply derives JsonCodec
val gossipResult: ZIO[MaelstromRuntime, AskError, GossipOk] =
MessageId.next.flatMap( msgId => //(1)!
NodeId("n2").ask[GossipOk](Gossip(msgId, Seq(1,2)), 5.seconds)
)
MessageId.next
gives next sequential message id
Tip
Use MessageId.next
to generate a new message id. It is a sequential id generator
Important
Make sure to use different message ids for different messages. If you use the same message id for different messages, the receiver will not be able to map the response to the request
The ask
api can return either a successful response or an AskError
type AskError = ErrorMessage | DecodingFailure | Timeout
Ask error can be one of the following:
Timeout
if the reply was not received within given durationDecodingFailure
if the reply could not be decoded into the given typeErrorMessage
if the sender sends an error message instead of the reply message.
case class Query(id: Int, msg_id: MessageId, `type`: String = "query")
extends NeedsReply,
Sendable derives JsonCodec
case class Answer(in_reply_to: MessageId, text: String) extends Reply derives JsonCodec
val askResponse: ZIO[MaelstromRuntime, AskError, Unit] = for
msgId <- MessageId.next
answer <- NodeId("g4").ask[Answer](Query(1, msgId), 5.seconds)
_ <- logInfo(s"answer: $answer")
yield ()
askResponse
.catchAll {
case t: Timeout => logError(s"timeout: ${t.timeout}")
case d: DecodingFailure => logError(s"decoding failure: ${d.error}")
case e: ErrorMessage =>
val code: ErrorCode = e.code
val text: String = e.text
logError(s"error code: $code, error text: $text")
}
Sender can send an error message if it encounters an error while processing the request message or when request is invalid. You can read more about error messages in the error messages section
4. reply
#
From within receive
function, you can call reply
api to send a reply message to the source of the current message.
case class Gossip(msg_id: MessageId, numbers: Seq[Int], `type`: String = "gossip")
extends NeedsReply derives JsonCodec
case class GossipOk(in_reply_to: MessageId, myNumbers: Seq[Int], `type`: String = "gossip_ok")
extends Reply, Sendable derives JsonCodec
val messageHandler =
receive[Gossip] {
case msg: Gossip => reply(GossipOk(msg.msg_id, Seq(1,2)))
}
reply
api takes an instance of Sendable
& Reply
message which has a zio.json.JsonEncoder
instance.
Tip
reply
can be called only inside of receive function. Outside of the receive
function, you can use send
api which takes a remote NodeId
argument.
Error messages#
zio-maelstrom has a built in data type for error messages called ErrorMessage
case class ErrorMessage(
in_reply_to: MessageId,
code: ErrorCode,
text: String,
`type`: String = "error"
) extends Sendable
with Reply
derives JsonCodec
It supports all the standard maelstrom error codes as well as ability to send custom error codes
View all error codes
sealed trait ErrorCode(private val code1: Int, val definite: Boolean) {
def code: Int = code1
override def toString: String = s"error: ${this.getClass.getSimpleName.replace("$","")}, code: $code"
}
object ErrorCode:
/**
* Indicates that the requested operation could not be completed within a timeout.
*/
object Timeout extends ErrorCode(0, false)
/**
* Thrown when a client sends an RPC request to a node which does not exist.
*/
object NodeNotFound extends ErrorCode(1, true)
/**
* Use this error to indicate that a requested operation is not supported by the current implementation. Helpful for stubbing out APIs during development.
*/
object NotSupported extends ErrorCode(10, true)
/**
* Indicates that the operation definitely cannot be performed at this time--perhaps because the server is in a read-only state, has not yet been initialized, believes its peers to be down, and so on. Do not use this error for indeterminate cases, when the operation may actually have taken place.
*/
object TemporarilyUnavailable extends ErrorCode(11, true)
/**
* The client's request did not conform to the server's expectations, and could not possibly have been processed.
*/
object MalformedRequest extends ErrorCode(12, true)
/**
* Indicates that some kind of general, indefinite error occurred. Use this as a catch-all for errors you can't otherwise categorize, or as a starting point for your error handler: it's safe to return internal-error for every problem by default, then add special cases for more specific errors later.
*/
object Crash extends ErrorCode(13, false)
/**
* Indicates that some kind of general, definite error occurred. Use this as a catch-all for errors you can't otherwise categorize, when you specifically know that the requested operation has not taken place. For instance, you might encounter an indefinite failure during the prepare phase of a transaction: since you haven't started the commit process yet, the transaction can't have taken place. It's therefore safe to return a definite abort to the client.
*/
object Abort extends ErrorCode(14, true)
/**
* The client requested an operation on a key which does not exist (assuming the operation should not automatically create missing keys).
*/
object KeyDoesNotExist extends ErrorCode(20, true)
/**
* The client requested the creation of a key which already exists, and the server will not overwrite it.
*/
object KeyAlreadyExists extends ErrorCode(21, true)
/**
* The requested operation expected some conditions to hold, and those conditions were not met. For instance, a compare-and-set operation might assert that the value of a key is currently 5; if the value is 3, the server would return precondition-failed.
*/
object PreconditionFailed extends ErrorCode(22, true)
/**
* The requested transaction has been aborted because of a conflict with another transaction. Servers need not return this error on every conflict: they may choose to retry automatically instead.
*/
object TxnConflict extends ErrorCode(30, true)
/**
* Custom error code
*
* @param code the error code
*/
case class Custom(override val code: Int) extends ErrorCode(code, false)
You can send an error message to any node id as a reply to another message. Here's an example
case class InMessage(msg_id: MessageId) extends NeedsReply derives JsonCodec
val handler = receive[InMessage] { case msg: InMessage =>
reply(ErrorMessage(msg.msg_id, ErrorCode.PreconditionFailed, "some text message")) // (1)!
}
- You can set any text in
text
field
case class InMessage(msg_id: MessageId) extends NeedsReply derives JsonCodec
val handler = receive[InMessage] { case msg: InMessage =>
reply(ErrorMessage(msg.msg_id, ErrorCode.Custom(1005), "some text message"))
}
Maelstrom services#
Maelstrom starts some services at the beginning of every simulation by default
These are their node ids:
lin-kv
lww-kv
seq-kv
lin-tso
You can read more these services on the maelstrom docs
ZIO-Maelstrom provides LinkKv
, LwwKv
, SeqKv
& LinTso
clients to interact with these services. SeqKv
, LwwKv
& LinKv
are all key value stores. They have the same api but different consistency guarantees.
Native KV APIs#
Native apis are provided by the maelstrom services
read
-
Takes a key and returns the value of the key. If the value does not exist, it returns
KeyDoesNotExist
error code.val counterValue: ZIO[SeqKv, AskError, Int] = SeqKv.read("counter", 5.seconds)
write
-
Takes a key and a value and writes the value against the key. If a value already exists against the key, it is overwritten.
val _: ZIO[LwwKv, AskError, Unit] = LwwKv.write("counter", 1, 5.seconds)
cas
-
CAS stands for
compare-and-swap
. It takes a key, a value and an expected value. It writes the value against the key only if the expected value matches the current value of the key. If the value is different, then it returnsPreconditionFailed
error code. If the key does not exist, it returnsKeyDoesNotExist
error code. If you setcreateIfNotExists
to true, it will create the key if it does not exist.val _: ZIO[LinKv, AskError, Unit] = LinKv.cas(key = "counter", from = 1, to = 3, createIfNotExists = false, timeout = 5.seconds)
Above example will write
3
tocounter
only if the current value ofcounter
is1
. If the current value is different, it will returnPreconditionFailed
error code.
High level KV APIs#
High level apis are built on top of native apis by combining multiple native apis and/or adding additional logic
readOption
-
Takes a key and returns an
Option
of the value of the key. If the value does not exist, it returnsNone
. Does not returnKeyDoesNotExist
error code.val counterMaybe: ZIO[SeqKv, AskError, Option[Int]] = SeqKv.readOption("counter", 5.seconds)
writeIfNotExists
-
Takes a key and a value and writes the value against the key only if the key does not exist. If the key already exists, it returns
PreconditionFailed
error code.val _: ZIO[LwwKv, AskError, Unit] = LwwKv.writeIfNotExists("counter", 1, 5.seconds)
update
-
This is a high level api built on top of other apis. It takes a key, a function that takes the current value and returns a new value. It reads the current value of the key, applies the function and writes the new value against the key. If the value has changed in the meantime, it applies the function again and keeps trying until the value does not change. This is useful for implementing atomic operations like incrementing a value.
val increasedValue: ZIO[SeqKv, AskError, Int] = SeqKv.update("counter", 5.seconds) { case Some(oldValue) => oldValue + 1 case None => 1 }
The timeout value does not apply to entire operation but to each individual read, cas and write operation. So the total time taken by the operation can be more than the timeout value. Retries are only done when the value has changed in the meantime. And other error is returned immediately. This also applies to
updateZIO
api. updateZIO
-
This is a high level api built on top of other apis. It takes a key, a function that takes the current value and returns a
ZIO
that returns a new value. It reads the current value of the key, applies theZIO
and writes the new value against the key. If the value has changed in the meantime, it applies the function again and keeps trying until the value does not change. This is very similar toupdate
but the function can be aZIO
which can do some async operations. When retries happen, theZIO
is retried as well, so side effects should be avoided in this function.def getNewNumber(oldValue: Option[Int]): ZIO[Any, Nothing, Int] = ??? val increasedValueZIO: ZIO[SeqKv, AskError, Int] = SeqKv.updateZIO("counter", 5.seconds)(getNewNumber)
Important
-
Because all these apis are built on top of
ask
api, they can returnAskError
which you may need to handle. According to maelstrom documentation, they can returnKeyDoesNotExist
orPreconditionFailed
error codes. -
In case of network partition or delay, all of the above apis can return
Timeout
error code. -
When incorrect types are used to decode the response, they can return
DecodingFailure
error code.
Tip
key and value of the key value store can be any type that has a zio.json.JsonCodec
instance
TSO APIs#
LinTso
is a linearizable timestamp oracle. It has the following api
val timestamp: ZIO[LinTso, AskError, Int] = LinTso.ts(5.seconds)
Settings#
Below are the settings that can be configured for a node
-
Log Level
The default log level is
LogLevel.Info
. If you want more detailed logs, you can set it toLogLevel.Debug
. If you want to disable logs, you can set it toLogLevel.None
-
Log Format
Log format can be either Plain or Colored. Default is colored.
-
Concurrency
This is the concurrency level for processing messages. Default is 1024. This means 1024 request messages(receive api) + 1024 response messages (ask api) = 2048 messages can be processed in parallel.
object MainApp extends MaelstromNode {
val program = ???
}
object MainApp extends MaelstromNode {
override val configure = NodeConfig
.withConcurrency(100)
.withLogLevelDebug
.withPlaintextLog
val program = ZIO.logDebug("Starting node...")
}
Logging#
You can log at different levels using ZIO's logging APIs - ZIO.logDebug
, ZIO.logInfo
, etc.
All these APIs log to STDERR because STDOUT is used for sending messages.
You can configure the log level using settings API.
By default, log statements are colored. You can change it to plain using settings API
object MainApplication extends MaelstromNode {
override val configure = NodeConfig.withLogLevelDebug
def program = for
_ <- ZIO.logDebug("Starting node")
_ <- ZIO.logInfo("Received message")
_ <- ZIO.logWarning("Something is wrong")
_ <- ZIO.logError("Something is really wrong")
yield ()
}
Above program, when initialized, will output the following:
Testing#
Using static inline messages:
When developing a solution, you sometimes want to test it without maelstrom. While you can use stdIn
to enter the input, you can also hardcode the input messages in the program itself.
object InlineInput extends MaelstromNode {
case class Ping(msg_id: MessageId) extends NeedsReply derives JsonCodec // (1)!
case class Pong(in_reply_to: MessageId, `type`: String = "pong") extends Sendable, Reply
derives JsonEncoder
val program = receive[Ping](ping => reply(Pong(ping.msg_id)))
override val configure: NodeConfig =
NodeConfig
.withStaticInput(
NodeId("A"), // (2)!
Set(NodeId("B"), NodeId("C")), // (3)!
InlineMessage(NodeId("B"), Ping(MessageId(1))), // (4)!
InlineMessage(NodeId("C"), Ping(MessageId(2)))
)
}
- When using inline input, you need JsonEncoder and JsonDecoder instances for input messages
- Node's own id
- Other nodes in the cluster
- Messages to be sent to the node. These are varargs and you can send any number of messages
Tip
When debugging an issue, use static input, set log level to debug and set concurrency to 1. This might help you isolate the issue.
Using static input files:
You can configure the runtime to read the input from a file.
object Main extends MaelstromNode {
case class Ping(msg_id: MessageId) extends NeedsReply derives JsonDecoder
case class Pong(in_reply_to: MessageId, `type`: String = "pong") extends Sendable, Reply
derives JsonEncoder
val program = receive[Ping](ping => reply(Pong(ping.msg_id)))
override val configure: NodeConfig =
NodeConfig
.withStaticInput(
NodeId("A"), // (1)!
Set(NodeId("B"), NodeId("C")), // (2)!
"examples" / "echo" / "fileinput.txt" // (3)!
)
.withLogLevelDebug
}
- Node's own id
- Other nodes in the cluster
- Path to the file containing the input messages
{"src": "c1", "dest": "n1", "body": { "type" : "ping", "msg_id": 1 }}
sleep 2s
{"src": "c1", "dest": "n1", "body": { "type" : "ping", "msg_id": 2 }}
This will run the entire program with the input from the file. With file input you also get to simulate delay in inputs using sleep statements as shown above.