Clare S. Y. Huang Data Scientist | Atmospheric Dynamicist

I love to share what I've learnt with others. Check out my blog posts and notes about my academic research, as well as technical solutions on software engineering and data science challenges.
Opinions expressed in this blog are solely my own.


Pyspark error "Could not serialize object"

This post is related to the idea discuss in the post “How to Solve Non-Serializable Errors When Instantiating Objects In Spark UDFs”. Here I discuss the solution in a hypothetical scenario in pyspark.

Suppose I have a class to transform string to sum of numbers like this:

import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

class AnimalsToNumbers(object):
    def __init__(self, spark):
        self._mapping = {'elephant': 1, 'bear': 4, 'cat': 9}
        self._spark = spark
        
    def addition(self, animal_str):
        return sum([self._mapping.get(x, 0)
                    for x in animal_str.split('+')])
    
    def add_up_animals(self, df):
        addition_udf = F.udf(self.addition, IntegerType())
        return df.withColumn('sum', addition_udf('animal'))

(In practical cases, self._mapping is a huge object containing the dictionary and other attributes that are derived from methods in the AnimalsToNumbers class.) If I want to transform a dataframe df that looks like this:

+-------------+
|       animal|
+-------------+
|elephant+bear|
|     cat+bear|
+-------------+

The operation

AnimalsToNumbers(spark).add_up_animals(df)

will lead to an error like this:

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
~/anaconda3/envs/pyspark_demo/lib/python3.5/site-packages/pyspark/serializers.py in dumps(self, obj)
    589         try:
--> 590             return cloudpickle.dumps(obj, 2)
    591         except pickle.PickleError:
...
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o25.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

The issue is that, as self._mapping appears in the function addition, when applying addition_udf to the pyspark dataframe, the object self (i.e. the AnimalsToNumbers class) has to be serialized but it can’t be.

A (surprisingly simple) way is to create a reference to the dictionary (self._mapping) but not the object:

class AnimalsToNumbers(object):
    def __init__(self, spark):
        self._mapping = {'elephant': 1, 'bear': 4, 'cat': 9}
        self._spark = spark
        
    
    def add_up_animals(self, df):
        mapping = self._mapping
        def addition(animal_str):
            return sum([mapping.get(x, 0) for x in animal_str.split('+')])

        addition_udf = F.udf(addition, IntegerType())
        return df.withColumn('sum', addition_udf('animal'))

AnimalsToNumbers(spark).add_up_animals(df).show() would yield:

+-------------+---+
|       animal|sum|
+-------------+---+
|elephant+bear|  5|
|     cat+bear| 13|
+-------------+---+

Yay :)

Handling JSON in PostgreSQL

Found a useful cheatsheet that listed out operations on JSON in PostgreSQL.

If I want to list the rows where column col1 in table table1 contains a JSON object with the key key1, I can use:

select * from table1
where col1->'key1' IS NOT NULL

Local wave activity package updated to version 0.3.7

The python package hn2016_falwa has just been updated with the following changes:

  • Switched the unittest framework from unittest to pytest
  • Improved interface of the QGField object
  • Added a new function hn2016_falwa.download_data.retrieve_erai to download ERA-Interim data (but not connected to the main program yet)

My package uses f2py. When switching from unittest to pytest, there are several changes to make in setup.py and .travis to accommodate such usage:

In setup (numpy.distutils.core.setup), remove the argument

test_suite='tests.my_module_suite'

and add:

    packages=find_packages(),
    setup_requires=['pytest-runner'],
    tests_require=['pytest'],

In .travis, the script section looks like:

script:
  - python setup.py pytest

Visit to AOS at UW-Madison to give a Colloquium

I visited the Department of Atmospheric and Oceanic Sciences at the University of Wisconsin-Madison for two days and had a lot of fun discussing atmospheric (and machine learning) research with the scientists there. Thanks Prof. Jon Martin for inviting me over!

The colloquium I gave on Monday was an overview of the finite-amplitude local Rossby wave activity theory and its application to study blocking. We learned from this framework that atmospheric blocking can be modelled as a traffic jam problem. I also mentioned the follow-up work by Paradise et al. (2019, JAS) that discusses the implication of this notion.

The slides for my colloquium can be found below:

Slides for UW-Madison Colloquium

Papers on architecture of Recurrent Neural Networks (RNN)

