1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package net.dnio.codec.avro;
- import org.apache.avro.generic.GenericRecord;
- import org.apache.avro.io.BinaryEncoder;
- import org.apache.avro.io.DatumWriter;
- import org.apache.avro.io.Encoder;
- import org.apache.avro.io.EncoderFactory;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.jboss.netty.channel.Channel;
- import org.jboss.netty.channel.ChannelHandlerContext;
- import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
- import java.io.ByteArrayOutputStream;
- /**
- * <pre>
- *
- * Created by IntelliJ IDEA.
- * User: zhenqin
- * Date: 13-8-9
- * Time: 上午9:30
- * To change this template use File | Settings | File Templates.
- *
- * </pre>
- *
- * @author zhenqin
- */
- public class AvroEncoder<T> extends OneToOneEncoder {
- private DatumWriter<T> datumWriter;
- private BinaryEncoder reuse;
- /**
- * 日志记录
- */
- private static Log log = LogFactory.getLog(AvroEncoder.class);
- public AvroEncoder(DatumWriter<T> writer, BinaryEncoder reuse) {
- this.datumWriter = writer;
- this.reuse = reuse;
- }
- /**
- * Transforms the specified message into another message and return the
- * transformed message. Note that you can not return {@code null}, unlike
- * you can in {@link org.jboss.netty.handler.codec.oneone.OneToOneDecoder#decode(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.Channel, Object)};
- * you must return something, at least {@link org.jboss.netty.buffer.ChannelBuffers#EMPTY_BUFFER}.
- */
- @Override
- public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
- if (msg instanceof GenericRecord) {
- //取得消息正文
- T datum = (T)msg;
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, reuse);
- datumWriter.write(datum, encoder);
- encoder.flush();
- byte[] array = outputStream.toByteArray();
- return ctx.getChannel().getConfig().
- getBufferFactory().getBuffer(array, 0, array.length);
- }
- return msg;
- }
- public DatumWriter<T> getDatumWriter() {
- return datumWriter;
- }
- public BinaryEncoder getReuse() {
- return reuse;
- }
- }
|