AvroEncoder.java 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package net.dnio.codec.avro;
  2. import org.apache.avro.generic.GenericRecord;
  3. import org.apache.avro.io.BinaryEncoder;
  4. import org.apache.avro.io.DatumWriter;
  5. import org.apache.avro.io.Encoder;
  6. import org.apache.avro.io.EncoderFactory;
  7. import org.apache.commons.logging.Log;
  8. import org.apache.commons.logging.LogFactory;
  9. import org.jboss.netty.channel.Channel;
  10. import org.jboss.netty.channel.ChannelHandlerContext;
  11. import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
  12. import java.io.ByteArrayOutputStream;
  13. /**
  14. * <pre>
  15. *
  16. * Created by IntelliJ IDEA.
  17. * User: zhenqin
  18. * Date: 13-8-9
  19. * Time: 上午9:30
  20. * To change this template use File | Settings | File Templates.
  21. *
  22. * </pre>
  23. *
  24. * @author zhenqin
  25. */
  26. public class AvroEncoder<T> extends OneToOneEncoder {
  27. private DatumWriter<T> datumWriter;
  28. private BinaryEncoder reuse;
  29. /**
  30. * 日志记录
  31. */
  32. private static Log log = LogFactory.getLog(AvroEncoder.class);
  33. public AvroEncoder(DatumWriter<T> writer, BinaryEncoder reuse) {
  34. this.datumWriter = writer;
  35. this.reuse = reuse;
  36. }
  37. /**
  38. * Transforms the specified message into another message and return the
  39. * transformed message. Note that you can not return {@code null}, unlike
  40. * you can in {@link org.jboss.netty.handler.codec.oneone.OneToOneDecoder#decode(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.Channel, Object)};
  41. * you must return something, at least {@link org.jboss.netty.buffer.ChannelBuffers#EMPTY_BUFFER}.
  42. */
  43. @Override
  44. public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
  45. if (msg instanceof GenericRecord) {
  46. //取得消息正文
  47. T datum = (T)msg;
  48. ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
  49. Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, reuse);
  50. datumWriter.write(datum, encoder);
  51. encoder.flush();
  52. byte[] array = outputStream.toByteArray();
  53. return ctx.getChannel().getConfig().
  54. getBufferFactory().getBuffer(array, 0, array.length);
  55. }
  56. return msg;
  57. }
  58. public DatumWriter<T> getDatumWriter() {
  59. return datumWriter;
  60. }
  61. public BinaryEncoder getReuse() {
  62. return reuse;
  63. }
  64. }