python - pickle error on spark filter -
when filter rdd using closure refers object, pickle error.
without object:
>>> mappartitionsrdd[369] @ mappartitions @ serdeutil.scala:143 >>> b = a.filter(lambda row: row.foo == 1) >>> b pythonrdd[374] @ rdd @ pythonrdd.scala:43
with object:
>>> z.foo 1 >>> b = a.filter(lambda row: row.foo == z.foo) >>> type(b) <class 'pyspark.rdd.pipelinedrdd'> >>> b traceback (most recent call last): file "<stdin>", line 1, in <module> file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/rdd.py", line 142, in __repr__ return self._jrdd.tostring() file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/rdd.py", line 2107, in _jrdd pickled_command = ser.dumps(command) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/serializers.py", line 402, in dumps return cloudpickle.dumps(obj, 2) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 816, in dumps cp.dump(obj) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 133, in dump return pickle.pickler.dump(self, obj) file "/usr/lib64/python2.6/pickle.py", line 224, in dump self.save(obj) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/usr/lib64/python2.6/pickle.py", line 562, in save_tuple save(element) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function self.save_function_tuple(obj, [themodule]) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple save((code, closure, base_globals)) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple save(element) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/usr/lib64/python2.6/pickle.py", line 600, in save_list self._batch_appends(iter(obj)) file "/usr/lib64/python2.6/pickle.py", line 633, in _batch_appends save(x) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function self.save_function_tuple(obj, [themodule]) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple save((code, closure, base_globals)) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple save(element) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/usr/lib64/python2.6/pickle.py", line 600, in save_list self._batch_appends(iter(obj)) file "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends save(tmp[0]) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function self.save_function_tuple(obj, modlist) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple save(f_globals) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.pickler.save_dict(self, obj) file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) file "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems save(v) file "/usr/lib64/python2.6/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 616, in save_reduce save(cls) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 467, in save_global d),obj=obj) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 631, in save_reduce save(args) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple save(element) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.pickler.save_dict(self, obj) file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems save(v) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function self.save_function_tuple(obj, modlist) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple save(f_globals) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.pickler.save_dict(self, obj) file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems save(v) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 547, in save_inst self.save_inst_logic(obj) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 537, in save_inst_logic save(stuff) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.pickler.save_dict(self, obj) file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems save(v) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 547, in save_inst self.save_inst_logic(obj) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 537, in save_inst_logic save(stuff) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.pickler.save_dict(self, obj) file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems save(v) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/usr/lib64/python2.6/pickle.py", line 600, in save_list self._batch_appends(iter(obj)) file "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends save(tmp[0]) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 547, in save_inst self.save_inst_logic(obj) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 537, in save_inst_logic save(stuff) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.pickler.save_dict(self, obj) file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems save(v) file "/usr/lib64/python2.6/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 650, in save_reduce save(state) file "/usr/lib64/python2.6/pickle.py", line 286, in save f(self, obj) # call unbound method explicit self file "/opt/cloudera/parcels/cdh-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict pickle.pickler.save_dict(self, obj) file "/usr/lib64/python2.6/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) file "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems save(v) file "/usr/lib64/python2.6/pickle.py", line 313, in save (t.__name__, obj)) pickle.picklingerror: can't pickle 'lock' object: <thread.lock object @ 0x7f5f92cc64c8>
what doing wrong?
what other fields z
contain? when serializing closure, object passed in, not field you're accessing. if other fields refer unserializable entities (ex: spark context object), you'll serialization error.
Comments
Post a Comment