Bookmarking some papers mentioned in Andrew Ng’s course Sequence Models:

Gate Recurrent Unit (GRU)

Long short-term memory (LSTM)

(More to update)

Useful Git commands at work

Below are solutions I curated online to solve problems related to Git when collaborating with others and working on several branches together. This post will be updated from time to time.

Copy a file from one branch to another

To copy a file to the current branch from another branch (ref)

git checkout another_branch the_file_you_want.txt

Merge changes from master branch to yours

To merge changes from another branch to yours, you can use merge or rebase depending on the preferred commit order. BitBucket has a nice tutorial discussing the difference btween the two. Usually I’d love to have the changes pulled from another branch as a single commit with git merge.

git checkout master      # the branch with changes
git pull                 # pull the remote changes to local master branch
git checkout mybranch    # go back to mybranch
git merge master         # incorporate the changes into mybranch

How to use git revert

Useful tutorial from BitBucket on git revert.

Compare differences between branches and output the results

If you want to compare the difference between your (more updated) branch and the master branch, use the command

git diff master..your_branch

You can save the comparison results into a text file with colors by

git diff master..your_branch > your_branch_to_master.diff

The color can be viewed when you open the .diff file with Sublime.

Update password for Git on Mac OS X

use the following command

git config --global credential.helper osxkeychain

[to be continued]

Comparison between different statistical language models

When researching on different machine learning models for modeling languages (i.e. sequence model), here are some useful resources I found online:

Articles

Courses

Read libsvm files into PySpark dataframe

I wanted to load the libsvm files provided in tensorflow/ranking into PySpark dataframe, but couldn’t find existing modules for that. Here is a version I wrote to do the job. (Disclaimer: not the most elegant solution, but it works.)

First of all, load the pyspark utilities required.

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.ml.linalg import SparseVector

Initiate a spark session for creation of dataframe.

sc = SparkContext("local", "read_libsvm")
spark_session = SparkSession \
    .builder \
    .appName("read_libsvm") \
    .getOrCreate()

Get the path to the data files.

_TRAIN_DATA_PATH="data/train.txt"
_TEST_DATA_PATH="data/test.txt"

Here is the module I wrote for the purpose:

def read_libsvm(filepath, query_id=True):
    '''
    A utility function that takes in a libsvm file and turn it to a pyspark dataframe.

    Args:
        filepath (str): The file path to the data file.
        query_id (bool): whether 'qid' is present in the file.

    Returns:
        A pyspark dataframe that contains the data loaded.
    '''

    with open(filepath, 'r') as f:
        raw_data = [x.split() for x in f.readlines()]

    train_outcome = [int(x[0]) for x in raw_data]

    if query_id:
        train_qid = [int(x[1].lstrip('qid:')) for x in raw_data]

    index_value_dict = list()
    for row in raw_data:
        index_value_dict.append(dict([(int(x.split(':')[0]), float(x.split(':')[1]))
                                       for x in row[(1 + int(query_id)):]]))

    max_idx = max([max(x.keys()) for x in index_value_dict])
    rows = [
        Row(
            qid=train_qid[i],
            label=train_outcome[i],
            feat_vector=SparseVector(max_idx + 1, index_value_dict[i])
        )
        for i in range(len(index_value_dict))
    ]
    df = spark_session.createDataFrame(rows)
    
    return df

Let’s see how the train and test sets look like in the tf-ranking package:

train_df = read_libsvm(_TRAIN_DATA_PATH)
test_df = read_libsvm(_TEST_DATA_PATH)

