The examples described on this page are very simple and their only purpose is to indicate how to use some features of riak-scala-client.

These examples have been bundled as a complete example mini-app, which can be found in the examples folder of the Github project.

A RiakAlbumRepository Actor

For this example, let's shamelessly steal the domain used by the ReactiveMongo project for their BSON Example, i.e music albums and their artists.

Let's start with the domain. For now we will be using the default spray-json serialization support and not worry about any secondary indexes yet. A simplified domain would probably look something like this:

import spray.json.DefaultJsonProtocol._

case class Track (number: Int, title: String)
object Track {
  implicit val jsonFormat = jsonFormat2(Track.apply)
}

case class Album (
  title: String,
  artist: String,
  releasedIn: Int,
  tracks: List[Track]
)

object Album {
  implicit val jsonFormat = jsonFormat4(Album.apply)
}

A simple RiakAlbumRepository actor would need to respond to a number of messages and reply with some other messages. Something like:

object AlbumRepositoryProtocol {
  case class StoreAlbum(album: Album)
  case class FetchAlbumByTitle(title: String)
}

Using the above domain and message protocol, let's write a failing unit test for the RiakAlbumRepository.

  "when receiving a StoreAlbum message, the RiakAlbumRepository" should {
    "store the album in the database" in new AkkaTestkitContext {
      val albumRepository = TestActorRef(Props[RiakAlbumRepository])

      verifyAlbumDoesNotExistInDatabase(album1.title)

      within(timeout) {
        albumRepository ! StoreAlbum(album1)

        val albumFromDb = expectMsgType[Album]

        albumFromDb must beEqualTo(album1)

        verifyAlbumExistsInDatabase(album1)
      }

      removeAlbumFromDatabase(album1)
    }
  }

This is only one of the tests and without any of the test data or helper functions. Have a look at the full code for this unit test to see the rest.

With some failing unit tests in place, we can now implement the repository:

class RiakAlbumRepository extends Actor with ActorLogging {
  import AlbumRepositoryProtocol._
  import context.dispatcher

  private val albums = RiakClient(context.system, "localhost", 8098).bucket("albums")

  def receive = {
    case StoreAlbum(album)        => storeAlbum(album, sender)
    case FetchAlbumByTitle(title) => fetchAlbumByTitle(title, sender)
  }

  private def storeAlbum(album: Album, actor: ActorRef) {
    albums.storeAndFetch(album.title, album)
          .map(value => value.as[Album])
          .onSuccess {
            case storedAlbum => actor ! storedAlbum
          }
  }

  private def fetchAlbumByTitle(title: String, actor: ActorRef) {
    albums.fetch(title)
          .map(valueOption => valueOption.map(_.as[Album]))
          .onSuccess {
            case albumOption => actor ! albumOption
          }
  }
}

In both cases, we convert raw RiakValue instances to instances of Album (or Option[Album]) by mapping over the Future and then we register a callback function to send the result to the actor that sent the request when the future completes successfully.

That's basically all it takes for a ery simple non-blocking Riak repository.

Dealing with vclocks and other meta data

Note that in the above example, the StoreAlbum message takes an instance of Album. This is fine for new albums but what if we wanted to update albums? The current implementation would not send along any vclock information and, if the albums bucket is configured to allow siblings, this will lead to conflicts that could have been prevented.

Depending on your situation, sometimes it is a good idea to keep the values returned from a low-level class like our album repository boxed inside a RiakMeta instance so you can use the meta data at a later time to perform an update. This would look something like:

object AlbumRepositoryProtocol {
  ...
  case class UpdateAlbum(album: RiakMeta[Album])
}

class RiakAlbumRepository extends Actor with ActorLogging {
  ...
  def receive = {
    ...
    case UpdateAlbum(album) => updateAlbum(album, sender)
  }

  ...
  private def updateAlbum(albumMeta: RiakMeta[Album], actor: ActorRef) {
    albums.storeAndFetch(albumMeta.data.title, albumMeta)
          .map(value => value.asMeta[Album])
          .onSuccess {
            case storedAlbumMeta => actor ! storedAlbumMeta
          }
  }

  private def fetchAlbumByTitle(title: String, actor: ActorRef) {
    albums.fetch(title)
          .map(valueOption => valueOption.map(_.asMeta[Album]))
          .onSuccess {
            case albumMetaOption => actor ! albumMetaOption
          }
  }
}

