How Does Apache Beam's Combinevalues Operate Over Elements When Executing Arithmetic Operations
Solution 1:
The reason the combiner is being called twice is because of the MapReduce phases. Since the function you are using (halving the mean) is not associative, you'd need a an "advance combiner" as in the example 8 you mention.
What is happening in your current code is, from (xxx, 1)
calculate the half mean (xxx, 0.5)
and then, when merging the values, it halves it again, making (xxx, 0.25)
.
In this answer I explain a similar concept.
For your particular case, as mentioned, you need "advance combiners"
with beam.Pipeline() as pipeline:
defcombiner(elements):
print(elements)
returnsum(elements)/2classHalfMean(beam.CombineFn):
defcreate_accumulator(self):
# Tuple of sum, countreturn (0, 0)
defadd_input(self, accumulator, input):
# Add the current element to sum, add one to count
new_tuple = (accumulator[0] + input, accumulator[1] + 1)
return new_tuple
defmerge_accumulators(self, accumulators):
# Join all accumulators
partial_sums = [x[0] for x in accumulators]
partial_counts = [x[1] for x in accumulators]
merged_tuple = (sum(partial_sums), sum(partial_counts))
return merged_tuple
defextract_output(self, sum_count_tuple):
# Now we can output half of the mean
mean = sum_count_tuple[0]/sum_count_tuple[1]
return mean/2
counts = ( pipeline
| 'create' >> beam.Create(['xxx'])
| 'key it' >> beam.Map(lambda elem: (elem, 1))
#| 'combine' >> beam.CombinePerKey(combiner)
| 'advance combine' >> beam.CombinePerKey(HalfMean())
| 'print' >> beam.Map(print)
)
I'm leaving your old combiner with a print so you see what's happening.
Anyhow, that is still not a CombineValues
but a CombinerPerKey
. CombineValues
takes a key value pair on which the value is an iterator, and applies the combiner to it. In the following case, the elements that it's taking are ('a', [1, 2, 3])
and ('b', [10])
. Here you have the example
kvs = [('a', 1),
('a', 2),
('a', 3),
('b', 10),
]
combine_values = (pipeline
| 'create_kvs' >> beam.Create(kvs)
| 'gbk' >> beam.GroupByKey()
| 'combine values' >> beam.CombineValues(HalfMean())
| 'print cv' >> beam.Map(print)
)
Post a Comment for "How Does Apache Beam's Combinevalues Operate Over Elements When Executing Arithmetic Operations"