The publisher socket can only publish messages into the queue.
The publisher can be created within the Context
import cats.effect.{Resource, IO}
import io.fmq.Context
import io.fmq.socket.pubsub.Publisher
val publisherResource: Resource[IO, Publisher[IO]] =
for {
context <- Context.create[IO](1)
publisher <- Resource.eval(context.createPublisher)
} yield publisher
The Publisher[F]
is a valid instance of the socket but it's not connect to the network yet.
Publisher can either connect to the specific port or allocate a random port.
import io.fmq.socket.pubsub.Publisher
import io.fmq.syntax.literals._
val specificPort: Resource[IO, Publisher.Socket[IO]] =
for {
publisher <- publisherResource
connected <- publisher.bind(tcp"://localhost:31234")
} yield connected
val randomPort: Resource[IO, Publisher.Socket[IO]] =
for {
publisher <- publisherResource
connected <- publisher.bindToRandomPort(tcp_i"://localhost")
} yield connected
Since publisher.bind
and publisher.bindToRandomPort
return Resource[F, ProducerSocket[IO]]
the connection will be released automatically.
The settings can be changed until the socket is connected:
import io.fmq.options.{SendTimeout, Linger}
import io.fmq.socket.pubsub.Publisher
def configurePublisher(publisher: Publisher[IO]): IO[Unit] =
for {
_ <- publisher.setSendTimeout(SendTimeout.Immediately)
_ <- publisher.setLinger(Linger.Immediately)
} yield ()
Publish messages
Only connected publisher can send messages:
import io.fmq.frame.Frame
def sendSingleMessage(publisher: Publisher.Socket[IO]): IO[Unit] =
def sendMultipartMessage(publisher: Publisher.Socket[IO]): IO[Unit] =
publisher.sendMultipart(Frame.Multipart("filter", "my-message"))
def sendMultipartManually(publisher: Publisher.Socket[IO]): IO[Unit] =
publisher.sendMore("filter") >> publisher.send("my-message")