Datahub 安装失败记录

背景介绍

参考链接: https://blog.csdn.net/ddxygq/article/details/123437072

DataHub是由LinkedIn的数据团队开源的一款提供元数据搜索与发现的工具。

提到LinkedIn,不得不想到大名鼎鼎的Kafka,Kafka就是LinkedIn开源的。LinkedIn开源的Kafka直接影响了整个实时计算领域的发展,而LinkedIn的数据团队也一直在探索数据治理的问题,不断努力扩展其基础架构,以满足不断增长的大数据生态系统的需求。随着数据的数量和丰富性的增长,数据科学家和工程师要发现可用的数据资产,了解其出处并根据见解采取适当的行动变得越来越具有挑战性。为了帮助增长的同时继续扩大生产力和数据创新,创建了通用的元数据搜索和发现工具DataHub。

Datahub作为新一代的元数据管理平台,大有取代老牌元数据管理工具Atlas之势。首先,阿里云也有一款名为DataHub的产品,是一个流式处理平台,本文所述DataHub与其无关。

市面上常见的元数据管理系统有如下几个:

笔者之前白嫖了亚马逊的EC2服务器,在链接文章的教程下尝试安装datahub, 系统默认环境python2.7、python3.7;

安装datahub过程

1、笔者尝试安装了 python3.8,本来开始没有安装3.8,但是在安装完datahub的时候,尝试验证版本号时抱如下第一个错误

python3 -m datahub version
DataHub CLI version: 0.10.0.1
Python version: 3.7.16 (default, Dec 15 2022, 23:24:54) 
[GCC 7.3.1 20180712 (Red Hat 7.3.1-15)]
Exception ignored in: <generator object configure_logging at 0x7f8fcca5a050>
Traceback (most recent call last):
  File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/utilities/logging_manager.py", line 187, in configure_logging
  File "/usr/lib64/python3.7/contextlib.py", line 486, in __exit__
AttributeError: 'NoneType' object has no attribute 'exc_info'

2、上边的错误出现后没有找到合适的解决方案,就直接升级python版本到3.8了,以后就有了一系列的坑,如下第二个错误,发生在对python3.8编译的过程中。

    wget https://www.python.org/ftp/python/3.8.3/Python-3.8.3.tgz

    tar zxvf Python-3.8.3.tgz

    ./configure --prefix=/usr/lib/python3.8
    checking build system type... x86_64-pc-linux-gnu
    checking host system type... x86_64-pc-linux-gnu
    checking for python3.8... no
    checking for python3... python3
    checking for --enable-universalsdk... no
    checking for --with-universal-archs... no
    checking MACHDEP... "linux"
    checking for gcc... no
    checking for cc... no
    checking for cl.exe... no
    configure: error: in `/home/ec2-user/Python-3.8.3':
    configure: error: no acceptable C compiler found in $PATH
    See `config.log' for more details

通过安装 gcc 解决了上边的问题; 因为ec2有权限限制,非root用户执行命令时尽可能带上sudo

    sudo yum -y install gcc-c++

    sudo make && sudo make install

3、然后make install的时候遇到问题三,如下提示,通过安装zlib-devel解决

zipimport.ZipImportError: can't decompress data; zlib not available

    sudo yum install zlib-devel

编译完并通过ln -s 命令修改了python3的软链之后,执行pip3的操作开始报如下的错误,通过安装openssl 和 openssl-devel解决,但是还得从新编译python3.8,从.config开始从新执行之前的命令

python3 -m pip install --upgrade pip wheel setuptools

WARNING: pip is configured with locations that require TLS/SSL, however the ssl module in Python is not available.
WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'SSLError("Can't connect to HTTPS URL because the SSL module is not available.")': /simple/pip/
WARNING: Retrying (Retry(total=3, connect=None, read=None, redirect=None, status=None)) after connection broken by 'SSLError("Can't connect to HTTPS URL because the SSL module is not available.")': /simple/pip/
    sudo yum -y install openssl openssl-devel

    python3 -m pip install --upgrade acryl-datahub

4、之后再次执行datahub的安装,入到如下问题, 通过安装 libffi-devel 解决

 File "/usr/lib/python3.8/lib/python3.8/ctypes/__init__.py", line 7, in 
      from _ctypes import Union, Structure, Array
  ModuleNotFoundError: No module named '_ctypes'
  [end of output]

