If we want to "reply" against another message to a node, the reply data class needs to extend from Reply trait
traitReply:valin_reply_to:MessageId
in_reply_to is the MessageId of the message that we are replying against. Most of the time, you will have to extend reply messages from Sendable as well because replies need to be "sent" out of a node.
Besides the traits, any message that needs to be sent as a request to another node or as a response for another message, should have a zio.json.JsonEncoder instance. This is required to encode the message into a JSON string which is then sent to the node. Likewise, any message that needs to be received as a request from another node or as a response for another message, should have a zio.json.JsonDecoder instance. This is required to decode the JSON string into the message.
In the unique-ids example, we have defined the following messages:
Here, Generate message extends from NeedsReply because it expects a reply message. Generate message is sent by maelstrom server nodes and not the application nodes. Application nodes just receive the message. Hence it does not need to extend from Sendable. GenerateOk message is the response for Generate and because application node needs to send it, it needs to extend from both Sendable and Reply.
If a message needs to be sent as well as received, it needs an instance of zio.json.JsonCodec which is a combination of zio.json.JsonEncoder and zio.json.JsonDecoder.
Usually a node wants to handle more than one type of message. For that, we discriminate messages using the type field using jsonDiscriminator annotation of zio-json. Here's an example
Outgoing message can also extend for input parent trait if they also need to be received by the node. They just need to derive JsonDecoder additionally in that case.
receive api takes a handler function I => ZIO[MaelstromRuntime & R, Nothing, Unit]
Note
I needs have a zio.json.JsonDecoder instance
R can be anything. You will need to provide R & MaelstromRuntime when you run the ZIO effect
Here's an example
Receive
caseclassGossip(msg_id:MessageId,numbers:Seq[Int],`type`:String="gossip")derivesJsonCodecvalmessageHandler:ZIO[MaelstromRuntime,Nothing,Unit]=receive[Gossip]{casemsg:Gossip=>ZIO.logDebug(s"received $msg from $src")*>ZIO.logDebug(s"my node id is $me")*>ZIO.logDebug(s"other node ids are $others")}
receive is a context function and it it gives some variables in the context of the handler function. i.e. me, others and src
ask api is a combination of send and receive. It sends a message to a remote node and waits for a reply. It takes a Sendable & Receive message and returns a Reply message. It also takes a timeout argument which is the maximum time to wait for a reply. It expects a zio.json.JsonDecoder instance for the reply & a zio.json.JsonEncoder instance for the request message. ask api can be called from within and outside of receive function.
Use MessageId.next to generate a new message id. It is a sequential id generator
Important
Make sure to use different message ids for different messages. If you use the same message id for different messages, the receiver will not be able to map the response to the request
The ask api can return either a successful response or an AskError
AskError
typeAskError=ErrorMessage|DecodingFailure|Timeout
Ask error can be one of the following:
Timeout if the reply was not received within given duration
DecodingFailure if the reply could not be decoded into the given type
ErrorMessage if the sender sends an error message instead of the reply message.
Sender can send an error message if it encounters an error while processing the request message or when request is invalid. You can read more about error messages in the error messages section
sealedtraitErrorCode(privatevalcode1:Int,valdefinite:Boolean){defcode:Int=code1overridedeftoString:String=s"error: ${this.getClass.getSimpleName.replace("$","")}, code: $code"}objectErrorCode:/** * Indicates that the requested operation could not be completed within a timeout. */objectTimeoutextendsErrorCode(0,false)/** * Thrown when a client sends an RPC request to a node which does not exist. */objectNodeNotFoundextendsErrorCode(1,true)/** * Use this error to indicate that a requested operation is not supported by the current implementation. Helpful for stubbing out APIs during development. */objectNotSupportedextendsErrorCode(10,true)/** * Indicates that the operation definitely cannot be performed at this time--perhaps because the server is in a read-only state, has not yet been initialized, believes its peers to be down, and so on. Do not use this error for indeterminate cases, when the operation may actually have taken place. */objectTemporarilyUnavailableextendsErrorCode(11,true)/** * The client's request did not conform to the server's expectations, and could not possibly have been processed. */objectMalformedRequestextendsErrorCode(12,true)/** * Indicates that some kind of general, indefinite error occurred. Use this as a catch-all for errors you can't otherwise categorize, or as a starting point for your error handler: it's safe to return internal-error for every problem by default, then add special cases for more specific errors later. */objectCrashextendsErrorCode(13,false)/** * Indicates that some kind of general, definite error occurred. Use this as a catch-all for errors you can't otherwise categorize, when you specifically know that the requested operation has not taken place. For instance, you might encounter an indefinite failure during the prepare phase of a transaction: since you haven't started the commit process yet, the transaction can't have taken place. It's therefore safe to return a definite abort to the client. */objectAbortextendsErrorCode(14,true)/** * The client requested an operation on a key which does not exist (assuming the operation should not automatically create missing keys). */objectKeyDoesNotExistextendsErrorCode(20,true)/** * The client requested the creation of a key which already exists, and the server will not overwrite it. */objectKeyAlreadyExistsextendsErrorCode(21,true)/** * The requested operation expected some conditions to hold, and those conditions were not met. For instance, a compare-and-set operation might assert that the value of a key is currently 5; if the value is 3, the server would return precondition-failed. */objectPreconditionFailedextendsErrorCode(22,true)/** * The requested transaction has been aborted because of a conflict with another transaction. Servers need not return this error on every conflict: they may choose to retry automatically instead. */objectTxnConflictextendsErrorCode(30,true)/** * Custom error code * * @param code the error code */caseclassCustom(overridevalcode:Int)extendsErrorCode(code,false)
You can send an error message to any node id as a reply to another message. Here's an example
Send standard error
caseclassInMessage(msg_id:MessageId)extendsNeedsReplyderivesJsonCodecvalhandler=receive[InMessage]{casemsg:InMessage=>reply(ErrorMessage(msg.msg_id,ErrorCode.PreconditionFailed,"some text message"))}
Send custom error
caseclassInMessage(msg_id:MessageId)extendsNeedsReplyderivesJsonCodecvalhandler=receive[InMessage]{casemsg:InMessage=>reply(ErrorMessage(msg.msg_id,ErrorCode.Custom(1005),"some text message"))}
ZIO-Maelstrom provides LinkKv, LwwKv, SeqKv & LinTso clients to interact with these services. SeqKv, LwwKv & LinKv are all key value stores. They have the same api but different consistency guarantees.
CAS stands for compare-and-swap. It takes a key, a value and an expected value. It writes the value against the key only if the expected value matches the current value of the key. If the value is different, then it returns PreconditionFailed error code. If the key does not exist, it returns KeyDoesNotExist error code. If you set createIfNotExists to true, it will create the key if it does not exist.
Above example will write 3 to counter only if the current value of counter is 1. If the current value is different, it will return PreconditionFailed error code.
Takes a key and a value and writes the value against the key only if the key does not exist. If the key already exists, it returns PreconditionFailed error code.
This is a high level api built on top of other apis. It takes a key, a function that takes the current value and returns a new value. It reads the current value of the key, applies the function and writes the new value against the key. If the value has changed in the meantime, it applies the function again and keeps trying until the value does not change. This is useful for implementing atomic operations like incrementing a value.
The timeout value does not apply to entire operation but to each individual read, cas and write operation. So the total time taken by the operation can be more than the timeout value. Retries are only done when the value has changed in the meantime. And other error is returned immediately. This also applies to updateZIO api.
updateZIO
This is a high level api built on top of other apis. It takes a key, a function that takes the current value and returns a ZIO that returns a new value. It reads the current value of the key, applies the ZIO and writes the new value against the key. If the value has changed in the meantime, it applies the function again and keeps trying until the value does not change. This is very similar to update but the function can be a ZIO which can do some async operations. When retries happen, the ZIO is retried as well, so side effects should be avoided in this function.
Because all these apis are built on top of ask api, they can return AskError which you may need to handle. According to maelstrom documentation, they can return KeyDoesNotExist or PreconditionFailed error codes.
In case of network partition or delay, all of the above apis can return Timeout error code.
When incorrect types are used to decode the response, they can return DecodingFailure error code.
Tip
key and value of the key value store can be any type that has a zio.json.JsonCodec instance
Below are the settings that can be configured for a node
Log Level
The default log level is LogLevel.Info.
If you want more detailed logs, you can set it to LogLevel.Debug.
If you want to disable logs, you can set it to LogLevel.None
Log Format
Log format can be either Plain or Colored. Default is colored.
Concurrency
This is the concurrency level for processing messages. Default is 1024.
This means 1024 request messages(receive api) + 1024 response messages (ask api) = 2048 messages can be processed in parallel.
You can log at different levels using ZIO's logging APIs - ZIO.logDebug, ZIO.logInfo, etc.
All these APIs log to STDERR because STDOUT is used for sending messages.
You can configure the log level using settings API.
By default, log statements are colored. You can change it to plain using settings API
Logging
objectMainApplicationextendsMaelstromNode{overridevalconfigure=NodeConfig.withLogLevelDebugdefprogram=for_<-ZIO.logDebug("Starting node")_<-ZIO.logInfo("Received message")_<-ZIO.logWarning("Something is wrong")_<-ZIO.logError("Something is really wrong")yield()}
Above program, when initialized, will output the following:
When developing a solution, you sometimes want to test it without maelstrom. While you can use stdIn to enter the input, you can also hardcode the input messages in the program itself.
This will run the entire program with the input from the file. With file input you also get to simulate delay in inputs using sleep statements as shown above.