Python 客户端

pegasus python client

项目地址

https://github.com/apache/incubator-pegasus/tree/master/python-client

版本要求

Python 3.7+

安装

pip3 install pypegasus3

使用

pegasus python client 使用了 twisted, 编写的代码会带有 twisted 的影子。

示例

完整的示例请参考 sample。以下是简单的示例:

#!/usr/bin/env python
# coding:utf-8

from pypegasus.pgclient import Pegasus

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks


@inlineCallbacks
def basic_test():
    # init
    c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602'], 'temp')

    suc = yield c.init()
    if not suc:
        reactor.stop()
        print('ERROR: connect pegasus server failed')
        return

    # set
    try:
        ret = yield c.set('hkey1', 'skey1', 'value', 0, 500)
        print('set ret: ', ret)
    except Exception as e:
        print(e)

    # get
    ret = yield c.get('hkey1', 'skey1')
    print('get ret: ',  bytes.decode(ret))

    reactor.stop()


if __name__ == "__main__":
    reactor.callWhenRunning(basic_test)
    reactor.run()

log 配置文件

pegasus python client 使用了 logging日志包,默认配置如下:

[loggers]
keys=root
[logger_root]
level=INFO
handlers=hand01
propagate=0
[handlers]
keys=hand01
[handler_hand01]
class=handlers.RotatingFileHandler
formatter=form01
args=('pegasus.log', 'a', 100*1024*1024, 10)
[formatters]
keys=form01
[formatter_form01]
format=%(asctime)s [%(thread)d] [%(levelname)s] %(filename)s:%(lineno)d %(message)s
datefmt=%Y-%m-%d %H:%M:%S

如果用户有定制需求,可以在自己的代码目录添加配置文件 logger.conf

API 说明

初始化

初始化先构造 Pegasus 对象,在使用 init 函数完成初始化:

class Pegasus(object):
    """
    Pegasus client class.
    """
    
    def __init__(self, meta_addrs=None, table_name='',
                 timeout=DEFAULT_TIMEOUT):
        """
        :param meta_addrs: (list) pagasus meta servers list.
                           example: ['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603']
        :param table_name: (bytes) table name/app name used in pegasus.
        :param timeout: (int) default timeout in milliseconds when communicate with meta sever and replica server.
        """
    def init(self):
        """
        Initialize the client before you can use it.

        :return: (DeferredList) True when initialized succeed, others when failed.
        """

ttl

判断 key 的剩余的 ttl 时间

def ttl(self, hash_key, sort_key, timeout=0):
    """
    Get ttl(time to live) of the data.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, int>) (code, ttl)
             code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
             ttl: in seconds, -1 means forever.
    """

exist

判断 key 是否存在

def exist(self, hash_key, sort_key, timeout=0):
    """
    Check value exist.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, None>) (code, ign)
             code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
             ign: useless, should be ignored.
    """

set

插入一条数据(若已存在则会覆盖)

def set(self, hash_key, sort_key, value, ttl=0, timeout=0):
    """
    Set value to be stored in <hash_key, sort_key>.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param value: (bytes) value to be stored under <hash_key, sort_key>.
    :param ttl: (int) ttl(time to live) in seconds of this data.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, None>) (code, ign)
             code: error_types.ERR_OK.value when data stored succeed.
             ign: useless, should be ignored.
    """

multi_set

同时写一条 hashkey 的多条 sortkey 数据

def multi_set(self, hash_key, sortkey_value_dict, ttl=0, timeout=0):
    """
    Set multiple sort_keys-values under hash_key to be stored.

    :param hash_key: (bytes) which hash key used for this API.
    :param sortkey_value_dict: (dict) <sort_key, value> pairs in dict.
    :param ttl: (int) ttl(time to live) in seconds of these data.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, _>) (code, ign)
             code: error_types.ERR_OK.value when data stored succeed.
             ign: useless, should be ignored.
    """

get

获取一条数据

def get(self, hash_key, sort_key, timeout=0):
    """
    Get value stored in <hash_key, sort_key>.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, bytes>) (code, value).
             code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
             value: data stored in this <hash_key, sort_key>
    """

multi_get

同时读一条 hashkey 的多条 sortkey 数据

def multi_get(self, hash_key,
              sortkey_set,
              max_kv_count=100,
              max_kv_size=1000000,
              no_value=False,
              timeout=0):
    """
    Get multiple values stored in <hash_key, sortkey> pairs.

    :param hash_key: (bytes) which hash key used for this API.
    :param sortkey_set: (set) sort keys in set.
    :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
    :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
    :param no_value: (bool) whether to fetch value of these keys.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, dict>) (code, kvs)
             code: error_types.ERR_OK.value when data got succeed.
             kvs: <sort_key, value> pairs in dict.
    """

multi_get_opt

同时读一条 hashkey 的多条 sortkey 数据, 读取的数据根据 multi_get_options 参数指定的模式确定。

