package net.dnio.codec.avro;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
/**
*
*
* Created by IntelliJ IDEA.
* User: zhenqin
* Date: 13-8-9
* Time: 上午9:33
* To change this template use File | Settings | File Templates.
*
*
*
* @author zhenqin
*/
public class AvroDecoder extends OneToOneDecoder {
/**
* Avro Data Reader
*/
private DatumReader datumReader;
/**
* Decoder
*/
private BinaryDecoder reuse;
/**
* 日志记录
*/
private static Log log = LogFactory.getLog(AvroDecoder.class);
/**
*
* @param datumReader Avro Schema Reader
* @param reuse 可为null
*/
public AvroDecoder(DatumReader datumReader, BinaryDecoder reuse) {
this.datumReader = datumReader;
this.reuse = reuse;
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer)) {
return msg;
}
ChannelBuffer buf = (ChannelBuffer) msg;
final byte[] array;
final int offset;
final int length = buf.readableBytes();
if (buf.hasArray()) {
array = buf.array();
offset = buf.arrayOffset() + buf.readerIndex();
} else {
array = new byte[length];
buf.getBytes(buf.readerIndex(), array, 0, length);
offset = 0;
}
Decoder decoder = DecoderFactory.get().binaryDecoder(array, offset, length, reuse);
GenericRecord result = (GenericRecord) datumReader.read(null, decoder);
return result;
}
public DatumReader getDatumReader() {
return datumReader;
}
public BinaryDecoder getReuse() {
return reuse;
}
}