apache spark - Are multiple reduceByKey on the same RDD compiled into a single scan? -
suppose have rdd (50m records/dayredu) want summarize in several different ways. rdd records 4-tuples: (keep, foo, bar, baz)
.
keep
- booleanfoo
,bar
,baz
- 0/1 int
i want count how many of each of foo
&c kept , dropped, i.e., have following foo
(and same bar
, baz
):
rdd.filter(lambda keep, foo, bar, baz: foo == 1) .map(lambda keep, foo, bar, baz: keep, 1) .reducebykey(operator.add)
which return (after collect
) list [(true,40000000),(false,10000000)]
.
the question is: there easy way avoid scanning rdd
3 times (once each of foo
, bar
, baz
)?
what mean not way rewrite above code handle 3 fields, telling spark process 3 pipelines in single pass.
it's possible execute 3 pipelines in parallel submitting job different threads, pass through rdd 3 times , require 3x more resources on cluster.
it's possible job done in 1 pass rewriting job handle counts @ once - answer regarding aggregate
option. splitting data in pairs (keep, foo) (keep, bar), (keep, baz)
another.
it's not possible job done in 1 pass without code changes, there not way spark know jobs relate same dataset. @ most, speed of subsequent jobs after first 1 improved caching
initial rdd rdd.cache
before .filter().map().reduce()
steps; still pass through rdd 3 times, 2nd , 3rd time potentially lot faster if data fits in memory of cluster:
rdd.cache // first reducebykey action trigger cache , rdd data kept in memory val foo = rdd.filter(foofilter).map(foomap).reducebykey(???) // subsequent operations execute faster rdd available in mem val bar = rdd.filter(barfilter).map(barmap).reducebykey(???) val baz = rdd.filter(bazfilter).map(bazmap).reducebykey(???)
if doing this, create pairs of relevant data , count them in single pass:
// split initial tuple pairs keyed data type ("foo", "bar", "baz") , keep information. datapairs contain data like: (("bar",true),1), (("foo",false),1) val datapairs = rdd.flatmap{case (keep, foo, bar, baz) => def condpair(name:string, x:int):option[((string,boolean), int)] = if (x==1) some(((name,keep),x)) else none seq(condpair("foo",foo), condpair("bar",bar), condpair("baz",baz)).flatten } val totals = datapairs.reducebykey(_ + _)
this easy , pass on data once, requires rewriting of code. i'd scores 66,66% in answering question.
Comments
Post a Comment