def multi_get_opt(self, hash_key,
                  start_sort_key, stop_sort_key,
                  multi_get_options,
                  max_kv_count=100,
                  max_kv_size=1000000,
                  timeout=0):
    """
    Get multiple values stored in hash_key, and sort key range in [start_sort_key, stop_sort_key) as default.

    :param hash_key: (bytes) which hash key used for this API.
    :param start_sort_key: (bytes) returned k-v pairs is start from start_sort_key.
    :param stop_sort_key: (bytes) returned k-v pairs is stop at stop_sort_key.
    :param multi_get_options: (MultiGetOptions) configurable multi_get options.
    :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
    :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, dict>) (code, kvs)
             code: error_types.ERR_OK.value when data got succeed.
             kvs: <sort_key, value> pairs in dict.
    """

其中,MultiGetOptions 可以指定 sortkey 的范围、是否包含边界、子串匹配、是否返回 value、是否逆序等,具体定义如下:

class MultiGetOptions(object):
    """
    configurable options for multi_get.
    """

    def __init__(self):
        self.start_inclusive = True
        self.stop_inclusive = False
        self.sortkey_filter_type = filter_type.FT_NO_FILTER
        self.sortkey_filter_pattern = ""
        self.no_value = False
        self.reverse = False

class filter_type:
  FT_NO_FILTER = 0
  FT_MATCH_ANYWHERE = 1
  FT_MATCH_PREFIX = 2
  FT_MATCH_POSTFIX = 3

remove

删除一条数据

def remove(self, hash_key, sort_key, timeout=0):
    """
    Remove the entire <hash_key, sort_key>-value in pegasus.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, None>) (code, ign)
             code: error_types.ERR_OK.value when data stored succeed.
             ign: useless, should be ignored.
    """

multi_del

批量删除一个 hashkey 下的多条 sortkey 数据

def multi_del(self, hash_key, sortkey_set, timeout=0):
    """
    Remove multiple entire <hash_key, sort_key>-values in pegasus.

    :param hash_key: (bytes) which hash key used for this API.
    :param sortkey_set: (set) sort keys in set.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, int>) (code, count).
             code: error_types.ERR_OK.value when data got succeed.
             count: count of deleted k-v pairs.
    """

sort_key_count

获取一个 hashkey 下的 sortkey 数量

def sort_key_count(self, hash_key, timeout=0):
    """
    Get the total sort key count under the hash_key.

    :param hash_key: (bytes) which hash key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, count>) (code, count)
             code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
             value: total sort key count under the hash_key.
    """

get_sort_keys

获取一个 hashkey 下的 sortkey 值

def get_sort_keys(self, hash_key,
                  max_kv_count=100,
                  max_kv_size=1000000,
                  timeout=0):
    """
    Get multiple sort keys under hash_key.

    :param hash_key: (bytes) which hash key used for this API.
    :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
    :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, set>) (code, ks)
             code: error_types.ERR_OK.value when data got succeed.
             ks: <sort_key, ign> pairs in dict, ign will always be empty bytes.
    """

get_scanner

获取 scanner 对象,用于指定范围的数据扫描。可以通过 scan_options 参数指定扫描的模式。

def get_scanner(self, hash_key,
                start_sort_key, stop_sort_key,
                scan_options):
    """
    Get scanner for hash_key, start from start_sort_key, and stop at stop_sort_key.
    Whether the scanner include the start_sort_key and stop_sort_key is configurable by scan_options

    :param hash_key: (bytes) which hash key used for this API.
    :param start_sort_key: (bytes) returned scanner is start from start_sort_key.
    :param stop_sort_key: (bytes) returned scanner is stop at stop_sort_key.
    :param scan_options: (ScanOptions) configurable scan options.
    :return: (PegasusScanner) scanner, instance of PegasusScanner.
    """

其中,ScanOptions 可以指定是否包含边界、超时时间、一次从 replica server 批量获取的 sortkey-value 数量等,具体定义如下:

class ScanOptions(object):
    """
    configurable options for scan.
    """

    def __init__(self):
        self.timeout_millis = 5000
        self.batch_size = 1000
        self.start_inclusive = True
        self.stop_inclusive = False
        self.snapshot = None                   # for future use

get_unordered_scanners

一次性获取多个 scanner,用于整个 table 的数据扫描。可以通过 scan_options 参数指定扫描的模式。

def get_unordered_scanners(self, max_split_count, scan_options):
    """
    Get scanners for the whole pegasus table.

    :param max_split_count: (int) max count of scanners will be returned.
    :param scan_options: (ScanOptions) configurable scan options.
    :return: (list) instance of PegasusScanner list.
             each scanner in this list can scan separate part of the whole pegasus table.
    """

scanner对象

用于数据扫描的对象,由 get_scannerget_unordered_scanners 返回。使用它的 next 函数执行扫描过程。

class PegasusScanner(object):
    """
    Pegasus scanner class, used for scanning data in pegasus table.
    """

get_next

获取扫描得到的数据,需要循环执行,直到返回 None 结束扫描。

def get_next(self):
    """
    scan the next k-v pair for the scanner.
    :return: (tuple<tuple<hash_key, sort_key>, value> or None)
                all the sort_keys returned by this API are in ascend order.
    """
Copyright © 2023 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.

Apache Pegasus is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by the Apache Incubator. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.

Apache Pegasus, Pegasus, Apache, the Apache feather logo, and the Apache Pegasus project logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.