Monday, June 18, 2012

Netty Tutorial Part 1.5: On Channel Handlers and Channel Options

Intro:

After some feedback on Part 1, and being prompted by some stackoverflow questions, I want to expand on and clarify some topics, so this is Part 1.5.
  • Channel Handler Sharability & State
  • Channel Options

Channel Handlers

As discussed previously, most types of ChannelHandlers have the job of either encoding objects into byte arrays (or specifically, ChannelBuffers) or decoding ChannelBuffers into objects. In most cases, handlers will be instantiated and placed into a ChannelPipeline by a defined ChannelPipelineFactory in code that looks something like this (borrowed from the ClientBootstrap javadoc):

The seeming complexity of the pipeline factory may not be obvious, but it's actually an elegant structure that  flexibly handles some variability in the capabilities of different ChannelHandler implementations. First of all, many ChannelHandlers require some configuration, a detail not represented in the above example. Consider the DelimiterBasedFrameDecoder which decodes chunks of bytes in accordance with the constructor specified delimiter. Instances of this decoder require configuration, so the interface only ChannelPipelineFactory allows us to provide the template that defines how it should be created every time a new pipeline is created. An as a recap, every time a new channel is created, a pipeline is created for it, although in some cases, I suppose, you might end up with an empty pipeline with no channel handlers, which means you are using a snazzy framework for passing around ChannelBuffers. That, combined with the ordering and logical names of handlers in the pipeline, is why one needs some care and attention when creating a pipeline. Here's a better example of the use of some parameterized channel handlers in use (again, pilfered liberally from the StringEncoder javadoc):

Shared and Exclusive Channel Handlers

However, a crucial factor in the use of channel handlers is this: Some channel handlers are such good multitaskers, that you only ever need one instance and that single instance can be reused in every single pipeline created. That is to say, these types of channel handlers maintain no state so they are exhibit channel safety, a variation on the notion of thread safety. With no state to maintain, there's no reason to create more than one. On the other hand, other channel handlers keep state in instance variables and therefore assume that they are exclusive to one channel and one pipeline. Both the above examples create pipelines with exclusive channel handler instances, inasmuch as they create a new handler instance each time a new pipeline is created. The second  example, creating a pipeline factory called MyStringServerFactory (which uses actual real-life channel handlers) contains an example of two channel handlers which could actually be shared, although they have been created in exclusive mode. They are the StringEncoder and StringDecoder (and I will explain the reasoning behind it shortly).
Creating and registering a channel handler in shared mode is simply a matter of instantiating one instance and re-using it in all created pipelines. The following code reimplements MyStringServerFactory using a shared StringDecoder and StringEncoder:

So how is one to know if a channel handler class is sharable or exclusive ? Netty defines an informational annotation called @Sharable and sharable channel handler classes are annotated as such. To quote from the @Sharable's javadoc: 
Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition.
If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables.
This annotation is provided for documentation purpose, just like the JCIP annotations.
It's safe to say that the author summed it up with more brevity than I did, but also informs the reader that the annotation is informational only, so there is no built-in use of the annotation within the API implementation itself, although the @Retention of the annotation is RUNTIME so it can be detected by your code, so using a bit of Reflection[s], I whipped up this table that categorizes all the channel handlers included in the Netty API into Sharable, Exclusive, Upstream, Downstream, and Both. Excluded from this table are abstract and inner classes as well as SimpleChannelHandlerSimpleChannelUpstreamHandler and  SimpleChannelDownstreamHandler. Those last three are concrete channel handlers and are not annotated as @Sharable but they are, what I refer to as, logically abstract, or empty implementations. By themselves, they do nothing and are intended for extending when implementing your own handler classes.
All packages are in org.jboss.netty.
Direction Sharable Exclusive
Upstream
  • handler.codec.string.StringDecoder
  • handler.codec.protobuf.ProtobufDecoder
  • handler.timeout.IdleStateHandler
  • handler.timeout.ReadTimeoutHandler
  • handler.codec.base64.Base64Decoder
  • handler.codec.http.HttpResponseDecoder
  • handler.codec.rtsp.RtspRequestDecoder
  • handler.codec.protobuf.ProtobufVarint32FrameDecoder
  • handler.codec.frame.LengthFieldBasedFrameDecoder
  • handler.codec.frame.DelimiterBasedFrameDecoder
  • handler.codec.rtsp.RtspResponseDecoder
  • handler.codec.spdy.SpdyHttpDecoder
  • handler.codec.serialization.ObjectDecoder
  • handler.codec.http.websocket.WebSocketFrameDecoder
  • handler.timeout.IdleStateAwareChannelUpstreamHandler
  • handler.codec.http.websocketx.WebSocket08FrameDecoder
  • handler.codec.http.HttpChunkAggregator
  • handler.codec.serialization.CompatibleObjectDecoder
  • handler.codec.compression.ZlibDecoder
  • handler.codec.http.websocketx.WebSocket00FrameDecoder
  • handler.codec.spdy.SpdyFrameDecoder
  • handler.codec.http.HttpRequestDecoder
  • handler.codec.frame.FixedLengthFrameDecoder
  • handler.queue.BlockingReadHandler
  • handler.codec.http.HttpContentDecompressor
  • handler.codec.http.websocketx.WebSocket13FrameDecoder
Downstream
  • handler.codec.base64.Base64Encoder
  • handler.timeout.WriteTimeoutHandler
  • handler.codec.rtsp.RtspResponseEncoder
  • handler.codec.protobuf.ProtobufEncoder
  • handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender
  • handler.codec.http.websocket.WebSocketFrameEncoder
  • handler.codec.http.websocketx.WebSocket00FrameEncoder
  • handler.codec.rtsp.RtspRequestEncoder
  • handler.codec.serialization.ObjectEncoder
  • handler.codec.frame.LengthFieldPrepender
  • handler.codec.string.StringEncoder
  • handler.codec.serialization.CompatibleObjectEncoder
  • handler.codec.http.HttpResponseEncoder
  • handler.codec.compression.ZlibEncoder
  • handler.codec.spdy.SpdyFrameEncoder
  • handler.codec.spdy.SpdyHttpEncoder
  • handler.codec.http.websocketx.WebSocket13FrameEncoder
  • handler.codec.http.websocketx.WebSocket08FrameEncoder
  • handler.codec.http.HttpRequestEncoder
Both
  • handler.logging.LoggingHandler
  • handler.execution.ExecutionHandler
  • handler.timeout.IdleStateAwareChannelHandler
  • handler.queue.BufferedWriteHandler
  • handler.codec.spdy.SpdyFrameCodec
  • handler.codec.spdy.SpdySessionHandler
  • handler.codec.spdy.SpdyHttpCodec
  • handler.ssl.SslHandler
  • handler.stream.ChunkedWriteHandler
  • handler.codec.http.HttpClientCodec
  • handler.codec.http.HttpContentCompressor
  • handler.codec.http.HttpServerCodec

Decoders

You might notice a few instances of asymetry when it comes to sharability. For example, the ObjectEncoder is sharable, but the ObjectDecoder is not. This is revealing about the nature of handlers. An ObjectEncoder's job is to convert an object into a byte array (ok, a ChannelBuffer), and it is passed the complete payload (the object to be encoded) so there is no need to retain any state. In contrast, the ObjectDecoder has to work with a bunch of bytes which may, or may not be complete for the purposes of reconstituting back into an object. Think about a serialized object being sent from a remote client. It has been serialized into a byte stream of possibly several thousand bytes. When the decoder is first called, the bytestream may be incomplete, with some portion of the byte stream still being transported.
Don't even think about having the handler wait for the rest of the byte stream to arrive before proceeding. That sort of thing is just not done around these parts, because Netty is an asynchronous API and we're not going to allocate a pool of worker threads (remmeber, they do all real work) to sit around waiting for I/O to complete. Accordingly, when there are bytes available for processing, a worker thread will be allocated to process it through execution of the ObjectDecoder's decode method. This is the signature of that method:

Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
 
When this method is called, if the buffer contains the complete set of bytes required to decode into an object, then the decoded object will be returned and passed on to the next upstream handler. If not, the handler returns a null, in effect signaling a No Op. Either way, the worker thread doing the work is off onto doing something else once the invocation is complete. 
 
