12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- package net.dnio.codec.avro;
- import org.apache.avro.generic.GenericRecord;
- import org.apache.avro.io.BinaryDecoder;
- import org.apache.avro.io.DatumReader;
- import org.apache.avro.io.Decoder;
- import org.apache.avro.io.DecoderFactory;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.jboss.netty.buffer.ChannelBuffer;
- import org.jboss.netty.channel.Channel;
- import org.jboss.netty.channel.ChannelHandlerContext;
- import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
- /**
- * <pre>
- *
- * Created by IntelliJ IDEA.
- * User: zhenqin
- * Date: 13-8-9
- * Time: 上午9:33
- * To change this template use File | Settings | File Templates.
- *
- * </pre>
- *
- * @author zhenqin
- */
- public class AvroDecoder<T> extends OneToOneDecoder {
- /**
- * Avro Data Reader
- */
- private DatumReader<T> datumReader;
- /**
- * Decoder
- */
- private BinaryDecoder reuse;
- /**
- * 日志记录
- */
- private static Log log = LogFactory.getLog(AvroDecoder.class);
- /**
- *
- * @param datumReader Avro Schema Reader
- * @param reuse 可为null
- */
- public AvroDecoder(DatumReader<T> datumReader, BinaryDecoder reuse) {
- this.datumReader = datumReader;
- this.reuse = reuse;
- }
- @Override
- protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
- if (!(msg instanceof ChannelBuffer)) {
- return msg;
- }
- ChannelBuffer buf = (ChannelBuffer) msg;
- final byte[] array;
- final int offset;
- final int length = buf.readableBytes();
- if (buf.hasArray()) {
- array = buf.array();
- offset = buf.arrayOffset() + buf.readerIndex();
- } else {
- array = new byte[length];
- buf.getBytes(buf.readerIndex(), array, 0, length);
- offset = 0;
- }
- Decoder decoder = DecoderFactory.get().binaryDecoder(array, offset, length, reuse);
- GenericRecord result = (GenericRecord) datumReader.read(null, decoder);
- return result;
- }
- public DatumReader<T> getDatumReader() {
- return datumReader;
- }
- public BinaryDecoder getReuse() {
- return reuse;
- }
- }
|