note: This error originates from a subprocess, and is likely not a problem with pip.
ERROR: Failed building wheel for avro

  File "/usr/lib/python3.8/lib/python3.8/ctypes/__init__.py", line 7, in 
      from _ctypes import Union, Structure, Array
  ModuleNotFoundError: No module named '_ctypes'
  [end of output]

note: This error originates from a subprocess, and is likely not a problem with pip.
ERROR: Failed building wheel for click-default-group

    sudo yum install libffi-devel

5、继续执行datahub的安装,遇到如下问题, 通过安装bzip2-devel 同时从新编译解决(如果不从新编译还会报错)

    File "/usr/lib/python3.8/lib/python3.8/bz2.py", line 19, in 
    from _bz2 import BZ2Compressor, BZ2Decompressor

ModuleNotFoundError: No module named '_bz2'
    sudo yum install bzip2-devel 

    sudo make && sudo make install

直到此时解决以上的问题,才算是把datahub安装好了。

启动datahub

6、如果docker服务没有启动会可能有如下问题;启动服务失败,报标题中的错误,试试启动服务命令前加上 sudo。

The name org.freedesktop.PolicyKit1 was not provided by any .service files See system logs and 'systemctl status docker.service' for details.

7、docker出现的问题如下

ERROR: Got permission denied while trying to connect to the Docker daemon socket at unix:///var/run/docker.sock: Get "http://%2Fvar%2Frun%2Fdocker.sock/v1.24/info": dial unix /var/run/docker.sock: connect: permission denied

通过修改docker 组来处理

Try adding your user to the docker group:

    Run usermod -aG docker "${USER}", then
    either log out and log back in, or run newgrp docker.
    After this you have to restart your docker daemon: sudo service docker restart.

到现在所有的坑基本上踩完了,直接执行如下启动datahub的命令并得到如下提示,GAME OVER!

python3 -m datahub docker quickstart

Total Docker memory configured 0.96GB is below the minimum threshold 3.8GB. You can increase the memory allocated to Docker in the Docker settings.

是的,这是一次全程失败的实验过程,但是笔者认为除了最后设备硬件不够用之外,主要的安装踩坑过程基本上就差不多了。

一致性hash 算法

一、场景描述

假设有三台缓存服务器,用于缓存图片,我们为这三台缓存服务器编号为 01号、02号、03号,现在有3万张图片需要缓存,我们希望这些图片被均匀的缓存到这3台服务器上,以便它们能够分摊缓存的压力。也就是说,我们希望每台服务器能够缓存1万张左右的图片,那么我们应该怎样做呢?

常见的做法是对缓存项的键进行哈希,将hash后的结果对缓存服务器的数量进行取模操作,通过取模后的结果,决定缓存项将会缓存在哪一台服务器上

hash(图片名称)% N

存在的问题:

当服务器数量发生改变时,比如新增加一台缓存服务器,所有缓存在一定时间内是失效的,当应用无法从缓存中获取数据时,则会向后端服务器请求数据;

同理,假设突然有一台缓存服务器出现了故障,那么我们则需要将故障机器移除,那么缓存服务器数量从3台变为2台,同样会导致大量缓存在同一时间失效,造成了缓存的雪崩,后端服务器将会承受巨大的压力,整个系统很有可能被压垮。为了解决这种情况,就有了一致性哈希算法。

二、一致性hash 算法是什么?

一致性哈希算法也是使用取模的方法,但是取模算法是对服务器的数量进行取模,而理论上一致性哈希算法是对 2^32 取模,具体步骤如下:

  • 步骤一:一致性哈希算法将整个哈希值空间按照顺时针方向组织成一个虚拟的圆环,称为 Hash 环;

    将 2^32 想象成一个圆,像钟表一样,钟表的圆可以理解成由60个点组成的圆,而此处我们把这个圆想象成由2^32个点组成的圆    
  • 步骤二:接着将各个服务器使用 Hash 函数进行哈希,具体可以选择服务器的IP或主机名作为关键字进行哈希,从而确定每台机器在哈希环上的位置

    哈希算法:hash(服务器的IP) % 2^32;计算结果一定是 0 到 2^32-1 之间的整数,那么 hash
    环上必定有一个点与这个整数对应,所以我们可以使用这个整数代表服务器,也就是服务器就可以映射到这个环上 
  • 步骤三:最后使用算法定位数据访问到相应服务器:将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针寻找,第一台遇到的服务器就是其应该定位到的服务器

    图片的名称作为 key,所以我们使用下面算法将图片映射在哈希环上:hash(图片名称) % 2^32;
    只要从图片的位置开始,沿顺时针方向遇到的第一个服务器就是图片存放的服务器了
