Broadcast#
Info
his example is for Challenge #3b: Multi-Node Broadcast
The goal is to make nodes gossip messages with each other so that "eventually" all nodes receive all messages.
import com.bilalfazlani.zioMaelstrom.*
import zio.*
import zio.json.*
As part of the broadcast challenge, nodes will receive work with below messages
- Topology
-
topology
message contains the list of all nodes in the cluster mapped to their neighbors. - Broadcast
-
broadcast
message has an Integer number in it. Not all the nodes will receive all the numbers in broadcast. The goal is to make sure that all nodes get all the numbers eventually using gossip. - Read
-
read
Maelstrom will expect a reply to this message with all the number that that node knows about either via broadcast or gossip.
- Gossip
-
Our nodes will send
gossip
messages to each other to gossip the numbers they have received via broadcast.
Because there are multiple messages, lets create a sealed trait to represent them. We will use zio-json
annotations to make it easy to serialize and deserialize these messages.
sealed trait InMessage derives JsonDecoder // (1)!
case class Topology(topology: Map[NodeId, List[NodeId]]) extends InMessage
case class Broadcast(message: Int) extends InMessage
case class Read() extends InMessage
case class Gossip(iHaveSeen: Set[Int]) extends InMessage derives JsonEncoder // (2)!
jsonDiscriminator
is required when want to receive multiple types of message. "type" field is part of standard Maelstrom message format so we use the same field to differentiate between different messages.- This derives decoders for all children of
InMessage
trait - Since
Gossip
message is sent out from nodes, it also needs an Encoder
case class BroadcastOk() derives JsonEncoder
case class ReadOk(messages: Set[Int]) derives JsonEncoder
case class TopologyOk() derives JsonEncoder
case class State(messages: Set[Int] = Set.empty, neighbours: Set[NodeId] = Set.empty) {
def addBroadcast(message: Int): State = copy(messages = messages + message)
def addGossip(gossipMessages: Set[Int]): State = copy(messages = messages ++ gossipMessages)
def addNeighbours(neighbours: Set[NodeId]): State = copy(neighbours = neighbours)
}
object Main extends MaelstromNode {
// some helper functions
val getState = ZIO.serviceWithZIO[Ref[State]](_.get)
def updateState(f: State => State) = ZIO.serviceWithZIO[Ref[State]](_.update(f))
def gossip(state: State) = ZIO.foreachPar(state.neighbours)(_ send Gossip(state.messages)) // (1)!
val startGossip = getState.flatMap(gossip).delay(500.millis).forever // (2)!
val program = receive[InMessage] {
case Broadcast(broadcast) =>
updateState(_.addBroadcast(broadcast)) *>
reply(BroadcastOk()) // (3)!
case Read() => getState.map(_.messages).flatMap(x => reply(ReadOk(x)))
case Topology(topology) =>
for {
me <- MaelstromRuntime.me // (4)!
neighbours = topology(me).toSet
_ <- updateState(_.addNeighbours(neighbours)) // (5)!
_ <- reply(TopologyOk()) // (6)!
_ <- startGossip.forkScoped.unit // (7)!
} yield ()
case Gossip(gossipMessages) => updateState(_.addGossip(gossipMessages)) // (8)!
}.provideSome[MaelstromRuntime](ZLayer(Ref.make(State()))) // (9)!
}
- This is a naive implementation of gossip protocol. We are sending all the number in a node's state to all its neighbors.
- The gossiping once started, will trigger every 500 milliseconds and keep happening forever
- Save the new number and reply OK to the sender
me
is the current node's id- Add node's neighbors to the state
- Reply OK to the sender
- Start gossiping after arrival of
topology
message - Add gossip received by other nodes to node's state
- Using the
provideSome
method to provide all the layers exceptMaelstromRuntime
&Scope
Tip
This is a naive implementation of gossip protocol. We are sending all the numbers in a node's state to all its neighbors. This will not scale well. Find a better way to do this.
Note
Source code for this example can be found on Github