Clare S. Y. Huang Data Scientist | Atmospheric Dynamicist

I love to share what I've learnt with others. Check out my blog posts and notes about my academic research, as well as technical solutions on software engineering and data science challenges.
Opinions expressed in this blog are solely my own.


READ MORE button via jekyll

Found a workable solution adding “read more” button to jekyll posts from Jonny Langefeld’s blog post. 😄 Thanks for the solution!

Here we go. 😉

More efficient way to do outer join with large dataframes

Today I learned from a colleague the way of doing outer join of large dataframes more efficiently: instead of doing the outer join, you can first union the key column, and then implement left join twice. I have done an experiment myself on the cluster with two dataframes (df1 and df2) - each dataframe has ~10k rows, and there is only ~10% of overlap(i.e. an inner-join would result in ~1k rows).

The usual way of doing outer join would be like:

df3 = df1.join(df2, how='outer', on='id').drop_duplicates()

Here is an equivalent way(I call it union-left here) that takes less time to compute:

df3 = df1.select('id').union(df2.select('id'))
df3 = df3.join(x1_df, how='left', on='id')
df3 = df3.join(x2_df, how='left', on='id').drop_duplicates()

The distribution of IDs in the two dataframes:

  df1 only overlap df2 only
# of ids 8625 914 8623

Here is the distribution of computing times for inner, left, outer, right and union-left(that gives same results as outer) joins(I repeated each join 20 times):

For these sizes of dataframes, the union-left join is on average ~20% faster than the equivalent outer join.

Local wave activity calculation for Southern Hemisphere available in release0.4.0

[hn2016_falwa Release 0.4.0] 😄I am happy to announce that the climate data analysis in Nakamura and Huang(2018, Science) for the southern hemisphere is also available on GitHub now! (Finally, I have time to do a thorough check of the code and make the release…)

Check out the release note for enhanced functionality:
https://github.com/csyhuang/hn2016_falwa/releases/tag/0.4.0

The documentation page has been fixed too:
https://csyhuang.github.io/hn2016_falwa

Jupyter notebook demonstrating the usage of the functions:
https://github.com/csyhuang/hn2016_falwa/blob/master/examples/nh2018_science/demo_script_for_nh2018.ipynb

If you have any questions/issues regarding the usage of the package, feel free to post on the Issue page of the repo! I will help you fix them soon as I can!

Tips for writing more efficient SQL

Learned from colleagues some points to pay attention to when writing SQL queries. (This post will be updated from time to time.)

Partitioned tables

Always specify the partition in the where clause. If you have to retrieve data from several partitions, loop through it one-by-one.

Distinct elements

Note that the two queries

SELECT DISTINCT(item) FROM table_X

and

SELECT item FROM table_X
GROUP BY item

give the same result. However, the second SQL query will be executed faster. There is no difference calling distinct and group by via (py)spark though.

JOIN v.s. WHERE

Always use where to filter the table to be joined to the smallest, e.g.

SELECT c.* FROM credit c
INNER JOIN (
	SELECT date, item FROM purchase
	WHERE date > 20190207
) p
ON c.date = p.date
WHERE c.eligibility = True

Note that the line WHERE c.eligibility = True is executed to filter the table credit before the JOIN. This shrinks the table credit to the smallest before joining.

I started a comic series about bouldering (for fun)

This is not about work but for fun :D

I recently started learning bouldering (2.5 months till now?) and that’s lots of fun! I keep updating a comic series - the Matchman Bouldering Series - on Instagram/Tumblr that records the ideas I learned from friends in every session collaborating with my college friend Martin.

Some previews…

Matchman Bouldering Series

If you are fond of bouldering and have practical/physics ideas to share, ping me and see if we can collaborate! :)

Conversion of pandas dataframe to pyspark dataframe with an older version of pandas

Pandas dataframe can be converted to pyspark dataframe easily in the newest version of pandas after v0.19.2. If you are using an older version of pandas, you have to do a bit more work for such conversion as follows.

First, load the packages and initiate a spark session.

from pyspark.sql import SparkSession, DataFrame, Row
import pandas as pd

spark = SparkSession.builder \
        .master("local") \
        .appName("Pandas to pyspark DF") \
        .getOrCreate()

Here is an example of pandas dataframe to be converted.

df = pd.DataFrame({'index':[i for i in range(7)],
                   'alphabet':[i for i in 'pyspark']})
