A repository wrapper around Azure Cosmos DB

October 20, 2022    .Net Azure Cosmos DB

A repository wrapper for Azure Cosmos DB queries and commands

I recently upgraded the deprecated package Microsoft.Azure.DocumentDB.Core and replaced it with the Microsoft.Azure.Cosmos . We hadn’t gotten around to it, but the deprecated and the email telling us to “update to the latest version of Azure Cosmos DB .NET SDKs You’re receiving this email because you’ve used version 1.x of the .NET SDK or .NET Core SDK for Azure Cosmos DB in the past 12 months. We shared the retirement announcement of version 1.x of the Azure Cosmos DB .NET SDK and the .NET Core SDK for SQL API with you in August 2019. The original retirement date was scheduled for 31 August 2020.” helped up the priority.

Here is the SDK v3 docs . There was a very helpful migration guide with a link to a performance guide .

Thankfully, we already had this abstracted in a class and I only needed to change that class, the querying/insert code and connecting to the database.

I’d like to share the Repository code we ended up with and how we set it up. Our code is currently .Net 6 and the example is for an Api project.

Startup/Setup

public void ConfigureServices(IServiceCollection services)
{
  // ... your regular setup code for APIs
  var config = configuration.GetSection("Cosmos").Get<CosmosConfiguration>();

  // let .Net DI cleanup the CosmosClient. This needs to be a Singleton per the performance docs.
  // Your model needs a property that serializes to id (camelCase) https://alexdrenea.com/2019/07/29/azure-cosmosdb-dotnet-sdk-v3-one-of-the-specified-inputs-is-invalid/
  // If you don't you may get ["One of the specified inputs is invalid"] 400 errors
  var cosmosOptions = new CosmosClientOptions { ApplicationName = settings.ObserverName, SerializerOptions = new CosmosSerializationOptions { PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase } };
  services.AddSingleton(new CosmosClient(config.Endpoint.ToString(), config.Key, cosmosOptions)));

  // helper function
  ICosmosRepository GetCosmosRepository(IServiceProvider provider, string collectionId, string partitionKeyPath) =>
      new(provider.GetRequiredService<CosmosClient>(), config.DB, collectionId, partitionKeyPath);

  services.AddSingleton<IStuffRepository>(x => new StuffRepository(
      new CosmosRepository(cosmosClient, config.DB, config.StuffCollection, CosmosPartitionKeys.StuffCollection));
}
public static class CosmosPartitionKeys {
 public const string StuffCollection = "myStuffPartitionKey";
}

Settings

// appsettings.json
// you should really put this into Azure KeyVault, but sometimes we still use the appsettings.json
{
  // your other settings
  "Cosmos": {
        "Endpoint": "https://mystuff.documents.azure.com:443/",
        "Key": "the key you copy from the portal==",
        "DB": "mystuf-dev",
        "StuffCollection": "stuff",
  }
}

Repository Code

using System.Net;
using System.Text.RegularExpressions;
using Microsoft.Azure.Cosmos.Linq;

namespace AgSync.Drive.Services.AzureCosmosDb;

public interface ICosmosRepository
{
    Task<IOrderedQueryable<T>> Query<T>(string partitionKey);
    Task<List<T>> QuerySql<T>(string sql, object paramValues, string partitionKey) where T : class;
    Task<CosmosRepositoryResult<T>> Delete<T>(string id, string partitionKey) where T : class;
    Task<CosmosRepositoryResult<T>> Upsert<T>(string partitionKey, T model) where T : class;
}

public record CosmosRepositoryResult<T>(HttpStatusCode StatusCode, T Model, string ErrorMessage);

/// <summary>
/// A wrapper around the Azure Cosmos Client.
/// There will be one instance per collection (events, automations, etc)
/// </summary>
public class CosmosRepository : ICosmosRepository
{
    private readonly CosmosClient _cosmosClient;
    private readonly string _databaseId;
    private readonly string _collectionName;
    private readonly string _partitionKeyPath;
    private DatabaseResponse _cosmosDatabaseCachedResponse;
    private Microsoft.Azure.Cosmos.Container _cosmosContainerCachedResponse;
    private static readonly CosmosLinqSerializerOptions _cosmosLinqSerializerOptions = new() { PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase };

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="cosmosClient">"we recommend that you use a single instance per AppDomain for the lifetime of the application."</param>
    /// <param name="databaseId"></param>
    /// <param name="collectionName">The collection name or container id. ex: events, fieldOperations, automations</param>
    /// <param name="partitionKeyPath">The key of the partition. ex: events has streamId, most of the others are 'partitionKey'. 
    ///     Go to the data explorer, click on items for a collection and look for the second column
    /// </param>
    public CosmosRepository(CosmosClient cosmosClient, string databaseId, string collectionName, string partitionKeyPath)
    {
        _cosmosClient = cosmosClient;
        _databaseId = databaseId;
        _collectionName = collectionName;
        _partitionKeyPath = partitionKeyPath;
    }

