Sep 15, 2019 - 捣鼓Hue

hue提供了一个很友好的界面,让非专业人士也能够接触轻易地通过SQL完成自己的需求。

随之而来的问题是,外行的SQL能力实在是参差不齐,常常会有个人任务占用大量集群资源,严重影响其他任务。理想的情况下,需要进行一系列的培训和测试,才能开放hue平台的权限。但人非圣贤,孰能无过。即便是熟练工,也可能写出不合格的sql丢进去执行。那应该怎么办?

一个优秀的系统,不能指望人不犯错,而需要把自身打造到坚不可摧。按照这个思路,在SQL执行层做些更严格的检测,如果不合格就不执行或者降低优先级就可以了。据说阿里的dataworks已经实现了,但似乎需要把所有的数据都迁移上去才能用。我们自己当然也可以实现的。

首先,需要一个SQL检查器。这个值得单独一篇文章来写了,暂且跳过。知乎这个帖子的答案干货很多有没有好用的开源sql语法分析器?。回到正题,如何给hue注入一个sql检查器?

hue的开发架构

hue后端使用Django开发,前后端通过mako模板(类似jinja2)粘合,同时前端使用knockoutjs(类似Vue.js)来做状态管理。

hack过程

整体思路是,后端接收到请求后,在执行前做检查。如果sql有问题,可以直接返回错误,或者在执行后把更易读的错误信息加到response里去。

后端注入

首先在hue平台执行一条sql,抓包能看到是请求的/notebook/api/execute/hive这个路径。从hue的后台源代码找这个url对应的view即可。熟悉Django的话,大概率能追溯到$HUE_HOME/desktop/libs/notebook/src/notebook/api.pyexecute函数。不过这个函数只是个中转站,它负责调用所有引擎进行计算,包括hive,hbase,spark等等。真正的引擎,都在$HUE_HOME/desktop/libs/notebook/src/notebook/connectors里。其中hiveserver2.py文件的execute函数,是真正执行hivesql的函数。

  @query_error_handler
  def execute(self, notebook, snippet):
    db = self._get_db(snippet, interpreter=self.interpreter)

    statement = self._get_current_statement(notebook, snippet)
    session = self._get_session(notebook, snippet['type'])

    query = self._prepare_hql_query(snippet, statement['statement'], session)
    ###### 开始注入, 伪代码
    # statement['statement'] 是从前端提交的sql
    check_result = check(statement['statement'])
    if check_result['error']:
        raise QueryError('self defined error', handle=statement)
    ###### 结束 ######

    try:
      if statement.get('statement_id') == 0:
        if query.database and not statement['statement'].lower().startswith('set'):
          db.use(query.database)
      handle = db.client.query(query, with_multiple_session=True) # Note: with_multiple_session currently ignored
    except QueryServerException, ex:
      raise QueryError(ex.message, handle=statement)

    # All good
    server_id, server_guid = handle.get()
    response = {
      'secret': server_id,
      'guid': server_guid,
      'operation_type': handle.operation_type,
      'has_result_set': handle.has_result_set,
      'modified_row_count': handle.modified_row_count,
      'log_context': handle.log_context,
      'session_guid': handle.session_guid
    }
    response.update(statement)
    ##### 注入: 把自定义的警告消息也混入response
    response['warnings'] = check_result['warn']
    ##### 注入结束
    return response

以上就是函数的源代码,注入的地方已经添加了注释。如果检查出现严重的错误,就停止执行,返回自定义的错误。会直接在前端展示出来,因为raise QueryError就是执行出错时,把错误抛给前端的方式。

而如果有警告信息,就让它随着接口返回。这部分想要被展示出来,就需要研究下前端是如何展示错误信息的,能否把收到的 warnings 也以error的形式展示。

前端展示

Django是典型的MVC模型,从 url 到 view,比较好追溯。前端相对来说就麻烦很多,主要是不熟悉它的架构。尤其是混用了mako模板和kknockoutjs 这个比较旧的框架。

模板和框架的了解过程就省去不表了,先来看看 knockoutjs 的效果吧。如果有hue平台,打开hive editor,在console里测试下面的代码:

var ace_id = $('.ace_editor').attr('id')
# 获取 knockoutjs 的环境
var vm = ko.dataFor(document.getElementById(ace_id));
# push一个error信息
vm.errors.push({message: 'this is an error', help: null, line: null, col: null})

前端接收到后端的response之后,是如何处理的呢?如何在不影响正常执行的情况下, 把警告(提示)信息也展示出来?

打开hive editor时,查看都加载了哪些javascipt文件,查看哪个文件会请求/notebook/api/execute/hive这个url。同时 观察到response中返回的字段有has_result_set, log_context 等,哪个文件包括有这些关键字。二者结合,可以定位到build/static/notebook/js/notebook.ko.js这个文件,而且该文件明显是使用knockoutjs开发的。