df.head(7)
index alphabet
0 0 p
1 1 y
2 2 s
3 3 p
4 4 a
5 5 r
6 6 k

To convert it to a pyspark dataframe, one has to create a list of Row objects and pass it into createDataFrame:

df_pyspark = spark.createDataFrame([
    Row(index=row[1]['index'], alphabet=row[1]['alphabet'])
    for row in df.iterrows()
])
df_pyspark.show()
+--------+-----+
|alphabet|index|
+--------+-----+
|       p|    0|
|       y|    1|
|       s|    2|
|       p|    3|
|       a|    4|
|       r|    5|
|       k|    6|
+--------+-----+

Common issues in RNN training

Exploding gradients and vanishing gradients are two common issues with the training of RNN.

To avoid exploding gradients, one may use:

  • Truncated Back-propagation through time (BPTT)
  • Clip gradients at threshold
  • RMSprop to adjust learning rate

Vanishing gradients are harder to detect. To avoid it, one may use:

  • Weight initialization
  • ReLu activation functions
  • RMSprop
  • LSTM, GRUs

Generate sequence from an array column of pyspark dataframe

Suppose I have a Hive table that has a column of sequences,

+------------------+
|          sequence|
+------------------+
|      [3, 23, 564]|
+------------------+

how to generate a column that contains permutations of each sequence in multiple rows? The desired output shall look like:

+------------------+------------------+
|          sequence|       permutation|
+------------------+------------------+
|      [3, 23, 564]|      [3, 23, 564]|
|      [3, 23, 564]|      [3, 564, 23]|
|      [3, 23, 564]|      [23, 3, 564]|
|      [3, 23, 564]|      [23, 564, 3]|
|      [3, 23, 564]|      [564, 3, 23]|
|      [3, 23, 564]|      [564, 23, 3]|
+------------------+------------------+

In order to get multiple rows out of each row, we need to use the function explode. First, we write a user-defined function (UDF) to return the list of permutations given a array (sequence):

import itertools
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import IntegerType, ArrayType

@udf_type(ArrayType(ArrayType(IntegerType())))
def permutation(a_list):
    return list(itertools.permutations(a_list, len(a_list)))

The udf_type function is adapted from the blog post by John Paton. The output type is specified to be an array of “array of integers”.

The application of this function with explode will yield the result above:

df = spark_session.createDataFrame([Row(sequence=[3, 23, 564])])

result = df.withColumn('permutation', F.explode(permutation(F.col('sequence'))))

Pyspark error "Could not serialize object"

This post is related to the idea discuss in the post “How to Solve Non-Serializable Errors When Instantiating Objects In Spark UDFs”. Here I discuss the solution in a hypothetical scenario in pyspark.

Suppose I have a class to transform string to sum of numbers like this:

import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

class AnimalsToNumbers(object):
    def __init__(self, spark):
        self._mapping = {'elephant': 1, 'bear': 4, 'cat': 9}
        self._spark = spark
        
    def addition(self, animal_str):
        return sum([self._mapping.get(x, 0)
                    for x in animal_str.split('+')])
    
    def add_up_animals(self, df):
        addition_udf = F.udf(self.addition, IntegerType())
        return df.withColumn('sum', addition_udf('animal'))

(In practical cases, self._mapping is a huge object containing the dictionary and other attributes that are derived from methods in the AnimalsToNumbers class.) If I want to transform a dataframe df that looks like this:

+-------------+
|       animal|
+-------------+
|elephant+bear|
|     cat+bear|
+-------------+

The operation

AnimalsToNumbers(spark).add_up_animals(df)

will lead to an error like this:

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
~/anaconda3/envs/pyspark_demo/lib/python3.5/site-packages/pyspark/serializers.py in dumps(self, obj)
    589         try:
--> 590             return cloudpickle.dumps(obj, 2)
    591         except pickle.PickleError:
...
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o25.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

The issue is that, as self._mapping appears in the function addition, when applying addition_udf to the pyspark dataframe, the object self (i.e. the AnimalsToNumbers class) has to be serialized but it can’t be.

A (surprisingly simple) way is to create a reference to the dictionary (self._mapping) but not the object:

class AnimalsToNumbers(object):
    def __init__(self, spark):
        self._mapping = {'elephant': 1, 'bear': 4, 'cat': 9}
        self._spark = spark
        
    
    def add_up_animals(self, df):
        mapping = self._mapping
        def addition(animal_str):
            return sum([mapping.get(x, 0) for x in animal_str.split('+')])

        addition_udf = F.udf(addition, IntegerType())
        return df.withColumn('sum', addition_udf('animal'))

