Handling Error 429 in DocumentDB

I am working on one of these IoT projects at the moment and one of my requirements is to stream events from devices into DocumentDB.  My simplified architecture looks like this. 

 

Event Hub to Worker Role to DocumentDB

 

SNAGHTML565ed0e5

 

Each collection in DocumentDB is provisioned with a certain amount of throughput (RU == Request Units).  If you exceed that amount of requests you will receive back a 429 error “Too Many Request“. To get around this you can

 

  1. Move your collection to a more performant level (S1, S2, S3)
  2. Implement throttling retries
  3. Tune the requests to need less RUs
  4. Implement Partitions

#1 makes sense because if you are hitting DocumentDB that hard then you need to get the right “SKU” to tackle the job (but what if S3 is not enough)
#2 means capture the fact you hit this error and retry (I’ll show an example)
#3 in an IoT scenario could be taken to mean “Slow down the sensors”, but also includes indexing strategy (change to lazy indexing and/or excluding paths)
#4 IMHO the best and most logical option. (allows you to scale out.  Brilliant)

 

Scenario

My scenario is that I want to take the cheapest option so even though I think #4 is the right option, it will cost me money for each collection.  I want to take a look at #2.  I want to show you what I think is the long way around and then show you the “Auto retry” option.

 

The Verbose Route

 

        public async Task SaveToDocDb(string uri, string key, dynamic jsonDocToSave)
        {
 
            using (var client = new DocumentClient(new Uri(uri), key))
            {
                var queryDone = false;
                while (!queryDone)
                {
                    try
                    {
                        await client.CreateDocumentAsync(coll.SelfLink, jsonDocToSave);
                        queryDone = true;
                    }
                    catch (DocumentClientException documentClientException)
                    {
                        var statusCode = (int)documentClientException.StatusCode;
                        if (statusCode == 429) 
                            Thread.Sleep(documentClientException.RetryAfter);
                        //add other error codes to trap here e.g. 503 - Service Unavailable
						else
                            throw;
                    }
                    catch (AggregateException aggregateException) when (aggregateException is DocumentClientException )
                    {
                            var statusCode = (int)aggregateException.StatusCode;
                            if (statusCode == 429)
                                Thread.Sleep(aggregateException.RetryAfter);
							//add other error codes to trap here e.g. 503 - Service Unavailable
                            else
                                throw;
                        }
                    }
                }
            }
        }

 

 

The above is a very common pattern TRY…CATCH, get the error number and take action.  I wanted something less verbose and would have automatic retry logic that made sense. 

 

The Less Verbose Route

 

To do it I had to add another package to my project.  In the Nuget Package Manager Console

Install-Package Microsoft.Azure.DocumentDB.TransientFaultHandling

 

**
This is not a DocumentDB team library and it is unclear as to whether it is still being maintained.  What is clear is that the DocumentDB team will be bringing this retry logic into the SDK natively.  This will mean a lighter more consistent experience and no reliance on an external library.
**

Now add some using statements to the project

using Microsoft.Azure.Documents.Client.TransientFaultHandling;
using Microsoft.Azure.Documents.Client.TransientFaultHandling.Strategies;
using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;

And now instead of instantiating a regular DocumentClient in your application you can do this.

 

private IReliableReadWriteDocumentClient CreateClient(string uri, string key)
{
ConnectionPolicy policy = new ConnectionPolicy()
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp
};
var documentClient = new DocumentClient(new Uri(uri), key,policy);
var documentRetryStrategy = new DocumentDbRetryStrategy(RetryStrategy.DefaultExponential) { FastFirstRetry = true };
return documentClient.AsReliable(documentRetryStrategy);
} 

 

Summary

You may never hit the upper ends of the Request Units in your collection but in an IoT scenario like this or doing large scans over a huge collection of documents  you may hit this error and you need to know how to deal with it.  This article has provided you with two ways to handle this need.  Enjoy

 

My thanks go to Ryan Crawcour of the DocumentDB team for proof reading and sanity checking

Deleting Multiple Documents from Azure DocumentDB

I have been using DocumentDB a lot recently and wanted to share with you something that is harder to do than it should be.  When I say harder I mean you have to type more to get what seems a really easy thing to do.  I will also tell you how to do this more efficiently but because of cost sensitivity I couldn’t do it that way.

Scenario

I have a collection of documents.  These documents are being streamed into DocumentDB from a worker role which is reading from Azure Event Hubs.  As you can imagine I get a lot of documents in a relatively short space of time.  The size of a DocumentDB collection is 10GB.  I only want to use one collection.  My situation is that really I only need to keep two days worth of data in the collection at any one time.  My requirements therefore are

  • Maintain only one collection
  • Retain only two days worth of data
  • Remove documents on a schedule.

