• 隐藏侧边栏
  • 展开分类目录
  • 关注微信公众号
  • 我的GitHub
  • QQ:1753970025
Chen Jiehua

Dynamodb小记 

目录

最近因为业务需求,接触到 AWS Dynamodb,顺便记录一下一些基本的问题……

简介

Amazon DynamoDB 是一种完全托管的 NoSQL 数据库服务,提供快速而可预测的性能,能够实现无缝扩展。使用 DynamoDB,您可以免除操作和扩展分布式数据库的管理工作负担,因而无需担心硬件预置、设置和配置、复制、软件修补或集群扩展等问题。

表、项目和属性

dynamodb与mongodb比较类似,都是文档型NoSQL数据库。基本 DynamoDB 组件包括:

  • 表 – 类似于其他数据库系统,DynamoDB 将数据存储在表中。表 是数据的集合。例如,请参阅名为 People 的示例表(如下所示),该表可用于存储个人联系信息。
  • 项目 – 每个表包含多个项目。项目 是一组属性,具有不同于所有其他项目的唯一标识。DynamoDB 中的项目在很多方面都类似于其他数据库系统中的行、记录或元组。在示例 People 表中,每个项目表示一位人员。
  • 属性 – 每个项目包含一个或多个属性。属性 是基础的数据元素,无需进一步分解。DynamoDB 中的属性在很多方面都类似于其他数据库系统中的字段或列。例如,示例 People 表中的一个项目包含名为 PersonID、LastName、FirstName 等的属性。

对dynamodb的操作,主要是数据(项目)的写入和查询:

创建数据

  • PutItem – 将单个项目写入到表中。
  • BatchWriteItem – 将最多 25 个项目写入到表中。

读取数据

  • GetItem – 从表中检索单个项目。必须为所需的项目指定主键。
  • BatchGetItem – 从一个或多个表中检索最多 100 个项目。
  • Query – 检索具有特定分区键的所有项目。
  • Scan – 检索指定表或索引中的所有项目。

主键

创建表时,除表名称外,还必须指定表的主键。主键唯一标识表中的每个项目,因此,任意两个项目的主键都不相同。DynamoDB 支持两种不同类型的主键:

  • 分区键 – 简单的主键,由一个称为分区键 的属性组成。
    DynamoDB 使用分区键的值作为内部哈希函数的输入。来自哈希函数的输出决定了项目将存储到的分区(DynamoDB 内部的物理存储)。表中任意两个项目的分区键值都不相同。
  • 分区键和排序键 – 称为复合主键,此类型的键由两个属性组成。第一个属性是分区键,第二个属性是排序键。
    DynamoDB 使用分区键值作为对内部哈希函数的输入。来自哈希函数的输出决定了项目将存储到的分区(DynamoDB 内部的物理存储)。具有相同分区键的所有项目按排序键值的排序顺序存储在一起。两个项目可具有相同的分区键值,但这两个项目必须具有不同的排序键值。

次级索引

DynamoDB 支持两种索引:

  • Global secondary index – 一种带有可能与表中不同的分区键和排序键的索引。
  • Local secondary index – 一种分区键与表中的相同但排序键与表中的不同的索引。

查询和扫描

查询

Query 操作仅使用主键属性值查看表或secondary index中的项目。您必须提供分区键名称和要搜索的确切值。您可以选择提供排序键名称和值,并使用比较运算符来优化搜索结果。

在 Query 操作中,使用 KeyConditionExpression 参数确定要从表或索引中读取的项目。您必须指定分区键名称和值作为等式条件。您可选择为排序键提供另一个条件(如果有)。

单个 Query 请求可以检索最多 1 MB 个数据;DynamoDB 可以向这些数据应用筛选表达式(FilterExpression),从而在将数据返回给用户之前缩小结果范围。

扫描

Scan 操作读取表或secondary index中的每个项目。

单个 Scan 请求可以检索最多 1 MB 个数据;DynamoDB 可以向这些数据应用筛选表达式,从而在将数据返回给用户之前缩小结果范围。

全局二级索引

参考官方文档。

在已有主键(分区键+排序键)的基础上,创建全局二级索引(分区键+排序键)可以更加方便查询数据。

环境配置

在进行业务开发前,我们首先需要在本地搭建一下dynamodb的运行环境,方便调试测试。参考官方文档,下载 dynamodb 并在本地运行:

$ java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb

Boto3

初始化连接

参考boto3官方文档,我们来创建一个带全局二级索引的表。