That's a standard pattern for decoders. Examining ObjectDecoder specifically, we can see that it extends LengthFieldBasedFrameDecoder, and a brief glance at the source of that class shows the following non-final instance fields:
 

private boolean discardingTooLongFrame;
private long tooLongFrameLength;
private long bytesToDiscard;
 
These fields are the state being protected by making this handler exclusive. The actual byte stream being  decoded is not stored in state, but rather accumulates in the same instance of the ChannelBuffer passed to the decode each time an invocation is made to decode the specific byte stream. This is an important conceptual point to understand: A decoder may be invoked more than once in order to decode a byte stream. Completion results in a returned object;  in-completion results a returned null (and an assumption of more invocations to follow). 

Non-Blocking = Scalability ?

An important takeaway from this is the notion that worker threads only kick in when there is work to do, rather than blocking waiting for I/O to deliver bytes to process. Consider a simple and contrived scenario where a server listens for clients submitting serialized java objects. Implementing a blocking I/O reader, the thread spawned to process a given client's object submission might create a java.io.ObjectInputStream to read from the SocketInputStream. Bytes are read in and objects are spit out the other side. If the byte stream is constantly full, then we would have efficient use of worker threads, although the model still requires one thread to be dedicated to each connected client. Fair enough, since there is a constant supply of data to process, all things being equal.




However, what happens if the supply of bytes to process is intermitent ? Perhaps the clients only supply data intermitently, or perhaps the network has limited bandwidth and/or is congested, delaying the object transmissions. Now the server has allocated threads to read (and block) from the input stream, but there are long periods of time where there is nothing to do but wait for work to arrive.




This is one of the reasons that non-blocking I/O tends to scale better than traditional blocking I/O. Note, it does not necessarilly make it faster, it allows the server to accept and process an escalating number of clients for a more controlled and reduced level of resource allocation. For smaller numbers of clients,  it is quite likely that blocking I/O will be faster, since dedicated threads require less supervision. Of course, since Netty supports both blocking (OIO) and non-blocking (NIO) channels, a Netty developer can create all sorts of services that can be configured to use either. Which one is appropriate for any given application is so dependent on many application specific factors that it is rarely possible to state any hard rules about this. The Netty javadoc for the org.jboss.netty.channel.socket.oio package attempts to set a rule of thumb regarding the use of OIO:
Old blocking I/O based socket channel API implementation - recommended for a small number of connections (< 1000).
That seems high to me, but hardware, network, traffic types and other factors will certainly influence the magic number.

 Framing  (buffers, not pictures)

 ObjectDecoder extends LengthFieldBasedFrameDecoder because the latter performs a critical function necessary in many decoders. Without a frame decoder, the object decoder would be faced with an entire unsegmented byte stream, not knowing where one serialized object ended and the next one started. 









 
Of course, the CompatibleObjectDecoder can do this because it expects a standard java serialization stream, but there are several downsides to this, not the least of which is that they payloads will be significantly larger. In order to help the ObjectDecoder figure out which bytes belong to which serialized object, the frame decoder part of the class knows how to segment the byte stream into discrete chunks that the ObjectDecoder can assume are the contents of one object.











The frame decoder knows how to segment the byte stream because it works in concert with the sender's ObjectEncoder, which has conveniently placed length indicators into the downstream byte stream, specifically so the upstream decoder can use them to parse. When implementing your own custom decoders, if you wanted to implement this technique, your client can use the LengthFieldPrepender encoder.

There are two points to note from this:

  • Most non-trivial decoders are either implementing a known protocol specification (like HTTP for example) or are expecting the payload to have been "massaged" by the encoding counterpart (like the ObjectEncoder).
  • The opportunity for optimization with the Netty encoder/decoder pairs is a good (though not the only) reason to use Netty even if you're using the traditional blocking IO ( OIO ) model.

In the case of the ObjectDecoder, it has it's own frame decoding built in, but in many decoder classes, there is no frame decoding built in and framing support needs to be provided. Fortunately, the modular architecture of the ChannelPipeline allows us to insert a frame decoder before instances of decoders that need them. A good example of this is the StringDecoder. The javadoc emphasizes that this decoder should be used in conjunction with a frame decoder:

Please note that this decoder must be used with a proper FrameDecoder such as DelimiterBasedFrameDecoder if you are using a stream-based transport such as TCP/IP.

There are a few ways that transmitted strings might be represented and the corresponding frame encoder and decoder that can be used:
  1. Fixed Length:   No specific encoder, just use fixed length strings / FixedLengthFrameDecoder
  2. Length Field Prepend: LengthFieldPrepender / LengthFieldBasedFrameDecoder
  3. String delimiter: No specific encoder, but strings should be delimited by the specified delimiters / DelimiterBasedFrameDecoder
The DelimiterBasedFrameDecoder can be configured with more than one delimeter and the presence of either in the stream will trigger a frame segmentation. Typical string delimiters are end-of-line characters or null characters. Delimiters are specified not as strings or even characters, but as ChannelBuffers so they are expressed using the same type that the decoder's input and output will be in.

The class Delimiters contains a few pre-defined and commonly used delimiters, and it's simple enough to define your own:

ChannelBuffer delimiter = ChannelBuffers.wrappedBuffer("THEBIG-D".getBytes())


To frame-up this discussion on frames, channel handlers are often put to work in conjunction with each other. This can be within one pipleline, such as a frame decoder before a string decoder. In addition, a decoder's decoding algorithm may rely on a counterpart encoder's encoding.

ChannelHandler State


Although the most common (and recommended) way of handling state in a ChannelHandler is to use instance fields and then make sure the handler is used exclusively ( a new instance per pipeline inserted into ), there are a couple of ways of keeping state safely in a shared ChannelHandler.

The ChannelHandlerContext class provides the notion of an attachment which is a store that is specific to a channel and handler instance. It is accessed using the getter and setter methods:

void channelHandlerContext.setAttachment(Object obj)
Object channelHandlerContext.getAttachment()

Typically, if you only need to keep state within a handler across calls to the handler, the attachment model is suitable. However, if you need to access state specific to a channel outside [and/or inside] the handler, Netty provides an analog to ThreadLocal called a ChannelLocal. Obviously, since this state is channel specific, but may be accessed by multiple different threads, a ThreadLocal does not serve this purpose. While a ThreadLocal is essentially a bit of data keyed in part by the owning thread, a ChannelLocal is similar in that it is a bit of data keyed by the owning channel. Accordingly, you can define a static field and using the channel as the key, access the value from pretty much anywhere in your code. The primary methods are:


void channelLocal.set(Channel channel, T value)
T channelLocal.get(Channel channel)


This concludes the discussion specific to handlers. It seems to me that a missing piece here is a walk-through of building a read world encoder/decoder pair that really does something useful, and I will endeavor to do this in a future entry.

Channel Options


Channel options are configuration name/value pairs that are used as a loosely typed way of configuring various bits-and-pieces in Netty. They fall into four categories:


  • General Netty Configuration
  • Client Socket Configuration
  • Server Socket Configuration 
  • Server Child Socket Configuration 

In essence, channel options are placed in a Map and then passed to the Bootstrap; the options provided are applied where applicable, so this is an abstracted way of configuring the stack.

General Netty Channel Options

There are a number of option keys that can be used in place of specific API calls to configure how channels are created. The following is a partial summary of the keys and what the types are just to give you an idea what they are. 

  • bufferFactory: An instance of a ChannelBufferFactory that will be called to create new ChannelBuffers. This might be useful in order to customize the byte order of created channels, or to create direct (off heap) buffers.
  • connectTimeoutMillis: The number of milliseconds before a connection request times out. Ignored if the channel factory creates connectionless channels.
  • pipelineFactory: An instance of a ChannelPipelineFactory
  • receiveBufferSizePredictor: UDP packets are received into pre-created byte arrays, which is an unresolvable dilema in that you don't know how much space to allocate until you have received the datagram, but to receive the datagram, you have to allocate a buffer to receive it. BufferSizePredictors provide a way to customize the size of those arrays based on some intelligence you might have about the expected size.
  • receiveBufferSizePredictorFactory: A factory for creating buffer size predictors.

Socket Channel Options