For point #3 I am using a .net StopWatch object and that is really simple.  Having only one collection is also very simple so really it comes down to

How do I delete a bunch of documents from DocumentDB?

First Attempt

//setup
string databaseId = ConfigurationManager.AppSettings["DatabaseId"];
string collectionId = ConfigurationManager.AppSettings["CollectionId"];
string endpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
string authorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
//connection policy
ConnectionPolicy policy = new ConnectionPolicy()
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp
};
//Build our selection criteria
var sqlquery = "SELECT * FROM c WHERE " + ToUnixTime(DateTime.Now).ToString() + " - c.time > 172800";
//Get our client
DocumentClient client = new DocumentClient(new Uri(endpointUrl), authorizationKey, policy);
//Database
Database database = client.CreateDatabaseQuery().Where(db => db.Id == databaseId).ToArray().FirstOrDefault();

//Get a reference to the collection
DocumentCollection coll = client.CreateDocumentCollectionQuery(database.SelfLink).Where(c => c.Id == collectionId).ToArray().FirstOrDefault();
//Issue our query against the collection
var results = client.CreateDocumentQuery<Document>(coll.DocumentsLink, sqlquery).AsEnumerable();
Console.WriteLine("Deleting Documents");
//How many documents do we have to delete
Console.WriteLine("Count of docs to delete = {0}", results.Count().ToString());
//Enumerate the collection
foreach (var item in results)
{
// Console.WriteLine("Deleting");
client.DeleteDocumentAsync(item.SelfLink);
}
//How many documents are still left
var postquery = client.CreateDocumentQuery<Document>(coll.DocumentsLink, sqlquery).AsEnumerable();
Console.WriteLine("Count of docs remaining = {0}", postquery.Count().ToString());
Console.ReadLine();

You may be expecting the result of the second count to be 0.  Unless you have 100 documents or less as the result of the first query then you are going to be disappointed.  We enumerate through the result of our first query getting a reference to each document and deleting it.  Seem fine?  The problem is that DocumentDB only returns 100 documents to us at a time and we didn’t go back and ask for more.  The solution is to execute our query and tell DocumentDB to re-execute the query if it has more results.  You can see a visual example of this when you use Query Explorer in the portal.  Down the bottom, under your query after execution you will find something like this.

More

The solution.

Here is the code that asks if there are more results to be had and if there are then can we go get the next batch

//setup
string databaseId = ConfigurationManager.AppSettings["DatabaseId"];
string collectionId = ConfigurationManager.AppSettings["CollectionId"];
string endpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
string authorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
//Connection Policy
ConnectionPolicy policy = new ConnectionPolicy()
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp
};
//build our selection criteria
var sqlquery = "SELECT * FROM c WHERE " + ToUnixTime(DateTime.Now).ToString() + " - c.time > 172800";
//client
DocumentClient client = new DocumentClient(new Uri(endpointUrl), authorizationKey, policy);
//database
Database database = client.CreateDatabaseQuery().Where(db => db.Id == databaseId).ToArray().FirstOrDefault();

//Get a reference to the collection
DocumentCollection coll =
client.CreateDocumentCollectionQuery(database.SelfLink)
.Where(c => c.Id == collectionId)
.ToArray()
.FirstOrDefault();
//First execution of the query
var results = client.CreateDocumentQuery<Document>(coll.DocumentsLink, sqlquery).AsDocumentQuery();

Console.WriteLine("Deleting Documents");
//While there are more results
while (results.HasMoreResults)
{
Console.WriteLine("Has more...");
//enumerate and delete the documents in this batch
foreach (Document doc in await results.ExecuteNextAsync())
{
await client.DeleteDocumentAsync(doc.SelfLink);
}
}
//second count should now be 0
var postquery = client.CreateDocumentQuery<Document>(coll.DocumentsLink, sqlquery).AsEnumerable();
Console.WriteLine("Count of docs remaining = {0}", postquery.Count().ToString());

The key is this statement

var results = client.CreateDocumentQuery<Document>(coll.DocumentsLink, sqlquery).AsDocumentQuery();

using AsDocumentQuery allows us to know if we have more results.

The Easy Way

The easy way and most definitely the proper way to do this is to use Partitions in DocumentDB.  A Partition is essentially a collection.  There are different types of partition but for this example I would have used a range partition over time.  When I wanted to delete documents I would have just simply dropped a partition.  My partitions would have been based on dates.  I would always have had 2 full partitions (full meaning closed for data) and one partition (current) that was filling with data

collections

