Broadcast#
Info
his example is for Challenge #3b: Multi-Node Broadcast
The goal is to make nodes gossip messages with each other so that "eventually" all nodes receive all messages.
import zio.json.*
import zio.*
import com.bilalfazlani.zioMaelstrom.*
As part of the broadcast challenge, nodes will receive work with below messages
- Topology
-
topology
message contains the list of all nodes in the cluster mapped to their neighbors. - Broadcast
-
broadcast
message has an Integer number in it. Not all the nodes will receive all the numbers in broadcast. The goal is to make sure that all nodes get all the numbers eventually using gossip. - Read
-
read
Maelstrom will expect a reply to this message with all the number that that node knows about either via broadcast or gossip.
- Gossip
-
Our nodes will send
gossip
messages to each other to gossip the numbers they have received via broadcast.
Because there are multiple messages, lets create a sealed trait to represent them. We will use zio-json
annotations to make it easy to serialize and deserialize these messages.
@jsonDiscriminator("type") // (1)!
sealed trait InMessage derives JsonDecoder // (2)!
@jsonHint("topology")
case class Topology(topology: Map[NodeId, List[NodeId]], msg_id: MessageId)
extends InMessage,
NeedsReply // (3)!
@jsonHint("broadcast")
case class Broadcast(message: Int, msg_id: MessageId) extends InMessage, NeedsReply
@jsonHint("read")
case class Read(msg_id: MessageId) extends InMessage, NeedsReply
@jsonHint("gossip")
case class Gossip(iHaveSeen: Set[Int], `type`: String = "gossip")
extends InMessage,
Sendable // (4)!
derives JsonEncoder // (5)!
- "type" field is part of standard Maelstrom message format. It is used to identify the type of message.
derive JsonDecoder
will derives decoders for all children ofInMessage
trait- All the messages that needs a reply should extend from
NeedsReply
- All messages that need to be sent out from nodes should extend from
Sendable
- Since
Gossip
message is sent out from nodes, it also needs an Encoder
case class BroadcastOk(in_reply_to: MessageId, `type`: String = "broadcast_ok")
extends Sendable,
Reply derives JsonEncoder // (1)!
case class ReadOk(messages: Set[Int], in_reply_to: MessageId, `type`: String = "read_ok") // (2)!
extends Sendable,
Reply derives JsonEncoder
case class TopologyOk(in_reply_to: MessageId, `type`: String = "topology_ok")
extends Sendable,
Reply derives JsonEncoder
- All messages that are replies to some other messages must extend
Sendable
andReply
- All
Sendable
messages requiretype
field
case class State(messages: Set[Int] = Set.empty, neighbours: Set[NodeId] = Set.empty) {
def addBroadcast(message: Int): State = copy(messages = messages + message)
def addGossip(gossipMessages: Set[Int]): State = copy(messages = messages ++ gossipMessages)
def addNeighbours(neighbours: Set[NodeId]): State = copy(neighbours = neighbours)
}
object Main extends MaelstromNode {
// some helper functions
val getState = ZIO.serviceWithZIO[Ref[State]](_.get)
def updateState(f: State => State) = ZIO.serviceWithZIO[Ref[State]](_.update(f))
def gossip(state: State) = ZIO.foreachPar(state.neighbours)(_ send Gossip(state.messages)) // (1)!
val startGossip = getState.flatMap(gossip).delay(500.millis).forever // (2)!
val handleMessages: ZIO[MaelstromRuntime & Ref[State] & Scope, Nothing, Unit] =
receive[InMessage] {
case msg @ Broadcast(broadcast, messageId) =>
updateState(_.addBroadcast(broadcast)) *>
reply(BroadcastOk(messageId)) // (3)!
case msg @ Read(messageId) =>
getState
.map(_.messages)
.flatMap(messages => reply(ReadOk(messages, messageId)))
case msg @ Topology(topology, messageId) =>
val neighbours = topology(me).toSet // (4)!
updateState(_.addNeighbours(neighbours)) // (5)!
*> reply(TopologyOk(messageId)) // (6)!
*> startGossip.forkScoped.unit // (7)!
// .forkScoped adds a `Scope` requirement in the environment
case msg @ Gossip(gossipMessages, _) => updateState(_.addGossip(gossipMessages)) // (8)!
}
val program =
handleMessages.provideSome[MaelstromRuntime & Scope](ZLayer(Ref.make(State()))) // (9)!
}
- This is a naive implementation of gossip protocol. We are sending all the number in a node's state to all its neighbors.
- The gossiping once started, will trigger every 500 milliseconds and keep happening forever
- Save the new number and reply OK to the sender
me
is the current node's id- Add node's neighbors to the state
- Reply OK to the sender
- Start gossiping after arrival of
topology
message - Add gossip received by other nodes to node's state
- Using the
provideSome
method to provide all the layers exceptMaelstromRuntime
&Scope
Tip
This is a naive implementation of gossip protocol. We are sending all the numbers in a node's state to all its neighbors. This will not scale well. Find a better way to do this.
Note
Source code for this example can be found on Github