    /// <summary>
    /// Create the Queryable, then add to the query with LINQ, then call CosmosRepository.LinqQueryToResults.
    /// Do not call .ToList
    /// </summary>
    /// <example>
    ///    var query = _cosmosRepository.Query<FieldOperationViewModel>(organizationId.ToString())
    ///                .Where(x => x.ReleaseDateTime != null);
    ///    var results = CosmosRepository.LinqQueryToResults(query); 
    /// </example>
    /// <typeparam name="T"></typeparam>
    /// <param name="partitionKey"></param>
    /// <returns></returns>
    public async Task<IOrderedQueryable<T>> Query<T>(string partitionKey)
    {
        var container = await GetOrCreateContainer();
        return container.GetItemLinqQueryable<T>(requestOptions: new QueryRequestOptions { PartitionKey = new PartitionKey(partitionKey) }, linqSerializerOptions: _cosmosLinqSerializerOptions);
    }

    /// <summary>
    /// Call after Query<T>.
    /// Do not use ToList() on Container.GetItemLinqQueryable<T>() which uses blocking calls to synchronously drain the query. Use ToFeedIterator() to drain the query asynchronously.
    /// </summary>
    /// <see cref="https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/performance-tips-dotnet-sdk-v3?tabs=trace-net-core"/>
    /// <typeparam name="T"></typeparam>
    /// <param name="queryable"></param>
    /// <returns></returns>
    public static async Task<List<T>> LinqQueryToResults<T>(IQueryable<T> queryable)
    {
        var results = new List<T>();
        using var iterator = queryable.ToFeedIterator();
        while (iterator.HasMoreResults)
        {
            foreach (var item in await iterator.ReadNextAsync())
            {
                results.Add(item);
            }
        }

        return results;
    }

    /// <summary>
    /// Query with the given sql string.
    /// </summary>
    /// <typeparam name="T">Type of class to return</typeparam>
    /// <param name="sql">the sql string</param>
    /// <param name="paramValues">the values must match the @ params in the sql string</param>
    /// <param name="partitionKey">The partition key for the query, which is usually the organization id for Dispatch Pro.</param>
    /// <returns></returns>
    public async Task<List<T>> QuerySql<T>(string sql, object paramValues, string partitionKey) where T : class
    {
        var container = await GetOrCreateContainer();

        // remove \r\n and whitespace
        var query = new QueryDefinition(Regex.Replace(sql, @"\s+", " ").Trim());
        var dictionary = paramValues.GetType().GetProperties()
                .ToDictionary(x => x.Name, x => x.GetValue(paramValues)?.ToString() ?? "");
        foreach (var kvp in dictionary)
        {
            query.WithParameter("@" + kvp.Key, kvp.Value);
        }

        var results = new List<T>();
        using FeedIterator<T> resultSetIterator = container.GetItemQueryIterator<T>(
            query,
            requestOptions: new QueryRequestOptions()
            {
                PartitionKey = new PartitionKey(partitionKey)
            });
        while (resultSetIterator.HasMoreResults)
        {
            var response = await resultSetIterator.ReadNextAsync();
            results.AddRange(response.Resource);
        }

        return results;
    }

    public async Task<CosmosRepositoryResult<T>> Delete<T>(string id, string partitionKey)
        where T : class
    {
        ItemResponse<T> response = null;
        try
        {
            var container = await GetOrCreateContainer();
            response = await container.DeleteItemAsync<T>(id, new PartitionKey(partitionKey));
            return new CosmosRepositoryResult<T>(response.StatusCode, response.Resource, string.Empty);
        }
        catch (CosmosException ex)
        {
            return new CosmosRepositoryResult<T>(response == null ? HttpStatusCode.BadRequest : response.StatusCode, null, ex.Message);
        }
    }