train_df.show(5)
+--------------------+-----+---+
|         feat_vector|label|qid|
+--------------------+-----+---+
|(137,[5,13,17,42,...|    0|  1|
|(137,[11,13,18,30...|    2|  1|
|(137,[11,27,29,39...|    2|  1|
|(137,[5,10,26,31,...|    1|  1|
|(137,[13,17,22,24...|    2|  2|
+--------------------+-----+---+
only showing top 5 rows
test_df.show(5)
+--------------------+-----+---+
|         feat_vector|label|qid|
+--------------------+-----+---+
|(137,[2,7,37,40,4...|    1|  1|
|(137,[1,8,12,15,2...|    2|  1|
|(137,[4,11,15,16,...|    0|  1|
|(137,[14,19,20,33...|    0|  1|
|(137,[9,12,19,26,...|    1|  2|
+--------------------+-----+---+
only showing top 5 rows

Simple pyspark solutions

Here is a curation of some solutions to simple problems encountered when working with pyspark.

How to replace string in a column?

Source

from pyspark.sql.functions import *
new_df = df.withColumn(col_name, regexp_replace(col_name, pattern, replacement))

How to avoid duplicate columns when joining two dataframe on columns with the same name?

Source

df = left_df.join(right_df, ["name"])

Ubuntu on Windows

Found a useful article: How to Access Your Ubuntu Bash Files in Windows (and Your Windows System Drive in Bash).

Resources on Python Packaging

On my way figuring out how to properly write unit test for my python package, I have come across the useful pages below:

Unit tests are worth the time writing to make sure your package works as you expected. I also found some commercial packages using unit tests as sample script for user to refer to (e.g. AllenNLP).

Setting up a Dash App on PythonAnywhere

After opening an account on pythonanywhere, go to the Web tab and select Add a new web app.

When prompted to select a Python Web framework, choose Flask.

Choose your python version. Here, I am choosing Python 3.6 (Flask 0.12).

Enter a path for a Python file I wish to hold my Dash app. I entered:

/home/username/mysite/dashing_demo_app.py

Put the script of your Dash app in dashing_demo_app.py. You can use the script in the sample file dashing_demo_app.py provided on the GitHub repo of pythonanywhere’s staff.

Next I have to set up a virtual environment that the app is running in. I am using the requirements3.6.txt provided in the above GitHub repo.

Go to the Files tab to create requirements3.6.txt in your home directory. Then, go to the Consoles tab to start a new bash session. Create a virtual environment dashappenv with the following command in the home directory:

mkvirtualenv dashappenv --python=/usr/bin/python3.6
pip install -r requirements3.6.txt

Then, go to the Web tab and enter under Virtualenv the path of your virtual environment:

/home/username/.virtualenvs/dashappenv

Lastly, modify your WSGI file. Instead of

from dashing_demo_app import app as application

provided, enter

from dashing_demo_app import app
application = app.server

to import your app.

It’s all done. Go to Web to reload your app. You can then click the URL of your webapp and see it running. :) Here is the sample webapp I built based on the example in Dash tutorial.

Published on Science!

Our paper, Nakmaura and Huang (2018), Atmospheric blocking as a traffic jam in the jet stream is now available on Science!

NH18 Science Paper Preview

Here is the press release from UChicago about the publication.

For interested researchers, the sample script to reproduce the results can be found in the directory nh2018_science of the my python package’s GitHub repo hn2016_falwa. You can download ERA-Interim reanalysis data with download_example.py to run the local wave activity and flux analysis in the jupyter notebook demo demo_script_for_nh2018.ipynb.

Have fun and feel free to email me (csyhuang at uchicago.edu) if you are interested in using the code and/or have questions about it.

Installing Stanford Core NLP package on Mac OS X

I am following instructions on the GitHub page of Stanford Core NLP under Build with Ant. To install ant, you can use homebrew:

$ brew install ant

In Step 5, you have to include the .jar files in the directory CoreNLP/lib and CoreNLP/liblocal in your CLASSPATH. To do this, first, I install coreutils:

brew install coreutils

such that I can use the utility realpath there. Then, I include the following in my ~/.bashrc:

for file in `find /Users/clare.huang/CoreNLP/lib/ -name "*.jar"`;
  do export CLASSPATH="$CLASSPATH:`realpath $file`";
done

for file in `find /Users/clare.huang/CoreNLP/liblocal/ -name "*.jar"`;
  do export CLASSPATH="$CLASSPATH:`realpath $file`";
done

(I guess there are better ways to combine the commands above. Let me know if there are.)

To run CoreNLP, I have to download the latest version of it, and place it in the directory CoreNLP/:

wget http://nlp.stanford.edu/software/stanford-corenlp-full-2018-01-31.zip

The latest version is available on their official website. Unzip it, and add all the .jar there to the $CLASSPATH.

Afterwards, you shall be able to run CoreNLP with the commands provided in the blogpost of Khalid Alnajjar (under Running Stanford CoreNLP Server). If you have no problem starting the server, you shall be able to see the interface on your browser at http://localhost:9000/:

java -mx4g -cp "*" edu.stanford.nlp.pipeline.StanfordCoreNLPServer -annotators "tokenize,ssplit,pos,lemma,parse,sentiment" -port 9000 -timeout 30000

Yay. Next, I will try setting up the python interface.

Installing java on Mac

The information of this post was learnt from this StackOverflow post and also David Cai’s blog post on how to install multiple Java version on Mac OS High Sierra.

With brew cask installed on Mac (see homebrew-cask instructions), different versions of java can be installed via the command (I want to install java9 here, for example):

brew tap caskroom/versions
brew cask install java9

After installing, the symlink /usr/bin/java is still pointing to the old native Java. You can check where it points to with the command ls -la /usr/bin/java. It is probably pointing to the old native java path: /System/Library/Frameworks/JavaVM.framework/Versions/Current/Commands/java .

However, homebrew installed java into the directory /Library/Java/JavaVirtualMachines/jdkx.x.x_xxx.jdk/Contents/Home.

To easily switch between different java environments, you can use jEnv. The installing instructions can be found on jEnv’s official page.

Adding an RSS feed to this site

Here is the link to the RSS feed of this blog.

Thanks to the instructions on Joel Glovier’s blog post.

Python Library and scripts for downloading ERA-Interim Data

Update: ECMWF API Clients on pip and conda

The ECMWF API Python Client is now available on pypi and anaconda.
The Climate Corporation has distributed the ECMWF API Python Client on pypi. Now it can be installed via:

pip install ecmwf-api-client

Anaconda users on OS X/linux system can install the package via:

conda install -c bioconda ecmwfapi

To use the sample script, you need an API key ( .ecmwfapirc ) placed in your home directory. You can retrieve that by logging in: https://api.ecmwf.int/v1/key/ Create a file named “.ecmwfapirc” in your home directory and put in the content shown on the page:

{
    "url"   : "https://api.ecmwf.int/v1",
    "key"   : "(...)",
    "email" : "(...)"
}

After doing that, in the directory with the sample script example.py, you can test the package by running it:

python example.py

You should see it successfully retrieves a .grib file if the package has been set up properly.

There are sample scripts available on the ECMWF website (look under “Same request NetCDF format”). Below is a example of python script I wrote to retrieves zonal wind, meridional wind and temperature data at all pressure levels during the time period 2017-07-01 to 2017-07-31 in 6-hour intervals:

#!/usr/bin/env python
from ecmwfapi import ECMWFDataServer
server = ECMWFDataServer()

param_u, param_v, param_t = "131.128", "132.128", "130.128"

for param_string, param in zip(["_u", "_v", "_t"],
                               [param_u, param_v, param_t]):

    server.retrieve({
        "class": "ei",
        "dataset": "interim",
        "date": "2017-07-01/to/2017-07-31",
        "expver": "1",
        "grid": "1.5/1.5",
        "levelist": "1/2/3/5/7/10/20/30/50/70/100/125/150/175/200/225/250/300/350/400/450/500/550/600/650/700/750/775/800/825/850/875/900/925/950/975/1000",
        "levtype": "pl",
        "param": param,
        "step": "0",
        "stream": "oper",
        "format": "netcdf",
        "time": "00:00:00/06:00:00/12:00:00/18:00:00",
        "type": "an",
        "target": "2017-07-01/to/2017-07-31" + param_string + ".nc",
    })

I learnt the above steps on these pages:

Resources on deep learning

I have been searching for solutions how to use Recurrent Neural Networks for text classifications. Here are some useful resources I’ve found:

Three co-authored papers submitted

The publication page has been updated with 3 submitted manuscripts.

Updates on Feb 9, 2018: The manuscript “Role of Finite-Amplitude Rossby Waves and Nonconservative Processes in Downward Migration of Extratropical Flow Anomalies” has been accepted by Journal of Atmospheric Sciences.

The subroutine wrapper.qgpv_eqlat_lwa_ncforce for computing effective diffusivity, which quantifies the damping on wave transiences by irreversible mixing in the stratosphere during a stratospheric sudden warming event, can be found in my python package.

Setting up algs4 on Linux

I am interested in going through the exercise from Princeton University’s Algorithm course. I found someone wrote a handy bash script to set up the environment on Mac OS/Linux:

https://gist.github.com/JIghtuse/021604bee56bddab6173c919da7dd2ad

my widget for counting (since Dec24, 2016)