package net.dnio.codec.avro; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; /** *
 *
 * Created by IntelliJ IDEA.
 * User: zhenqin
 * Date: 13-8-9
 * Time: 上午9:33
 * To change this template use File | Settings | File Templates.
 *
 * 
* * @author zhenqin */ public class AvroDecoder extends OneToOneDecoder { /** * Avro Data Reader */ private DatumReader datumReader; /** * Decoder */ private BinaryDecoder reuse; /** * 日志记录 */ private static Log log = LogFactory.getLog(AvroDecoder.class); /** * * @param datumReader Avro Schema Reader * @param reuse 可为null */ public AvroDecoder(DatumReader datumReader, BinaryDecoder reuse) { this.datumReader = datumReader; this.reuse = reuse; } @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { if (!(msg instanceof ChannelBuffer)) { return msg; } ChannelBuffer buf = (ChannelBuffer) msg; final byte[] array; final int offset; final int length = buf.readableBytes(); if (buf.hasArray()) { array = buf.array(); offset = buf.arrayOffset() + buf.readerIndex(); } else { array = new byte[length]; buf.getBytes(buf.readerIndex(), array, 0, length); offset = 0; } Decoder decoder = DecoderFactory.get().binaryDecoder(array, offset, length, reuse); GenericRecord result = (GenericRecord) datumReader.read(null, decoder); return result; } public DatumReader getDatumReader() { return datumReader; } public BinaryDecoder getReuse() { return reuse; } }