博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
深入理解Spark:核心思想与源码分析. 3.9 启动测量系统MetricsSystem
阅读量:5967 次
发布时间:2019-06-19

本文共 3708 字,大约阅读时间需要 12 分钟。

3.9 启动测量系统MetricsSystem

MetricsSystem使用codahale提供的第三方测量仓库Metrics,有关Metrics的具体信息可以参考附录D。MetricsSystem中有三个概念:

Instance:指定了谁在使用测量系统;

Source:指定了从哪里收集测量数据;

Sink:指定了往哪里输出测量数据。

Spark按照Instance的不同,区分为Master、Worker、Application、Driver和Executor。

Spark目前提供的Sink有ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等。

Spark中使用MetricsServlet作为默认的Sink。

MetricsSystem的启动代码如下。

val metricsSystem = env.metricsSystem

    metricsSystem.start()

MetricsSystem的启动过程包括以下步骤:

1)注册Sources;

2)注册Sinks;

3)给Sinks增加Jetty的ServletContextHandler。

MetricsSystem启动完毕后,会遍历与Sinks有关的ServletContextHandler,并调用attach-Handler将它们绑定到Spark UI上。

metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler (handler)))

3.9.1 注册Sources

registerSources方法用于注册Sources,告诉测量系统从哪里收集测量数据,它的实现见代码清单3-45。注册Sources的过程分为以下步骤:

1)从metricsConfig获取Driver的Properties,默认为创建MetricsSystem的过程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。

2)用正则匹配Driver的Properties中以source.开头的属性。然后将属性中的Source反射得到的实例加入ArrayBuffer[Source]。

3)将每个source的metricRegistry(也是MetricSet的子类型)注册到Concurrent-Map<String, Metric> metrics。这里的registerSource方法已在3.8.2节讲解过。

代码清单3-45 MetricsSystem注册Sources的实现

private def registerSources() {

    val instConfig = metricsConfig.getInstance(instance)

    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

 

    // Register all the sources related to instance

    sourceConfigs.foreach { kv =>

        val classPath = kv._2.getProperty("class")

        try {

            val source = Class.forName(classPath).newInstance()

            registerSource(source.asInstanceOf[Source])

        } catch {

            case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)

        }

    }

}

3.9.2 注册Sinks

registerSinks方法用于注册Sinks,即告诉测量系统MetricsSystem往哪里输出测量数据,它的实现见代码清单3-46。注册Sinks的步骤如下:

1)从Driver的Properties中用正则匹配以sink.开头的属性,如{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json},将其转换为Map(servlet -> {class=org.apache.spark.metrics.sink.MetricsServlet, path=/metrics/json})。

2)将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。

代码清单3-46 MetricsSystem注册Sinks的实现

private def registerSinks() {

    val instConfig = metricsConfig.getInstance(instance)

    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

    sinkConfigs.foreach { kv =>

        val classPath = kv._2.getProperty("class")

        if (null != classPath) {

            try {

                val sink = Class.forName(classPath)

                .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])

                .newInstance(kv._2, registry, securityMgr)

            if (kv._1 == "servlet") {

                metricsServlet = Some(sink.asInstanceOf[MetricsServlet])

            } else {

                sinks += sink.asInstanceOf[Sink]

            }

            } catch {

                case e: Exception => logError("Sink class "+ classPath + " cannot be instantialized",e)

            }

        }

    }

}

3.9.3 给Sinks增加Jetty的ServletContextHandler

为了能够在SparkUI(网页)访问到测量数据,所以需要给Sinks增加Jetty的Servlet-ContextHandler,这里主要用到MetricsSystem的getServletHandlers方法实现如下。

def getServletHandlers = {

    require(running, "Can only call getServletHandlers on a running MetricsSystem")

    metricsServlet.map(_.getHandlers).getOrElse(Array())

}

可以看到调用了metricsServlet的getHandlers,其实现如下。

def getHandlers = Array[ServletContextHandler](

    createServletHandler(servletPath,

        new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)

)

最终生成处理/metrics/json请求的ServletContextHandler,而请求的真正处理由get-MetricsSnapshot方法,利用fastjson解析。生成的ServletContextHandler通过SparkUI的attachHandler方法,也被绑定到SparkUI(creatServlethandler与attachHandler方法在3.4.4节详细讲述过)。最终我们可以使用以下这些地址来访问测量数据。

http://localhost:4040/metrics/applications/json。

http://localhost:4040/metrics/json。

http://localhost:4040/metrics/master/json。

转载地址:http://rdhax.baihongyu.com/

你可能感兴趣的文章
使用dotenv管理环境变量
查看>>
温故js系列(11)-BOM
查看>>
Vuex学习
查看>>
bootstrap - navbar
查看>>
服务器迁移小记
查看>>
FastDFS存储服务器部署
查看>>
Android — 创建和修改 Fragment 的方法及相关注意事项
查看>>
swift基础之_swift调用OC/OC调用swift
查看>>
Devexpress 15.1.8 Breaking Changes
查看>>
ElasticSearch Client详解
查看>>
mybatis update返回值的意义
查看>>
expdp 详解及实例
查看>>
通过IP判断登录地址
查看>>
深入浅出JavaScript (五) 详解Document.write()方法
查看>>
Beta冲刺——day6
查看>>
在一个程序中调用另一个程序并且传输数据到选择屏幕执行这个程序
查看>>
代码生成工具Database2Sharp中增加视图的代码生成以及主从表界面生成功能
查看>>
关于在VS2005中编写DLL遇到 C4251 警告的解决办法
查看>>
提高信息安全意识对网络勒索病毒说不
查看>>
使用Jquery 加载页面时调用JS
查看>>