Most of the supported channel options are for configuring sockets. By and large, Netty does not really expose the Java Socket API that much, which might seem odd for a networking API. However, all implementations of SocketChannel use sockets, and sockets have various low level configuration items which can be set through channel options. In general, these are the types of sockets we are working with:

  • Client Socket: A socket on the client side of a TCP connection to a server.
  • Datagram Socket: A socket on the client and/or server side of a unicast UDP conversation.
  • Multicast Socket: A socket on the client and/or server side of a multicast UDP conversation. 
  • Server Socket: A specialized socket on the server side of a TCP server that brokers connection requests from client sockets and spins up a new Socket dedicated to that connection in an action referred to as an accept.  Aside from the initial connection, a client socket does not communicate with the server socket.
  • Child Socket: The socket created on the server, by the server socket, to communicate with a client socket.

All of these socket types support options outlined in java.net.SocketOptions and many of them can have a significant impact on performance. The options are listed below with the SocketOption constant name and the Netty channel option equivalent.



  • IP_MULTICAST_IF / networkInterface: The name of the interface on which outgoing multicast packets should be sent. When a host has multiple network interfaces, this tends to be quite important.
  • IP_MULTICAST_IF2 / networkInterface: The same as IP_MULTICAST_IF but defined again for good measure.
  • IP_MULTICAST_LOOP / loopbackModeDisabled: Defines if multicast packets should be received by the sender of the same.
  • IP_TOS / trafficClass: Sets the type of service or traffic class in the headers of packets.
  • SO_BINDADDR / : A read only option representing the bind interface of a local socket.
  • SO_BROADCAST / broadcast: Enables or disables a DataGramSocket's ability to send broadcast messages,
  • SO_KEEPALIVE / keepAlive: A flag indicating that probes should be periodically sent across the network to the oposing socket to keep the connection alive.
  • SO_LINGER / soLinger: Allows the close of a socket to be delayed for a period of time to allow completion of I/O.
  • SO_OOBINLINE / : Allows Out of Band TCP Urgent data transmissions to be received through the normal data channel rather than discarded.
  • SO_RCVBUF / receiveBufferSize: The size in bytes of the socket's data receive buffer.
  • SO_REUSEADDR / reuseAddress: When set to true, allows multiple DatagramSockets to be bound to the same socket address if the option is enabled prior to binding the socket.
  • SO_SNDBUF / sendBufferSize: The size in bytes of the socket's data send buffer.
  • SO_TIMEOUT / connectTimeoutMillis: Sets a timeout in ms. on blocking socket operations. 
  • TCP_NODELAY / tcpNoDelay: Disables Nagle's algorithm on the socket which delays the transmission of data until a certain volume of pending data has accumulated.

SO_Timeout vs. ChannelFuture Await

The connectTimeoutMillis (or Socket SO_TIMEOUT) is sometimes confused with the timeout that can be set on a ChannelFuture using await or one of its equivalents. The SO_TIMEOUT is strictly the time limit (in ms.) that an I/O operation on the socket must complete in. If a connect I/O operation is not complete within that time, the socket instance will throw a java.net.ConnectException: connection timed out exception. On the other hand, the ChannelFuture's await timeout is simply the amount of time the calling thread will wait for the operation to complete. The timeout itself does not impact the operation and the operation is considered complete when it concludes, whether it was successful or not.

Having said that, if you wanted any asynchronous operation executed on a channel to be cancelled if it did not complete within a specific period of time, a ChannelFuture await timeout is useful because when the timeout elapses and the operation is found incomplete, it can be cancelled. This might be applicable, for example, in a mobile device scenario where you might want to save battery consumption by ensuring that network operations did not run an overly long time. Just because your application threads are not running, does not mean the device would not be consuming power attempting to complete a long running network operation that the application has long since forgotten about.

The following example will attempt to illustrate the difference and how they can be usefully implemented. This code sample is based on simple testing framework class called SimpleNIOClient. The code for the example is TimeoutTest. The test performed is very simple and just attempts to connect to an internet address using different values for the ChannelFuture await timeout and the client socket's SO_TIMEOUT.

The salient part of this test is the connect code and the timers that are placed around it.

 public static void timeoutTest(long ioTimeout, long futureTimeout) {
  // Create a map with the channel options
  Map<String, Object> options = new HashMap<String, Object>();
  options.put("connectTimeoutMillis", ioTimeout);
  // Create the client
  // Not providing any handlers since we're not using any for this test
  SimpleNIOClient client = new SimpleNIOClient(options);
  // Issue a connect operation
  ChannelFuture cf = client.connect("heliosapm.com", 80);  
  // Add a completion listener
  cf.addListener(new ChannelFutureListener() {
   public void operationComplete(ChannelFuture future) throws Exception {
    if(future.isSuccess()) {
     clog("F: Connected:" + future.isDone());
    } else {
     if(future.isCancelled()) {
      clog("Request Cancelled");
     } else {
      clog("Connect Exception:Success: " + future.isSuccess() + "  Done: " + future.isDone()  + "  Cause: "+ future.getCause());
     }
    }
   }
  });
  // Wait at least futureTimeout for the operation to complete
  cf.awaitUninterruptibly(futureTimeout);
  // If the operation is not complete, cancel it.
  if(!cf.isDone()) {
   clog("Channel Future Still Waiting. Cancelled:" + cf.cancel());
  }
 }

I run the test using arbitrarily low timeouts since I am trying to trigger connect timeouts. The invocation code looks like this:
  // Timeout on I/O  
  timeoutTest(10, 500);
  // Wait a bit so output is clear
  try { Thread.sleep(2000); } catch (Exception e) {}
  System.out.println("===============================");
  // Timeout on future
  timeoutTest(500, 10);
  try { Thread.currentThread().join(5000); } catch (Exception e) {}

In the first case, on line 2, I want the SO_TIMEOUT timeout to occur, so I invoke with a low I/O timeout and a higher ChannelFuture timeout. On line 7, I reverse the values for a higher timeout on the socket, but a low timeout on the ChannelFuture. The output is as follows:


