瀏覽代碼

Netty Avro Encoder/Decoder

ZhenQin 12 年之前
父節點
當前提交
7a250ecb4b
共有 3 個文件被更改,包括 177 次插入1 次删除
  1. 3 1
      README.md
  2. 91 0
      netty-avro-codec/AvroDecoder.java
  3. 83 0
      netty-avro-codec/AvroEncoder.java

+ 3 - 1
README.md

@@ -1 +1,3 @@
-#bakcode
+#个人私有项目代码库
+
+netty-avro-codec:Netty Avro的编解码器

+ 91 - 0
netty-avro-codec/AvroDecoder.java

@@ -0,0 +1,91 @@
+package net.dnio.codec.avro;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
+
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 13-8-9
+ * Time: 上午9:33
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class AvroDecoder<T> extends OneToOneDecoder {
+
+
+    /**
+     * Avro Data Reader
+     */
+    private DatumReader<T> datumReader;
+
+
+    /**
+     * Decoder
+     */
+    private BinaryDecoder reuse;
+
+    /**
+     * 日志记录
+     */
+    private static Log log = LogFactory.getLog(AvroDecoder.class);
+
+
+    /**
+     *
+     * @param datumReader Avro Schema Reader
+     * @param reuse 可为null
+     */
+    public AvroDecoder(DatumReader<T> datumReader, BinaryDecoder reuse) {
+        this.datumReader = datumReader;
+        this.reuse = reuse;
+    }
+
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+        if (!(msg instanceof ChannelBuffer)) {
+            return msg;
+        }
+
+        ChannelBuffer buf = (ChannelBuffer) msg;
+        final byte[] array;
+        final int offset;
+        final int length = buf.readableBytes();
+
+        if (buf.hasArray()) {
+            array = buf.array();
+            offset = buf.arrayOffset() + buf.readerIndex();
+        } else {
+            array = new byte[length];
+            buf.getBytes(buf.readerIndex(), array, 0, length);
+            offset = 0;
+        }
+
+        Decoder decoder = DecoderFactory.get().binaryDecoder(array, offset, length, reuse);
+        GenericRecord result = (GenericRecord) datumReader.read(null, decoder);
+        return result;
+    }
+
+    public DatumReader<T> getDatumReader() {
+        return datumReader;
+    }
+
+    public BinaryDecoder getReuse() {
+        return reuse;
+    }
+}

+ 83 - 0
netty-avro-codec/AvroEncoder.java

@@ -0,0 +1,83 @@
+package net.dnio.codec.avro;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+import java.io.ByteArrayOutputStream;
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 13-8-9
+ * Time: 上午9:30
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class AvroEncoder<T> extends OneToOneEncoder {
+
+
+    private DatumWriter<T> datumWriter;
+
+
+    private BinaryEncoder reuse;
+
+    /**
+     * 日志记录
+     */
+    private static Log log = LogFactory.getLog(AvroEncoder.class);
+
+    public AvroEncoder(DatumWriter<T> writer, BinaryEncoder reuse) {
+        this.datumWriter = writer;
+        this.reuse = reuse;
+    }
+
+    /**
+     * Transforms the specified message into another message and return the
+     * transformed message.  Note that you can not return {@code null}, unlike
+     * you can in {@link org.jboss.netty.handler.codec.oneone.OneToOneDecoder#decode(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.Channel, Object)};
+     * you must return something, at least {@link org.jboss.netty.buffer.ChannelBuffers#EMPTY_BUFFER}.
+     */
+    @Override
+    public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+        if (msg instanceof GenericRecord) {
+
+            //取得消息正文
+            T datum = (T)msg;
+
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, reuse);
+
+            datumWriter.write(datum, encoder);
+
+            encoder.flush();
+
+            byte[] array = outputStream.toByteArray();
+
+            return ctx.getChannel().getConfig().
+                    getBufferFactory().getBuffer(array, 0, array.length);
+
+        }
+        return msg;
+    }
+
+    public DatumWriter<T> getDatumWriter() {
+        return datumWriter;
+    }
+
+    public BinaryEncoder getReuse() {
+        return reuse;
+    }
+}