And the unit tests would look something like this:

  "when receiving an UpdateAlbum message, the RiakAlbumRepository" should {
    "store the album in the database" in new AkkaTestkitContext {
      val albumRepository = TestActorRef(Props[RiakAlbumRepository])
      val updatedAlbum = album1.copy(tracks = Track(13, "I Heard It Through the Grapevine") +: album1.tracks)

      storeAlbumInDatabase(album1)

      within(timeout) {
        albumRepository ! FetchAlbumByTitle(album1.title)

        val albumFromDb = expectMsgType[Option[RiakMeta[Album]]]

        albumFromDb must beSome[RiakMeta[Album]]
        albumFromDb.get.data must beEqualTo(album1)

        albumRepository ! UpdateAlbum(albumFromDb.get.map(_ => updatedAlbum))

        val updatedAlbumFromDb = expectMsgType[RiakMeta[Album]]

        updatedAlbumFromDb.data must beEqualTo(updatedAlbum)

        verifyAlbumExistsInDatabase(updatedAlbum)
      }

      removeAlbumFromDatabase(updatedAlbum)
    }
  }

Custom Serialization

So what if we want to customize the serialization format for some reason. Let's say you are writing an enterprise app and some guy in the standards department absolutely insists on you storing all data as XML (because it is a standard!)? After grumbling to your team mates and making fun of the guy, you might come up with something like this:

object Album {
  implicit object customXmlSerialization extends RiakSerializer[Album] with RiakDeserializer[Album] {
    def serialize(album: Album): (String, ContentType) = {
      val xml = <album>
                  <title>{album.title}</title>
                  <artist>{album.artist}</artist>
                  <releasedin>{album.releasedIn}</releasedin>
                  <tracks>
                  {album.tracks.map(track => <track><number>{track.number}</number><title>{track.title}</title></track>)}
                  </tracks>
                </album>.toString

       (xml, ContentType(`text/xml`))
    }

    def deserialize(data: String, contentType: ContentType): Album = {
      import scala.xml._
      def stringToXmlElem = XML.load(new java.io.StringReader(data))
      def xmlElemToTrack(elem: Elem) = Track((elem \ "number").text.toInt, (elem \ "title").text)
      def xmlElemToAlbum(elem: Elem) = Album(
        (elem \ "title").text,
        (elem \ "artist").text,
        (elem \ "releasedin").text.toInt,
        (elem \\ "track").toList.map(node => xmlElemToTrack(node.asInstanceOf[Elem]))
      )

      contentType match {
        case ContentType(`text/xml`, _) => xmlElemToAlbum(stringToXmlElem)
        case _ => throw RiakUnsupportedContentType(ContentType(`text/xml`), contentType)
      }
    }
  }
}

This is definitely not the best (de)serialization code ever written (e.g. absolutely no error handling whatsoever) but hopefully it gives a good impression of what it would take to write custom (de)serialization code.

Custom Indexes

Riak does not exactly offer the most powerfull query API and you probably knew that when you selected Riak for your project. But sometimes a little is more than enough and, in the case of our album repository, let's say all you need is some way to get a list of all albums for a certain artist and all albums released in a certain period. Secondary indexes to the rescue!

Implementing some indexes for our Album class is very easy. Just add a RiakIndexer[T} to the companion object (or anywhere else and import it where you need it). Something like:

object Album {
  ...

  implicit object indexer extends RiakIndexer[Album] {
    def index(album: Album) = Set(RiakIndex("artist", album.artist),
                                  RiakIndex("releasedIn", album.releasedIn))
  }
}

This custom indexer will be automatically used whenever an Album (or a RiakMeta[Album]) gets stored using one of the store methods on RiakBucket.

Implementing index queries in our repository is now pretty simple:

object AlbumRepositoryProtocol {
  ...
  case class FetchAlbumsByArtist(artist: String)
  case class FetchAlbumsReleasedBetween(startYear: Int, endYear: Int)
}

class RiakAlbumRepository extends Actor with ActorLogging {
  ...

  def receive = {
    ...
    case FetchAlbumsByArtist(artist) => fetchAlbumsByArtist(artist, sender)
    case FetchAlbumsReleasedBetween(startYear, endYear) =>
      fetchAlbumsReleasedBetween(startYear, endYear, sender)
  }

  ...

  private def fetchAlbumsByArtist(artist: String, actor: ActorRef) {
    albums.fetch("artists", artist)
          .map(values => values.map(_.as[Album]))
          .onSuccess {
            case albums => actor ! albums
          }
  }

  private def fetchAlbumsReleasedBetween(startYear: Int, endYear: Int, actor: ActorRef) {
    albums.fetch("releasedIn", startYear, endYear)
          .map(values => values.map(_.as[Album]))
          .onSuccess {
            case albums => actor ! albums
          }
  }
}

And that's it for our Riak albums repository.