プログラミング

PythonでDynamoDBに複数登録&削除する方法【boto3のソースコードを読もう】

PythonでDynamoDBを操作するにはboto3というライブラリを使用します。

複数の登録または削除のリクエストをするにはbatch_writerを使用します。

アイテムを登録する場合はput_item、削除する場合はdelete_itemを使用します。

batch_writerは25件までしかリクエストをまとめられませんが、boto3はその上限を上手いこと回避する実装をしています。

前半では登録&削除のコードの例、後半はboto3のコードを読んでbatch_writerの25件上限対策について解析します。

 

実装例

条件は以下の通りです。最小要素で進めていきます。

  • テーブル名 : Users

アイテムのKey

  • id(プライマリーキー) : Number
  • name : String

 

DynamoDBにPythonで複数登録

アイテムを100件作成し、DynamoDBに登録します。

import boto3

dynamoDB = boto3.resource("dynamodb")
table = dynamoDB.Table("Users")

items = [{"id": i, "name": "hoge_" + str(i)} for i in range(100)]

with table.batch_writer() as batch:
    for item in items:
        batch.put_item(Item=item)

 

1~4行目でUsersテーブルを操作するインスタンスを作成します。

8行目のwith句でbatch_writerを準備します。

9~10行目でfor文で100件のアイテムを登録します。

batch.put_itemではリクエストをスタックしているだけで、25件貯まるorwith句を抜けるとリクエストを実施します。

DynamoDBにPythonで複数削除

先ほどの作成したアイテム100件をDynamoDBから削除します。

import boto3

dynamoDB = boto3.resource("dynamodb")
table = dynamoDB.Table("Users")

keys = [{"id": i} for i in range(100)]

with table.batch_writer() as batch:
    for key in keys:
        batch.delete_item(Key=key)

 

削除するときは、アイテムのプライマリーキーを指定します。今回はパーテンションキーのみですが、ソートキーも定義している場合は合わせてキーとしてください。

6行目で削除対象のkey(id)の配列を作成します。

あとは登録と同様batch_writerを準備し、for文でdelete_itemにkeyを渡します。

 

boto3のbatch_writerについて解説

これまで使ってきたbatch_writerの正体は以下のコードです。

boto3/dynamodb/table.py (63~167行目)

長いのでかいつまんで解説

160~161行目

    def __enter__(self):
        return self

 

def __enter__ はwith句で呼ばれたときに実行します。

with table.batch_writer() as batch: でbatchにBatchWriterクラスのインスタンスが渡されるということです。

102~112行目

    def put_item(self, Item):
        self._add_request_and_process({'PutRequest': {'Item': Item}})

    def delete_item(self, Key):
        self._add_request_and_process({'DeleteRequest': {'Key': Key}})

    def _add_request_and_process(self, request):
        if self._overwrite_by_pkeys:
            self._remove_dup_pkeys_request_if_any(request)
        self._items_buffer.append(request)
        self._flush_if_needed()

 

put_itemまたはdelete_itemでリクエストを_add_request_and_processに送ります。

self._items_bufferにリクエストを貯めます

貯めたリクエストを消化するか判定します。

 

66~69行目

    def __init__(
        self, table_name, client, flush_amount=25, overwrite_by_pkeys=None
    ):

137~139行目

    def _flush_if_needed(self):
        if len(self._items_buffer) >= self._flush_amount:
            self._flush()

 

self._flush_amountには初期値で25が入っています。

25件以上リクエストがあったら、25件をまとめて送る

163~167行目

    def __exit__(self, exc_type, exc_value, tb):
        # When we exit, we need to keep flushing whatever's left
        # until there's nothing left in our items buffer.
        while self._items_buffer:
            self._flush()

 

def __exit__ はwith句を抜けるときに実行されます。

貯めていたリクエストを消化します。

こんな感じでリクエストをスタックと実行を管理しているのでboto3のbatch_writerは25件の上限を気にする必要は無いんですね。

-プログラミング
-,

Copyright© はるいちの趣味 , 2024 All Rights Reserved Powered by AFFINGER5.