python - How does Apache-Spark work with methods inside a class -
i learning apache-spark now. after reading spark tutorials, understand how pass python function apache-spark deal rdd dataset. still have no ideas on how apache-spark works methods inside class. example, have code below:
import numpy np import copy pyspark import sparkconf, sparkcontext class a(): def __init__(self, n): self.num = n class b(a): ### copy item of class b. def __init__(self, a): self.num = copy.deepcopy(a.num) ### print out item of b def display(self, s): print s.num return s def main(): ### locally run application "test" using spark. conf = sparkconf().setappname("test").setmaster("local[2]") ### setup spark configuration. sc = sparkcontext(conf = conf) ### "data" list store list of instances of class a. data = [] in np.arange(5): x = a(i) data.append(x) ### "lines" separate "data" in spark. lines = sc.parallelize(data) ### parallelly creates list of instances of class b using ### spark "map". temp = lines.map(b) ### got error when runs following code: ### nameerror: global name 'display' not defined. temp1 = temp.map(display) if __name__ == "__main__": main()
in fact, used above code parallelly generate list of instances of class b
using temp = lines.map(b)
. after that, did temp1 = temp.map(display)
, wanted parallelly print out each of items in list of instances of class b
. error shows up: nameerror: global name 'display' not defined.
wondering how can fix error, if still use apache-spark parallel computing. appreciate if helps me.
structure
. ├── ab.py └── main.py
main.py
import numpy np pyspark import sparkconf, sparkcontext import os ab import a, b def main(): ### locally run application "test" using spark. conf = sparkconf().setappname("test").setmaster("local[2]") ### setup spark configuration. sc = sparkcontext( conf = conf, pyfiles=[ os.path.join(os.path.abspath(os.path.dirname(__file__)), 'ab.py')] ) data = [] in np.arange(5): x = a(i) data.append(x) lines = sc.parallelize(data) temp = lines.map(b) temp.foreach(lambda x: x.display()) if __name__ == "__main__": main()
ab.py
import copy class a(): def __init__(self, n): self.num = n class b(a): ### copy item of class b. def __init__(self, a): self.num = copy.deepcopy(a.num) ### print out item of b def display(self): print self.num
comments:
- once again - printing bad idea. ignoring spark architecture there chance bottleneck in program.
- if need diagnostic output consider logging or collect sample , inspect locally:
for x in rdd.sample(false, 0.001).collect(): x.display()
- for side effects use
foreach
instead ofmap
- i modified
display
method. wasn't sure shoulds
in context
Comments
Post a Comment