Skip to content

Storing data in riak

broach edited this page Apr 25, 2012 · 58 revisions

First, a word or two about Riak, CAP Theorem and eventual consistency.

Unless you're already familiar with CAP Theorem and eventual consistency, taking the time to read through at least The Riak Fast Track would be well worth your while.

It's ok, we'll wait.

Ok! Now that you've read through that and understand that Riak is a system that favors AP with eventual C, this might make some sense to you.

Storing data in Riak with the Java client.

In Riak data is stored in buckets. Those buckets have a number of options and tunable parameters, one of which is whether or not to allow sibling records. By default, a bucket does not allow sibling creation. The Riak Java client is somewhat built around this in that at the most basic level, you can simply say "store this data using this key" and anything that is currently in Riak referenced by that key will be overwritten. There are, however, some issues with attempting to do that.

If you have any type of contention/concurrency where multiple threads or processes are doing read/modify/write operations on those key/values, you are likely to lose writes if the operations interleave. One will overwrite the other. At that point you need to enable siblings and deal with conflict resolution.

With that in mind, the following basic examples illustrate using Riak with the default bucket options and just storing some data.

Basic Example #1: Store a String
Basic Example #2: Store a POJO
Basic Example #3: Changing query parameters

For a more detailed example of how you would store data in Riak in an environment with concurrency, jump down to the Advanced Examples section.

## Basic Store, data is a String

Using the Bucket.store(String, String) method, your String is stored in Riak as bytes representing UTF-8 text

public class App
{
    import com.basho.riak.client.IRiakClient;
    import com.basho.riak.client.IRiakObject;
    import com.basho.riak.client.RiakException;
    import com.basho.riak.client.RiakFactory;
    import com.basho.riak.client.bucket.Bucket;

    public static void main(String[] args) throws RiakException
    {
        String myData = "This is my data";
        riakClient = RiakFactory.httpClient();
        Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
        myBucket.store("TestKey", myData).execute();
        riakClient.shutdown();
    }
}
## Store POJO (serialized to JSON) By passing a POJO to the Bucket.store(String, T) method, your POJO is serialized to JSON using the Jackson library and stored in Riak as UTF-8 text. ```java public class App { class Pojo { public String foo; public String bar; public int foobar; }
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.IRiakObject;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;

public static void main(String[] args) throws RiakException
{
    Pojo myPojo = new Pojo();
    myPojo.foo = "My foo data";
    myPojo.bar = "My Bar data";
    myPojo.foobar = 5;

    riakClient = RiakFactory.httpClient();
    Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
    myBucket.store("TestKey", myPojo).execute();

    riakClient.shutdown();
}

}

