iterator - Apache Spark iterating through RDD gives error using mappartitionstopair -
i using mappartitionstopair function javapairrdd per follows:
javapairrdd<mykeyclass, myvalueclass> myrdd; javapairrdd<integer, double> myresult = myrdd.mappartitionstopair(new pairflatmapfunction<iterator<tuple2<mykeyclass,myvalueclass>>, integer, double>(){ public iterable<tuple2<myinteger, mydouble>> call(iterator<tuple2<mykeyclass, myvalueclass>> arg0) throws exception { tuple2<mykeyclass, myvalueclass> temp = arg0.next(); //the error coming here... treemap<integer, double> dic = new treemap<integer, double>(); do{ ........ // code compute newintegervalue , newdoublevalue temp ........ dic.put(newintegervalue, newdoublevalue) temp = arg0.next(); }while(arg0.hasnext()); } }
i able run on apache spark pseudo-distributed mode. not able run above code on cluster. getting following error:
java.util.nosuchelementexception: next on empty iterator @ scala.collection.iterator$$anon$2.next(iterator.scala:39) @ scala.collection.iterator$$anon$2.next(iterator.scala:37) @ scala.collection.indexedseqlike$elements.next(indexedseqlike.scala:64) @ org.apache.spark.interruptibleiterator.next(interruptibleiterator.scala:43) @ scala.collection.convert.wrappers$iteratorwrapper.next(wrappers.scala:30) @ incrementalgraph$6.call(mysparkjob.java:584) @ incrementalgraph$6.call(mysparkjob.java:573) @ org.apache.spark.api.java.javarddlike$$anonfun$fn$9$1.apply(javarddlike.scala:186) @ org.apache.spark.api.java.javarddlike$$anonfun$fn$9$1.apply(javarddlike.scala:186) @ org.apache.spark.rdd.rdd$$anonfun$13.apply(rdd.scala:601) @ org.apache.spark.rdd.rdd$$anonfun$13.apply(rdd.scala:601) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:35) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:263) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:230) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:61) @ org.apache.spark.scheduler.task.run(task.scala:56) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:196) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1110) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:603) @ java.lang.thread.run(thread.java:722)
i using spark 1.2.0 on hadoop 2.2.0.
can me fix issue??
update: hasnext() gives true before calling of next() on iterator
i found answer.
i made myrdd storage level memory_only. before starting of mappartitonstopair transofrmations, had following line in code:
myrdd.persist(storagelevel.memory_only());
i removed , fixed program.
i don't know why fixed it. if can explain it, highly appreciated.
Comments
Post a Comment