您好,登录后才能下订单哦!
这篇文章主要介绍“Flume怎么自定义Event Serializer序列化类”,在日常操作中,相信很多人在Flume怎么自定义Event Serializer序列化类问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Flume怎么自定义Event Serializer序列化类”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
把日志从flume打到hbase中,但是我们的日志由于前期是存到MongoDb中的,所以都是Json格式的日志,这时候使用flume自带的SimpleHbaseEventSerializer和RegexHbaseEventSerializer这样的就不行了,于是开始痛苦的看源码,自己写序列化的类(这里需要注意,如果是在flume的hbasesink包下编写的代码,License信息一定要加上。就是最上面那段英文,要不然在运行的时候会报错),比较简单,编写好类之后,编译打包,传到flume的lib目录下,然后在配置agent的时候指定Serializer的类为编写的类即可。下面是代码(类注释没贴出来,见谅哈):
public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { private byte[] table;//hbase表 private byte[] cf;//列簇 private byte[][] payload;//列集合 private byte[][] payloadColumn;//列值 private byte[] incrementColumn; private String rowSuffix;//roykey后缀 private String rowPrefix;//rowkey前缀 private byte[] incrementRow; private KeyType keyType;//rowkey后缀类型 private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class); @Override public void configure(Context context) { // TODO Auto-generated method stub //设置主键后缀类型,这里使用时间戳 keyType = KeyType.TS; if (iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void configure(ComponentConfiguration conf) { // TODO Auto-generated method stub } @Override public void initialize(byte[] table, byte[] cf) { // TODO Auto-generated method stub this.table = table; this.cf = cf; } /** * * @Title: setEvent * @Description: 获取日志信息,并解析出HBase的列以及列的value值 * @param event * @throws * @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event) */ @Override public void setEvent(Event event) { // TODO Auto-generated method stub //获取日志信息 String log = new String(event.getBody(), StandardCharsets.UTF_8); //headers包含日志中项目编号和host信息 Map<String, String> headers = event.getHeaders(); JsonReader jsonReader = new JsonReader(new StringReader(log)); String name = ""; String value = ""; String path = ""; Map<String, String> kv = new HashMap<String, String>(); try { //解析日志中的键值对缓存到map中 jsonReader.beginObject(); while (jsonReader.hasNext()) { name = jsonReader.nextName(); value = jsonReader.nextString(); if(name.equals("uri")) path = value.split(" ")[1]; kv.put(name, value); } jsonReader.endObject(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } //解析headers中的项目id和服务host、路径 if(path.contains("?")){ path = path.substring(0, path.indexOf("?")); } String pcode = headers.get("pcode"); String host = headers.get("host"); //将项目编号和服务器host添加到map中 kv.put("pcode",pcode); kv.put("host", host); //初始化列和value数组 this.payloadColumn = new byte[kv.keySet().size()][]; this.payload = new byte[kv.keySet().size()][]; int i = 0; //给hbase的列和value赋值 for (String key : kv.keySet()) { this.payloadColumn[i] = key.getBytes(); this.payload[i] = kv.get(key).getBytes(); i++; } //设置rowkey的前缀 格式是项目编号+路径 this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time"); } @Override public List<PutRequest> getActions() { // TODO Auto-generated method stub List<PutRequest> actions = new ArrayList<PutRequest>(); if (payloadColumn != null) { byte[] rowKey; try { rowKey = rowSuffix.getBytes(); // for 循环,提交所有列和对于数据的put请求。 for (int i = 0; i < this.payload.length; i++) { PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]); actions.add(putRequest); } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } @Override public List<AtomicIncrementRequest> getIncrements() { // TODO Auto-generated method stub List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>(); if (incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); } return actions; } @Override public void cleanUp() { // TODO Auto-generated method stub } }
到此,关于“Flume怎么自定义Event Serializer序列化类”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。