<a name="basicexample3"/>
## Store data, changing query parameters for just this request
To override the default parameters in the Bucket, you can specify them prior to calling the execute() method. 
```java
public class App
{
    import com.basho.riak.client.IRiakClient;
    import com.basho.riak.client.IRiakObject;
    import com.basho.riak.client.RiakException;
    import com.basho.riak.client.RiakFactory;
    import com.basho.riak.client.bucket.Bucket;
    import com.basho.riak.client.cap.Quora;
    import com.basho.riak.client.operations.StoreObject;

    public static void main(String[] args) throws RiakException
    {
        riakClient = RiakFactory.httpClient();
        Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
        StoreObject<IRiakObject> storeObject = myBucket.store("TestKey", "TestData");
        storeObject.w(Quora.ONE).pw(Quora.ONE).execute();
        riakClient.shutdown();
    }
}

# The Hard Way ## Eventual Consistency; Resolvers, Mutators, and Converters

In most environments, you're going to configure your buckets to allow siblings and write the code that deals with them. There are three Interfaces you're going to be using:

  • ConflictResolver<T>
    This Interface is used to resolve sibling records returned by Riak
  • Mutation<T>
    This interface is used to modify an object in Riak
  • Converter<T>
    This interface is used to serialize/deserialize data to/from Riak

One thing worth noting is that the current IRiakClient and its various interfaces aren't likely what you're used to if you've used other datastores' APIs. When using the above classes the current Java client design expects your entire read/modify/write cycle to be encapsulated entirely within the store operation. For more details see the Houston, we have a problem section on the fetching data from Riak page. If you do not wish to completely encapsulate the read/modify/write inside the store operation, see Storing previously fetched and modified data below. This will be addressed in a future version of the Java client.

The following diagram outlines the anatomy of a read/modify/write cycle using the Bucket interface, your own domain object (T), and the StoreObject it returns:

### Figure 1 ![StoreObject anatomy](http://dl.dropbox.com/u/74693818/RJC-store-v3.png)

There are four versions of the store() method available in the Bucket interface:

  • StoreObject<IRiakObject> store(String key, byte[] value)
  • StoreObject<IRiakObject> store(String key, String value)
  • <T> StoreObject<T> store(String key, T o)
  • <T> StoreObject<T> store(T o)

The first two are only useful if you want to overwrite anything currently in Riak associated with the key you're passing in. Be aware, however, that there is a caveat. An anonymous Mutation instance is created and used. It replaces the data portion of whatever is currently stored in Riak, but not links, secondary indexes, or user metadata. If you want to overwrite everything you will need to supply a custom Converter that does so.

The second two are actually what you will most likely use if you are performing a read/modify/write cycle. As noted in figure 1 above, the interface is slightly clunky in that the object being passed in is going to be discarded when you supply the Mutation; it's only used to infer the type. The fourth version will extract the key from the object being passed in before doing so by referencing a String field annotated with @RiakKey.

The following example is a "leader board" for various games. Imagine you were providing a service where game developers would have their games send you a score every time a player completed a game. You are required to store the top 5 scores for each game. We're going to rely on the default JSONConverter to serialize/deserialize our POJO to/from Riak. If you're interested in seeing how you would implement a converter to use a different serialization library, check out Using a custom converter for an example.

App.java

import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
import java.util.Random;

public class App 
{
    public static void main( String[] args ) throws RiakException, InterruptedException
    {
        // We need some data, of course
        String playerNames[] = {"Steve","Brian","Bob" };
        Random generator = new Random();
        GameLeaderboard gl = new GameLeaderboard("SuperCoolGame");

        for (int i = 0; i < 5; i++)
        {
            NameScorePair nsp = new NameScorePair(playerNames[(i+3)%3], generator.nextInt(100));
            gl.addScore(nsp);
        }		

        // Store our initial leaderboard in Riak
        IRiakClient myDefaultHttpClient = RiakFactory.httpClient();
        Bucket b = myDefaultHttpClient.createBucket("demo_bucket").allowSiblings(true).execute();
        b.store(gl).withResolver(new GameLeaderboardResolver()).execute();

        gl = b.fetch("SuperCoolGame", GameLeaderboard.class)
                 .withResolver(new GameLeaderboardResolver())
                 .execute();

        // Ouput the results!
        for ( NameScorePair n : gl.getScoreList())
        {
            System.out.println(n.getName() + " " + n.getScore());
        }
        System.out.println();
        
        /* 
         * Now that we have a leaderboard in Riak, lets modify it! 
         * This simulates a new name/score pair coming in, and we're going
         * to modify the leaderboard in Riak using the GamLeaderboardMutation
         * We know our sample data only has scores to 100, so using 1000 ensures
         * we'll modify the object
         */
        NameScorePair nsp = new NameScorePair("John", 1000);
        GameLeaderboardMutation glbm = new GameLeaderboardMutation(nsp);
        /* Note that as mentioned in the cookbook, the GameLeaderboard object 
         * passed to Bucket.store() is discarded after the type is inferred 
         * and the key extracted - all modification is done by your Mutation
         * 
         * Note also that we're calling returnbody(true) in order to get
         * the current data back
         */
        gl = b.store(new GameLeaderboard("SuperCoolGame"))
            .withMutator(glbm)
            .withResolver(new GameLeaderboardResolver())
            .returnbody(true)
            .execute();

        // Ouput the results!
        for ( NameScorePair n : gl.getScoreList())
        {
            System.out.println(n.getName() + " " + n.getScore());
        }
    }
}

GameLeaderboardResolver.java

import com.basho.riak.client.cap.ConflictResolver;
import java.util.Collection;
import java.util.Iterator;

public class GameLeaderboardResolver implements ConflictResolver<GameLeaderboard>
{

    /*  
     * Riak hands us a list of GameLeaderboard objects. Our job is to reconcile
     * those objects and return a single, resolved GameLeaderboard
     *   
     * In this example, the logic is pretty straightforard. in our GameLeaderboard
     * class we created a addScores(Collection<NameScorePair>) method that will do the 
     * heavy lifting for us. By adding all the lists into one GameLeaderboard
     * via that method, we end up with the top 5 scores from all the siblings
     *   
     * Worth noting is that your ConflictResolver is *always* called, even if  
     * there are no siblings, or even if there is no object in Riak
     */  
        
    public GameLeaderboard resolve(Collection<GameLeaderboard> siblings)
    {   
        if (siblings.size() > 1)
        {       
            // We have siblings, need to resolve them
            Iterator<GameLeaderboard> i = siblings.iterator();

            GameLeaderboard resolvedLeaderboard = new GameLeaderboard(i.next());
                        
            while (i.hasNext())
            {           
                resolvedLeaderboard.addScores(i.next().getScoreList());
            }           
                        
            return resolvedLeaderboard;
        }       
        else if (siblings.size() == 1)
        {       
            // Only one object - just return it
            return siblings.iterator().next();
        }       
        else    
        {       
            // No object returned - return null 
            return null;
        }       
    }   
}

GameLeaderboardMutation.java

import com.basho.riak.client.cap.Mutation;

public class GameLeaderboardMutation implements Mutation<GameLeaderboard>
{
    private NameScorePair nsp;
        
    public GameLeaderboardMutation(NameScorePair nsp)
    {   
        this.nsp = nsp;
    }   

    /*
     * And at the heart of things is this method. After the data in Riak has
     * been converted to GameLeaderboard Objects and any siblings resolved, 
     * Mutation.apply() is called and it is where you will do any and all modifications
     *
     * Here we add the NameScorePair we passed to the constructor to the 
     * GameLeaderboard object. After this our modified data will be stored back
     * to Riak
     */
    public GameLeaderboard apply(GameLeaderboard original)
    {   
        original.addScore(nsp);
        return original;
    }   
}

GameLeaderboard.java

import com.basho.riak.client.convert.RiakKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.TreeSet;

public final class GameLeaderboard
{
    /*
     * The @RiakKey annotation allows the StoreObject to extract the key you wish to use
     * from your POJO. If you're using the default JSONConverter, this is excluded
     * from serialization
     */
    @RiakKey private String gameName;
    private TreeSet<NameScorePair> scoreList = new TreeSet<NameScorePair>();
        
    // required by Jackson for JSON serialization
    public GameLeaderboard() {}
        
    public GameLeaderboard(String gameName)
    {   
        this.gameName = gameName;
    }   
        
    public GameLeaderboard(GameLeaderboard other)
    {   
        this.gameName = other.getGameName();
        this.addScores(other.getScoreList());
    }   
        
    public void addScore(NameScorePair s)
    {   
        scoreList.add(s);
        if (scoreList.size() > 5)
            scoreList.pollFirst();
    }   
        
    public void addScores(Collection<NameScorePair> scores)
    {   
        scoreList.addAll(scores);
        while (scoreList.size() > 5)
            scoreList.pollFirst();
                
    }   
        
    public String getGameName()
    {   
        return gameName;
    }   
        
    public ArrayList<NameScorePair> getScoreList()
    {   
        return new ArrayList<NameScorePair>(scoreList.descendingSet());
    }   
}

NameScorePair.java

public class NameScorePair implements Comparable<NameScorePair>
{
   private String name;
   private int score;

   // Required by Jackson for JSON serialization
   public NameScorePair() {}
   
   public NameScorePair(String name, int score)
   {    
      this.name = name;
      this.score = score;
   }    
        
   public int compareTo(NameScorePair t)
   {    
      if (this.getScore() < t.getScore())
         return -1;     
      else if (this.getScore() > t.getScore())
         return 1;      
      else if (this.getName().equalsIgnoreCase(name))
         return 0;      
      else      
         return -1;     
   }    

   @Override
   public int hashCode()
   {    
      int hash = 3; 
      hash = 47 * hash + (this.name != null ? this.name.hashCode() : 0);
      hash = 47 * hash + this.score;
      return hash;
   }    

   @Override
   public boolean equals(Object o)
   {    
      if (o == null)
      {         
         return false;  
      }         
      else if (o instanceof NameScorePair)
      {         
         return ((name.equalsIgnoreCase(((NameScorePair)o).getName())) &&
            (score == ((NameScorePair)o).getScore()));
      }         
      else      
         return false;  
   }    
        
   public int getScore()
   {    
      return score;
   }    
   
   public String getName()
   {
      return name;
   }
}
## Storing previously fetched and modified data

As noted in Houston, we have a problem section on the fetching data from Riak page, if you want to do a read/modify/write cycle outside of the store operation, you need to deal with IRiakObjects directly. We've shown how you would fetch data with the intention of modifying it then storing it afterward so the example below extends that by showing how you would modify then store the data back to Riak. As noted in the example of fetching the data, the GameLeaderboardResolver needed to be modified, as well as the GameLeaderboard class itself.

App.java

import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.IRiakObject;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
import com.basho.riak.client.builders.RiakObjectBuilder;
import com.basho.riak.client.http.util.Constants;
import java.io.IOException;
import org.codehaus.jackson.map.ObjectMapper;

public class App
{
    public static void main( String[] args ) throws RiakException, IOException
    {
        IRiakClient myDefaultHttpClient = RiakFactory.httpClient();
        Bucket b = myDefaultHttpClient.createBucket("demo_bucket").allowSiblings(true).execute();
                
        // Fetch data from Riak as an IRiakObject
        IRiakObject myObject = b.fetch("SuperCoolGame")
                                   .withResolver(new GameLeaderboardResolver())
                                   .execute(); 
                
        /* If there was data in Riak associated with the specified key, 
         * You now have an IRiakObject. Your data is contained within it and
         * is accessable via the IRIakObject.getValue() and 
         * IRiakObject.getValueAsString() methods. 
         *       
         * You can modify this data, then
         * put it back in the IRiakObject via the IRiakObject.setValue() methods.
         *       
         * Storing the IRiakObject back to Riak is now possible via the Bucket.Store()
         * methods.
         */      
        if (myObject != null)
            System.out.println(myObject.getValueAsString());
        else    
            myObject = RiakObjectBuilder.newBuilder("demo_bucket", "SuperCoolGame")
                            .withContentType(Constants.CTYPE_JSON_UTF8)
                            .withValue("{}")            
                            .build();                   
                
        // Modify data, store back to Riak 
                
        /* we're going to use the Jackson to convert the payload to/from 
         * our GameLeaderboard object. You could use a different JSON library 
         * here such as Gson, or different serialization library entirely
         */      
                
        ObjectMapper objectMapper = new ObjectMapper();
        GameLeaderboard glb = objectMapper.readValue(myObject.getValueAsString(), GameLeaderboard.class);
                
        NameScorePair nsp = new NameScorePair("Brian",1000000);
        glb.addScore(nsp);
                
        myObject.setValue(objectMapper.writeValueAsBytes(glb));
                
        // Store the IRiakObject back to Riak 
        b.store("SuperCoolGame", myObject).withResolver(new GameLeaderboardResolver()).execute();
                
                
        // Refetch and display, showing changes 
        myObject = b.fetch("SuperCoolGame")
                        .withResolver(new GameLeaderboardResolver())
                        .execute();             
                
        System.out.println(myObject.getValueAsString());
                
                
                
    }
}

GameLeaderboardResolver.java

import com.basho.riak.client.IRiakObject;
import com.basho.riak.client.cap.ConflictResolver;
import com.basho.riak.client.cap.VClock;
import com.basho.riak.client.convert.JSONConverter;
import java.util.Collection;
import java.util.Iterator;

public class GameLeaderboardResolver implements ConflictResolver<IRiakObject>
{

    /*
     * Riak hands us a list of IRiakObjects objects. Our job is to reconcile
     * those objects and return a single, resolved IRiakObject
     *
     * In this example, the logic is pretty straightforard. in our GameLeaderboard
     * class we created a addScores(Collection<NameScorePair>) method that will do the
     * heavy lifting for us. By adding all the lists into one GameLeaderboard
     * via that method, we end up with the top 5 scores from all the siblings
     *
     * Worth noting is that your ConflictResolver is *always* called, even if
     * there are no siblings, or even if there is no object in Riak
     */

    public IRiakObject resolve(Collection<IRiakObject> siblings)
    {
        if (siblings.size() > 1)
        {
            // We have siblings, need to resolve them
            Iterator<IRiakObject> i = siblings.iterator();

            /*
             * Convert the JSON contained in the IRiakObjects to
             * GameLeaderboard using the JSONConverter class.
             *
             * The constructor is expecting a bucket and key so it can use them
             * to construct a new IRiakObject at the end. Note we have to grab
             * the VClock from one of the siblings to preserve it
             *
             * You could use a different JSON serialization library here, such
             * as Gson, or another serialization library entirely
             */

            IRiakObject first = i.next();
            VClock vclock = first.getVClock();

            JSONConverter<GameLeaderboard> converter =
                new JSONConverter<GameLeaderboard>(GameLeaderboard.class,
                                                    first.getBucket(),
                                                    first.getKey());


            GameLeaderboard resolvedLeaderboard = new GameLeaderboard(converter.toDomain(first));

            while (i.hasNext())
            {
                resolvedLeaderboard.addScores(converter.toDomain(i.next()).getScoreList());
            }

            // Here we again use the JSONConverter which will return an IRiakObject
            // with our vector clock
            return converter.fromDomain(resolvedLeaderboard, vclock);

        }
        else if (siblings.size() == 1)
        {
            // Only one object - just return it
            return siblings.iterator().next();
        }
        else
        {
            // No object returned - return null
            return null;
        }

    }
}

GameLeaderboard.java

import com.basho.riak.client.convert.RiakKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.TreeSet;

public final class GameLeaderboard
{
    private TreeSet<NameScorePair> scoreList = new TreeSet<NameScorePair>();
        
    // required by Jackson for JSON serialization
    public GameLeaderboard() {}
        
    public GameLeaderboard(GameLeaderboard other)
    {   
        this.addScores(other.getScoreList());
    }   
        
    public void addScore(NameScorePair s)
    {   
        scoreList.add(s);
        if (scoreList.size() > 5)
            scoreList.pollFirst();
    }   
        
    public void addScores(Collection<NameScorePair> scores)
    {   
        scoreList.addAll(scores);
        while (scoreList.size() > 5)
            scoreList.pollFirst();
                
    }   
        
    public ArrayList<NameScorePair> getScoreList()
    {   
        return new ArrayList<NameScorePair>(scoreList.descendingSet());
    }   
        
}

NameScorePair.java

public class NameScorePair implements Comparable<NameScorePair>
{
    private String name;
    private int score;

    // Required by Jackson for JSON serialization
    public NameScorePair() {}
        
        
    public NameScorePair(String name, int score)
    {   
        this.name = name; 
        this.score = score;
    }   
        
    public int compareTo(NameScorePair t)
    {   
        if (this.getScore() < t.getScore())
            return -1;  
        else if (this.getScore() > t.getScore())
            return 1;   
        else if (this.getName().equalsIgnoreCase(name))
            return 0;   
        else    
            return -1;  
    }   

    @Override
    public int hashCode()
    {   
        int hash = 3;
        hash = 47 * hash + (this.name != null ? this.name.hashCode() : 0);
        hash = 47 * hash + this.score;
        return hash;
    }   

    @Override
    public boolean equals(Object o)
    {   
        if (o == null)
        {       
            return false;
        }       
        else if (o instanceof NameScorePair)
        {       
            return ((name.equalsIgnoreCase(((NameScorePair)o).getName())) &&
                (score == ((NameScorePair)o).getScore()));
        }       
        else    
            return false;
    }   
        
    public int getScore()
    {   
        return score;
    }   
        
    public String getName()
    {   
        return name;
    }   
}
Clone this wiki locally