[Client][Thread[ClientBossPoolThread#1,5,ClientBossPoolThreadGroup]]:Socket Timeout Exception:Success: false  Done: true  Cause: java.net.ConnectException: connection timed out
===============================
[Client][Thread[main,5,main]]:Request Cancelled
[Client][Thread[main,5,main]]:Channel Future Still Waiting. Cancelled:true


In the first case, the socket timeout is so low that the operation completes quite quickly (although it does not succeed) so the future listener reports that the operation:

  1. Was not successful
  2. Is Complete
  3. Resulted in a ConnectException on account of a timed out connection.
In the second call, the higher socket timeout means the future timeout expires first, and when it does, the operation is still running, so the thread cancels it.

Socket Buffer Sizes


Another critical performance affecting channel option are the socket buffer sizes, which are the sizes of the send and receive native platform buffers allocated by the networking kernel to buffer I/O. It is possible to simply set these values arbitrarily high and leave it at that, but in situations where there are a large number of sockets in use, this may waste native memory resources.

The next example code I want to demonstrate uses another test harness called SimpleNIOServer. The test class is called AdjustableSocketBufferSizeServer. It requires a little explanation around some of the customizations, so please bear with....

ChannelHandlerProviderFactory

This is a channel handler factory that understands whether a channel handler class is sharable or exclusive, so when an instance of a ChannelHandlerProvider is given to the ChannelPipelineFactoryImpl and it creates a new pipeline, it will either create a brand new handler instance (if exclusive) or provide the "singleton" instance already created. This is not critical to the example, but to avoid confusion, I am mentioning their use.

CustomServerBootstrap

This is a customized server bootstrap that allows me to change the channel options on the fly. In the native version, once a bootstrap has been created, the channel options are committed and I want to to be able to change the socket buffer sizes of created child channels on the fly. (Once a change is made, it affects only new connections from that point on.... it does not change existing socket buffer sizes). I also created a management interface called BootstrapJMXManager that allows me to make these changes through JMX.

Alright, the test class, AdjustableSocketBufferSizeServer, creates a new server using SimpleNIOServer. It also creates a JMXConnectorServer and RMI Registry so the MBeanServer can easily be connected to so the child socket receive buffer sizes can be adjusted programatically, so that in turn the test case can be run automatically end to end.

The server's pipeline factory creates pipelines with the following handlers:

  • Upstream: 
    • InstrumentedDelimiterBasedFrameDecoder with a "|" delimiter (exclusive)
    • StringDecoder (shared)
    • StringReporter (shared)
  • Downstream:
    • CompatibleObjectEncoder (shared) 

The test is fairly straightforward:
  1. The client makes a JMX call to the server to specify the size of the server allocated child channel's socket receive buffer size.
  2. The client makes a new connection and sends a [longish] string to the server.
  3. The FrameDecoder splits the string content and sends the fragments upstream to the StringDecoder.
  4. The StringDecoder decodes the byte stream into a String and sends the strings upstream to the StringReporter.
  5. The StringReporter measures the length of the String, creates an int array with the length of the passed string and writes the array downstream back to the client.
  6. On its way downstream, the int[] is encoded using standard Java serialization in the CompatibleObjectEncoder.
  7. The client records the elapsed time to get a response from the server.
  8. The client increases the requested server allocated child channel's socket receive buffer size.
  9. Rinse, Repeat, (Go to #1.) for each subsequent defined buffer size
The script looping has an outer loop and an inner loop:

  • Outer Loop: Iterates each socket receive buffer size defined in an array, sets this size on the server's bootstrap and executes the inner loops. Size: 5
  • Inner Loop: Submits the configured string payload and captures the elapsed time to response. Size: 100


I am going to run the client side of the test from a Groovy script using the raw TCP Socket API, so the server pipeline uses a CompatibleObjectEncoder since the client cannot decode the optimized ObjectDecoder's payload. (It could, but I am sticking to a generic client).


InstrumentedDelimiterBasedFrameDecoder


Remember that for each logical request, server side decoders may need to be invoked several times in order to complete the task of reading the full bytestream and decoding it. When it is invoked, if the bytestream is not complete, it returns a null. Eventually, the bytestream will be complete and the decoder can complete the task and return the (non-null) decoded payload. This being the case, my theory was that the smaller the receiving socket's receive buffer size, the more times the decoder would need to be called. Or in contrast, the larger the receiving buffer size, the more likely that the full bytestream would be transmitted in one shot, reducing the number of required frame decoder calls to one.


To test this theory, I made a simple extension of the DelimiterBasedFrameDecoder that counts the number of times it is invoked by incrementing a counter in an instance variable. When the decode completes, the highwater counter value is written to a ChannelLocal and the counter is reset back to zero. Upstream, the StringReporter, which determines the length of the passed string and returns it to the client, also returns the count of frame decoder invocations by reading it from the ChannelLocal and adding it to the int[] payload returned to the client. In addition, it occurred to me that the socket buffer size might not actually be set to what the client requested. (Will it really set the buffer size to 2 bytes ??!!??) So I added a read of that value and appended it to the result sent back to the client. It is useful to note that when reading or writing channel options from a live (socket based) channel, they are read from and applied to  the actual socket instance. With that, the client can report:

  • The length of the submitted string reported by the server. (Not useful except as a validating assertion)
  • The number of times the frame decoder was called for the most recently submitted string.
  • The elapsed time of the request execution.
  • The actual server socket receive buffer size.



The server pipeline uses a CompatibleObjectEncoder since we're sending back an int[] and since the client is a simple socket client, not a Netty based client. It is a bit of overkill though. Whenever one is sending primitives, or arrays of primitives, it is possible to bypass modular encoding and simply write directly to a ChannelBuffer. Here's how we could remove the CompatibleObjectEncoder from the pipeline and rewrite the StringReporter's write of the int[].

   
   // =====================================================================
   //  Uncomment next section to ditch the CompatibleObjectEncoder from the pipeline
   // =====================================================================
   ChannelBuffer intBuffer = ChannelBuffers.buffer(12);
   for(int i = 0; i < 3; i++) {
    intBuffer.writeInt(ret[i]);
   }
   ctx.sendDownstream(new DownstreamMessageEvent(e.getChannel(), Channels.future(e.getChannel()), intBuffer, e.getChannel().getRemoteAddress()));
   // =====================================================================
   
   // =====================================================================
   // Comment the next line to ditch the CompatibleObjectEncoder from the pipeline
   // =====================================================================
//   ctx.sendDownstream(new DownstreamMessageEvent(e.getChannel(), Channels.future(e.getChannel()), ret, e.getChannel().getRemoteAddress()));



Having done that, the groovy script client (coming up next) which normally uses an ObjectInputStream to read objects from the SocketInputStream would be replaced with a DataInputStream and the code would look like this:

                    /*
                     * The prior code
                    if(ois==null) {
                        ois = new ObjectInputStream(socket.getInputStream());
                    }
                    result = ois.readObject();
                     */
                    if(ois==null) {
                        ois = new DataInputStream(socket.getInputStream());
                    }
                    result = new int[3];
                    for(index in 0..2) {
                        result[index] = ois.readInt();
                    }


Groovy Client Script and Network Kernel Tuning 


This is the groovy test client I used to test the server. I admit that this script is a bit messy, but it does the job. I won't go over it in too much detail, excepting drawing your attention to the following:


  • The client sends the same string each time, since the varying value being adjusted to measure the effect is the server's socket receive buffer size. 
  • The string content is fairly arbitrary and generated by appending all the JVM's system property key/value pairs a couple of times. Once generated, the same string is used in each call. 
  • The socket receive buffer sizes tested are defined in this int array: [1144, 2288, 4576, 9152, 18304, 36608, 73216, 146432, 292864, 585728] 


 Since the script assertively validates that the socket receive buffer size is actually what the client set it to, I found that when I requested a buffer size of 146,432, I got an assertion failure with the actual reported buffer size being only 131,071. As it happens, my Linux network kernel configuration had limited the maximum size of of socket receive buffers to 131,071. In order to raise this limit, running in a sudo'ed shell, I did this:


  • sysctl -a | grep mem > ~/sysctl-mem.txt (Saved the current settings for rollback purposes) 
  • sysctl -w net.core.rmem_max=8388608 (Set the maximum buffer size to 8.3 MB) 
  • sysctl net.core.rmem_max (Verified the value was set. Output was "net.core.rmem_max = 8388608") 
  • sysctl -w net.ipv4.route.flush=1 (Flushed the settings so they took effect immediately) 


This is a useful reference for tuning the network kernel in Linux. Here is a guide for TCP tuning on different operating systems. I should also note that the string I generated as the sample test payload was 8,088 bytes before being delimited with a single "|".

Here are the charted results of the test, with the socket receive buffer size on the x-axis and the average elapsed time of each request in ms. on the first y-axis. The second y-axis represents the average number of times the Frame Decoder was called for each request.



The results are pretty much what I expected and I think quite intuitive:

  • The average elapsed time of the request executions is inversely proportional to the server child socket's receive buffer size until the buffer size is large enough to contain the whole request payload.
  • The average number of frame decoder invocations per request execution is inversely proportional to the server child socket's receive buffer size until the buffer size is large enough to contain the whole request payload.
One might be tempted to simply set the socket buffer size to the max, but keep in mind that on a busy system, many sockets may be allocated and even though the buffer memory does not come out of your Java heap space, it all adds up in your native memory and could be quite wasteful. Furthermore, it is quite obvious from the chart that for a predictable payload size, there is a sweet spot for buffer sizes, so in concert with Netty's ReceiveBufferSizePredictor, it would be interesting to create an auto-tuning socket buffer sizing algorithm, but that's for another day.

The spreadsheet I used for this test is here, and is simple to use. The groovy test script generates a CSV file with all the tabulated results, so you can just import it into OpenOffice (et. al.) and then paste the numbers in.

That's it for entry 1.5. I hope it has been helpful, if not outright entertaining. I am delighted to get any feedback, and genuinely pleased to answer any questions. As promised, in entry 2.0, I will be discussing implementing Ajax push in Netty (with no further decimal releases in the 1 range).

Part 1 of this series:  Netty Tutorial Part 1: Introduction to Netty


Sunday, May 27, 2012

Netty Tutorial Part 1: Introduction to Netty

Netty Tutorial, Part 1: Introduction to Netty



Update:  Part 1.5 Has Been Published: Netty Tutorial Part 1.5: On Channel Handlers and Channel Options

From the Netty web site:

"Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server."




Netty is a Java library and API primarilly aimed at writing highly concurrent networked and networking applications and services. One aspect of Netty you may find different from standard Java APIs is that it is predominantly an asynchronous API. That term implies differnet things to different people and may overlap with the terms non-blocking and event-driven. Regardless, if you have never used an asynchronous API before, it takes a little bit of a mind shift to implement Netty if you are accustomed to writing linear software. Here's how I would boil it down.
You build a Netty stack and start it. Issuing requests is easy and much the same as it is in any Java API. The mind shift comes in processing responses because there are none. Almost every single method invocation of substance is asynchronous, which means that there is no return value and invocation is usually instantaneous. The results (if there are any) will be delivered back in another thread. This is the fundamental difference between a standard API and an asynchronous one. Consider a client API that supplies a method to acquire the number of widgets from the server.

A Standard API

   public int getWidgetCount();
When a thread calls getWidgetCount(), some period of time will elapse and an int will be returned.

Asynchronous API

   public WidgetCountListener myListener = new WidgetCountListener() {
        public void onWidgetCount(int widgetCount) {
             ...... do your thing with the widget count

        }
   };

In my fabricated asynchronous version of the same API, the getWidgetCount call does not return anything and could conceivably execute instantly. However, it accepts a response handler as an argument and that listener will be called back when the widget count has been acquired and the listener can then do whatever is usefully defined with that result.
It might seem the additional layer[s] of complexity are unwarranted, but this is a crucial aspect of high performance applications and services written with Netty and the like. Your client need not waste resources having threads stuck in waiting or timed waiting mode, waiting for the server to respond. They can be notified when the result is available. For one thread, issuing one call, this may seem overkill, but consider hundreds of threads executing this method millions of times. Moreover, a fundamental benefit of NIO is that Selectors can be used to delegate the notification of events we are interested in to the underlying operating system and at the root of it all, to the hardware we're running on. Being informed through callbacks from the OS that 16 bytes have been successfully written out across the network to a server, or that 14 bytes were just read in from the same is obviously a very low level and granular way to work, but through a chain of abstractions, a developer can implement the Java NIO and Netty APIs to enable handling things at much less granular and abstracted level.
In this introduction, I want to start with some basic concepts and name some of the core building blocks and then slide into some actual code examples.

How Does All That Work ?


 The basic currency of Netty is a ChannelBuffer. From the Netty JavaDocs:
A random and sequential accessible sequence of zero or more bytes (octets). This interface provides an abstract view for one or more primitive byte arrays (byte[]) and NIO buffers.
 That's how data is passed around in Netty. If your application only deals with byte arrays and byte buffers, you're in luck ! However, if you need to take a higher order data representation, such as a Java object,  and send it to a remote server, and then receive another object back again, those byte collections need to be converted. Or, if you need to issue a request to an HTTP server, your request might start life as a simple string in the form of a URL, but then it needs to be wrapped into a request that the HTTP server will understand and then decomposed into some raw bytes and sent off across the network.  The HTTP server needs to accept the bytes that are sent and compose them back into an HTTP request that it can interpret and fulfil. Once fulfilled, the reverse must occur where the response (perhaps the response is a JPEG or a JavaScript file) must be wrapped in an HTTP response, converted into bytes and sent back to the calling client.
The basic abstraction of the network through which these bytes are transported is the Netty Channel. Once more to the Netty JavaDocs:
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.
Nexus is an apt name to describe a channel:  (from dictionary.com)
1.  a means of connection; tie; link
2.  a connected series or group
3.  the core or center, as of a matter or situation
 A channel is the externally exposed API through which you interact with Netty. A more grounded way of saying it, a Channel is an abstracted representation of a socket. But.... it's not necessarily a socket, it could be a file, or something even more abstract, so Nexus is a good way to describe it. Suffice it so say, a Channel provides the interface to connect and write to the destination represented by the Channel. No read ? you might ask ? Nope. Remember, it's like the asynchronous getWidgetCount method mentioned above. There's no return value.
All methods on a Channel fall into two categories:
  1. A simple attribute method that (synchronously) provides information about the channel itself.
  2. I/O operations like bind, disconnect, or write.
All methods in category #2 are asynchronous* and they all return a ChannelFuture which is a deferred result, meaning that it is a container for a result that is not known yet, but through which the result will be delivered when it is available. It's a bit like our contrived WidgetCountListener, except that you register your listener with the ChannelFuture instead of passing the listener with the original invocation. (Makes for a cleaner API, no ?) The interface that describes the listener you can implement is the ChannelFutureListener which has one method, called when the operation is complete: public void operationComplete(ChannelFuture future). When this method is called, the operation is complete, but it may not have succeeded, so the passed future can be interrogated to determine the outcome of the operation.
* Not completely true. Netty is predominantly used for NIO, but it also has channel implementations for OIO which refers to the Old IO which is completely synchronous. OIO has some benefits and the Netty implementation is consistent with NIO implementation so components can be interchangeable and reusable.

Creating a Connected Channel


 Channels are not created directly. They are created by a ChannelFactory. There are two variations for ChannelFactories, one for client channels and one for server channels. For each of those two variations, there are several implementations to account for the I/O type as well as the transport protocol:
 UDP based channel factories are the same for clients and servers as they are considered connectionless. There are two additional types:
I will be focusing on the TCP NIO channels in this blog, but be aware there are slight differences in creating different types of channel factories.
The constructors for the TCP NIO ChannelFactories have the same signatures and while there are a few overloads, the basics are that the factory needs two thread pools, or Executors for:
  • The Boss Threads: Threads provided by this pool are boss threads. Boss threads create and connect/bind sockets and then pass them off to the worker threads. In the server, there is one boss thread allocated per listening socket. In the client, there is only one boss thread*
  • The Worker Threads: Worker threads perform all the asynchronous I/O. They are not general purpose threads and developers should take precautions not to assign unrelated tasks to threads in this pool which may cause the threads to block, be rendered unable to perform their real work which in turn may cause deadlocks and an untold number of performance issues.
* So if there is only one boss thread in the client, why does the NioClientSocketChannelFactory require an Executor ?
  1. The boss thread can be released when there is no work to do and is created lazily, but it may be more efficient to pool a small number of threads than create a new one when required and destroying it when idle. 
  2. It is possible that one might want to create several different channel factories and rather than giving each one their own boss pool, they can all share one.
NIO channel factories are the only type that use a boss pool since they are the only ones that can asynchronously connect to sockets or bind to server sockets. The others either have virtual connections (Local), only synchronously connect (OIO) or are connectionless (UDP). The HttpTunelingClientSocketChannelFactory is simply a wrapper for another client socket channel factory, so it may or may not be using a boss thread but it not configured with one.

Keep this in mind about ChannelFactories:  in the course of conducting business with Netty, the factories will allocate resources, including the thread pools. Once you're done with a ChannelFactory, be sure to call releaseExternalResources() on the factory. This will ensure that all its resources are released.

In a nutshell, to send something to a listening server:
  1. Create a channel
  2. Connect the channel to the remote listening socket
  3. Call write(Object message) on the channel.
 Passing Object to the channel seems fairly flexible, no ?  So what happens if I do this ?

   channel.write(new Date());

Netty will issue this exception:

java.lang.IllegalArgumentException: unsupported message type: class java.util.Date

So what is supported ? ChannelBuffers. That's it. However, Channels have a construct called Pipelines (or more specifically, ChannelPipelines). A pipeline is a stack of interceptors that can manipulate or transform the values that are passed to them. Then, when they're done, the pass the value on to the next interceptor. These interceptors are referred to as ChannelHandlers.  The pipeline maintains strict ordering of the ChannelHandler instances that it contains, and typically, the first channel handler will accept a raw ChannelBuffer and the last chanel handler (referred to as the Sink) will discard whatever payload has been passed to it. Somewhere in the pipeline, you want to implement a handler that does something useful. The ChannelHandler itself is only a marker interface with no methods, so handlers have a lot of flexibility, but for a handler to do anything useful, it must be able to respond and/or forward ChannelEvents.  (There's a lot of terminology here.....)
What the heck is a ChannelEvent ? For the purposes of these two paragraphs, consider a ChannelEvent to be a package containing a ChannelBuffer, which as we already know, is the singular currency of Channels. Skip the next paragraphs if you're satisfied with this simplification.

Channel Events -For Real ?

Ok, ChannelEvents are not just packets containing ChannelBuffers.Since almost everything in Netty occurs asynchronously, the driving code patterns consist of bits of code that generate events (like Connect, Disconnect and Write) and bits of code that handle those events once they've been executed by one of the ChannelFactory's thread pools. All these events in the Netty API are instances of the interface ChannelEvent. The javadoc for ChannelEvent has a comprehensive list of the different types of events and explanations of each. For a good example of what an event handler looks like, see the javadoc for the class SimpleChannelHandler. This class implements handler methods for almost every type of event. There is even an event called IdleStateEvent which can be fired when ....... nothing. It fires when a channel has been idle, which can be useful.
In order to implement anything other than a simple ChannelBuffer sender or receiver, we need to add ChannelHandlers to the pipeline. Most of the time, ChannelHandlers in use are probably encoders and decoders, meaning:
  •     Encoder: Converts a non ChannelBuffer object into a ChannelBuffer, suitable for transmission to somewhere else. It might not encode directly to a ChannelBuffer, rather, it might do a partial conversion, implicitly relying on another handler[s] in the pipeline to complete the conversion. One way or the other, if you're not sending straight ChannelBuffers, one or more of the handlers in concert in the pipeline must convert the payload to a ChannelBuffer before it goes out the door. An example of a encoder is ObjectEncoder which converts regular [serializable] java objects into ChannelBuffers containing the byte representation of the written object.
  •     Decoder: The reverse of an encoder where a ChannelBuffer's contents are converted into something more useful. The counterpart of the ObjectEncoder mentioned above, is the ObjectDecoder and it does exactly this.
The Netty SDK supplies various codecs (contraction of Coder/Decoder[s]) such as Google ProtoBufs, Compression, HTTP Request/Response and Base64.
Not all ChannelHandlers are Encoders/Decoders (although the majority in the core API are). A ChannelHandler can be put to work doing all sorts of useful things. For example:
  •     ExecutionHandler: Forwards channel events to another thread pool. Intended to push events out of the Worker (I/O processing) thread pool in order to make sure it stays lively.
  •     BlockingReadHandler: Allows your code to treat a Channel as though it was not asynchronous for the purposes of reading incoming data.
  •     LoggingHandler:Simply logs what events are passing through the handler. A highly useful debugging tool......
So how do I get these ChannelHandlers to help me with my java.util.Date problem ? The answer is that you need to add an ObjectEncoder* to the channel's pipeline so it can translate the Date to a ChannelBuffer. Here's a visual, and I'll get to the code in a minute:



ObjectEncoder is actually an optimized Netty special. So long as you have a Netty ObjectDecoder on the other end, you're good to go. Otherwise, you need to use a CompatibleObjectEncoder which uses standard Java serialization.


Alright, checkpoint:  We need a Channel, which we get from a ChannelFactory, but we also need to define a ChannelPipeline. There's a few ways you can do this, but Netty provides a construct called a Bootstrap that wraps all these things together quite nicely, so here's how all this stuff fits together in order to create a Netty client that can send a Date. This example will implement the NioClientSocketChannelFactory which will create an instance of an NioClientSocketChannel. It bears mentioning, though, that for the most part, the Netty public API simply returns a SocketChannel, an abstract parent class, and you can just consider this to be a plain Channel as the implementations themselves exhibit identical behavior and exposed functionality. This code sample is a little more simplified that what you might see in other examples, but I doeth it for clarity.

   Executor bossPool = Executors.newCachedThreadPool();
   Executor workerPool = Executors.newCachedThreadPool();
   ChannelFactory channelFactory = new NioClientSocketChannelFactory(bossPool, workerPool);
   ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
     public ChannelPipeline getPipeline() throws Exception {
       return Channels.pipeline(
         new ObjectEncoder()
       }
   };
   Bootstrap boostrap = new ClientBootstrap(channelFactory);
   boostrap.setPipelineFactory(pipelineFactory);
   // Phew. Ok. We built all that. Now what ?
   InetSocketAddress addressToConnectTo = new InetSocketAddress(remoteHost, remotePort);
   ChannelFuture cf = bootstrap.connect(addressToConnectTo);

That's how you get a Channel.....  wait up !  That's not a Channel. It's a ChannelFuture. What's that ? Remember that [almost] everything is asynchronous in Netty, so when you request a connection, the actual process of connecting is asynchronous. For that reason, the bootstrap returns a ChannelFuture which is a "handle" to a forthcoming completion event. The ChannelFuture provides the state of the requested operation, and assuming the connect completes successfully, will also provide the Channel. There are (as always....) several options for exactly how the calling thread can wait for the connection to complete, and the following outline of these options applies equally to all asynchronous invocations against channels (disconnects, writes etc.) since all these operations return ChannelFutures.

[A]Waiting on Godot (where Godot is a Channel Invocation)


Regardless of the strategy implemented to wait for a Channel operation to complete, completion itself does not necessarilly indicate success, simply that the operation concluded. The result of completion could be one of the following:
  • Success !: The operation completed successfully.
  • Failed: The operation failed and the reason (a Throwable) for the failure is available from ChannelFuture.getCause().
  • Timed Out: Also a failure as far as we're concerned.
  • Cancelled: A channel invocation can be cancelled by calling ChannelFuture.cancel().
Having said that, here's how you can wait for completion:
  • Await: Await is a fancy way of saying wait, (perhaps used to avoid confusing with Object.wait()  ?) The ChannelFuture provides a number of methods that will cause the connecting thread to wait until the connect operation is complete. Note that this does not mean the thread will wait until it connects, rather, it waits until the final conclusion of the call which might be a successful connect, a failed connect, a connect timeout or the connect request could be cancelled. If there are too many options here to keep you attention, skip to the next paragraph, but the asynchronous nature of these operations requires a few different ways of awaiting, but basically stated:
    • Interruptibility: The waiting thread may be interrupted. This is a fact of life with Threads and the core Java thread API always throws a checked exception (InterruptedException) whenever a thread is directed to wait (sleep, join etc.). However, the ChannelFuture gives you the option of supressing calls to interrupt() which basically means you don't need to put a try/catch block around the call which is awaitUninterruptibly. Otherwise, the interruptible option can be used which is await.
    • Timeout: For various reasons, operations may be indefinitely blocked so it may be wise to apply a timeout to channel operations which limits the time that a thread will wait before getting a timeout exception.
  • Don't Wait: The connecting thread can optionally fire-and-forget by registering a listener on the ChannelFuture which will be fired when the operation completes.This listener is an implementation of a ChannelFutureListener and it receives a callback when the operation completes, passing a reference of the ChannelFuture in question.
In summary, this code outlines examples of the completion waiting options, using a connect operation as the asynchronous event we're waiting for, but applicable for most operations:
   // Waiting on a connect. (Pick one)
   ChannelFuture cf = bootstrap.connect(addressToConnectTo);
   // A. wait interruptibly
   cf.await();
   // B. wait interruptibly with a timeout of 2000 ms.
   cf.await(2000, TimeUnit.MILLISECONDS);
   // C. wait uninterruptibly
   cf.awaitUninterruptibly();
   // D. wait uninterruptibly with a timeout of 2000 ms.
   cf.awaitUninterruptibly(2000, TimeUnit.MILLISECONDS);
   // E. add a ChannelFutureListener that writes the Date when the connect is complete
   cf.addListener(new ChannelFutureListener(){
    public void operationComplete(ChannelFuture future) throws Exception {
     // chek to see if we succeeded
     if(future.isSuccess()) {
      Channel channel = future.getChannel();
      channel.write(new Date());
      // remember, the write is asynchronous too !
     }
    }
   });
   // if a wait option was selected and the connect did not fail,
   // the Date can now be sent.
   Channel channel = cf.getChannel();
   channel.write(new Date());
Typically, in a client side application, the awaitXXX would be appropriate, since your client thread may not have much to do while it waits anyways. In a server code stack (perhaps some sort of proxy server), the connect event callback might be a better choice since there is more likely to be a backlog of work to be done and threads should not be sitting around waiting on I/O events to complete.
The Netty documentation makes this fairly clear, but it is worth emphasising here, you want your Worker threads working, not waiting, so avoid calling any awaits in a worker thread.
One last item on connects, or more to the point, triggering some useful activity when a connect completes. In the example above, one of the options registered a ChannelFutureEvent and the write was executed in the body of the listener when the connection completed successfully. What may not be obvious here is that the operationComplete method is called-back on as a result of a broadcast ChannelStateEvent. This is an instance of a ChannelEvent which is broadcast when a Channel changes state. The ChannelFuture handled this event by invoking the defined callback, but ChannelEvents are passed to all handlers in the pipeline, so you might implement your post-connection action in one of the handlers. I point this out because much of the Netty sample code implements this slightly more non-linear style. For example, see ObjectEchoClient where the client class does all the work of establishing a connected channel, but never actually writes anything. The write of the Object to be echoed actually occurs in the last handler in the pipeline (the ObjectEchoClientHandler) and it is executed when it receives the ChannelStateEvent indicating the Channel connected.

What about the server ?


Following up on the DateSender, we only got as far as the server socket which is listening for data being sent by client sockets. The server side is much like the reverse of the client side.


The server socket sends ChannelEvents loaded with the byte stream it is receiving from the client into the pipeline by passing them to the sink. The pipeline needs to be configured to do the reverse so the next ChannelHandler is an ObjectDecoder which knows how to convert an array of bytes back into a Java Object, or in this case, the Date. Once the Date is decoded, it is handed to the next handler in the pipeline which is a custom handler I invented called a DateHandler. You may have noticed that although Channels have a write method, they don't have an equivalent read method. That would be more of a synchronous API where a thread would read from an OutputStream and wait until data was available. That's why the diagram above does not have an arrow back to the ServerChannel. Rather, the last handler in the pipeline receives the fully decoded message from the client and does something useful with it.
Think of server channel pipelines as being your actual business service invokers. Our actual business service is the DateReceiver and the pipeline feeds it properly unmarshalled data. Of course, you could create a pipeline that contains just one handler that does everything, but chaining together simple components provides much more flexibility. For example, if I was sending not just a Date, but perhaps an array of 300 Dates, I might want to add Compression/Decompression handlers into the client and server pipelines to shrink the size of the transmitted payload, and I can do this by simply modifying the pipeline configurations, rather than have to add additional code to my rhetorical single-do-everything handler. Or I might want to add some authentication so not just any old client can send Dates to my server..... Or what if I needed to shift the Date in transit to account for Time Zone changes..... Or......  it makes more sense to componentize.
The Server side of the DateSender is fairly simple. We create a channel factory, a pipeline factory and put the ObjectDecoder and our custom DateHandler into the pipeline. We'll use a ServerBootstrap instead of a ClientBootstrap and rather than connecting, as we did in the client, we're going to bind to a server socket so we can listen for requests from clients.
 public static void bootServer() {
  // More terse code to setup the server
  ServerBootstrap bootstrap = new ServerBootstrap(
    new NioServerSocketChannelFactory(
      Executors.newCachedThreadPool(),
      Executors.newCachedThreadPool()));

  // Set up the pipeline factory.
  bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
   public ChannelPipeline getPipeline() throws Exception {
    return Channels.pipeline(
     new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
     new DateHandler()
    );
   };
  });

  // Bind and start to accept incoming connections.
  bootstrap.bind(new InetSocketAddress("0.0.0.0", 8080));
  slog("Listening on 8080");
 }

 static class DateHandler extends SimpleChannelHandler {
  public void messageReceived(ChannelHandlerContext ctx,MessageEvent e) throws Exception {
   Date date = (Date)e.getMessage();
   // Here's the REALLY important business service at the end of the pipeline
   slog("Hey Guys !  I got a date ! [" + date + "]");
   // Huh ?
   super.messageReceived(ctx, e);
  }  
 }
Starting at the bottom, the DateHandler extends SimpleChannelHandler which tends to be a good way to go when you're not doing anything out of the ordinary because it's callbacks are strongly typed. In other words, you do not have to listen on all ChannelEvents and test the type of the event for the ones you want. In this case, the only callback that is overridden is messageReceived which means the handler has received some actual payload.
slog and clog are simple wrappers for System.out.println but the former prefixes with [Server] and the latter with [Client] so's we can differentiate the output.

A Quick Note on messageReceived


As I mentioned, the messageReceived method is a callback when some actual payload is being received. That is, the method handles a special subtype of the ChannelEvent called a MessageEvent. To access the payload in the MessageEvent, we simply call its getMessage() method which returns a java.lang.Object which can then be cast to the type expected. Recap: If there are no prior handlers before this handler to convert the payload, the type of the message will be..... a ChannelBuffer. In this case, however, the ObjectDecoder already converted the bytes in the incoming ChannelBuffer to a Date, so the type is java.util.Date. As for the other guy, the ChannelHandlerContext, we'll get to that shortly.
Once we issue the critical business event of logging the received date, the handler is technically done, so what's the score with that code on line 30 ? Since there might be additional handlers further up the pipeline, it's a good idea to always send the payload on its way once it has been handled. If there are no additional handlers, the pipeline will discard the message.
The source code for DateSender is in the GitHub repository if you want to view the whole thing, and here's what the output looks like:

[Server]:Listening on 8080
[Client]:DateSender Example
[Client]:Issuing Channel Connect...
[Client]:Waiting for Channel Connect...
[Client]:Connected. Sending Date
[Server]:Hey Guys !  I got a date ! [Sat May 19 14:00:58 EDT 2012]

Note that this example is purely one way. We only send a Date up to the server and nothing is sent back. If it were to, the pipelines on both sides would need their counterparts,which I will discuss next.

The Return of the King Date


Both clients and servers will read and write in a bidirectional scenario. Consider the DateSender example. What if the server was to increment the date by some arbitrary value and return the modified date ? There another example of almost the same code as DateSender called DateModifier  but which has a server that modifies the Date and returns it to the client. In order to do this, these modifications are required:
  1. The Server will return a Date back to the client, so its pipeline will need an ObjectEncoder.
  2. The Client will receive a Date back from the Server so its pipeline will need an ObjectDecoder.
  3. The Client needs an extra handler to do something with the returned Date from the Server.
 Here's the code that creates the new Client pipeline:
   ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
     public ChannelPipeline getPipeline() throws Exception {
       return Channels.pipeline(
         new ObjectEncoder(),
         // Next 2 Lines are new
         new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
         new ClientDateHandler()
       );
     }
   };
The code for the client DateHandler is very simple and it is invoked when the client receives a Date back from the server and it has been decoded:
   static class ClientDateHandler extends SimpleChannelHandler {
    public void messageReceived(ChannelHandlerContext ctx,MessageEvent e) throws Exception {
     Date date = (Date)e.getMessage();
     clog("Hey Guys !  I got back a modified date ! [" + date + "]");
    }
   }
The server pipeline is almost the mirror of the client now:
   bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() throws Exception {
     return Channels.pipeline(
      new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
      // Next 2 Lines are new
      new ObjectEncoder(),
      new ServerDateHandler()
     );
    };
   });
