Flume怎么自定义Event Serializer序列化类

发布时间:2021-12-09 13:38:22 作者:iii
来源:亿速云 阅读:137

这篇文章主要介绍“Flume怎么自定义Event Serializer序列化类”,在日常操作中,相信很多人在Flume怎么自定义Event Serializer序列化类问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Flume怎么自定义Event Serializer序列化类”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

把日志从flume打到hbase中,但是我们的日志由于前期是存到MongoDb中的,所以都是Json格式的日志,这时候使用flume自带的SimpleHbaseEventSerializer和RegexHbaseEventSerializer这样的就不行了,于是开始痛苦的看源码,自己写序列化的类(这里需要注意,如果是在flume的hbasesink包下编写的代码,License信息一定要加上。就是最上面那段英文,要不然在运行的时候会报错),比较简单,编写好类之后,编译打包,传到flume的lib目录下,然后在配置agent的时候指定Serializer的类为编写的类即可。下面是代码(类注释没贴出来,见谅哈):

public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
	private byte[] table;//hbase表
	private byte[] cf;//列簇
	private byte[][] payload;//列集合
	private byte[][] payloadColumn;//列值
	private byte[] incrementColumn;
	private String rowSuffix;//roykey后缀
	private String rowPrefix;//rowkey前缀
	private byte[] incrementRow;
	private KeyType keyType;//rowkey后缀类型 
	private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class);

	@Override
	public void configure(Context context) {
		// TODO Auto-generated method stub
		//设置主键后缀类型,这里使用时间戳
		keyType = KeyType.TS;
		if (iCol != null && !iCol.isEmpty()) {
			incrementColumn = iCol.getBytes(Charsets.UTF_8);
		}
		incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
	}

	@Override
	public void configure(ComponentConfiguration conf) {
		// TODO Auto-generated method stub

	}

	@Override
	public void initialize(byte[] table, byte[] cf) {
		// TODO Auto-generated method stub
		this.table = table;
		this.cf = cf;
	}
	/**
	 * 
	 * @Title: setEvent 
	 * @Description: 获取日志信息,并解析出HBase的列以及列的value值 
	 * @param event   
	 * @throws 
	 * @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event)
	 */
	@Override
	public void setEvent(Event event) {
		// TODO Auto-generated method stub
		//获取日志信息
		String log = new String(event.getBody(), StandardCharsets.UTF_8);
		//headers包含日志中项目编号和host信息
		Map<String, String> headers = event.getHeaders();
		JsonReader jsonReader = new JsonReader(new StringReader(log));
		String name = "";
		String value = "";
		String path = "";
		Map<String, String> kv = new HashMap<String, String>();
		try {
			//解析日志中的键值对缓存到map中
			jsonReader.beginObject();
			while (jsonReader.hasNext()) {
				name = jsonReader.nextName();
				value = jsonReader.nextString();
				if(name.equals("uri"))
					path = value.split(" ")[1];
				kv.put(name, value);
			}
			jsonReader.endObject();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//解析headers中的项目id和服务host、路径
		if(path.contains("?")){
			path = path.substring(0, path.indexOf("?"));
		}
		String pcode = headers.get("pcode");
		String host = headers.get("host");
		//将项目编号和服务器host添加到map中
		kv.put("pcode",pcode);
		kv.put("host", host);
		//初始化列和value数组
		this.payloadColumn = new byte[kv.keySet().size()][];
		this.payload = new byte[kv.keySet().size()][];
		int i = 0;
		//给hbase的列和value赋值
		for (String key : kv.keySet()) {
			this.payloadColumn[i] = key.getBytes();
			this.payload[i] = kv.get(key).getBytes();
			i++;
		}
		//设置rowkey的前缀 格式是项目编号+路径
		
		this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time");
	}
	
	@Override
	public List<PutRequest> getActions() {
		// TODO Auto-generated method stub
		List<PutRequest> actions = new ArrayList<PutRequest>();
		if (payloadColumn != null) {
			byte[] rowKey;
			try {
				rowKey = rowSuffix.getBytes();
				// for 循环,提交所有列和对于数据的put请求。
				for (int i = 0; i < this.payload.length; i++) {
					PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]);
					actions.add(putRequest);
				}

			} catch (Exception e) {
				throw new FlumeException("Could not get row key!", e);
			}
		}
		return actions;
	}

	@Override
	public List<AtomicIncrementRequest> getIncrements() {
		// TODO Auto-generated method stub
		List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
		if (incrementColumn != null) {
			AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn);
			actions.add(inc);
		}
		return actions;
	}
	@Override
	public void cleanUp() {
		// TODO Auto-generated method stub
	}

}

到此,关于“Flume怎么自定义Event Serializer序列化类”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. Flume NG 学习笔记(八)Interceptors(拦截器)测试
  2. 通用自定义XML配置类

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

flume event serializer

上一篇:hbase client访问的超时时间怎么配置

下一篇:hive连接hbase外部表时insert数据报错怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》