API Snapshots: Java Core, C++ Core, Python, Memory, Pig, Hive,

Concurrent Theta Sketch

Concurrent Theta sketch is a manifestation of a generic approach for parallelizing sketches while bounding the error such parallelism introduces1.

At its core, a generic concurrent sketch ingests data through multiple sketches that are local to the inserting threads.
The data in these local sketches, which are bounded in size, is merged into a single shared sketch by utilizing the sketch mergability property. Queries are served from a snapshot of the shared sketch. This snapshot is taken frequently enough to guarantee the result’s freshness, and seldom enough to not become the bottleneck of the sketch.

GenericConcurrentSketch

Unlike previous solutions, this design enables simultaneous queries and updates to a sketch from an arbitrary number of threads. The responsibility for merging the thread-local sketch into the shared sketch is divided into two

  1. Eager propagation. When the sketch is small any delay in merging the local data into the shared thread–so it is captured by the snapshot–can incur a large error in the query result. Therefore, data is eagerly propagated to the shared sketch by the inserting thread upon each update.
  2. Lazy propagation. When sketches are big enough, the local sketches are used to buffer data that should be added to the shared sketch. A background propagation thread continuously merges full local sketches into the shared sketch.

Implementation and User API

Both the local sketch and the shared sketch are descendants of UpdateSketch and therefore support its API. However, it is important that the shared sketch is only used to get the estimate, while updates only go through the local sketches. The shared sketch can be allocated either off-heap or on-heap, while the local sketch is always allocated on-heap.

Like other Theta sketches, UpdateSketchBuilder is used to build the shared and local sketches. It is imperative that the shared sketch is built first. Then, at the context of an application thread(/s) that feeds the data a local sketch is created and connected to the shared sketch. This is a list of the configuration parameters for the builder:

  1. Buffer size of shared sketch
  2. Buffer size of local sketches
  3. Size of the threads pool to handle propagation of all sketches
  4. Flag to indicate if the propagated data is to be sorted prior to propagation
  5. Max concurrency error; the point the sketch flips from exact to estimate mode is derived from this parameter
  6. Max number of local threads to be used

Code Example for Building a Concurrent Theta Sketch

import org.apache.datasketches.memory.WritableDirectHandle;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.theta.UpdateSketchBuilder;

class ApplicationWithsketches {

    private UpdateSketchBuilder bldr;
    private UpdateSketch sharedSketch;
    private Thread writer;

    private int sharedLgK;
    private int localLgK;
    private boolean ordered;
    private boolean offHeap;
    private int poolThreads;
    private double maxConcurrencyError;
    private int maxNumWriterThreads;
    private WritableDirectHandle wdh;
    private WritableMemory wmem;


    //configures builder for both local and shared
    void buildConcSketch() {
        bldr = new UpdateSketchBuilder();

        // All configuration parameters are optional
        bldr.setLogNominalEntries(sharedLgK);     // default 12 (K=4096)
        bldr.setLocalLogNominalEntries(localLgK); // default 4 (B=16)
        bldr.setNumPoolThreads(poolThreads);      // default 3
        bldr.setPropagateOrderedCompact(ordered); // default true
        bldr.setMaxConcurrencyError(maxConcurrencyError);   // default 0
        bldr.setMaxNumLocalThreads(maxNumWriterThreads);   // default 1
        
        // build shared sketch first
        final int maxSharedUpdateBytes = Sketch.getMaxUpdateSketchBytes(1 << sharedLgK);    
        if (offHeap) {
          wdh = WritableMemory.allocateDirect(maxSharedUpdateBytes);
          wmem = wdh.get();
        } else {
          wmem = null; //WritableMemory.allocate(maxSharedUpdateBytes);
        }
        sharedSketch = bldr.buildShared(wmem);
    }
    
    void mainApplicationMethod() {
        // init attributes, e.g, with properties file
        ...
        buildConcSketch();
        writer = new WriterThread(bldr, sharedSketch);
        
        while(#some_application_condition) {
            // get estimate through shared sketch
            doSomethingWithEstimate(sharedSketch.getEstimate());
            try {
                Thread.sleep(100);
            } catch (final InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
    
// Context of writer thread 
class WriterThread extends Thread {

    private UpdateSketch local;

    // build local sketches from bldr and reference to shared sketch
    public WriterThread(final UpdateSketchBuilder bldr, final UpdateSketch shared) {
        local = bldr.buildLocal(shared);
        //init input stream, such as a queue, or a communication buffer, etc.
    }
    
    // updtae concurrent sketch through local sketch - no need for locks or any other synchronization
    public void run() {
        while(true) {
            if(#input_stream_is_not_empty) {
            long data = getDataFromInputStream();
            local.update(data);
        }
    }
}

Serializing a Concurrent Sketch

A concurrent sketch is not a single unit of computation. It is composed of the shared sketch and the local buffers. Only the shared sketch supports serialization as it captures the most up-to-date content of the sketch. In the current implementation, deserializing a shred sketch yields an UpdateSketch. Therefore when de-serializing a concurrent sketch both the shared sketch and the local buffers need to be re-created again.

Code Example for Serializing and Deserializing a Concurrent Theta Sketch

import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.theta.UpdateSketchBuilder;

public class serDeTest {
    
    private UpdateSketchBuilder bldr;
    private UpdateSketch sharedSketch;
    private WritableMemory wmem;
    
    void serDeConcurrentQuickSelectSketch() {
        int k = 512;
        
        // build shared sketch and local buffer as in the example above
        bldr = new UpdateSketchBuilder();
        ...
        sharedSketch = bldr.buildShared(wmem);
        UpdateSketch local = bldr.buildLocal(sharedSketch);
        
        int i=0;
        // update sketch through local buffer
        for (; i<10000; i++) {
          local.update(i);
        }
        
        // serialize shared
        byte[]  serArr = shared.toByteArray();
        Memory srcMem = Memory.wrap(serArr);
        Sketch recovered = Sketches.heapifyUpdateSketch(srcMem);

        //reconstruct to Native/Direct
        final int bytes = Sketch.getMaxUpdateSketchBytes(k);
        wmem = WritableMemory.allocate(bytes);
        shared = bldr.buildSharedFromSketch((UpdateSketch)recovered, wmem);
        UpdateSketch local2 = bldr.buildLocal(shared);
        
        // check estimate ~10K
        System.out.println("Estimate="+sharedSketch.getEstimate();
        
        // continue updating through new local buffer
        for (; i<20000; i++) {
          local2.update(i);
        }            

        // check estimate ~20K
        System.out.println("Estimate2="+sharedSketch.getEstimate();
    }

}

[1] Arik Rinberg, Alexander Spiegelman, Edward Bortnikov, Eshcar Hillel, Idit Keidar, Hadar Serviansky, Fast Concurrent Data Sketches, https://arxiv.org/abs/1902.10995