读懂之后,把返回的warnings,插入到self.errors就可以了。这也是前文在console中插入错误消息使用的办法。

  <!-- begin: 以下是需要添加的. -->
  if (data.handle.warnings && data.handle.warnings.length) {
    data.handle.warnings.forEach(err => {
      self.errors.push({message: err, help: null, line: null, col: null});
    });
  }
  <!-- end -->
} else {
  self._ajaxError(data, self.execute);
}

顺便提一下,在最新版的hue中,notebook.ko.js被替代为hue/desktop/core/src/desktop/js/apps/notebook/snippet.js

One More Thing

vm.errors.push 的方式,只能展示纯文本。没办法改变字体的大小,颜色,或者添加超链接等等。

如果想要注入HTML标签,可以修改hue/desktop/libs/notebook/src/notebook/templates/editor_components.mako这个文件:

<!-- 原本的样子: 把text改成html就可以了
        <li data-bind="text: message"></li> 
-->
        <li data-bind="html: message"></li>

然后刷新页面,就可以push一个可点击的链接出来。

vm.errors.push({message: '<a href="#">这是一个超链</a>', help: null, line: null, col: null})`

至于改变字体颜色大小等等,添加style属性去定义就好了。

总结

其实cloudera早在2017年的新版hue中就已经做了这个功能: https://blog.cloudera.com/new-in-cloudera-enterprise-5-12-hue-4-interface-and-query-assistant/, 不过看起来是个收费功能。

了解hue的整体开发框架(Django, mako, knockoutjs), 摸清来龙去脉之后,完全可以自己做。需要添加的代码也就十多行。最核心的,是SQL检查器,错误的自动检查和相应的优化建议。

Dec 12, 2018 - 散列函数与分流算法

散列函数

散列函数(hash function)对一种对任意输入,都返回一个固定长度输出的函数。常被用来检测信息的完整性,常用的函数有MD5,SHA1等。下载软件时,有的网站会提供一个md5值,下载完成后可以计算软件的md5值,对比是否与网站上的一致。如果不一致,可能是没下完整,也可以是被黑客”改造后”的软件,尽量不要安装。

散列函数应该有以下特点:

  • 同样的输入,保证会有同样的输出。

  • 很难找到其他的输入,使得它的输出与指定的输出相等。保证如果输入的信息被篡改,那输出的散列值变化的概率几乎为1。

第二个特点被称做“弱抗碰撞性”。碰撞就是说两条不同的信息,散列值相等。理论上碰撞当然是不可避免的,比如MD5函数固定地返回32位字母和数字的组合。 这个组合有(26 + 10)^32种,但输入的信息是无穷多个可能。所以散列函数无法保证不碰撞,只能尽量让输出保持随机性,降低碰撞的概率。

分流算法

分流算法是公司做AB测试系统时,将不同的用户分配到不同实验时使用的算法。分流算法需要做到的效果是:

  • 随机性,保证每个实验的用户群体结构类似

  • 指定时间内,同一个用户被分配到的实验id不会变

这两个特点刚好是散列函数的特长。只要把时间因素加入散列函数,就可以保证在指定时间内,输出的不变性,同时随机性也完全有保证。

实战

import time
import random
from hashlib import md5
SALT = 'add some random salt in hash function'
EXPID_CONF = {'A': 30, 'B': 20, 'C': 50}

def split_stream(uid, expid_conf=None, unchange_time=7 * 24 * 3600):
    """
    @param uid: 用户id
    @param expid_conf: 实验ID和流量配置,默认使用 EXPID_CONF 的配置
    @param unchange_time: 多长时间内保持分流结果不变,默认7天
    """
    expid_conf = expid_conf or EXPID_CONF
    for val in expid_conf.values():
        assert val >= 0
    # 计算随机的hash值
    time_factor = int(time.time() / unchange_time)
    msg = '{uid}+{salt}+{t}'.format(uid=uid, salt=SALT, t=time_factor)
    hash_bytes = md5(msg.encode()).digest()

    # hash值转为数字, 对总流量取模, 保证  0 <= rand_int <= stream_sum
    stream_sum = sum(expid_conf.values())
    rand_int = int.from_bytes(hash_bytes, byteorder='big') % stream_sum

    # 计算分流结果,判断rand_int的取值落在哪个实验区间即可
    stream_seq = sorted(expid_conf.items(), key=operator.itemgetter(1))
    for expid, stream_count in stream_seq:
        if rand_int < stream_count:
            return expid
        rand_int -= stream_count


if __name__ == '__main__':
    # 随便测试
    from collections import Counter
    res = []
    for i in range(0, 10000):
        uid = random.randint(0, 100000)
        res.append(split_stream(uid))
    print(Counter(res))

Dec 11, 2018 - 性能提升: reids与内存缓存

redis是基于内存的数据库,单次查询的速度很快。但要查询的数据分布在不同的key里,且查询字段较多时,速度依然会被拖慢。简单来说,性能分为网络耗时查询耗时

当数据量比较小,更新不频繁,而且查询逻辑复杂时,可以把数据读到内存中,定时从redis更新。这种方法把网络耗时降到0,查询耗时又不会比redis差。

如果数据量比较大,读进内存这一步就很耗时,只能在redis中查询。可以使用pipiline或者lua脚本来减少网络耗时,尽量在一次网络交互就拿到所有查询结果。

内存缓存

将redis中的数据,读到内存中。使用时,直接对自带的内存做计算,并且可以转换成更方便的数据结构进行查询。适用的场景是数据量小,更新不频繁

关键的问题,在于更新内存中的缓存。什么时候更新,如何更新?

带有效期的字典: expiredict

expiringdict 实现了一个带有效期(和长度限制)的字典,可以设置key在多久之后过期。简单的测试:

import time
from expiringdict import ExpiringDict
cache = ExpiringDict(max_age_seconds=3, max_len=10)
cache['test'] = 123
time.sleep(3)
'test' in cache # False

接下来一个简单的函数,就能保证(1)总能拿到有效的数据 (2)过期后自动获取数据,并返回更新后的数据。

def get_cache_data(key):
  global cache
  if key in cache:
    cacahe[key] = 'run function to get data from redis'
  return cacahe[key]

值得注意的是,run function to get data from redis这一步也可以做适当的优化。 如果在redis存贮的数据,是hash或者set等复杂的数据结构,应该把数据序列化成字符,存贮成字符串对象,读取后再反序列化。因为redis对hash结构HGETALL的操作,比GET操作要耗时的多(GET操作是最快的)。做个简单的测试就可以体会到了:

import time
import json

cli = get_redis_cli()

def read_hash():
  t1 = time.time()
  res = cli.hgetall('hash_data')
  print('read_hash time cost: {0:.2f}ms'.format(1000 * (time.time() - t1)))
  return res

def read_kv():
  t1 = time.time()
  res = json.loads(cli.hget('kv_data'))
  print('read_kv time cost: {0:.2f}ms'.format(1000 * (time.time() - t1)))
  return res

if __name__ == '__main__':
  data = {i: i + 1 for i in rang(10000)}
  cli.hmset('hash_data', data)
  cli.set('kv_data', json.dumps(data))
  read_kv(); read_hash()

data比较小时可能差异不明显,测试中长度达到1w,区别就很显著了。

基于共享内存的更新方法

不论怎样,更新内存时run function to get data from redis这一步总会有多余的耗时。有没有可能把这部分也优化掉?

随手查到mmap,这家伙把内存数据映射到一个文件,其他进程可以对文件进行读写操作,进而共享控制同一块内存。gensim就是用mmap来做并行训练的。

可行的操作是,线下定时更新文件对应的内存数据,线上的N个进程从文件读取更新的数据。这就把内存更新的耗时完全移到线下了,而且多进程共享还能节省内存。唯一的缺点就是,需要再线下维护一个定时任务。

降低网络开销

数据量比较大或者更新频繁时,只能从redis做查询,得到结果。能够优化的,仅仅是减少服务器与redis的通信次数,降低频繁的数据传输的耗时。

两种方案,一是redis内置的pipeline,一次发送多个查询,执行完毕后返回多个查询的结果;二是使用lua脚本,将脚本发送到redis服务器执行。相比pipeline,lua脚本的灵活性更高一些。如果多个查询之间有逻辑依赖(比如如果’test’在某个set里,就直接返回结果,否则再继续查询’test’是否在另一个set里),就是最适合使用lua 的场景了。

pipeline

pipeline使用起来很简单,编写指令,最后execute会把所有指令的结果一起返回。适合多个查询相互独立的场景。

cli = get_strict_redis()
pipe = cli.pipeline()
pipe.sismember('set1', '我在这里吗')
pipe.sismember('set2', '你在哪里啊')
res = pipe.execute()
# [False, False]      # 你们都不在这里

lua脚本

lua是门编程语言,灵活性自然是有保障的。redis提供了对lua环境的支持,可以把lua代码发给redis执行。整体也不算太难,看看语法比葫芦画瓢就行了。

实现一个稍微复杂一点的例子,如果 '我在这里吗' in 'set1'为True,就继续看 '你在哪里啊' in 'set2'是否为True;否则第二个查询就不需要做了,直接返回1。

script = '''local ret = {}
    local exist = redis.call('sismember', 'set1', KEYS[1])
    table.insert(ret, exist)
    if exist1 == 1 then
      table.insert(ret, redis.call('sismember', 'set2', KEYS[2]))
    else
      table.insert(ret, 1)
    end
    return ret
'''
res = rcli.eval(script, 2, '我在这里吗', '你在哪里啊')
# [0, 1]