graph LR
A(())

三、一致性hash的优缺点

优点:

一致性Hash算法对于节点的增减都只需重定位环空间中的一小部分数据,只有部分缓存会失效,不至于将所有压力都在同一时间集中到后端服务器上,具有较好的容错性和可扩展性。

缺点:

一致性哈希算法在服务节点太少的情况下,容易因为节点分部不均匀而造成数据倾斜问题,也就是被缓存的对象大部分集中缓存在某一台服务器上,从而出现数据分布不均匀的情况,这种情况就称为 hash 环的倾斜。

通常优化方案:

为了解决这种数据倾斜问题,一致性哈希算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点,一个实际物理节点可以对应多个虚拟节点,虚拟节点越多,hash环上的节点就越多,缓存被均匀分布的概率就越大,hash环倾斜所带来的影响就越小,同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射。 具体做法可以在服务器ip或主机名的后面增加编号来实现

四、python实现一致性hash

# -*- coding: utf-8 -*-
import hashlib

class ConsistHash(object):
    def __init__(self, nodes=None, n_number=12) -> None:
        """
        nodes:           所有的节点
        n_number:        一个节点对应多少个虚拟节点
        """
        self._n_number = n_number   #每一个节点对应多少个虚拟节点,这里默认是3个
        self._node_dict = dict()    #用于将虚拟节点的hash值与node的对应关系
        self._sort_list = []        #用于存放所有的虚拟节点的hash值,这里需要保持排序
        if nodes:
            for node in nodes:
                self.add_node(node)

    def add_node(self, node) ->None:
        """
        添加node,首先要根据虚拟节点的数目,创建所有的虚拟节点,并将其与对应的node对应起来
        当然还需要将虚拟节点的hash值放到排序的里面
        这里在添加了节点之后,需要保持虚拟节点hash值的顺序

        """
        for i in range(self._n_number):
            node_str = "%s#%s" % (node,i)    #虚拟节点=n#真实节点#n
            key = self._gen_key(node_str)     #计算虚拟节点的key
            self._node_dict[key] = node       #key和真实节点的对应关系
            self._sort_list.append(key)
        self._sort_list.sort()

    @staticmethod
    def _gen_key(key_str) ->str:
        """
        通过key,返回当前key的hash值,这里采用md5
        """
        return hashlib.md5(key_str.encode(encoding='UTF-8')).hexdigest()   #16进制

    def get_node(self, key_str):
        """
        返回这个字符串应该对应的node,这里先求出字符串的hash值,然后找到第一个小于等于的虚拟节点,然后返回node
        如果hash值大于所有的节点,那么用第一个虚拟节点
        """
        if self._sort_list:
            key = self._gen_key(key_str)
            for node_key in self._sort_list:
                if key <= node_key:
                    return self._node_dict[node_key]
            return self._node_dict[self._sort_list[0]]
        else:
            return None

    def remove_node(self, node):
        """
        这里一个节点的退出,需要将这个节点的所有的虚拟节点都删除
        """
        for i in range(self._n_number):
            node_str = "%s#%s" % (node, i)
            key = self._gen_key(node_str)
            del self._node_dict[key]
            self._sort_list.remove(key)

if __name__ == "__main__":
    cons = ConsistHash(["192.168.1.1","192.168.1.2","192.168.1.3","192.168.1.4"])
    print(cons.get_node("DV001"))
    print(cons.get_node("DV002"))
    print(cons.get_node("DV003"))
    print(cons.get_node("DV004"))
    print(cons.get_node("DV005"))
    print(cons.get_node("DV006"))
    print(cons.get_node("DV007"))

参考链接: https://blog.csdn.net/a745233700/article/details/120814088

python socket 零拷贝

This module provides access to the BSD socket interface. It is available on all modern Unix systems, Windows, MacOS, and probably additional platforms.

python document

