Getting started with Netty: Building a WebSocket broadcast server

Netty is a fantastic Java framework that lets you build event-driven networked applications. Netty powers a ton of popular open-source projects like Vert.x, Akka and Play. While a web/application developer may never need to work with Netty directly, it is greatly beneficial to peel back the layers and understand how lower levels of the stack work.

Why use an asynchronous I/O framework?

Asynchronous, event-driven I/O is not a new idea, but it has taken on an increased significance in the last few years. In modern microservices-based architectures, a request can traverse multiple services before a response is returned. Traditional blocking I/O based models utilize a thread per connection, which means that if a backend service starts misbehaving, concurrent thread and connection counts shoot up. This causes operational headaches, and even short network blips in a high throughput application can cause threadpool saturation.

In contrast, event-driven frameworks use a small number of threads to accept and respond to incoming requests. The caveat here is that you can’t call long blocking methods in these threads. As such, this approach is well suited to I/O heavy workloads like web servers and proxies. For example, Netflix has rebuilt Zuul, their API gateway using Netty.

Netty basics

The three main concepts in Netty are channels, handlers and pipelines. A channel is simply an abstraction over the underlying transport e.g a TCP connection. A pipeline is composed of a series of handlers. As data flows through each stage of the pipeline, handlers can transform it and pass it on to the next stage. This concept is very similar to good ol’ Unix pipes.

Depending on whether a handler operates on incoming data received over the network or on outgoing data to be transmitted over the network, it is classified as a ChannelInboundHandler or a ChannelOutboundHandler. A handler can be both an inbound and an outbound handler if it implements both interfaces.

Getting our hands dirty

To develop Netty applications, you mainly need to focus on implementing handlers and building a pipeline. Luckily, Netty provides built-in handlers for most common protocols like HTTP, websockets, Redis etc. This frees you up to focus on your application logic and gets you up and running with very little code.

Let’s begin by writing a handler that receives WebSocket frames and logs them. Since we are receiving data, we need to implement a ChannelInboundHandler. Luckily, we don’t have to implement all the methods; Netty provides a convenient base class SimpleChannelInboundHandler that we can extend.

Next, recall that handlers need to be part of a pipeline to be useful, so let’s create one. Since we want to receive messages over a websocket, we’ll plug in the protocol-specific handlers that Netty provides at the beginning of the pipeline, and our custom handler goes at the end.

HttpServerCodec is both an inbound and outbound handler that decodes/encodes HTTP messages from/to the network respectively. This is required for the initial websocket upgrade request that arrives over HTTP. It is automatically taken out of the pipeline once the websocket handshake is complete.

HttpObjectAggregator is needed to aggregate the incoming request portion and content into a single FullHttpRequest. This is what the WebSocketServer* handlers operate on.

WebsocketServerCompressionHandler handles websocket compression extensions

WebSocketServerProtocolHandler is the primary handler for the websocket protocol logic

Bootstrapping the server

Finally, we need to write some boilerplate to get our server up and running. The server requires two sets of threads running event loops – one which accepts incoming connections (called the boss group) and the other which handles subsequent I/O operations (called the worker group). We also register a LoggingHandler that logs all events useful for debugging.

Notice that we only need a single thread for the boss group (unless you want to share the boss group with other server bootstraps) and we leave the number of threads in the worker group to the default (this is 2 * number of available cores).

Once you run the main method and the server starts up, you can do a basic test by firing up Chrome dev tools and running:

You should now see “Received text frame Hello websocket!” in your logs. Congrats, you built your first Netty server!

Publishing to other clients

So far, we were only concerned with accepting a WebSocket connection and receiving messages over it. Let’s take it a step further and broadcast a message received on one channel to all other connected channels.

First, we’ll need to keep track of all our connected clients. Again, Netty has our back with the concept of a ChannelGroup. A channel group is thread-safe and handles removing dead connections for you. Let’s create a channel group and add a new connection to it. We’ll also broadcast any incoming message to all channels other than the one it was received on:

Great! Now let’s open two browser tabs; in one, we’ll listen to messages on the websocket:

In the other, we’ll send a message:

If you actually try this, you’ll see in the logs that the message is received successfully, but it is never sent out to the other client! There is also no indication in the logs as to what the problem is.

Registering future listeners

To debug this, we need to figure out why the writeAndFlush() call is not doing what it’s supposed to. Recall that all I/O operations in Netty are asynchronous, hence writeAndFlush() will return right away with a ChannelFuture that will fire when the operation is complete.

Currently, we’re throwing away the returned future and so we don’t have a way of knowing what happened. Let’s register a listener that will log if the operation was not successful:

Ah, now we see the exception in the logs:

ByteBuf and Reference Counting

ByteBuf is Netty’s abstraction over raw byte data. It is reference-counted to make memory allocation more efficient and hence reduce GC pressure. This means that the application needs to explicitly manage reference counts so as to not cause memory leaks. In general, this means that for an inbound message, the final ChannelInboundHandler that handles the message must release it. Similarly, for an outbound message, the final ChannelOutboundHandler must release it.

What the above exception is saying is that the frame we’re trying to send out has a reference count of zero, and so has been already returned to the buffer pool. Wait, but where did we explicitly decrement the reference count of the received frame ? Turns out our base class SimpleChannelInboundHandler does this by default for convenience. So, by the time the message is being written out, it has already been released by the inbound handler.

We can fix this by making a copy of the frame each time we send it out:

It works reliably now!

Can we do better?

Making a copy of the byte buffer works but is obviously not the most efficient solution. We should be able to use a single copy of the buffer since we’re sending the same content to all clients. It’s a matter of managing reference counts properly.

Another thing to keep in mind is that ByteBufs have pointers to indicate where in the buffer we can begin to read/write data. For our use case, we need to make sure that each Channel has an independent view of the read pointer (readerIndex) so that it doesn’t read incomplete or invalid data. The ByteBuf.duplicate() method creates what is called a derived buffer i.e a buffer with the same content, but separate reader and writer pointers. However, duplicate() does not change the reference count of the derived buffer. We’ll do that ourselves by calling ByteBuf.retain().

Since we’ve decided to take reference-counting into our own hands, let’s also disable the automatic release behavior in SimpleChannelInboundHandler; this can be done with a constructor parameter.
The publishing code now looks like:

Edit: @bkowalczyyk pointed on Twitter that if you don’t disable automatic release behavior in SimpleChannelInboundHandler, the above code still works correctly.

Cleaning up properly

There is still a small problem with the above code. After we have written duplicate()d frames to all channels, the reference to the original frame still remains, which means there is a memory leak!

When can we safely release the original frame? A naive approach would be to release it after the forEach() statement. That would be wrong, because since writeAndFlush() is async, we would inadvertently release the frame before the contents are written out. This means that we can safely release only after all the writeAndFlush() operations are complete.

Edit: As the edit above mentions, you can actually release it at this point because we increment the reference count for each channel write; the last channel write will clean it up when it is complete. Read on though, if you are interested in a more convoluted way of doing this šŸ™‚

So we need a way to signal that all the futures returned by the write operations are complete. Netty provides a handy (albeit with a rather awkward interface) class to do this – PromiseCombiner. It requires us to register all the futures we want to wait for and finally register an “aggregate” future that will fire when all the earlier ones are finished. Once the aggregate future fires, we’ll release the frame.

That’s it! Now you can see “WebSocket frame successfully deallocated” in the logs after a broadcast is complete.

Summary

Netty is a powerful framework for network programming, but like all sharp tools, you need to be aware of the edges. We built a small websocket server that demonstrates the basics of Netty. You can find the full example here.

Want more content about writing modern and lean Java?