    /// <summary>
    /// Create or Update the item.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="partitionKey"></param>
    /// <param name="model"></param>
    /// <returns></returns>
    public async Task<CosmosRepositoryResult<T>> Upsert<T>(string partitionKey, T model)
        where T : class
    {
        ItemResponse<T> response = null;
        try
        {
            var container = await GetOrCreateContainer();

            // result.StatusCode == System.Net.HttpStatusCode.OK || historyResult.StatusCode == System.Net.HttpStatusCode.Created
            response = await container.UpsertItemAsync(partitionKey: new PartitionKey(partitionKey), item: model);
            return new CosmosRepositoryResult<T>(response.StatusCode, response.Resource, string.Empty);
        }
        catch (CosmosException ex)
        {
            return new CosmosRepositoryResult<T>(response == null ? HttpStatusCode.BadRequest : response.StatusCode, null, ex.Message);
        }
    }

    private async Task<Microsoft.Azure.Cosmos.Container> GetOrCreateContainer()
    {
        _cosmosDatabaseCachedResponse ??= await _cosmosClient.CreateDatabaseIfNotExistsAsync(_databaseId);
        var containerProperties = new ContainerProperties(id: _collectionName, partitionKeyPath: $"/{_partitionKeyPath}");
        _cosmosContainerCachedResponse ??= await _cosmosDatabaseCachedResponse.Database.CreateContainerIfNotExistsAsync(containerProperties: containerProperties);
        return _cosmosContainerCachedResponse;
    }
}

I didn’t need a Patch or Replace method, but you can pretty easily add one by looking at the sample code and docs.

Use the code in a wrapping repository

You may want to wrap that in a repository that’s specific to your collection. If you think that’s too much, then just inject ICosmosRepository into where you need it and call it directly in your controller or other places.

public class StuffRepository : IStuffRepository{
    
    private readonly ICosmosRepository _cosmosRepository;

    public StuffRepository(ICosmosRepository cosmosRepository)
    {
        _cosmosRepository = cosmosRepository;
    }

    public async Task<Stuff> Get(Guid id, int partitionKeyValue)
    {
        var query = (await _cosmosRepository.Query<Stuff>(partitionKeyValue.ToString()))
                    .Where(container => container.Id == id);
        var results = await CosmosRepository.LinqQueryToResults(query); 
        return results.FirstOrDefault();
    }

    public async Task<OperationResult<Guid>> Upsert(Container model)
    {
        var response = await _cosmosRepository.Upsert(model.PartitionKey, model);
        if (response.StatusCode == System.Net.HttpStatusCode.OK || historyResult.StatusCode == System.Net.HttpStatusCode.Created)
        {
            return OperationResult.Success(model.Id);
        }
        return OperationResult.Failure($"{response.StatusCode} - {response.ErrorResult}");
    }

    public async Task<OperationResult<List<Stuff>>> GetPagingWithSql(string partitionKeyValue, Guid id, int size, int page){
        
        var sql = @"SELECT *
                        FROM exe
                        WHERE exe.stuffId = @id
                        ORDER BY exe.CreatedDateTime DESC
                        OFFSET @offset
                        LIMIT @size";
        var parameters = new
        {
            id = id.ToString(),
            size,
            offset = page * size
        };

        return await _executionsRepository.QuerySql<Stuff>(sql, parameters, partitionKeyValue);
    }

    public async Task<OperationResult> Delete(Guid id, int partitionKeyValue)
    {
        var response = await _cosmosRepository.Delete<Stuff>(id.ToString(), partitionKeyValue.ToString());
        if (response.StatusCode == System.Net.HttpStatusCode.OK)
        {
            return OperationResult.SuccessMessage($"deleted {id} in {partitionKeyValue}");
        }

        return OperationResult.Failure($"delete failed for {id} in {partitionKeyValue}");
    }
}


Watch the Story for Good News
I gladly accept BTC Lightning Network tips at [email protected]

Please consider using Brave and adding me to your BAT payment ledger. Then you won't have to see ads! (when I get to $100 in Google Ads for a payout, I pledge to turn off ads)

Use Brave

Also check out my Resources Page for referrals that would help me.


Swan logo
Use Swan Bitcoin to onramp with low fees and automatic daily cost averaging and get $10 in BTC when you sign up.