Socket 是任何一种计算机网络通讯中最基础的内容。socket起源于Unix,而Unix/Linux基本哲学之一就是“一切皆文件”,对于文件用【打开】【读写】【关闭】模式来操作。socket就是该模式的一个实现,socket即是一种特殊的文件,一些socket函数就是对其进行的操作(读/写IO、打开、关闭)

python socket 介绍

传统的文件传输里面(read/write方式),在实现上需要经过多次上下文的切换,文件数据实际上是经过了四次copy操作:

硬盘—>内核buf—>用户buf—>socket相关缓冲区—>协议引擎

1. Socket 类型

套接字格式:

socket(family,type[,protocal]) 使用给定的地址族、套接字类型、协议编号(默认为0)来创建套接字。

socket类型 描述
socket.AF_UNIX 只能够用于单一的Unix系统进程间通信
socket.AF_INET 服务器之间网络通信
socket.AF_INET6 IPv6
socket.SOCK_STREAM 流式socket , for TCP
socket.SOCK_DGRAM 数据报式socket , for UDP
socket.SOCK_RAW 原始套接字,普通的套接字无法处理ICMP、IGMP等网络报文,而SOCK_RAW可以;
其次,SOCK_RAW也可以处理特殊的IPv4报文;此外,利用原始套接字,可以通过IP_HDRINCL套接字选项由用户构造IP头。
socket.SOCK_SEQPACKET 可靠的连续数据包服务
创建TCP Socket: s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
创建UDP Socket: s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  1. Socket 函数

    1. TCP发送数据时,已建立好TCP连接,所以不需要指定地址。UDP是面向无连接的,每次发送要指定是发给谁。
    2. 服务端与客户端不能直接发送列表,元组,字典。需要字符串化repr(data)。
服务端socket函数 描述
s.bind(address) 将套接字绑定到地址, 在AF_INET下,以元组(host,port)的形式表示地址.
s.listen(backlog) 开始监听TCP传入连接。backlog指定在拒绝连接之前,操作系统可以挂起的最大连接数量。该值至少为1,大部分应用程序设为5就可以了。
s.accept() 接受TCP连接并返回(conn,address),其中conn是新的套接字对象,可以用来接收和发送数据。address是连接客户端的地址。
客户端socket函数 描述
s.connect(address) 连接到address处的套接字。一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。
s.connect_ex(adddress) 功能与connect(address)相同,但是成功返回0,失败返回errno的值。
公共socket函数 描述
s.recv(bufsize[,flag]) 接受TCP套接字的数据。数据以字符串形式返回,bufsize指定要接收的最大数据量。flag提供有关消息的其他信息,通常可以忽略。
s.send(string[,flag]) 发送TCP数据。将string中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于string的字节大小。
s.sendall(string[,flag]) 完整发送TCP数据。将string中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回None,失败则抛出异常。
s.recvfrom(bufsize[.flag]) 接受UDP套接字的数据。与recv()类似,但返回值是(data,address)。其中data是包含接收数据的字符串,address是发送数据的套接字地址。
s.sendto(string[,flag],address) 发送UDP数据。将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。
s.close() 关闭套接字。
s.getpeername() 返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)。
s.getsockname() 返回套接字自己的地址。通常是一个元组(ipaddr,port)
s.setsockopt(level,optname,value) 设置给定套接字选项的值。
s.getsockopt(level,optname[.buflen]) 返回套接字选项的值。
s.settimeout(timeout) 设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect())
s.gettimeout() 返回当前超时期的值,单位是秒,如果没有设置超时期,则返回None。
s.fileno() 返回套接字的文件描述符。
s.setblocking(flag) 如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。
非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。
s.makefile() 创建一个与该套接字相关连的文件
  1. socket编程思路

TCP服务端:

1. 创建套接字,绑定套接字到本地IP与端口 

# socket.socket(socket.AF_INET,socket.SOCK_STREAM) , s.bind()

2. 开始监听连接                   #s.listen()

3. 进入循环,不断接受客户端的连接请求              #s.accept()

4. 然后接收传来的数据,并发送给对方数据         #s.recv() , s.sendall()

5. 传输完毕后,关闭套接字                     #s.close()

TCP客户端:

1. 创建套接字,连接远端地址

# socket.socket(socket.AF_INET,socket.SOCK_STREAM) , s.connect()

