Skip to content Skip to sidebar Skip to footer

How Does Apache Beam's Combinevalues Operate Over Elements When Executing Arithmetic Operations

This is a bit of a contrived example, but I have been exploring the docs for CombineValues and wish understand what I'm seeing. If I combine values and perform some arithmetic oper

Solution 1:

The reason the combiner is being called twice is because of the MapReduce phases. Since the function you are using (halving the mean) is not associative, you'd need a an "advance combiner" as in the example 8 you mention.

What is happening in your current code is, from (xxx, 1) calculate the half mean (xxx, 0.5) and then, when merging the values, it halves it again, making (xxx, 0.25).

In this answer I explain a similar concept.

For your particular case, as mentioned, you need "advance combiners"

with beam.Pipeline() as pipeline:

    defcombiner(elements):
        print(elements)
        returnsum(elements)/2classHalfMean(beam.CombineFn):
        defcreate_accumulator(self):
            # Tuple of sum, countreturn (0, 0)

        defadd_input(self, accumulator, input):
            # Add the current element to sum, add one to count
            new_tuple = (accumulator[0] + input, accumulator[1] + 1)
            return new_tuple

        defmerge_accumulators(self, accumulators):
            # Join all accumulators
            partial_sums = [x[0] for x in accumulators]
            partial_counts = [x[1] for x in accumulators]
            merged_tuple = (sum(partial_sums), sum(partial_counts))
            return merged_tuple

        defextract_output(self, sum_count_tuple):
            # Now we can output half of the mean
            mean = sum_count_tuple[0]/sum_count_tuple[1]
            return mean/2

    counts = ( pipeline
             | 'create' >> beam.Create(['xxx'])
             | 'key it' >> beam.Map(lambda elem: (elem, 1))
             #| 'combine' >> beam.CombinePerKey(combiner)
             | 'advance combine' >> beam.CombinePerKey(HalfMean())
             | 'print' >> beam.Map(print)
           )

I'm leaving your old combiner with a print so you see what's happening.

Anyhow, that is still not a CombineValues but a CombinerPerKey. CombineValues takes a key value pair on which the value is an iterator, and applies the combiner to it. In the following case, the elements that it's taking are ('a', [1, 2, 3]) and ('b', [10]). Here you have the example

    kvs = [('a', 1),
           ('a', 2),
           ('a', 3),
           ('b', 10),
   ]

    combine_values = (pipeline
             | 'create_kvs' >> beam.Create(kvs)
             | 'gbk' >> beam.GroupByKey()
             | 'combine values' >> beam.CombineValues(HalfMean())
             | 'print cv' >> beam.Map(print)
           )

Post a Comment for "How Does Apache Beam's Combinevalues Operate Over Elements When Executing Arithmetic Operations"