PythonでDynamoDBを操作するにはboto3というライブラリを使用します。
複数の登録または削除のリクエストをするにはbatch_writerを使用します。
アイテムを登録する場合はput_item、削除する場合はdelete_itemを使用します。
batch_writerは25件までしかリクエストをまとめられませんが、boto3はその上限を上手いこと回避する実装をしています。
前半では登録&削除のコードの例、後半はboto3のコードを読んでbatch_writerの25件上限対策について解析します。
実装例
条件は以下の通りです。最小要素で進めていきます。
- テーブル名 : Users
アイテムのKey
- id(プライマリーキー) : Number
- name : String
DynamoDBにPythonで複数登録
アイテムを100件作成し、DynamoDBに登録します。
import boto3 dynamoDB = boto3.resource("dynamodb") table = dynamoDB.Table("Users") items = [{"id": i, "name": "hoge_" + str(i)} for i in range(100)] with table.batch_writer() as batch: for item in items: batch.put_item(Item=item)
1~4行目でUsersテーブルを操作するインスタンスを作成します。
8行目のwith句でbatch_writerを準備します。
9~10行目でfor文で100件のアイテムを登録します。
batch.put_itemではリクエストをスタックしているだけで、25件貯まるorwith句を抜けるとリクエストを実施します。
DynamoDBにPythonで複数削除
先ほどの作成したアイテム100件をDynamoDBから削除します。
import boto3 dynamoDB = boto3.resource("dynamodb") table = dynamoDB.Table("Users") keys = [{"id": i} for i in range(100)] with table.batch_writer() as batch: for key in keys: batch.delete_item(Key=key)
削除するときは、アイテムのプライマリーキーを指定します。今回はパーテンションキーのみですが、ソートキーも定義している場合は合わせてキーとしてください。
6行目で削除対象のkey(id)の配列を作成します。
あとは登録と同様batch_writerを準備し、for文でdelete_itemにkeyを渡します。
boto3のbatch_writerについて解説
これまで使ってきたbatch_writerの正体は以下のコードです。
boto3/dynamodb/table.py (63~167行目)
長いのでかいつまんで解説
160~161行目
def __enter__(self): return self
def __enter__ はwith句で呼ばれたときに実行します。
with table.batch_writer() as batch: でbatchにBatchWriterクラスのインスタンスが渡されるということです。
102~112行目
def put_item(self, Item): self._add_request_and_process({'PutRequest': {'Item': Item}}) def delete_item(self, Key): self._add_request_and_process({'DeleteRequest': {'Key': Key}}) def _add_request_and_process(self, request): if self._overwrite_by_pkeys: self._remove_dup_pkeys_request_if_any(request) self._items_buffer.append(request) self._flush_if_needed()
put_itemまたはdelete_itemでリクエストを_add_request_and_processに送ります。
self._items_bufferにリクエストを貯めます。
貯めたリクエストを消化するか判定します。
66~69行目
def __init__( self, table_name, client, flush_amount=25, overwrite_by_pkeys=None ):
137~139行目
def _flush_if_needed(self): if len(self._items_buffer) >= self._flush_amount: self._flush()
self._flush_amountには初期値で25が入っています。
25件以上リクエストがあったら、25件をまとめて送る
163~167行目
def __exit__(self, exc_type, exc_value, tb): # When we exit, we need to keep flushing whatever's left # until there's nothing left in our items buffer. while self._items_buffer: self._flush()
def __exit__ はwith句を抜けるときに実行されます。
貯めていたリクエストを消化します。
こんな感じでリクエストをスタックと実行を管理しているのでboto3のbatch_writerは25件の上限を気にする必要は無いんですね。