2. 连接后发送数据和接收数据          # s.sendall(), s.recv()

3. 传输完毕后,关闭套接字          #s.close()

socket server 编写步骤该要

1. 第一步是创建socket对象。
    调用socket构造函数。如:
    socket = socket.socket( family, type )
    family参数代表地址家族,可为AF_INET或AF_UNIX。AF_INET家族包括Internet地址,AF_UNIX家族用于同一台机器上的进程间通信。
    type参数代表套接字类型,可为SOCK_STREAM(流套接字)和SOCK_DGRAM(数据报套接字)。
2. 第二步是将socket绑定到指定地址。
    这是通过socket对象的bind方法来实现的:
    socket.bind( address )
    由AF_INET所创建的套接字,address地址必须是一个双元素元组,格式是(host,port)。host代表主机,port代表端口号。
    如果端口号正在使用、主机名不正确或端口已被保留,bind方法将引发socket.error异常。
3. 第三步是使用socket套接字的listen方法接收连接请求。
    socket.listen( backlog )
    backlog指定最多允许多少个客户连接到服务器。它的值至少为1。收到连接请求后,这些请求需要排队,如果队列满,就拒绝请求。
4. 第四步是服务器套接字通过socket的accept方法等待客户请求一个连接。
    connection, address = socket.accept()
    调用accept方法时,socket会时入“waiting”状态。客户请求连接时,方法建立连接并返回服务器。
    accept方法返回一个含有两个元素的元组(connection,address)。
    第一个元素connection是新的socket对象,服务器必须通过它与客户通信;
    第二个元素 address是客户的Internet地址。
5. 第五步是处理阶段。
    服务器和客户端通过send和recv方法通信(传输 数据)。
    服务器调用send,并采用字符串形式向客户发送信息。send方法返回已发送的字符个数。
    服务器使用recv方法从客户接收信息。调用recv 时,服务器必须指定一个整数,它对应于可通过本次方法调用来接收的最大数据量。
    recv方法在接收数据时会进入“blocked”状态,最后返回一个字符串,用它表示收到的数据。
    如果发送的数据量超过了recv所允许的,数据会被截短。多余的数据将缓冲于接收端。以后调用recv时,
    多余的数据会从缓冲区 删除(以及自上次调用recv以来,客户可能发送的其它任何数据)。
6. 传输结束,服务器调用socket的close方法关闭连接。

scoket client 编写步骤该要

1. 创建一个socket以连接服务器:socket = socket.socket( family, type )
2. 使用socket的connect方法连接服务器。对于AF_INET家族,连接格式如下:
    socket.connect( (host,port) )
    host代表服务器主机名或IP,port代表服务器进程所绑定的端口号。
    如连接成功,客户就可通过套接字与服务器通信,如果连接失败,会引发socket.error异常。
3. 处理阶段,客户和服务器将通过send方法和recv方法通信。
4. 传输结束,客户通过调用socket的close方法关闭连接。

python scoket 零拷贝代码实例

接受服务端基本代码实现

    import socket

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_port =5678
    server_addr = ("0.0.0.0", server_port)
    print(f"Start server on port{server_port}")
    sock.bind(server_addr)

    sock.listen(1)

    while True:
        print("Waiting for connection")
        connection, client_addr = sock.accept()
        size =0
        try:
            i =0
            while  True:
                data = connection.recv(65536)
                i +=1
                if data:
                    size += len(data)
                else:
                    print("Done receiving data")
                    break
            print(f"Total size:{size}")
        except Exception as e:
            print(e)
        finally:
            connection.close()

传输客户端实现, 零拷贝的主要实现代码是通过 os.sendfile来达到目的

    import socket
    import time, os

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    server_port =5678
    server_address = ('127.0.0.1', server_port)

    sock.connect(server_address)
    start = time.time()

    try:
        with open(r'/tmp/吞噬星空.txt','rb') as f:
            # 普通方式传输代码实现
            # message = f.read()
            # sock.sendall(message)
            ret =0
            offset =0
            while True:
                ret = os.sendfile(sock.fileno(), f.fileno(), offset,65536)
                offset += ret
                if ret ==0:
                    break
    except Exception as e:
        print(e)
    finally:
        sock.close()
    end = time.time()
    print("Total time: ", end - start)