设为首页 - 加入收藏 PHP编程网 - PHP站长网 (http://www.52php.cn)- 电商,百科,编程,业界,移动互联,5G,云计算,站长网!
热搜: 娱乐 站长之家 服务 百度
当前位置: 首页 > 大数据 > 正文

Zeppelin中Interpreter插件开发

发布时间:2021-01-24 05:13 所属栏目:[大数据] 来源:网络整理
导读:? 项目背景: ? ? (1) 已有监控系统采用的OpenTSDB方案 ? ? (2) ?目前一些大数据应用,尤其是基于spark streaming的流式应用,会实时计算生成一些指标数据,借用监控系统的存储。 ? ? (3) 需要前端展示实时分析结果,采用zeppelin展示方式,但是目前zeppel

?

项目背景:

? ? (1) 已有监控系统采用的OpenTSDB方案

? ? (2) ?目前一些大数据应用,尤其是基于spark streaming的流式应用,会实时计算生成一些指标数据,借用监控系统的存储。

? ? (3) 需要前端展示实时分析结果,采用zeppelin展示方式,但是目前zeppelin不支持OpenTSDB后端引擎支持

? ?So,自己开发!

?

一 Interpreter插件流程

? ??? ? 插播: ? ?刚去访问官方发现0.6.0版本发布了!?http://zeppelin.apache.org/docs/0.6.0/

? ? (1) 下载Zeppelin源码

? ? (2) ?创建Zeppelin Maven工程的 Module

????

Zeppelin中Interpreter插件开发

? ? (3) 添加对zeppelin-interpreter插件包的依赖

????????

Zeppelin中Interpreter插件开发

? ? ? ? ?由于Zeppelin运行环境已经有了该依赖包,所以我们再创建自定义Interpreter插件的时候只需要在代码中对其依赖,打包过程中不需要打包该包。所以使用provided依赖方式。

? ? ?(4) 添加对OpenTSDB客户端操作API包的依赖

????

Zeppelin中Interpreter插件开发

? ? ? ?注意:该包为内部开发依赖包

? ? ? (5) 创建实现类继承Zeppelin提供的抽象类org.apache.zeppelin.interpreter.Interpreter;

? ??????????public class TsdInterpreter extends Interpreter

?

? ? ? ?(6) 代码中注册当前插件

? ? ? ? ? 在实现类中添加以下代码实现当前插件的注册

????? ? ? ??static {

? ? ? ? ? ? ? Interpreter.register("tsd","tsd",TsdInterpreter.class.getName());
? ? ? ? ? }

? ? ? ? ? 以tsd名称注册,那么Zeppelin前端在调用OpenTSDB查询的时候,只需要指定后端引擎名称%tsd即可。

? ? ? ?(7) 实现核心抽象方法,即Zeppelin前端提交过来的命令

??????????public InterpreterResult interpret(String cmd,InterpreterContext context)

? ? ? ? ? cmd: 即在Zeppelin交互式界面编写的命令,不包含%tsd

? ? ? ? ? context: 当前插件的上下文,主要包含插件的配置信息,例如操作OpenTSDB的时候就需要从上下文中获取OpenTSDB的IP和端口参数。

????????????

????? ? ? 该方法实现的核心思想就是: ? 解析命令=>实例化OpenTSDB操作客户端=>操作OpenTSDB客户端进行数据查询=> 获取返回结果 封装成InterpreterResult对象。

?

????????? ? 贴核心代码吧:

????????????

? ? ? ? ? Properties intpProperty = getProperty();
? ? ? ? ? for (Object k : intpProperty.keySet()) {
?? ??? ??? ?String key = (String) k;
?? ??? ??? ?String value = (String) intpProperty.get(key);

?? ??? ??? ?if (key.equals("tsd.host") ) {
?? ??? ??? ??? ?host = value;
?? ??? ??? ?} else if (key.equals("tsd.port")) {
?? ??? ??? ??? ?port = value;
?? ??? ??? ?}
?? ??? ?}
? ? ? ? ?propertiesUtil.setOpentsdbIp(host);
? ? ? ? ? propertiesUtil.setPort(Integer.parseInt(port));

? ? ? ? ? ? Scanner scanner = new Scanner(items[1]);
?? ??? ??? ?String start,end,metric,tagsStr;
?? ??? ??? ?if (scanner.hasNext())
?? ??? ??? ??? ?start = scanner.next();
?? ??? ??? ?else {
?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR,
?? ??? ??? ??? ??? ??? ?"1!Please enter the correct format!");
?? ??? ??? ?}
?? ??? ??? ?
?? ??? ??? ?if (scanner.hasNext())
?? ??? ??? ??? ?end = scanner.next();
?? ??? ??? ?else {
?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR,
?? ??? ??? ??? ??? ??? ?"2!Please enter the correct format!");
?? ??? ??? ?}
?? ??? ??? ?
?? ??? ??? ?if (scanner.hasNext())
?? ??? ??? ??? ?metric = scanner.next();
?? ??? ??? ?else {
?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR,
?? ??? ??? ??? ??? ??? ?"3!Please enter the correct format!");
?? ??? ??? ?}
?? ??? ??? ?
?? ??? ??? ?if (scanner.hasNext())
?? ??? ??? ??? ?tagsStr = scanner.next();
?? ??? ??? ?else {
?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR,
?? ??? ??? ??? ??? ??? ?"4!Please enter the correct format!");
?? ??? ??? ?}
?? ??? ??? ?