This is the code for the new Server DateHandler which has a few new concepts.
   static class ServerDateHandler extends SimpleChannelHandler {
    Random random = new Random(System.nanoTime());
    public void messageReceived(ChannelHandlerContext ctx,MessageEvent e) throws Exception {
     Date date = (Date)e.getMessage();
     // Here's the REALLY important business service at the end of the pipeline
     long newTime = (date.getTime() + random.nextInt());
     Date newDate = new Date(newTime);
     slog("Hey Guys !  I got a date ! [" + date + "] and I modified it to [" + newDate + "]");
     // Send back the reponse
     Channel channel = e.getChannel();
     ChannelFuture channelFuture = Channels.future(e.getChannel());
     ChannelEvent responseEvent = new DownstreamMessageEvent(channel, channelFuture, newDate, channel.getRemoteAddress());
     ctx.sendDownstream(responseEvent);
     // But still send it upstream because there might be another handler
     super.messageReceived(ctx, e);
    }  
   }
There's some new stuff on lines 10-13 dedicated specifically to returning the modified Date to the calling client and a subtle concept.
  • On line 10, we get a reference to the Channel from the MessageEvent.
  • The Date is going to be written back to the client, which (at the risk of repeating myself) will be an asynchronous call, so there's always a ChannelFuture involved, so in this case, on line 11,  we're going to create one using the Channels class, a class stuffed full of useful static methods like future.
  • On line 12, a new MessageEvent is created by constructing a DownstreamMessageEvent. Basically, the return value is being packaged up to be sent back down the client.
  • On line 13, the new MessageEvent is sent back down to the client by calling sendDownstream on the ChannelHandlerContext.
