Reservoir Sampling Sketch Pig UDFs

Instructions

  • get jars
  • save the following script as varopt_example.pig
  • adjust jar versions and paths as necessary
  • save the below data into a file called data.txt
  • copy data to hdfs: “hadoop fs -copyFromLocal data.txt”
  • run pig script: “pig reservoir_example.pig”

reservoir_example.pig script

register datasketches-memory-2.0.0.jar;
register datasketches-java-3.1.0.jar;
register datasketches-pig-1.1.0.jar;

-- very small sketch just for the purpose of this tiny example
DEFINE ReservoirSampling org.apache.datasketches.pig.sampling.ReservoirSampling('4');
DEFINE ReservoirUnion org.apache.datasketches.pig.sampling.ReesrvoirUnion('4');

raw_data = LOAD 'data.txt' USING PigStorage('\t') AS
    (scale: double, label: chararray);

-- make a few independent sketches from the input data
sketches = FOREACH
    (GROUP raw_data ALL)
GENERATE
    DataToSketch(raw_data) AS sketch0,
    DataToSketch(raw_data) AS sketch1,
    DataToSketch(raw_data) AS sketch2
    ;

sketchBag = FOREACH
    sketches
GENERATE
    TOBAG(sketch0,
          sketch1,
          sketch2))
    ;

result = FOREACH
    sketchBag
GENERATE
    FLATTEN(ReservoirUnion(*)) AS (n, k, samples:{(scale, label)})
    ;

DUMP result;
DESCRIBE result;

The test data has 2 fields: scale and label. The first step of the query creates several reservoir samples from the input data. We merge the sketches into a bag in the next step, and then union the independent sketches and dump the results.

Results:

From ‘DUMP result’:

(24,4,{(30.0,h),(7.0,g),(6.0,f),(5.0,e)})

Running this script many, we will see each element appear with equal probability.

From ‘DESCRIBE result’:

result: {n: long,k: int,samples: {(scale: double,label: chararray)}}

data.txt (tab separated)

1.0	a
2.0	b
3.0	c
4.0	d
5.0	e
6.0	f
7.0	g
30.0	h