?? ??? ??? ?// cpid=tudou,busiid=*,code=1
?? ??? ??? ?String[] tagsStrs = tagsStr.split(",");
?? ??? ??? ?Map<String,String> tags = new HashMap<String,String>();
?? ??? ??? ?for (String s : tagsStrs) {
?? ??? ??? ??? ?int index = s.indexOf('=');
?? ??? ??? ??? ?if (index == -1)
?? ??? ??? ??? ??? ?continue;
?? ??? ??? ??? ?String tagK = s.substring(0,index);
?? ??? ??? ??? ?String tagV = s.substring(index + 1);
?? ??? ??? ??? ?tags.put(tagK,tagV);
?? ??? ??? ?}

?? ??? ??? ?QueryService queryService = new QueryService();
?? ??? ??? ?try {
?? ??? ??? ??? ?List<QueryResponseEntity> responses = queryService
?? ??? ??? ??? ??? ??? ?.queryByMetric(start,tags,null,"sum");

?? ??? ??? ??? ?StringBuffer sb = new StringBuffer();
//?? ??? ??? ??? ?Map<String,String> alldps = new HashMap<String,String>();

?? ??? ??? ??? ?// build header
?? ??? ??? ??? ?Set<String> keys = new HashSet<String>();
?? ??? ??? ??? ?sb.append("time\t");
?? ??? ??? ??? ?for (QueryResponseEntity st : responses) {
?? ??? ??? ??? ??? ?sb.append(st.getTags().toString() + "\t");
?? ??? ??? ??? ??? ?keys.addAll(st.getDps().keySet());
?? ??? ??? ??? ?}
?? ??? ??? ??? ?sb.replace(sb.lastIndexOf("\t"),sb.lastIndexOf("\t") + 1,"\n");

?? ??? ??? ??? ?List<String> keys2 = new ArrayList<String>(keys);
?? ??? ??? ??? ?Collections.sort(keys2);
?? ??? ??? ??? ?// build lines
?? ??? ??? ??? ?Iterator<String> it = keys2.iterator();
?? ??? ??? ??? ?
?? ??? ??? ??? ?long t;
?? ??? ??? ??? ?while (it.hasNext()) {
?? ??? ??? ??? ??? ?String key = it.next(); // 每一行的时间戳
?? ??? ??? ??? ??? ?
?? ??? ??? ??? ??? ?t = Long.parseLong(key);
?? ??? ??? ??? ??? ?sb.append(sdf.format(new Date(t*1000)) + "\t");
?? ??? ??? ??? ??? ?for (QueryResponseEntity st : responses) {
?? ??? ??? ??? ??? ??? ?Map<String,String> dps = st.getDps();
?? ??? ??? ??? ??? ??? ?String value = dps.get(key);
?? ??? ??? ??? ??? ??? ?if (value != null) {
?? ??? ??? ??? ??? ??? ??? ?sb.append(value + "\t");
?? ??? ??? ??? ??? ??? ?} else {
?? ??? ??? ??? ??? ??? ??? ?sb.append(" \t");
?? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?sb.replace(sb.lastIndexOf("\t"),
?? ??? ??? ??? ??? ??? ??? ?"\n");
?? ??? ??? ??? ?}
?? ??? ??? ??? ?// sb.toString()

?? ??? ??? ??? ?return new InterpreterResult(InterpreterResult.Code.SUCCESS,
?? ??? ??? ??? ??? ??? ?InterpreterResult.Type.TABLE,sb.toString());

????二 插件部署

????????? ? (1) ?实现类的配置 ?

????????????????? ? 在ZEPPELIN_HOME/conf/zeppelin-site.xml

? ? ? ? ? ? ? ? ??

Zeppelin中Interpreter插件开发

????? ? ? (2) 拷贝OpenTSDB插件包

? ? ? ? ? ? ? ?在ZEPPELIN_HOME/interpreter

? ? ? ? ? ? ? ? 创建文件夹tsd,将所有依赖包拷贝到该文件夹下

? ? ? ? ? ? ? ? ? ??

Zeppelin中Interpreter插件开发

? ? ? ? ? (3) 重启Zeppelin,在Zeppelin管理界面的 Interpreter中添加 TSD配置

????????????????????

Zeppelin中Interpreter插件开发

?

?

三 ?实现效果

【免责声明】本站内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。

推荐文章
热点阅读