iterator - Apache Spark iterating through RDD gives error using mappartitionstopair -


i using mappartitionstopair function javapairrdd per follows:

javapairrdd<mykeyclass, myvalueclass> myrdd;  javapairrdd<integer, double> myresult = myrdd.mappartitionstopair(new pairflatmapfunction<iterator<tuple2<mykeyclass,myvalueclass>>, integer, double>(){    public iterable<tuple2<myinteger, mydouble>> call(iterator<tuple2<mykeyclass, myvalueclass>> arg0) throws exception {    tuple2<mykeyclass, myvalueclass> temp = arg0.next(); //the error coming here...   treemap<integer, double> dic = new treemap<integer, double>();    do{     ........    // code compute newintegervalue , newdoublevalue temp    ........     dic.put(newintegervalue, newdoublevalue)    temp = arg0.next();     }while(arg0.hasnext());    }  } 

i able run on apache spark pseudo-distributed mode. not able run above code on cluster. getting following error:

java.util.nosuchelementexception: next on empty iterator     @ scala.collection.iterator$$anon$2.next(iterator.scala:39)     @ scala.collection.iterator$$anon$2.next(iterator.scala:37)     @ scala.collection.indexedseqlike$elements.next(indexedseqlike.scala:64)     @ org.apache.spark.interruptibleiterator.next(interruptibleiterator.scala:43)     @ scala.collection.convert.wrappers$iteratorwrapper.next(wrappers.scala:30)     @ incrementalgraph$6.call(mysparkjob.java:584)     @ incrementalgraph$6.call(mysparkjob.java:573)     @ org.apache.spark.api.java.javarddlike$$anonfun$fn$9$1.apply(javarddlike.scala:186)     @ org.apache.spark.api.java.javarddlike$$anonfun$fn$9$1.apply(javarddlike.scala:186)     @ org.apache.spark.rdd.rdd$$anonfun$13.apply(rdd.scala:601)     @ org.apache.spark.rdd.rdd$$anonfun$13.apply(rdd.scala:601)     @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:35)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:263)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:230)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:61)     @ org.apache.spark.scheduler.task.run(task.scala:56)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:196)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1110)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:603)     @ java.lang.thread.run(thread.java:722) 

i using spark 1.2.0 on hadoop 2.2.0.

can me fix issue??

update: hasnext() gives true before calling of next() on iterator

i found answer.

i made myrdd storage level memory_only. before starting of mappartitonstopair transofrmations, had following line in code:

myrdd.persist(storagelevel.memory_only()); 

i removed , fixed program.

i don't know why fixed it. if can explain it, highly appreciated.


Comments

Popular posts from this blog

toolbar - How to add link to user registration inside toobar in admin joomla 3 custom component -

linux - disk space limitation when creating war file -