Clare S. Y. Huang Data Scientist | Atmospheric Dynamicist

Pyspark error "Could not serialize object"

This post is related to the idea discuss in the post “How to Solve Non-Serializable Errors When Instantiating Objects In Spark UDFs”. Here I discuss the solution in a hypothetical scenario in pyspark. Suppose I have a class to transform string to sum of numbers like this:

import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

class AnimalsToNumbers(object):
    def __init__(self, spark):
        self._mapping = {'elephant': 1, 'bear': 4, 'cat': 9}
        self._spark = spark
        
    def addition(self, animal_str):
        return sum([self._mapping.get(x, 0)
                    for x in animal_str.split('+')])
    
    def add_up_animals(self, df):
        addition_udf = F.udf(self.addition, IntegerType())
        return df.withColumn('sum', addition_udf('animal'))

(In practical cases, self._mapping is a huge object containing the dictionary and other attributes that are derived from methods in the AnimalsToNumbers class.) If I want to transform a dataframe df that looks like this:

+-------------+
|       animal|
+-------------+
|elephant+bear|
|     cat+bear|
+-------------+

The operation

AnimalsToNumbers(spark).add_up_animals(df)

will lead to an error like this:

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
~/anaconda3/envs/pyspark_demo/lib/python3.5/site-packages/pyspark/serializers.py in dumps(self, obj)
    589         try:
--> 590             return cloudpickle.dumps(obj, 2)
    591         except pickle.PickleError:
...
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o25.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

The issue is that, as self._mapping appears in the function addition, when applying addition_udf to the pyspark dataframe, the object self (i.e. the AnimalsToNumbers class) has to be serialized but it can’t be.

A (surprisingly simple) way is to create a reference to the dictionary (self._mapping) but not the object:

class AnimalsToNumbers(object):
    def __init__(self, spark):
        self._mapping = {'elephant': 1, 'bear': 4, 'cat': 9}
        self._spark = spark
        
    
    def add_up_animals(self, df):
        mapping = self._mapping
        def addition(animal_str):
            return sum([mapping.get(x, 0) for x in animal_str.split('+')])

        addition_udf = F.udf(addition, IntegerType())
        return df.withColumn('sum', addition_udf('animal'))

AnimalsToNumbers(spark).add_up_animals(df).show() would yield:

+-------------+---+
|       animal|sum|
+-------------+---+
|elephant+bear|  5|
|     cat+bear| 13|
+-------------+---+

Yay :)

<< Previous Page