AvroDecoder.java 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package net.dnio.codec.avro;
  2. import org.apache.avro.generic.GenericRecord;
  3. import org.apache.avro.io.BinaryDecoder;
  4. import org.apache.avro.io.DatumReader;
  5. import org.apache.avro.io.Decoder;
  6. import org.apache.avro.io.DecoderFactory;
  7. import org.apache.commons.logging.Log;
  8. import org.apache.commons.logging.LogFactory;
  9. import org.jboss.netty.buffer.ChannelBuffer;
  10. import org.jboss.netty.channel.Channel;
  11. import org.jboss.netty.channel.ChannelHandlerContext;
  12. import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
  13. /**
  14. * <pre>
  15. *
  16. * Created by IntelliJ IDEA.
  17. * User: zhenqin
  18. * Date: 13-8-9
  19. * Time: 上午9:33
  20. * To change this template use File | Settings | File Templates.
  21. *
  22. * </pre>
  23. *
  24. * @author zhenqin
  25. */
  26. public class AvroDecoder<T> extends OneToOneDecoder {
  27. /**
  28. * Avro Data Reader
  29. */
  30. private DatumReader<T> datumReader;
  31. /**
  32. * Decoder
  33. */
  34. private BinaryDecoder reuse;
  35. /**
  36. * 日志记录
  37. */
  38. private static Log log = LogFactory.getLog(AvroDecoder.class);
  39. /**
  40. *
  41. * @param datumReader Avro Schema Reader
  42. * @param reuse 可为null
  43. */
  44. public AvroDecoder(DatumReader<T> datumReader, BinaryDecoder reuse) {
  45. this.datumReader = datumReader;
  46. this.reuse = reuse;
  47. }
  48. @Override
  49. protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
  50. if (!(msg instanceof ChannelBuffer)) {
  51. return msg;
  52. }
  53. ChannelBuffer buf = (ChannelBuffer) msg;
  54. final byte[] array;
  55. final int offset;
  56. final int length = buf.readableBytes();
  57. if (buf.hasArray()) {
  58. array = buf.array();
  59. offset = buf.arrayOffset() + buf.readerIndex();
  60. } else {
  61. array = new byte[length];
  62. buf.getBytes(buf.readerIndex(), array, 0, length);
  63. offset = 0;
  64. }
  65. Decoder decoder = DecoderFactory.get().binaryDecoder(array, offset, length, reuse);
  66. GenericRecord result = (GenericRecord) datumReader.read(null, decoder);
  67. return result;
  68. }
  69. public DatumReader<T> getDatumReader() {
  70. return datumReader;
  71. }
  72. public BinaryDecoder getReuse() {
  73. return reuse;
  74. }
  75. }