AnimalsToNumbers(spark).add_up_animals(df).show() would yield:

+-------------+---+
|       animal|sum|
+-------------+---+
|elephant+bear|  5|
|     cat+bear| 13|
+-------------+---+

Yay :)

Handling JSON in PostgreSQL

Found a useful cheatsheet that listed out operations on JSON in PostgreSQL.

If I want to list the rows where column col1 in table table1 contains a JSON object with the key key1, I can use:

select * from table1
where col1->'key1' IS NOT NULL

Local wave activity package updated to version 0.3.7

The python package hn2016_falwa has just been updated with the following changes:

  • Switched the unittest framework from unittest to pytest
  • Improved interface of the QGField object
  • Added a new function hn2016_falwa.download_data.retrieve_erai to download ERA-Interim data (but not connected to the main program yet)

My package uses f2py. When switching from unittest to pytest, there are several changes to make in setup.py and .travis to accommodate such usage:

In setup (numpy.distutils.core.setup), remove the argument

test_suite='tests.my_module_suite'

and add:

    packages=find_packages(),
    setup_requires=['pytest-runner'],
    tests_require=['pytest'],

In .travis, the script section looks like:

script:
  - python setup.py pytest

Visit to AOS at UW-Madison to give a Colloquium

I visited the Department of Atmospheric and Oceanic Sciences at the University of Wisconsin-Madison for two days and had a lot of fun discussing atmospheric (and machine learning) research with the scientists there. Thanks Prof. Jon Martin for inviting me over!

The colloquium I gave on Monday was an overview of the finite-amplitude local Rossby wave activity theory and its application to study blocking. We learned from this framework that atmospheric blocking can be modelled as a traffic jam problem. I also mentioned the follow-up work by Paradise et al. (2019, JAS) that discusses the implication of this notion.

The slides for my colloquium can be found below:

Slides for UW-Madison Colloquium

Papers on architecture of Recurrent Neural Networks (RNN)

Bookmarking some papers mentioned in Andrew Ng’s course Sequence Models:

Gate Recurrent Unit (GRU)

Long short-term memory (LSTM)

(More to update)

Useful Git commands at work

Below are solutions I curated online to solve problems related to Git when collaborating with others and working on several branches together. This post will be updated from time to time.

Copy a file from one branch to another

To copy a file to the current branch from another branch (ref)

git checkout another_branch the_file_you_want.txt

Merge changes from master branch to yours

To merge changes from another branch to yours, you can use merge or rebase depending on the preferred commit order. BitBucket has a nice tutorial discussing the difference btween the two. Usually I’d love to have the changes pulled from another branch as a single commit with git merge.

git checkout master      # the branch with changes
git pull                 # pull the remote changes to local master branch
git checkout mybranch    # go back to mybranch
git merge master         # incorporate the changes into mybranch

How to use git revert

Useful tutorial from BitBucket on git revert.

Compare differences between branches and output the results

If you want to compare the difference between your (more updated) branch and the master branch, use the command

git diff master..your_branch

You can save the comparison results into a text file with colors by

git diff master..your_branch > your_branch_to_master.diff

The color can be viewed when you open the .diff file with Sublime.

Update password for Git on Mac OS X

use the following command

git config --global credential.helper osxkeychain

[to be continued]

Comparison between different statistical language models

When researching on different machine learning models for modeling languages (i.e. sequence model), here are some useful resources I found online:

Articles

Courses

Read libsvm files into PySpark dataframe

I wanted to load the libsvm files provided in tensorflow/ranking into PySpark dataframe, but couldn’t find existing modules for that. Here is a version I wrote to do the job. (Disclaimer: not the most elegant solution, but it works.)

First of all, load the pyspark utilities required.

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.ml.linalg import SparseVector

Initiate a spark session for creation of dataframe.

sc = SparkContext("local", "read_libsvm")
spark_session = SparkSession \
    .builder \
    .appName("read_libsvm") \
    .getOrCreate()

Get the path to the data files.

_TRAIN_DATA_PATH="data/train.txt"
_TEST_DATA_PATH="data/test.txt"

Here is the module I wrote for the purpose:

def read_libsvm(filepath, query_id=True):
    '''
    A utility function that takes in a libsvm file and turn it to a pyspark dataframe.

    Args:
        filepath (str): The file path to the data file.
        query_id (bool): whether 'qid' is present in the file.

    Returns:
        A pyspark dataframe that contains the data loaded.
    '''

    with open(filepath, 'r') as f:
        raw_data = [x.split() for x in f.readlines()]

    train_outcome = [int(x[0]) for x in raw_data]

    if query_id:
        train_qid = [int(x[1].lstrip('qid:')) for x in raw_data]

    index_value_dict = list()
    for row in raw_data:
        index_value_dict.append(dict([(int(x.split(':')[0]), float(x.split(':')[1]))
                                       for x in row[(1 + int(query_id)):]]))

    max_idx = max([max(x.keys()) for x in index_value_dict])
    rows = [
        Row(
            qid=train_qid[i],
            label=train_outcome[i],
            feat_vector=SparseVector(max_idx + 1, index_value_dict[i])
        )
        for i in range(len(index_value_dict))
    ]
    df = spark_session.createDataFrame(rows)
    
    return df

Let’s see how the train and test sets look like in the tf-ranking package:

train_df = read_libsvm(_TRAIN_DATA_PATH)
test_df = read_libsvm(_TEST_DATA_PATH)

train_df.show(5)
+--------------------+-----+---+
|         feat_vector|label|qid|
+--------------------+-----+---+
|(137,[5,13,17,42,...|    0|  1|
|(137,[11,13,18,30...|    2|  1|
|(137,[11,27,29,39...|    2|  1|
|(137,[5,10,26,31,...|    1|  1|
|(137,[13,17,22,24...|    2|  2|
+--------------------+-----+---+
only showing top 5 rows
test_df.show(5)
+--------------------+-----+---+
|         feat_vector|label|qid|
+--------------------+-----+---+
|(137,[2,7,37,40,4...|    1|  1|
|(137,[1,8,12,15,2...|    2|  1|
|(137,[4,11,15,16,...|    0|  1|
|(137,[14,19,20,33...|    0|  1|
|(137,[9,12,19,26,...|    1|  2|
+--------------------+-----+---+
only showing top 5 rows

Simple pyspark solutions

Here is a curation of some solutions to simple problems encountered when working with pyspark.

How to replace string in a column?

Source

from pyspark.sql.functions import *
new_df = df.withColumn(col_name, regexp_replace(col_name, pattern, replacement))

How to avoid duplicate columns when joining two dataframe on columns with the same name?

Source

df = left_df.join(right_df, ["name"])

Ubuntu on Windows

Found a useful article: How to Access Your Ubuntu Bash Files in Windows (and Your Windows System Drive in Bash).

Resources on Python Packaging

On my way figuring out how to properly write unit test for my python package, I have come across the useful pages below:

Unit tests are worth the time writing to make sure your package works as you expected. I also found some commercial packages using unit tests as sample script for user to refer to (e.g. AllenNLP).

Setting up a Dash App on PythonAnywhere

After opening an account on pythonanywhere, go to the Web tab and select Add a new web app.

When prompted to select a Python Web framework, choose Flask.

Choose your python version. Here, I am choosing Python 3.6 (Flask 0.12).

Enter a path for a Python file I wish to hold my Dash app. I entered:

/home/username/mysite/dashing_demo_app.py

Put the script of your Dash app in dashing_demo_app.py. You can use the script in the sample file dashing_demo_app.py provided on the GitHub repo of pythonanywhere’s staff.

Next I have to set up a virtual environment that the app is running in. I am using the requirements3.6.txt provided in the above GitHub repo.

Go to the Files tab to create requirements3.6.txt in your home directory. Then, go to the Consoles tab to start a new bash session. Create a virtual environment dashappenv with the following command in the home directory:

mkvirtualenv dashappenv --python=/usr/bin/python3.6
pip install -r requirements3.6.txt

Then, go to the Web tab and enter under Virtualenv the path of your virtual environment:

/home/username/.virtualenvs/dashappenv

Lastly, modify your WSGI file. Instead of

from dashing_demo_app import app as application

provided, enter

from dashing_demo_app import app
application = app.server

to import your app.

It’s all done. Go to Web to reload your app. You can then click the URL of your webapp and see it running. :) Here is the sample webapp I built based on the example in Dash tutorial.

my widget for counting (since Dec24, 2016)