Python 客户端

pegasus python client

项目地址

https://github.com/apache/incubator-pegasus/tree/master/python-client

版本要求

Python 3.7+

安装

pip3 install pypegasus3

使用

pegasus python client使用了twisted, 编写的代码会带有twisted的影子。

示例

完整的示例请参考sample。以下是简单的示例:

#!/usr/bin/env python
# coding:utf-8

from pypegasus.pgclient import Pegasus

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks


@inlineCallbacks
def basic_test():
    # init
    c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602'], 'temp')

    suc = yield c.init()
    if not suc:
        reactor.stop()
        print('ERROR: connect pegasus server failed')
        return

    # set
    try:
        ret = yield c.set('hkey1', 'skey1', 'value', 0, 500)
        print('set ret: ', ret)
    except Exception as e:
        print(e)

    # get
    ret = yield c.get('hkey1', 'skey1')
    print('get ret: ',  bytes.decode(ret))

    reactor.stop()


if __name__ == "__main__":
    reactor.callWhenRunning(basic_test)
    reactor.run()

log配置文件

pegasus python client使用了logging日志包,默认配置如下:

[loggers]
keys=root
[logger_root]
level=INFO
handlers=hand01
propagate=0
[handlers]
keys=hand01
[handler_hand01]
class=handlers.RotatingFileHandler
formatter=form01
args=('pegasus.log', 'a', 100*1024*1024, 10)
[formatters]
keys=form01
[formatter_form01]
format=%(asctime)s [%(thread)d] [%(levelname)s] %(filename)s:%(lineno)d %(message)s
datefmt=%Y-%m-%d %H:%M:%S

如果用户有定制需求,可以在自己的代码目录添加配置文件logger.conf

API说明

初始化

初始化先构造Pegasus对象,在使用init函数完成初始化:

class Pegasus(object):
    """
    Pegasus client class.
    """
    
    def __init__(self, meta_addrs=None, table_name='',
                 timeout=DEFAULT_TIMEOUT):
        """
        :param meta_addrs: (list) pagasus meta servers list.
                           example: ['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603']
        :param table_name: (bytes) table name/app name used in pegasus.
        :param timeout: (int) default timeout in milliseconds when communicate with meta sever and replica server.
        """
    def init(self):
        """
        Initialize the client before you can use it.

        :return: (DeferredList) True when initialized succeed, others when failed.
        """

ttl

判断key的剩余的ttl时间

def ttl(self, hash_key, sort_key, timeout=0):
    """
    Get ttl(time to live) of the data.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, int>) (code, ttl)
             code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
             ttl: in seconds, -1 means forever.
    """

exist

判断key是否存在

def exist(self, hash_key, sort_key, timeout=0):
    """
    Check value exist.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, None>) (code, ign)
             code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
             ign: useless, should be ignored.
    """

set

插入一条数据(若已存在则会覆盖)

def set(self, hash_key, sort_key, value, ttl=0, timeout=0):
    """
    Set value to be stored in <hash_key, sort_key>.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param value: (bytes) value to be stored under <hash_key, sort_key>.
    :param ttl: (int) ttl(time to live) in seconds of this data.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, None>) (code, ign)
             code: error_types.ERR_OK.value when data stored succeed.
             ign: useless, should be ignored.
    """

multi_set

同时写一条hashkey的多条sortkey数据

def multi_set(self, hash_key, sortkey_value_dict, ttl=0, timeout=0):
    """
    Set multiple sort_keys-values under hash_key to be stored.

    :param hash_key: (bytes) which hash key used for this API.
    :param sortkey_value_dict: (dict) <sort_key, value> pairs in dict.
    :param ttl: (int) ttl(time to live) in seconds of these data.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, _>) (code, ign)
             code: error_types.ERR_OK.value when data stored succeed.
             ign: useless, should be ignored.
    """

get

获取一条数据

def get(self, hash_key, sort_key, timeout=0):
    """
    Get value stored in <hash_key, sort_key>.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, bytes>) (code, value).
             code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
             value: data stored in this <hash_key, sort_key>
    """

multi_get

同时读一条hashkey的多条sortkey数据

def multi_get(self, hash_key,
              sortkey_set,
              max_kv_count=100,
              max_kv_size=1000000,
              no_value=False,
              timeout=0):
    """
    Get multiple values stored in <hash_key, sortkey> pairs.

    :param hash_key: (bytes) which hash key used for this API.
    :param sortkey_set: (set) sort keys in set.
    :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
    :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
    :param no_value: (bool) whether to fetch value of these keys.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, dict>) (code, kvs)
             code: error_types.ERR_OK.value when data got succeed.
             kvs: <sort_key, value> pairs in dict.
    """

multi_get_opt

同时读一条hashkey的多条sortkey数据, 读取的数据根据multi_get_options参数指定的模式确定。

def multi_get_opt(self, hash_key,
                  start_sort_key, stop_sort_key,
                  multi_get_options,
                  max_kv_count=100,
                  max_kv_size=1000000,
                  timeout=0):
    """
    Get multiple values stored in hash_key, and sort key range in [start_sort_key, stop_sort_key) as default.

    :param hash_key: (bytes) which hash key used for this API.
    :param start_sort_key: (bytes) returned k-v pairs is start from start_sort_key.
    :param stop_sort_key: (bytes) returned k-v pairs is stop at stop_sort_key.
    :param multi_get_options: (MultiGetOptions) configurable multi_get options.
    :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
    :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, dict>) (code, kvs)
             code: error_types.ERR_OK.value when data got succeed.
             kvs: <sort_key, value> pairs in dict.
    """

