Saturday, March 21, 2015

Using CursorMark for deep paging in Solr

Have you ever had to search through 100 million XML blobs to identify which XML blobs had some specific data (or, in my case, was missing some data)? It takes forever, and it's not very fun. I had a situation at work where it looked like I was going to have to do just that. Fortunately, we have the same data (more or less) in Solr cores. I just needed to do a NOT query on a specific field.

The Solr documentation suggests that you use a CursorMark parameter in your query if you need to page through more than a thousand records. 

The following is an example of using CursorMark using SolrJ. The query will return all documents that do not have data for targetField. It then loops through the matches using the cursorMark parameter as a way to tell solr where to retrieve the next set of matches. One of the conditions of using cursorMark to page through results is that you need to sort on a unique field. The schema we are using has a unique field named "uid". 

Something to note is that I had to explicitly set "timeAllowed" to 0 to say that I didn't want any time restrictions on the query.

Yonik Seeley made an interesting point in his Solr 'n Stuff blog about using cursorMark. You can change the returned fields, the facet fields, and number of rows returned, for a specific cursorMark since the cursorMark contains the state of the current search - the state isn't stored server-side. This makes it very easy for a client to allow a user to vary their experience while they page through results. This feels very similar to how you page through results using MySQL and other databases.

import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.CursorMarkParams;
import java.io.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

public class FindMissingData {

  private static final int MAX_ALLOWED_ROWS=1000;
  private static final String URL_PREFIX="http://somesolrserver:8983/solr/thecore";
  private static final String ZIP_ENTRY_NAME="TheData.txt";
  private static final String UNIQUE_ID_FIELD="uid";
  private static final String SOLR_QUERY="NOT targetField:[* TO *]";

  public static void main(String[] args) {

    if (args.length != 2) {
      System.out.println("java FindMissingData <rows per batch - max is 1000> <output zip file>");
    }

    int maxRows = Integer.parseInt(args[0]);
    String outputFile = args[1];

    // Delete zip file if it already exists - going to recreate it anyway
    File f = new File(outputFile);
    if(f.exists() && !f.isDirectory()) {
      f.delete();
    }

    FindMissingData.writeIdsForMissingData(outputFile, maxRows);
  }

  public static void writeIdsForMissingData(String outputFile, int maxRowCount) {

    if (maxRowCount > MAX_ALLOWED_ROWS) 
      maxRowCount = MAX_ALLOWED_ROWS;

    FileOutputStream fos = null;
    ZipOutputStream zos = null;

    try {
      fos = new FileOutputStream(outputFile, true);
      zos = new ZipOutputStream(fos);
      zos.setLevel(9);

      queryForMissingData(maxRowCount, zos);
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      if (zos != null) {
        try {
          zos.flush();
          zos.close();
        } catch (Exception e) {}
      }
      if (fos != null) {
        try {
          fos.flush();
          fos.close();
        } catch (Exception e) {}
      }
    }
  }

  private static void queryForMissingData(int maxRowCount, ZipOutputStream zos) throws IOException {
    ZipEntry zipEntry = new ZipEntry(ZIP_ENTRY_NAME);
    zos.putNextEntry(zipEntry);


    SolrServer server = new HttpSolrServer(URL_PREFIX);

    SolrQuery q = new SolrQuery(SOLR_QUERY);
    q.setFields(UNIQUE_ID_FIELD);
    q.setRows(maxRowCount);
    q.setSort(SolrQuery.SortClause.desc(UNIQUE_ID_FIELD));

    // You can't use "TimeAllowed" with "CursorMark"
    // The documentation says "Values <= 0 mean 
    // no time restriction", so setting to 0.
    q.setTimeAllowed(0);

    String cursorMark = CursorMarkParams.CURSOR_MARK_START;
    boolean done = false;
    QueryResponse rsp = null;
       
    while (!done) {
      q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
      try {
        rsp = server.query(q);
      } catch (SolrServerException e) {
        e.printStackTrace();
        return;
      }

      writeOutUniqueIds(rsp, zos);

      String nextCursorMark = rsp.getNextCursorMark();
      if (cursorMark.equals(nextCursorMark)) {
        done = true;
      } else {
        cursorMark = nextCursorMark;
      }
    }
  }

  private static void writeOutUniqueIds(QueryResponse rsp, ZipOutputStream zos) throws IOException {
    SolrDocumentList docs = rsp.getResults();

    for(SolrDocument doc : docs) {
      zos.write(
        String.format("%s%n",
          doc.get("uid").toString()).getBytes()
      );
    }
  }
}

Tuesday, March 3, 2015

Using Mockito to help test retry code...

There are times when you want to make sure that your code can retry an operation that might occaisionally fail due to environment issues.  For example, while using AWS SDK to get an object from S3 you might sometimes get an AmazonClientException - perhaps because of an endpoint being temporarily unreachable. You can simulate this situation using Mockito for the AmazonS3Client.

Mockito provides a fluent style way of how your mock objects respond.  Here is an example:

    AmazonS3Client mockClient = mock(AmazonS3Client.class);
    S3Object mockObject = mock(S3Object.class);

    when(mockClient.getObject(anyString(), anyString()))
        .thenThrow(new AmazonClientException("Something bad happened."))
        .thenReturn(mockObject);

This will cause the code to throw an AmazonClientException on the first call to getObject(), but then return the mocked S3Object on the second call to getObject().

This means that you can have happy path tests where the call eventually succeeds, and a test that will show how your code handles all retries failing, by just setting up your mock with the desired configuration of "thenThrow" and "thenReturn" entries.