Test a Distributed Lock in Dynamodb


Here's a method to test a distributed lock via a dynamodb table in pytest.

Why a distributed lock

If you can avoid locking, do it. It's probably hard to get right. The lock in this example is rudimentary.

If you have a shared resource where control of it is important to its processing, you might want a lock around that resource so only one operator can touch it at a time. For example, you want writes to be made to a shared resource one at a time so they don't conflict.

Why DynamoDb

I chose Dynamodb for a place to store a lock because of a few factors. I'm in AWS already. I'm in AWS Lambdas. It was the simplest shared store that occurred to me. It has a couple nice properties including conditional writes and some testing utilities around it.

The lock table

The lock table is very simple. It's a single-attribute table, where that attribute represents the resource we're locking on, ResourceId. Here's the CloudFormation template used by SAM:

# template.yaml

# allow aws to write the table
MyLambdaFunction:
  # ...
  Properties:
    - DynamoDBCrudPolicy
        TableName: !Ref ResourceLockTable

# the table itself
ResourceLockTable:
  Type: AWS::DynamoDB::Table
  DeletionPolicy: Delete
  Properties:
    TableName: resource_locks
    AttributeDefinitions:
      - AttributeName: resource_id
        AttributeType: N
    KeySchema:
      - AttributeName: resource_id
        KeyType: HASH
    BillingMode: PAY_PER_REQUEST

Acquiring the lock

To acquire the lock, we write an item to the table. The lock is per resource_id. There's a ConditionExpression on our put_item that allows the put operation to fail if the condition is not met. We want to fail is the resource is already locked, signified by the presence of that resource_id value already in the attribute. In the case of lock acquisition, we'll return true. If failed, we'll return false

# mutex.py
import boto3


def _create_client():
  return boto3.client("dynamodb")

def _request_lock(resource_id, client=_create_client()):
  try:
    client.put_item(
      TableName='resource_locks',
      Item={
          "resource_id": {"N": resource_id},
      },
      ConditionExpression= "resource_id <> :resource_id",
      ExpressionAttributeValues= {
          ":resource_id" : { "N": resource_id },
      }
    )
    return True
  catch client.exceptions.ConditionalCheckFailedException:
    return False

Testing unique locks

For testing, we'll use pytest. We'll create a unit test that allows us to see if our put_item implementation will create unique locks as we have designed. We'll use the moto library to create a local dynamodb table. For this setup, install:

pipenv install pytest pytest-describe moto --dev

We'll set up the table as a fixture that we can use in our test. We have to re-specify it separate from our template.yaml. The fixture has a setup and a teardown. Just to be cautious, we'll also setup a fixture that replaces any environmental variables that AWS libraries use for real access:

#test_mutex.py

import boto3
from moto import mock_dynamodb2
import os
import pytest

from . import mutex


@pytest.fixture(scope='module')
def aws_credentials():
  os.environ["AWS_ACCESS_KEY_ID"] = 'testing'
  os.environ["AWS_SECRET_ACCESS_KEY"] = 'testing'
  os.environ["AWS_SECURITY_TOKEN"] = 'testing'
  os.environ["AWS_SESSION_TOKEN"] = 'testing'

@pytest.fixture(scope='module')
def dynamodb_client(aws_credentials):
  with mock_dynamodb2():
    client = boto3.client('dynamodb')
    yield client

@pytest.fixture()
def setup_lock_table(dynamodb_client):
  dynamodb_client.create_table(
    TableName='resource_locks',
    KeySchema=[
      {
        'AttributeName': 'resource_id',
        'KeyType': 'HASH'
      },
    ],
    AttributeDefinitions=[
      {
        'AttributeName': 'resource_id',
        'AttributeType': 'N'
      },
    ],
    ProvisionedThroughput={
      'ReadCapacityUnits': 1,
      'WriteCapacityUnits': 1
    }
  )

  yield "setup_lock_table"

  dynamodb_client.delete_table(
      TableName='resource_locks',
  )
  dynamodb_client=None

def describe_request_lock():

  def test_when_open(setup_lock_table, dynamodb_client):
    resource_id = "12345"
    assert mutext._request_lock(resource_id, {}, client=dynamodb_client)

    lock_row = dynamodb_client.get_item(TableName="resource_locks", Key={ "resource_id": { "N": resource_id } })["Item"]
    assert lock_row == { "resource_id": { "N": resource_id } }

  def test_when_locked(setup_lock_table, dynamodb_client):
    resource_id = "23456"
    assert mutex._request_lock(resource_id, {}, client=dynamodb_client)
    assert not mutex._request_lock(resource_id, {}, client=dynamodb_client)

There's more

That's what's required to get the put_item tested on a lock table in DynamoDb. But there is more to consider: What happens if the lock is already held? What if the lock owner never releases it? Polling and lock TTLs are potential answers and good subjects for another article.

How do you create and test distributed locks in an environment like AWS Lambda or DynamoDb?