初始化 dynamodb 连接,本地测试环境我们需要指定 endpoint_url(默认就是 http://localhost:8000):

#!/usr/bin/env python 
# -*- coding: utf-8 -*-

dynamodb = None
table = None

def ddb_init(ddb_config):
    global dynamodb, table
    if ddb_config["test"]:
        session = boto3.session.Session()
        dynamodb = session.resource("dynamodb", endpoint_url=ddb_config["test_endpoint_url"])
    else:
        session = boto3.session.Session(
            aws_access_key_id=ddb_config["aws_access_key_id"],
            aws_secret_access_key=ddb_config["aws_secret_access_key"],
            region_name=ddb_config["region_name"])
        dynamodb = session.resource("dynamodb")
    # ddb table
    table = dynamodb.Table(ddb_config["table"])

创建Table

我们将 id(更加离散) 作为分区键,能够提高 dynamodb 的性能,参考 官方最佳实践;另外:

  • 主键:分区键=id,排序键=prefix,构成唯一组合;
  • 二级索引:分区键=prefix,排序键=field,方便查询;
def create_table(table_name):
    table = dynamodb.create_table(
        TableName=table_name,
        AttributeDefinitions=[
            {
                "AttributeName": "prefix",
                "AttributeType": "S",
            },
            {
                "AttributeName": "id",
                "AttributeType": "S",
            },
            {
                "AttributeName": "field",
                "AttributeType": "S",
            }
        ],
        KeySchema=[
            {
                "AttributeName": "id", 
                "KeyType": "HASH",
            },
            {
                "AttributeName": "prefix",
                "KeyType": "RANGE",
            }
        ],
        GlobalSecondaryIndexes=[
            {
                "IndexName": "prefix_field",
                "KeySchema": [
                    {
                        "AttributeName": "prefix",
                        "KeyType": "HASH",
                    },
                    {
                        "AttributeName": "field",
                        "KeyType": "RANGE",
                    }
                ],
                "Projection": {
                    "ProjectionType": "ALL",
                },
                "ProvisionedThroughput": {
                    "ReadCapacityUnits": 200,
                    "WriteCapacityUnits": 200,
                }
            }
        ],
        ProvisionedThroughput={
            "ReadCapacityUnits": 200,
            "WriteCapacityUnits": 200,
        }
    )

    table.meta.client.get_waiter("table_exists").wait(TableName=table_name)
    print table.item_count

写入数据

对于每一个字段的值,dynamodb不允许为空:

def write_ddb(items):
    with table.batch_writer() as batch:
        for item in items:
            if not item.get("v"):
                continue

            print datetime.now(), "put item:", item
            ddb_item = {
                "prefix": item["prefix"],
                "id": item["id"],
                "field": item["f"],
                "fval": item["v"],
                "rawdata": base64.b64encode(item["rawdata"]),
            }
            batch.put_item(Item=ddb_item)

查询数据

With the table full of items, you can then query or scan the items in the table using theDynamoDB.Table.query() or DynamoDB.Table.scan() methods respectively. To add conditions to scanning and querying the table, you will need to import the boto3.dynamodb.conditions.Key andboto3.dynamodb.conditions.Attr classes. The boto3.dynamodb.conditions.Key should be used when the condition is related to the key of the item. The boto3.dynamodb.conditions.Attr should be used when the condition is related to an attribute of the item.

You are also able to chain conditions together using the logical operators: & (and), | (or), and ~ (not).

由于dynamodb对查询的数据大小有限制(1M),因此我们还需要根据返回结果的 LastEvaluatedKey 继续进行查询。

def query(params):
    prefix = params["prefix"]
    field = {
        "name": params["field"],
        "val": params["fval"],
        "type": params["ftype"],   # FilterExpression, eq/in
        "negate": params["negate"],
    }
    start_key = None
    result = {}
    while True:
        response = _query(prefix, field, start_key)
        http_code = response.get("ResponseMetadata").get("HTTPStatusCode")
        if http_code != 200:
            print datetime.now(), "request dynamodb error code: %s" % http_code

        items = response.get("Items")
        for item in items:
            key = prefix + "&f=%s&v=%s" % (item["field"], item["fval"])
            result[key] = item["rawdata"]

        if response.get("LastEvaluatedKey"):
            start_key = response["LastEvaluatedKey"]
        else:
            break

    return result


def _query(prefix, field, start_key=None):
    p = {
        "IndexName": "prefix_field",
        "KeyConditionExpression": Key("prefix").eq(prefix) & Key("field").eq(field["name"]),
    }
    if start_key:
        p["ExclusiveStartKey"] = start_key

    filter_expression = None
    if field["type"] == "eq":
        filter_expression = Attr("fval").eq(field["val"])

    elif field["type"] == "in":
        filter_expression = Attr("fval").is_in(field["val"])

    if filter_expression:
        if field["negate"]:
            filter_expression = ~filter_expression

        p["FilterExpression"] = filter_expression

    response = table.query(**p)
    return response

 

 

参考:

亚马逊Dynamodb官方文档

码字很辛苦,转载请注明来自ChenJiehua《Dynamodb小记》

评论