apache spark - Are multiple reduceByKey on the same RDD compiled into a single scan? -


suppose have rdd (50m records/dayredu) want summarize in several different ways. rdd records 4-tuples: (keep, foo, bar, baz).

  • keep - boolean
  • foo, bar, baz - 0/1 int

i want count how many of each of foo &c kept , dropped, i.e., have following foo (and same bar , baz):

rdd.filter(lambda keep, foo, bar, baz: foo == 1)    .map(lambda keep, foo, bar, baz: keep, 1)    .reducebykey(operator.add) 

which return (after collect) list [(true,40000000),(false,10000000)].

the question is: there easy way avoid scanning rdd 3 times (once each of foo, bar, baz)?

what mean not way rewrite above code handle 3 fields, telling spark process 3 pipelines in single pass.

it's possible execute 3 pipelines in parallel submitting job different threads, pass through rdd 3 times , require 3x more resources on cluster.

it's possible job done in 1 pass rewriting job handle counts @ once - answer regarding aggregate option. splitting data in pairs (keep, foo) (keep, bar), (keep, baz) another.

it's not possible job done in 1 pass without code changes, there not way spark know jobs relate same dataset. @ most, speed of subsequent jobs after first 1 improved caching initial rdd rdd.cache before .filter().map().reduce() steps; still pass through rdd 3 times, 2nd , 3rd time potentially lot faster if data fits in memory of cluster:

rdd.cache // first reducebykey action trigger cache , rdd data kept in memory val foo = rdd.filter(foofilter).map(foomap).reducebykey(???) // subsequent operations execute faster rdd available in mem val bar = rdd.filter(barfilter).map(barmap).reducebykey(???) val baz = rdd.filter(bazfilter).map(bazmap).reducebykey(???) 

if doing this, create pairs of relevant data , count them in single pass:

// split initial tuple pairs keyed data type ("foo", "bar", "baz") , keep information. datapairs contain data like: (("bar",true),1), (("foo",false),1)  val datapairs = rdd.flatmap{case (keep, foo, bar, baz) =>      def condpair(name:string, x:int):option[((string,boolean), int)] = if (x==1) some(((name,keep),x)) else none     seq(condpair("foo",foo), condpair("bar",bar), condpair("baz",baz)).flatten } val totals = datapairs.reducebykey(_ + _) 

this easy , pass on data once, requires rewriting of code. i'd scores 66,66% in answering question.


Comments

Popular posts from this blog

java - Could not locate OpenAL library -

c++ - Delete matches in OpenCV (Keypoints and descriptors) -

sorting - opencl Bitonic sort with 64 bits keys -