python - pickle error on spark filter -


when filter rdd using closure refers object, pickle error.

without object:

>>> mappartitionsrdd[369] @ mappartitions @ serdeutil.scala:143 >>> b = a.filter(lambda row: row.foo == 1) >>> b pythonrdd[374] @ rdd @ pythonrdd.scala:43 

with object:

>>> z.foo 1 >>> b = a.filter(lambda row: row.foo == z.foo) >>> type(b) <class 'pyspark.rdd.pipelinedrdd'> >>> b traceback (most recent call last):   file "<stdin>", line 1, in <module>   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/rdd.py", line 142, in __repr__     return self._jrdd.tostring()   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/rdd.py", line 2107, in _jrdd     pickled_command = ser.dumps(command)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/serializers.py", line 402, in dumps     return cloudpickle.dumps(obj, 2)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 816, in dumps     cp.dump(obj)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 133, in dump     return pickle.pickler.dump(self, obj)   file "/usr/lib64/python2.6/pickle.py", line 224, in dump     self.save(obj)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/usr/lib64/python2.6/pickle.py", line 562, in save_tuple     save(element)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function     self.save_function_tuple(obj, [themodule])   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple     save((code, closure, base_globals))   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple     save(element)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/usr/lib64/python2.6/pickle.py", line 600, in save_list     self._batch_appends(iter(obj))   file "/usr/lib64/python2.6/pickle.py", line 633, in _batch_appends     save(x)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function     self.save_function_tuple(obj, [themodule])   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple     save((code, closure, base_globals))   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple     save(element)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/usr/lib64/python2.6/pickle.py", line 600, in save_list     self._batch_appends(iter(obj))   file "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends     save(tmp[0])   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function     self.save_function_tuple(obj, modlist)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple     save(f_globals)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict     pickle.pickler.save_dict(self, obj)   file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict     self._batch_setitems(obj.iteritems())   file "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems     save(v)   file "/usr/lib64/python2.6/pickle.py", line 331, in save     self.save_reduce(obj=obj, *rv)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 616, in save_reduce     save(cls)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 467, in save_global     d),obj=obj)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 631, in save_reduce     save(args)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple     save(element)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict     pickle.pickler.save_dict(self, obj)   file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict     self._batch_setitems(obj.iteritems())   file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems     save(v)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function     self.save_function_tuple(obj, modlist)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple     save(f_globals)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict     pickle.pickler.save_dict(self, obj)   file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict     self._batch_setitems(obj.iteritems())   file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems     save(v)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 547, in save_inst     self.save_inst_logic(obj)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 537, in save_inst_logic     save(stuff)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict     pickle.pickler.save_dict(self, obj)   file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict     self._batch_setitems(obj.iteritems())   file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems     save(v)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 547, in save_inst     self.save_inst_logic(obj)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 537, in save_inst_logic     save(stuff)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict     pickle.pickler.save_dict(self, obj)   file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict     self._batch_setitems(obj.iteritems())   file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems     save(v)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/usr/lib64/python2.6/pickle.py", line 600, in save_list     self._batch_appends(iter(obj))   file "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends     save(tmp[0])   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 547, in save_inst     self.save_inst_logic(obj)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 537, in save_inst_logic     save(stuff)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict     pickle.pickler.save_dict(self, obj)   file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict     self._batch_setitems(obj.iteritems())   file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems     save(v)   file "/usr/lib64/python2.6/pickle.py", line 331, in save     self.save_reduce(obj=obj, *rv)   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 650, in save_reduce     save(state)   file "/usr/lib64/python2.6/pickle.py", line 286, in save     f(self, obj) # call unbound method explicit self   file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict     pickle.pickler.save_dict(self, obj)   file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict     self._batch_setitems(obj.iteritems())   file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems     save(v)   file "/usr/lib64/python2.6/pickle.py", line 313, in save     (t.__name__, obj)) pickle.picklingerror: can't pickle 'lock' object: <thread.lock object @ 0x7f5f92cc64c8> 

what doing wrong?

what other fields z contain? when serializing closure, object passed in, not field you're accessing. if other fields refer unserializable entities (ex: spark context object), you'll serialization error.


Comments

Popular posts from this blog

java - Could not locate OpenAL library -

c++ - Delete matches in OpenCV (Keypoints and descriptors) -

sorting - opencl Bitonic sort with 64 bits keys -