Say what ?  Ok, the ChannelHandlerContext is basically a functional reference to the pipeline. It provides access to all the handlers in the pipeline as well as to the pipeline itself, and most importantly, in this case, it provides a means of sending a message back through the "same" path that the incoming message arrived on. The important points here are:
  1. Keeping in mind that there may be additional handlers in the pipeline beyond the ServerDateHandler, I want the handler to respond to the client by sending the modified Date back and then forward the current payload to the next handler in the pipeline.
  2. If we wrote to the Channel directly, it would start from the "top" of the pipeline and might be pushed through handlers that are not indented to be called with the return Date.
All this begs some additional details on how exactly handlers work inside of a pipeline and an explanation of Upstream and Downstream.

Upstream and Downstream


You might wonder why things don't get confused when both the ObjectEncoder and the ObjectDecoder are called within one pipeline invocation, after all, they're both in the pipeline, wouldn't they both get called ? They're not because handlers declare themselves as Upstream handlers, Downsteam handlers or both.Basically Downstream is when a pipeline is sending something to a remote. Upstream is when it is reading something from a remote. If the terminology is not immediately intuitive to you, think of this mnemonic sentence:
If you want to learn something, you might read up on it and then write down some notes.
... anyways, for handlers to participate in a pipeline, they will directly or indirectly implement one or both of these interfaces:
Therefore, on a downstream write, the pipeline orchestrates the payload to be passed to all channel handlers that implement ChannelDownstreamHandler and vice-versa. Keep in mind that a channel handler can implement both interfaces and participate in upstream and downstream events.