In the example above collection #1 and #2 would be closed for data as we are filling collection #3.  Once collection #3 is full then we drop collection #1, add collection #4 and make that the one that is accepting data.

Conclusion

This is simple to do when you know how but it does seem like a long winded approach.  I would like to see something a little less verbose.

Using Visual Studio to Work With Hive Scripts

I was building some HDInsight Azure Data Factory pipelines the other day and the Hive scripts were not doing what I wanted to them to do (although they were doing what was being asked).I needed to isolate the Hive scripts to see what they were doing at each stage of the script. I didn’t want to have to invoke the whole ADF just to look at the Hive section. I was in Visual Studio already so troubleshooting was easy from here. Here’s how I solved it.

  • Open up Server Explorer
  • Connect to your Azure Subscription
  • Navigate to the HDInsight node
  • Expand your clusters
  • Right click on the cluster of choice and select “Write a Hive Query”
  • Paste/Write your query

  • writehivequery

    Another quirk of my Hive Script was that I was passing in values from the activity to the Hive Script itself. I knew what the values were that I was using but didn’t want to change the script to use hard coded values instead of the reference to the config. Here’s what I mean

    hivewithparam

    I wanted to pass in my values as parameters to the script in Visual Studio. No problem. Once you have finished writing your query

    Hit Submit | Advanced

    submitadvanced

    Now you can enter your configuration key value pairs

    addparams

    This for me was a really quick and useful way of troubleshooting my Hive scripts. Hope you find it useful.

    Azure Streaming Analytics Lag() Function with example

    The Lag() function in Azure Streaming Analytics is documented here.  I have been asked a few times now about Lag() and if it can refer to the projected values of another Lag() function in the previous event.  Let’s see.

     

    The Event

    My events are really simple and their schema is:

     

    {
    type: "string",
    readingtaken: "datetime",
    reading: 0
    }

     

    The Query

     

    My query in ASA is

     

    select
    type,
    readingtaken,
    case
    when reading IS NULL then LAG(reading,1,0) OVER (PARTITION BY type LIMIT DURATION(hh,24))
    else reading
    end as reading
    into
    dest
    from
    readings
    where
    type = 'temperature'

     

    The Data Flow

     

    Here are my events as they flow through the stream (these are the raw events and not what is projected rom the ASA query)

     

    SNAGHTML1f60c7c8

     

     

    Here is what you as the consumer will see i.e. the projected events.

     

    SNAGHTML1f621e40

     

     

    The Explanation

     

    The event that is of interest is E3.

     

    Here is an explanation of the projected events

     

    E1 has the reading attribute and value as does E1(Projected)

    E2 does not have the attribute and value but E2(Projected) uses the value from E1.

    E3 again does not have the attribute and value.  E3 looks to E2 and not E2(Projected).  In E2 the value of reading is NULL and as per the documentation this is used.

     

     

    Summary

    Using Lag() you cannot or rather do not refer to the projected value of the previous event.  You refer to the actual event itself.  In the example above the reading attribute had an explicit value of NULL in E2 so that is what was used in E3

    Config Files in Azure Data Factory

    As you know I have started to use Visual Studio to publish my ADF solutions to the Azure cloud. I like it for ease of use and integration with TFS. I came across a piece of functionality that I had not seen before but currently is really only half baked (it works fine but the implementation needs work).
    The feature is configs. On your project right click and choose Add | New Item.

    config

    configstruct

    The properties are easy to specify and I would imagine the most often specified property will be the connectionString for a linked service. Like this

    configwithprops

    OK so that part is easy and makes sense. The next part is where the dots are not joined up yet. As you can see from my project below I have two config files. One for Dev and one for UAT

    configsinproject

    When I build the project in the Release/Debug folder it creates sub folders for each of the configurations. The configuration parameters have been applied to my JSON objects. I can then take these files from the relevant folder and deploy them to the correct ADF using Powershell or the Portal

    config folders

    This seems very long winded and not really a good experience. Another way to do it is:

  • Right Click on your project | Unload Project
  • Right Click Project | Edit xxxx.dfproj
  • Find the section ADFConfigFileToPublish
  • Insert your config name (I.e.DevConfig.json)
  • Save | reload the project

  • In the future I fully expect a better “deployment with config” experience.

    Pig and Hive Scripts in Azure Data Factory

    I work a lot with HDInsight activities in Azure Data Factory and this involves calling out to Pig and Hive Scripts passing in parameters to be used. It works very well or at least it did until I started to use Visual Studio for my ADF publishing.

    I built my ADF solution using the Visual Studio templates. I then tried to publish my project to Azure. This is where it went wrong. Below is the error

    publish_Failure

    As you can see from my code I am passing in the location of the .hql file inside my activity and if I was publishing this through PowerShell or the Portal itself then this would be enough but Visual Studio is different. Your ADF project wants a reference to your .hql or .pig files. At publish time Visual Studio will publish the files in your project to the locations specified in your Activity. I like it this way as it means I can maintain everything all in one place and not have to alter the scripts in some other tool.

    There’s actually two ways to reference these files

  • At the project level. Add | Existing item
  • In your project add a reference to a Pig AND/OR Hive project
  • hivepigprojects

    This caught me out at first but the more I think about it the more I like it. Hopefully this will save somebody else 20 minutes of head scratching.

    Looking at Azure Data Factory in VS2013

    I have been using Azure Data Factory now for quite a while.  When I first saw it, it was difficult to maintain and code.  The best I could say about it at that point was that it “has promise”.  Well I am happy to report that using it today is definitely a whole load easier.  This post is a very quick look at how designing your ADF flows has become clean and intuitive.  I am particularly pleased to see the addition of a VS plugin.  In order to use Visual Studio you will currently need VS2013 and the Azure SDK for .Net 2.7.  Once installed you will be able to create ADF solutions

     

    Creating an empty solution

     

     SNAGHTML1fef0905

     

    Now you can go ahead and create your entities.

     

    Linked Services

     

    image

     

    Tables

     

    image

     

    Pipelines

     

    image

     

    Now the thing about the pipeline is that it is by far the most complicated entity in ADF.  What I really like is that the ADF team have given us a head start in getting the syntax right by creating placeholders for us.  Below I have added a Hive pipeline to my project.

     

    image

     

     

    Summary

     

    Azure Data Factory is a great PaaS offering and now that you can design your solutions in Visual Studio I think the take up of it will increase dramatically.

    Working with Arrays in Azure Streaming Analytics

    When working with Azure Streaming Analytics (ASA) it is extremely common to use JSON for your events.  JSON allows us to express data in a rich way with things like nested objects and arrays.  This though can cause some head scratching further along the chain when we need to work with that data,  In this post I want to look at working with arrays in ASA.

     

    Here is my event data

    jsondata

    As you can see I have an array of objects which describe samples.  In this format they are not much use to me.  What I need to do is Pivot this data as it comes through ASA ready to be put into my destination.

    Here’s the query that does that

    SELECT
    source___timestamp,
    source.time,
    flattened.arrayindex,
    flattened.arrayvalue.sample,
    flattened.arrayvalue.x,
    flattened.arrayvalue.y,
    flattened.arrayvalue.z
    INTO
    output
    FROM
    source
    CROSS APPLY GetElements(source.samples) AS flattened


    And here is what it looks like on the other side


    output



    Perfect. It is now in a format I can use more easily.

    DocumentDB Indexing Policies–Portal Choices

    I have been using Azure DocumentDB for a good while now and think it is a very interesting service.  The fact that all attributes by default are able to be indexed with no real sign of performance impact on insertion speaks volumes for the team in Redmond.
    As I said by default all paths within a document are indexed.  DocumentDB allows you, when creating a collection in the portal, to specify how you want to index documents for that collection.  In days gone by everything was indexed using a hash index and whilst this was good for equality joins it wouldn’t work with range queries (Yes I know you can specify ScanInQuery to get around it but this is a performance hole).
    You now get a choice and I for one think this is great.  You can still create your own custom indexing policy whereby you may exclude certain paths from being indexed or you may put a range index on a string field but I think a large number of indexing cases will be satisifed by the options in the portal.  Here are a couple of examples of where range queries help.

     

    “Get me all documents where people working in HR earn more than £50k per year”
    “Show me documents where people who sat the music exam scored between 45 and 65 percent”

     

    Example

     

    I have created my database and collection and I also specified the default indexing policy from this screen.

     

    image

     

    This is a sample document from my collection

     

    {
    "candidateid": 123456,
    "school": "Castleford Academy",
    "results": 
    [
    {
    "exam": "english",
    "gradepct": 76
    },
    {
    "exam": "maths",
    "gradepct": 80
    },
    {
    "exam": "music",
    "gradepct": 55
    }
    ]
    }
    
    

     

    And finally here is my query with results that asks for documents where candidates have a gradepct value greater than 78.

     

    SELECT c.candidateid, examresult FROM c JOIN examresult IN c.results
    WHERE examresult.gradepct > 78
    
    

     

    [
    {
    "candidateid": "123456",
    "examresult": {
    "exam": "maths",
    "gradepct": 80
    }
    }
    ]
    
    

     

    Conclusion

     

    I really like that DocumentDB has sensible/expected defaults when it comes to indexing your documents as well as still being able to be very precise with custom indexing policies.