Skip to content

Publishing to a topic

pmesh by default uses the ev stream for events to ensure durability by default, meaning any subject name provided will be prefixed with ev. before being emitted to the NATS server.

For more advanced use cases, you can use the raw. prefix to disable this behaviour and publish messages to any NATS subject name as is.

Publishing a message

For convenience, pmesh provides an API to publish messages to the NATS server, available at the /publish/:topic endpoint. The request body and headers are directly forwarded to the NATS server as a message.

Although it will be explored in more detail later, the most simple way of making internal requests to pmesh is via HTTP using the hostname pm3. This hostname is a special alias that resolves to the pmesh instance’s internal IP address, but only authenticates requests from the local network. For this endpoint, the full URL would be http://pm3/publish/topic.

Since we have a project with a my-api service, we can actually start publishing messages and receiving responses right away without defining any runners.

First let’s define a POST route in our project.

app.ts
fastify.post("/greet/:id", function (request, reply) {
for (const key in request.headers) {
console.log(key, request.headers[key]);
}
reply.send({ id: request.params.id });
});

Now we can simply use curl to send a request to the pmesh instance.

Terminal window
$ curl http://pm3/publish/svc.my-api.greet.hello
{"stream":"ev", "seq":5189}

The logs will show the original request headers, as well as the message queue state.

Terminal window
# Initial request headers
6:30AM INF my-api host worker
6:30AM INF my-api user-agent curl/8.4.0
6:30AM INF my-api content-length 0
6:30AM INF my-api accept */*
6:30AM INF my-api x-forwarded-for 127.0.0.1
6:30AM INF my-api x-forwarded-proto http
6:30AM INF my-api x-ray 0055449e85d28dfe-MYH
6:30AM INF my-api p-asn AS0 Local
6:30AM INF my-api p-internal 1
6:30AM INF my-api p-ip 127.0.0.1
6:30AM INF my-api p-ip-geo XX
# The consumer queue ID
6:30AM INF my-api c-id run-ev-svc-my-api-all
# Number of messages pending delivery in the consumer queue
6:30AM INF my-api c-pending 0
# Consumer sequence number
6:30AM INF my-api c-seq 10
# Delivery attempt counter
6:30AM INF my-api c-attempt 0
# Stream timestamp for when the message was received
6:30AM INF my-api s-time 1709789441559
# Stream domain
6:30AM INF my-api s-domain
# Stream ID
6:30AM INF my-api s-id ev
# Stream sequence number
6:30AM INF my-api s-seq 5189

For each service in the manifest, there is also a corresponding implicit runner that subscribes to the svc.<name>.> and raw.svc.<name>.> subjects and forwards the messages to the service’s HTTP implementation, replacing the . delimiter with / to form a path.

Receiving a response

Cool, but where’s our response?

Once the request is finished, if the message originates from a stream, pmesh will store the response in a special key-value store, which is also conveniently exposed as an endpoint at /result/:stream/:seq.

You can think of what we received earlier as a promise.

Terminal window
$ curl http://pm3/result/ev/5189
{"id":"hello"}

We could also use the raw. prefix to publish a regular NATS message, where the response will be directly sent back to us as a reply.

Terminal window
$ curl http://pm3/publish/raw.svc.my-api.greet.hello
{"id":"hello"}

Similarly, we can use the nats CLI to send these requests.

Terminal window
$ nats request svc.my-api.greet.hello ""
07:11:21 Sending request on "svc.my-api.greet.hello"
07:11:21 Received with rtt 1.1784ms
{"id":"hello"}
$ nats request ev.svc.my-api.greet.hello ""
07:12:04 Sending request on "ev.svc.my-api.greet.hello"
07:12:04 Received with rtt 582.3µs
{"stream":"ev", "seq":5191}
$ nats kv get results ev-5191
results > ev-5191 revision: 12 created @ 07 Mar 24 06:12 UTC
{"id":"hello"}