Notice that the notion of Upstream vs. Downstream is specific to the client and the server, and for a conversation between a client and a server, both sides will need Encoder/Decoder pairs defined in the correct order.  The next diagram illustrates this in the form of a simplified HTTP server serving a file to a requesting client.




Sequence of ChannelHandlers in the Pipeline

In order to achieve the proper sequence of modifications of the payload, the pipeline keeps strict ordering of the handlers in the pipeline. Consider a pipeline that is supposed to look like this:
  1. JSON Encoder
    • Receives: A widget object
    • Produces: A JSON string representing the widget object.
  2. String Encoder:
    • Receives: A string
    • Produces: A ChannelBuffer with the encoded string
If the order of these encoders were accidentally reversed, the String Encoder would receive a widget Object when it expects a string and a class cast exception would occur.
There are a few constructs that allow for specifying the order of handlers in the pipeline. The simplest is probably the static method pipeline in the org.jboss.netty.channel.Channels class which creates a pipeline placing handlers in the created pipeline in the same order they are specified in the passed array. The signature is:

static ChannelPipeline Channels.pipeline(ChannelHandler...handlers)

In the ChannelPipeline interface itself provides a number of location specifying methods to add handlers, and herein we can also assign handlers specific logical names to uniquely identify each handler in the pipeline. The name itself is purely arbitrary, but as we will see, it is sometimes necessary to be able to reference a handler directly by its name, or determine its positional notation via the assigned name. The handler names are actually mandatory and assigned automatically if you do not specify them yourself. The pipeline methods are as follows:

Dynamically Modifying ChannelHandlers in the Pipeline

Another aspect of pipelines is that they are not immutable*, so it is possible to add and remove handlers at runtime. (There will be a very salient example of this in Part 2). Netty advertises the pipeline as thread-safe for this exact purpose, the four methods above can be called at runtime in order to "fine-tune" the pipeline for a specific call or to switch state.

*Actually there is an immutable implementation called a StaticChannelPipeline. Since it is immutable once created, it cannot be modified and may provide better performance since the pipeline execution does not have to be guarded against runtime changes of the handlers.

Why might we want to suddenly change the handlers in a pipeline that we went to all the bother of creating just-so in the first place ? Here's a contrived example: Say you have a bunch of encoders in a client pipeline, with the last encoder being a ZlibEncoder which will compress the encoded payload before sending it out the door. However, let's say you have determined that if the payload is less than 1024 bytes, the compression step is actually a hindrance to performance, not a benefit. This means that you want to conditionally route the payload through the compression handler. Hmmmm.... since you won't actually know the size of the payload until it has passed through the preceding handlers, you don't even have the option of creating a compression enabled pipeline and a compressionless pipeline because your payload is most of the way down the pipeline before you know which one you want. To address this, we can create a content interrogating handler that examines the size of the payload to be compressed [or not] and add or remove the compression handler accordingly. We'll add this handler after all the other handlers, but before the compression handler.

At this point, you might be thinking there are several alternatives here, but I did say it was contrived, so bear with me.

First, we will create the pipeline with the compression handler enabled, then we'll look at the ConditionalCompressionHandler. Here's what the pipeline might look like:
 public ChannelPipeline getPipeline() throws Exception {
  ChannelPipeline pipeline = Channels.pipeline();
  /// Add all the pre-amble handlers 
  //pipeline.addLast("Foo", new SomeHandler());
  //pipeline.addLast("Bar", new SomeOtherHandler());
  // etc.
  pipeline.addLast("IAmTheDecider", new ConditionalCompressionHandler(1024, "MyCompressionHandler"));
  pipeline.addLast("MyCompressionHandler", new ZlibEncoder());
  return pipeline;
 }


The ConditionalCompressionHandler must examine the size of the ChannelBuffer it is passed and decide if the compression handler should be called or not. However, once removed, if the next payload that comes along requires compression, we need to add it back in again.

  
  public class ConditionalCompressionHandler extends SimpleChannelDownstreamHandler {
   /** The minimum size of a payload to be compressed */
   protected final int sizeThreshold;
   /** The name of the handler to remove if the payload is smaller than specified sizeThreshold */
   protected final String nameOfCompressionHandler;
   /** The compression handler */
   protected volatile ChannelHandler compressionHandler = null;
   
   /**
    * Creates a new ConditionalCompressionHandler
    * @param sizeThreshold The minimum size of a payload to be compressed 
    * @param nameOfCompressionHandler The name of the handler to remove if the payload is smaller than specified sizeThreshold
    */
   public ConditionalCompressionHandler(int sizeThreshold, String nameOfCompressionHandler) {
    this.sizeThreshold = sizeThreshold;
    this.nameOfCompressionHandler = nameOfCompressionHandler;
   }
   
   /**
    * see org.jboss.netty.channel.SimpleChannelDownstreamHandler
    */
   public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
    // If the message is not a ChannelBuffer, hello ClassCastException !
    ChannelBuffer cb = (ChannelBuffer)e.getMessage();
    // Check to see if we already removed the handler
    boolean pipelineContainsCompressor = ctx.getPipeline().getContext(nameOfCompressionHandler)!=null;
    if(cb.readableBytes() < sizeThreshold) {  
     if(pipelineContainsCompressor) {
      // The payload is too small to be compressed but the pipeline contains the compression handler
      // so we need to remove it.
      compressionHandler = ctx.getPipeline().remove(nameOfCompressionHandler);
     }
    } else {
     // We want to compress the payload, let's make sure the compressor is there
     if(!pipelineContainsCompressor) {
      // Oops, it's not there, so lets put it in
      ctx.getPipeline().addAfter(ctx.getName(), nameOfCompressionHandler , compressionHandler);
     }
    }
   }
   
  }


 ChannelHandlerContext

Note that in pretty much every handler event, the handler is passed a ChannelHandlerContext. This class is an adapter between a ChannelHandler and the ChannelPipeline that contains it, allowing a handler to interact with the pipeline, a crucial aspect of the preceding code sample. In addition, it allows a handler to inquire about other handlers and even interact with other handlers (though more indirectly).

  • On line 26 we determine if the compression handler is installed by requesting the pipeline from the ChannelHandlerContext and then requesting the ChannelHandlerContext of the compression handler. If the compression handler is absent, the call will return null.
  • On line 31, the handler requests the pipeline and removes the compression handler from the pipeline by name.
  • On line 37, the handler requests the pipeline and adds the compression handler back into the pipeline. We also use the getName() method of the ChannelHandlerContext to get the name of the current handler.


That's it for this entry. Part 2 will demonstrate some more specific (and useful) examples by way of an Ajax server built with Netty that implements pushed events from the server to a browser. Please feel free to send any feedback or questions through the comments.  Thanks !

Update:  Part 1.5 Has Been Published: Netty Tutorial Part 1.5: On Channel Handlers and Channel Options