Skip to content

Broadcast#

Info

his example is for Challenge #3b: Multi-Node Broadcast

The goal is to make nodes gossip messages with each other so that "eventually" all nodes receive all messages.

Imports
import zio.json.*
import zio.*
import com.bilalfazlani.zioMaelstrom.*

As part of the broadcast challenge, nodes will receive work with below messages

Topology

topology message contains the list of all nodes in the cluster mapped to their neighbors.

Broadcast

broadcast message has an Integer number in it. Not all the nodes will receive all the numbers in broadcast. The goal is to make sure that all nodes get all the numbers eventually using gossip.

Read

read Maelstrom will expect a reply to this message with all the number that that node knows about either via broadcast or gossip.


Gossip

Our nodes will send gossip messages to each other to gossip the numbers they have received via broadcast.

Because there are multiple messages, lets create a sealed trait to represent them. We will use zio-json annotations to make it easy to serialize and deserialize these messages.

Message definitions
@jsonDiscriminator("type")                 // (1)!
sealed trait InMessage derives JsonDecoder // (2)!

@jsonHint("topology")
case class Topology(topology: Map[NodeId, List[NodeId]], msg_id: MessageId)
    extends InMessage, NeedsReply // (3)!

@jsonHint("broadcast")
case class Broadcast(message: Int, msg_id: MessageId) extends InMessage, NeedsReply

@jsonHint("read")
case class Read(msg_id: MessageId) extends InMessage, NeedsReply

@jsonHint("gossip")
case class Gossip(iHaveSeen: Set[Int], `type`: String = "gossip")
    extends InMessage, Sendable // (4)!
    derives JsonEncoder         // (5)!
  1. "type" field is part of standard Maelstrom message format. It is used to identify the type of message.
  2. derive JsonDecoder will derives decoders for all children of InMessage trait
  3. All the messages that needs a reply should extend from NeedsReply
  4. All messages that need to be sent out from nodes should extend from Sendable
  5. Since Gossip message is sent out from nodes, it also needs an Encoder
Reply messages
case class BroadcastOk(in_reply_to: MessageId, `type`: String = "broadcast_ok")
    extends Sendable, Reply derives JsonEncoder // (1)!

case class ReadOk(messages: Set[Int], in_reply_to: MessageId, `type`: String = "read_ok") // (2)!
    extends Sendable, Reply derives JsonEncoder

case class TopologyOk(in_reply_to: MessageId, `type`: String = "topology_ok")
    extends Sendable, Reply derives JsonEncoder
  1. All messages that are replies to some other messages must extend Sendable and Reply
  2. All Sendable messages require type field
Node state
case class State(messages: Set[Int] = Set.empty, neighbours: Set[NodeId] = Set.empty) {
  def addBroadcast(message: Int): State             = copy(messages = messages + message)
  def addGossip(gossipMessages: Set[Int]): State    = copy(messages = messages ++ gossipMessages)
  def addNeighbours(neighbours: Set[NodeId]): State = copy(neighbours = neighbours)
}
Node application
object Main extends ZIOAppDefault {

  // some helper functions
  val getState                       = ZIO.serviceWithZIO[Ref[State]](_.get)
  def updateState(f: State => State) = ZIO.serviceWithZIO[Ref[State]](_.update(f))

  def gossip(state: State) = ZIO.foreachPar(state.neighbours)(_ send Gossip(state.messages)) // (1)!

  val startGossip = getState.flatMap(gossip).delay(500.millis).forever // (2)!

  val handleMessages: ZIO[MaelstromRuntime & Ref[State] & Scope, Nothing, Unit] =
    receive[InMessage] {
      case msg @ Broadcast(broadcast, messageId) => updateState(_.addBroadcast(broadcast)) *>
          reply(BroadcastOk(messageId)) // (3)!

      case msg @ Read(messageId) => getState.map(_.messages)
          .flatMap(messages => reply(ReadOk(messages, messageId)))

      case msg @ Topology(topology, messageId) =>
        val neighbours = topology(me).toSet // (4)!
        updateState(_.addNeighbours(neighbours)) // (5)!
        *> reply(TopologyOk(messageId))     // (6)!
        *> startGossip.forkScoped.unit           // (7)!
      // .forkScoped adds a `Scope` requirement in the environment

      case msg @ Gossip(gossipMessages, _) => updateState(_.addGossip(gossipMessages)) // (8)!
    }

  val run = handleMessages
    .provideSome[Scope](MaelstromRuntime.live, ZLayer(Ref.make(State())))
}
  1. This is a naive implementation of gossip protocol. We are sending all the number in a node's state to all its neighbors.
  2. The gossiping once started, will trigger every 500 milliseconds and keep happening forever
  3. Save the new number and reply OK to the sender
  4. me is the current node's id
  5. Add node's neighbors to the state
  6. Reply OK to the sender
  7. Start gossiping after arrival of topology message
  8. Add gossip received by other nodes to node's state

Tip

This is a naive implementation of gossip protocol. We are sending all the numbers in a node's state to all its neighbors. This will not scale well. Find a better way to do this.

Note

Source code for this example can be found on Github