DBPedias

Your Database Knowledge Community

Raven

  1. RavenDB Auto Sharding Bundle Design–Early Thoughts

    Originally posted at 4/19/2011

    RavenDB Auto Sharding is an implementation of sharding on the server. As the name implies, it aims to remove all sharding concerns from the user. At its core, the basic idea is simple. You have a RavenDB node with the sharding bundle installed. You just work with it normally.

    At some point you realize that the data has grown too large for a single server, so you need to shard the data across multiple servers. You bring up another RavenDB server with the sharding bundle installed. You wait for the data to re-shard (during which time you can still read / write to the servers). You are done.

    At least, that is the goal. In practice, there is one step that you would have to do, you would have to tell us how to shard your data. You do that by defining a sharding document, which looks like this:

    { // Raven/Sharding/ByUserName
      "Limits": [3],
      "Replica": 2
      "Definitions": [
        {
          "EntityName": "Users",
          "Paths": ["Username"]
        },
        {
          "EntityName": "Posts",
          "Paths": ["AuthorName"]
        }
      ]
    }

    There are several things to not here. We define a sharding document that shards on just one key, and the shard key has a length of 3. We also define different ways to retrieve the sharding key from the documents based on the entity name. This is important, since you want to be able to say that posts by the same user would sit on the same shard.

    Based on the shard keys, we generate the sharding metadata:

    { "Id": "chunks/1", "Shards": ["http://shard1:8080", "http://shard1-backup:8080"], "Name": "ByUserName", "Range": ["aaa", "ddd"] }
    { "Id": "chunks/2", "Shards": ["http://shard1:8080", "http://shard2-backup:8080"], "Name": "ByUserName", "Range": ["ddd", "ggg"] }
    { "Id": "chunks/3", "Shards": ["http://shard2:8080", "http://shard3-backup:8080"], "Name": "ByUserName", "Range": ["ggg", "lll"] }
    { "Id": "chunks/4", "Shards": ["http://shard2:8080", "http://shard1-backup:8080"], "Name": "ByUserName", "Range": ["lll", "ppp"] }
    { "Id": "chunks/5", "Shards": ["http://shard3:8080", "http://shard2-backup:8080"], "Name": "ByUserName", "Range": ["ppp", "zzz"] }
    { "Id": "chunks/6", "Shards": ["http://shard3:8080", "http://shard3-backup:8080"], "Name": "ByUserName", "Range": ["000", "999"] }

    This information gives us a way to make queries which are both directed (against a specific node, assuming we include the shard key in the query) or global (against all shards).

    Note that we split the data into chunks, each chunk is going to be sitting in two different servers (because of the Replica setting above). We can determine which shard holds which chunk by using the Range data.

    Once  a chunk grows too large (25,000 documents, by default), it will split, potentially moving to another server / servers.

    Thoughts?

  2. Designing RavenFS

    What is Raven FS? Raven FS is a distributed file system designed to handle large file replication across WAN networks reliably.

    What does it actually means? The scenario that we have is actually quite simple. Given that we have a file in location A and we need to have that file in location B (geo distributed) how do we move the file across the WAN? Let me make the problem slightly more interesting:

    • The file is large, we are talking about hundreds of megabytes at the low range and tens of gigabytes at the high end.
    • The two locations might be connected over WAN.
    • The connection is assumed to be flakey.

    Let us consider the a few scenarios where this can be useful:

    • I have a set of videos that I would like to be edited in some fashion (say, putting Bang! and Wham! callouts in some places). Since I have zero ability in editing videos, I hire a firm in India to do that for me. The problem is that each video file is large, and just sending the files to India and back is a challenge. (Large file distributed collaboration problem)
    • I have a set of webservers where users can upload images. We need to send those images to background servers for processing, and then they need to be made available to the web servers again. The image sizes are too large to be sent over traditional queuing technologies. (Udi Dahan calls the problem the Data Bus).
    • I have a set of geo-distributed locations where I have a common set of files (think about something like scene information for rendering a game) that needs to be kept in sync. (Distributed file replication).

    I have run into each of those problems (and others that fall into similar categories) several times in recent months. Enough to convince me that:

    • There is a need here that people would be willing to pay for.
    • It is something that we can provide a solution for.
    • There is a host of other considerations related to those set of problems that we can also provide a solution for. A simple example might be simple backup procedures.

    The actual implementation will probably vary, but this is the initial design for the problem.

    A RavenFS node is going to be running as an HTTP Web Server. That removes a lot of complexity from our life, since we can utilize a lot of pre-existing protocols and behaviors. HTTP already supports the notion of partial downloads / parallel uploads, (Range, If-Range, Content-Range), so we can re-use a lot of that.

    From an external implementation perspective, RavenFS node exposes the following endpoints:

    • GET /static/path/to/file <- get the file contents, optionally just a range
    • PUT /static/path/to/file <- put file contents, optionally just a range
    • DELETE /static/path/to/file  <- delete the file
    • GET /metadata/path/to/file <- get the metadata about a file
    • GET /browse/path/to/directory <- browse the content of a directory
    • GET /stats <- number of files, current replication efforts, statistics on replication, etc

    A file in RavenFS consists of:

    • The file name
    • The file length
    • A sequence of bytes that makes up the file contents
    • A set of key/value properties that contains file metadata

    Internally, files are stored in a transactional store. Each file is composed of pages, each page is a maximum of 4 MB in size and is identified by its signature. (Actually, a pair of hash signatures, probably SHA256 & RIPEMD160, to avoid any potential for collision). The file contents are actually the list of pages that it is composed of.

    The notion of pages is pretty important for several reasons:

    • It provides us with a standard way to identify pieces of the files.
    • Each page may be part of multiple files.
    • Pages are immutable, once they are written to storage, they cannot be modified (but they can be removed if no file is referencing this page).
    • It makes it easier to chunk data to send while replicating.
    • It drastically reduces the size taken by files that share much of the same information.

    Let us try to analyze this further. Let us say that we have a 750MB video, we put this video file inside RavenFS. Internally, that file is chunked into 188 pages, each of them about 4 MB in size. Since we have setup replication to the RavenFS node in India, we start replicating each of those pages as soon as we are done saving it to the local RavenFS node. In other words, even while we are uploading the file to the local RavenFS node, it is being replicated to the remote RavenFS nodes, saving us the need to wait until the full file is loaded for replication to begin. Once the entire file has been replicated to the remote node, the team in India can start editing that file.

    They make changes in three different places, then save the file again to RavenFS. In total, they have modified 24 MB, and in total modified 30 pages. That means that for the purpose of replicating back to the local RavenFS node, we need to send only 120 MB, instead of 750 MB.

    This reduces both time and bandwidth required to handle replication. The same will happen, by the way, if we have a set of common files that have some common parts, we will not store the information twice. For that matter, the RavenFS client will be able to ask the RavenFS node about pages that are already stored, and so won’t need to even bother uploading pages that are already on the server.

    Another important factor in the decision to use pages is that when replicating across unreliable medium, sending large files around in a single chunk is a bad idea, because it is pretty common for the connection to drop, and if you need a prefect connection for the duration of the transfer of a 1.5 GB file, you are going to be in a pretty bad place very soon.

    Thoughts?

  3. Raven Situational Awareness

    There is a whole set of features that require collaboration from a set of severs. For example, when talking about auto scale scenarios, you really want the servers to figure things out on their own, without needing administrators to hold their hands and murmur sweet nothings at 3 AM.

    We needed this feature in Raven DB, Raven MQ and probably in Raven FS, so I sat down and thought about what is actually needed and whatever I could package that in a re-usable form. I am on a roll for the last few days, and something that I estimated would take a week or two took me about six hours, all told.

    At any rate, I realized that the important parts of this feature set is the ability to detect siblings on the same network, being able to detect failure of those siblings and the ability to dynamically select the master node.  The code is available here: https://github.com/hibernating-rhinos/Raven.SituationalAwareness under the AGPL license. If you want to use this code commercially, please contact me for commercial licensing arrangements.

    Let us see what is actually involved here:

    var presence = new Presence("clusters/commerce", new Dictionary<string, string>
    {
        {"RavenDB-Endpoint", new UriBuilder("http", Environment.MachineName, 8080).Uri.ToString()}
    }, TimeSpan.FromSeconds(3));
    presence.TopologyChanged += (sender, nodeMetadata) =>
    {
        switch (nodeMetadata.ChangeType)
        {
            case TopologyChangeType.MasterSelected:
                Console.WriteLine("Master selected {0}", nodeMetadata.Uri);
                break;
            case TopologyChangeType.Discovered:
                Console.WriteLine("Found {0}", nodeMetadata.Uri);
                break;
            case TopologyChangeType.Gone:
                Console.WriteLine("Oh no, {0} is gone!", nodeMetadata.Uri);
                break;
            default:
                throw new ArgumentOutOfRangeException();
        }
    };
    presence.Start();

    As you can see, we are talking about a single class that is exposed to your code. You need to provide the cluster name, this allows us to run multiple clusters on the same network without conflicts. (For example, in the code above, we have a set of servers for the commerce service, and another for the billing service, etc). Each node also exposes metadata to the entire cluster. In the code above, we share the endpoint for our RavenDB endpoint.  The TimeSpan variable determines the heartbeat frequency for the cluster (how often it would check for failing nodes).

    We have a single event that we can subscribe to, which let us know about changes in the system topology. Discovered and Gone are pretty self explanatory, I think. But MasterSelected is more interesting.

    After automatically discovering all the siblings on the network, Raven Situation Awareness will use the Paxos algorithm to decide who should be the master. The MasterSelected event happens when a quorum of the nodes select a master. You can then proceed with your own logic based on that. If the master will fail, the nodes will convene again and the quorum will select a new master.

    With the network topology detection and the master selection out of the way, (and all of that with due consideration for failure conditions) the task of actually implementing a distributed server system just became significantly easier.

  4. More on RavenDB structure

    Originally posted at 4/14/2011

    The following diagrams were generated via Visual Studio (Architecture > Generate Dependency Graph).

    I have shown before the assembly dependency graph for RavenDB, it has changed a bit since then (note the Raven.Json addition), but it is still quite a nice graph:

    image

    The problem was that for backward compatibility reasons, the namespaces weren’t nearly as ordered. Mostly because we moved things around in the assemblies but couldn’t change the associated namespaces.

    We were going to experience a breaking changes anyway, because of Raven.Json, so I took the time to clean things up properly:

    image

    Yes, I admit, I am addicted to (mostly) straight lines and simple directed graphs :-)

  5. RavenDB: Let us write our own JSON Parser, NOT

    One accusation that has been leveled at me often is that I keep writing my own implementation of Xyz (where Xyz is just about anything). The main problem is that I can get overboard with that, but for the most part, I think that I managed to strike the right balance. Wherever possible, I re-use existing, but when I run into problems that are easier to solve by creating my own solution, I would go with that.

    A case in point is the JSON parser inside RavenDB. From the get go, I used Newtonsoft.Json.dll. There wasn’t much to think of, this is the default implementation from my point of view. And indeed, it has been an extremely fine choice. It is a rich library, it is available for .NET 3.5, 4.0 & Silverlight and it meant that I had opened up a lot of extensibility for RavenDB users.

    Overall, I am very happy. Except… there was just one problem, with large JSON documents, the library showed some performance issues. In particular, a 3 MB JSON file took almost half a second to parse. That was… annoying. Admittedly, most documents tends to be smaller than that, but it also reflected on overall performance when batching, querying, etc. When you are querying, you are also building large json documents (a single document that contains a list of results, for example), so that problem was quite pervasive for us.

    I set out to profile things, and discovered that the actual cost wasn’t in the JSON parsing itself, that part was quite efficient. The costly part was actually in building the JSON DOM (JObject, JArray, etc). When people usually think about JSON serialization performance, they generally think about the perf from and to .NET objects. The overriding cost in that sort of serialization is actually how fast you can call the setters on the objects. Indeed, when looking at perf metrics on the subject, most of the comparisons were concentrated on that aspect almost exclusively.

    That make sense, since for the most part, that is how people use it. But for RavenDB, we are using JSON DOM for pretty much everything. This is how we are representing a document, after all, and that idea is pretty central to a document database.

    Before setting out to write our own, I looked at other options.

    ServiceStack.Json - that one was problematic for three reasons:

    • It wasn’t really nearly as rich in terms of API and functionality.
    • It was focused purely on reading to and from .NET objects, with no JSON DOM supported.
    • The only input format it had was a string.

    The last one deserves a bit of explanation. We cannot afford to use a JSON implementation that accepts a string as input, because that JSON object we are reading may be arbitrarily large. Using a string means that we have to allocate all of that information up front. Using a stream, however, means that we can allocate far less information and reduce our overall memory consumption.

    System.Json – that one had one major problem:

    • Only available on Silverlight, FAIL!

    I literally didn’t even bother to try anything else with it. Other stuff we have looked on had those issues or similar as well, mostly, the problem was no JSON DOM available.

    That sucked. I was seriously not looking to writing my own JSON Parser, especially since I was going to add all the bells & whistles of the Newtonsoft.Json. :-(

    Wait, I can hear you say, the project is open source, why not just fix the performance problem? Well, we have looked into that as well.

    The actual problem is pretty much at the core of how the JSON DOM is implemented in the library. All of the JSON DOM are basically linked lists, and all operations on the DOM are O(N). With large documents, that really starts to hurt. We looked into what it would take to modify that, but it turned out that it would have to be a breaking change (which pretty much killed the notion that it would be accepted by the project) or a very expensive change. That is especially true since the JSON DOM is quite rich in functionality (from dynamic support to INotifyPropertyChanged to serialization to… well, you get the point).

    Then I thought about something else, can we create our own JSON DOM, but rely on Newtonsoft.Json to fill it up for us? As it turned out, we could! So we basically took the existing JSON DOM, stripped it out of everything that we weren’t using. Then we changed the linked list support to a List and Dictionary, wrote a few adapters (RavenJTokenReader, etc) and we were off to the races. We were able to utilize quite a large fraction of the things that Newtonsoft.Json already did, we resolved the performance problem and didn’t have to implement nearly as much as I feared we would.

    Phew!

    Now, let us look at the actual performance results. This is using a 3 MB JSON file:

    • Newtonsoft Json.NET - Reading took 413 ms
    • Using Raven.Json - Reading took 140 ms

    That is quite an improvement, even if I say so myself :-)

    The next stage was actually quite interesting, because it was unique to how we are using JSON DOM in RavenDB. In order to save the parsing cost (which, even when optimized, is still significant), we are caching in memory the parsed DOM. The problem with caching of mutable information is that you have to return a clone of the information, and not the actual information (because then it would be mutated by the called, corrupting the cached copy).

    Newtonsoft.Json supports object cloning, which is excellent. Except for one problem. Cloning is also an O(N) operation. With Raven.Json, the cost is somewhat lower. But the main problem is that we still need to copy the entire large object.

    In order to resolve this exact issue, we introduced a feature called snapshots to the mix. Any object can be turned into a snapshot. A snapshot is basically a read only version of the object, which we then wrap around another object which provide local mutability while preserving the immutable state of the parent object.

    It is much easier to explain in code, actually:

    public void Add(string key, RavenJToken value)
    {
        if (isSnapshot)
            throw new InvalidOperationException("Cannot modify a snapshot, this is probably a bug");
    
        if (ContainsKey(key))
            throw new ArgumentException("An item with the same key has already been added: " + key);
    
        LocalChanges[key] = value; // we can't use Add, because LocalChanges may contain a DeletedMarker
    }
    
    public bool TryGetValue(string key, out RavenJToken value)
    {
        value = null;
        RavenJToken unsafeVal;
        if (LocalChanges != null && LocalChanges.TryGetValue(key, out unsafeVal))
        {
            if (unsafeVal == DeletedMarker)
                return false;
    
            value = unsafeVal;
            return true;
        }
    
        if (parentSnapshot == null || !parentSnapshot.TryGetValue(key, out unsafeVal) || unsafeVal == DeletedMarker)
            return false;
    
        value = unsafeVal;
    
        return true;
    }

    If the value is on the local changes, we use that, otherwise if the value is in the parent snapshot, we use that. We have the notion of local deletes, but that is about it. All changes happen to the LocalChanges.

    What this means, in turn, is that for caching scenarios, we can very easily and effectively create a cheap copy of the item without having to copy all of the items. Where as cloning the 3MB json object in Newtonsoft.Json can take over 100 ms to clone, we can create a snapshot (it involves a clone, so the first time it is actually expensive, around the same cost as Newtonsoft.Json is) and from the moment we have a snapshot, we can generate children for the snapshot at virtually no cost.

    Overall, I am quite satisfied with it.

    Oh, and in our tests runs, for large documents, we got 100% performance improvement from this single change.

  6. When unit testing a server, why not USE the server?

    One of the interesting aspects of build RavenDB was that it opened up my mind to the way we can use the very nature of the server to open up additional information about the server operations.

    One thing that I noticed recently is that if I need to debug a test for RavenDB, I often need to stop the current thread (the Test thread) and then literally go to the server test instance and look at what is actually going on in there.

    The fun part is that this is really nice, because I can go an inspect the running test instance, see what is going on, modify things to see how they affect the behavior, etc. The only problem is that this is actually quite complex to setup manually (stop on debugger, freeze the appropriate thread, resume run, inspect server – modify stuff there, stop on debugger, thaw thread, continue run, etc).

    What occurred to me, however, is that I can codify this behavior, and end up with this:

    image

    This method will only operate while a debugger is attached, but it is going to save me a long time.  Once I am done, I need to delete the marker document:

    image

    This is especially important if you are running in an in memory mode, since the moment the test is over, the database is completely wiped out.

  7. Humane dates in RavenDB

    Originally posted at 4/4/2011

    One of the advantages of RavenDB is that the documents format is highly human readable. Except for one thing. Can you tell me what date is represented by this value:

    /Date(1224043200000+0300)/
    

    I thought not :-)

    We have recently moved toward this format instead:

    2011-04-04T11:28:46.0404749+03:00

    Which it both machine and human readable.

    This make figuring out what something is so much easier.

  8. Leaving the relational mindset &ndash; RavenDB&rsquo;s trees

    Originally posted at 3/24/2011

    One of the common problems with people coming over to RavenDB is that they still think in relational terms, and implicitly accept relational limitations. The following has been recently brought up at a client meeting. The problem was that they got an error when rendering a page similar to that:

    The error is one of RavenDB’s Safe-By-Default, and is triggered when you are making too many calls. This is usually something that you want to catch early, and fail fast rather than add additional load to the system. But the problem that the customer was dealing with is that they needed to display different icons for each level of the tree, depending if the item was a container or a leaf.

    Inside Raven, categories were modeled as:

    { // categories/1
      "ParentId": null,
      "Name": "Welcome ..."
    }
    
    { // categories/2
       "ParentId": "categories/1",
       "Name": "Chapter 2..."
    }

    They had a few more properties, but none that really interests us for this post. The original code was pretty naïve, and did something like:

    public IEnumerable<TreeNode> GetNodesForLevel(string level)
    {
      var categories = from cat in session.Query<Category>()
                       where cat.ParentId == level
                       select cat;
                       
      foreach(var category in categories)
      {
        var childrenQuery = from cat in session.Query<Category>()
                             where cat.ParentId == category.Id
                             select cat;
                             
         yield return new TreeNode
         {
          Name = category.Name,
          HasChildren = childrenQuery.Count() > 0
         };
      }
    }

    As you can imagine, this has caused some issues, because we have a classic Select N+1 here.

    Now, if we were using SQL, we could have done something like:

    select *, (select count(*) from Categories child where child.ParentId = parent.Id)
    from Categories parent
    where parent.ParentId = @val

    The problem there is that this is a correlated subquery, and that can get expensive quite easily. Other options include denormalizing the count into the Category directly, but we will ignore that.

    What we did in Raven is define a map/reduce index to do all of the work for us. It is elegant, but it requires somewhat of a shift in thinking, so let me introduce that one part at a time:

    from cat in docs.Categories 
    let ids = new [] 
    { 
        new { cat.Id, Count = 0, cat.ParentId }, 
        new { Id = cat.ParentId, Count = 1, ParentId = (string)null } 
    } 
    from id in ids 
    select id 

    We are doing something quite strange, we need to project two items for every category. The syntax for that is awkward, I’ll admit, but it is pretty clear what is going on in here.

    Using the categories shown above, we get the following output:

    { "Id": "categories/1", "Count" = 0, ParentId: null }
    { "Id": null, "Count" = 1, ParentId: null }
    
    { "Id": "categories/2", "Count" = 0, ParentId: "categories/1" }
    { "Id": "categories/1",  "Count" = 1, ParentId: null }

    The reason that we are doing this is that we need to be able to aggregate across all categories, whatever they are in a parent child relationship or not. In order to do that, we project one record for ourselves, with count set to zero (because we don’t know that we are anyone’s parents) and one for our parent. Note that in the parent case, we don’t know what his parent is, so we set it to null.

    The next step is to write the reduce part, which runs over the results of the map query:

    from result in results
    group result by result.Id into g 
    let parent = g.FirstOrDefault(x=>x.ParentId != null)
    select new 
    { 
         Id = g.Key, 
         Count = g.Sum(x=>x.Count), 
         ParentId = parent == null ? null : parent.ParentId
    }

    Here you can see something quite interesting, we are actually group only on the Id of the results. So given our current map results, we will have three groups:

    • Id is null
    • Id is “categories/1”
    • Id  is “categories/2”

    Note that in the projection part, we are trying to find the parent for the current grouping, we do that by looking for the first record that was emitted, the one where we actually include the ParentId from the record. We then use count to check how many children a category have. Again, because we are emitting a record with Count equal to zero for the each category, they will be included even if they don’t have any children.

    The result of all of that is that we will have the following items indexed:

    { "Id": null, "Count": 1, "ParentId": null }
    { "Id": "categories/1", "Count": 1, "ParentId": null }
    { "Id": "categories/2", "Count": 0, "ParentId": "categories/1" }

    We can now query this index very efficiently to find who are the children of a specific category, and what is the count of their children.

    How does this solution compare to writing the correlated sub query in SQL? Well, there are two major advantages:

    • You are querying on top of the pre-computed index, that means that you don’t need to worry about things like row locks, number of queries, etc. Your queries are going to be blazing fast, because there is no computation involved in generating a reply.
    • If you are using the HTTP mode, you get caching by default. Yep, that is right you don’t need to do anything, and you don’t need to worry about managing the cache, or deciding when to expire things, you can just take advantage on the native RavenDB caching system, which will handle all of that for you.

    Admittedly, this is a fairly simple example, but using similar means, we can create very powerful solutions. It all depends on how we are thinking on our data.

  9. RavenDB on .NET Rocks

    Carl and Richard talk to Oren Eini, aka Ayende Rahein, about RavenDB. RavenDB is a NoSQL JSON document database. Oren explains how he came to the realization that he needed to build his own data store, and the advantages of document databases over relational databases. Is SQL dead? Not hardly, but RavenDB is an interesting addition to your data solution!

    You can listen to it here.

  1. 1
  2. Next ›
  3. Last »