其中,MultiGetOptions可以指定sortkey的范围、是否包含边界、子串匹配、是否返回value、是否逆序等,具体定义如下:

class MultiGetOptions(object):
    """
    configurable options for multi_get.
    """

    def __init__(self):
        self.start_inclusive = True
        self.stop_inclusive = False
        self.sortkey_filter_type = filter_type.FT_NO_FILTER
        self.sortkey_filter_pattern = ""
        self.no_value = False
        self.reverse = False

class filter_type:
  FT_NO_FILTER = 0
  FT_MATCH_ANYWHERE = 1
  FT_MATCH_PREFIX = 2
  FT_MATCH_POSTFIX = 3

remove

删除一条数据

def remove(self, hash_key, sort_key, timeout=0):
    """
    Remove the entire <hash_key, sort_key>-value in pegasus.

    :param hash_key: (bytes) which hash key used for this API.
    :param sort_key: (bytes) which sort key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, None>) (code, ign)
             code: error_types.ERR_OK.value when data stored succeed.
             ign: useless, should be ignored.
    """

multi_del

批量删除一个hashkey下的多条sortkey数据

def multi_del(self, hash_key, sortkey_set, timeout=0):
    """
    Remove multiple entire <hash_key, sort_key>-values in pegasus.

    :param hash_key: (bytes) which hash key used for this API.
    :param sortkey_set: (set) sort keys in set.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, int>) (code, count).
             code: error_types.ERR_OK.value when data got succeed.
             count: count of deleted k-v pairs.
    """

sort_key_count

获取一个hashkey下的sortkey数量

def sort_key_count(self, hash_key, timeout=0):
    """
    Get the total sort key count under the hash_key.

    :param hash_key: (bytes) which hash key used for this API.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, count>) (code, count)
             code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found.
             value: total sort key count under the hash_key.
    """

get_sort_keys

获取一个hashkey下的sortkey值

def get_sort_keys(self, hash_key,
                  max_kv_count=100,
                  max_kv_size=1000000,
                  timeout=0):
    """
    Get multiple sort keys under hash_key.

    :param hash_key: (bytes) which hash key used for this API.
    :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
    :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
    :param timeout: (int) how long will the operation timeout in milliseconds.
                    if timeout > 0, it is a timeout value for current operation,
                    else the timeout value specified to create the instance will be used.
    :return: (tuple<error_types.code.value, set>) (code, ks)
             code: error_types.ERR_OK.value when data got succeed.
             ks: <sort_key, ign> pairs in dict, ign will always be empty bytes.
    """

get_scanner

获取scanner对象,用于指定范围的数据扫描。可以通过scan_options参数指定扫描的模式。

def get_scanner(self, hash_key,
                start_sort_key, stop_sort_key,
                scan_options):
    """
    Get scanner for hash_key, start from start_sort_key, and stop at stop_sort_key.
    Whether the scanner include the start_sort_key and stop_sort_key is configurable by scan_options

    :param hash_key: (bytes) which hash key used for this API.
    :param start_sort_key: (bytes) returned scanner is start from start_sort_key.
    :param stop_sort_key: (bytes) returned scanner is stop at stop_sort_key.
    :param scan_options: (ScanOptions) configurable scan options.
    :return: (PegasusScanner) scanner, instance of PegasusScanner.
    """

其中,ScanOptions可以指定是否包含边界、超时时间、一次从replica server批量获取的sortkey-value数量等,具体定义如下:

class ScanOptions(object):
    """
    configurable options for scan.
    """

    def __init__(self):
        self.timeout_millis = 5000
        self.batch_size = 1000
        self.start_inclusive = True
        self.stop_inclusive = False
        self.snapshot = None                   # for future use

get_unordered_scanners

一次性获取多个scanner,用于整个table的数据扫描。可以通过scan_options参数指定扫描的模式。

def get_unordered_scanners(self, max_split_count, scan_options):
    """
    Get scanners for the whole pegasus table.

    :param max_split_count: (int) max count of scanners will be returned.
    :param scan_options: (ScanOptions) configurable scan options.
    :return: (list) instance of PegasusScanner list.
             each scanner in this list can scan separate part of the whole pegasus table.
    """

scanner对象

用于数据扫描的对象,由get_scannerget_unordered_scanners返回。使用它的next函数执行扫描过程。

class PegasusScanner(object):
    """
    Pegasus scanner class, used for scanning data in pegasus table.
    """

get_next

获取扫描得到的数据,需要循环执行,直到返回None结束扫描。

def get_next(self):
    """
    scan the next k-v pair for the scanner.
    :return: (tuple<tuple<hash_key, sort_key>, value> or None)
                all the sort_keys returned by this API are in ascend order.
    """
Copyright © 2023 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.

Apache Pegasus is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by the Apache Incubator. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.

Apache Pegasus, Pegasus, Apache, the Apache feather logo, and the Apache Pegasus project logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.