Clare S. Y. Huang Data Scientist | Atmospheric Dynamicist

I love to share what I've learnt with others. Check out my blog posts and notes about my academic research, as well as technical solutions on software engineering and data science challenges.
Opinions expressed in this blog are solely my own.


Databricks Certified Associate Developer for Apache Spark 3.0

I passed the Databricks Certified Associate Developer exam for Apache Spark 3.0 (python). Here is my certificate!

I registered for the exam when joining the Spark Summit this June, hoping to set a goal to push myself to dive deeper into spark architecture and performance tuning.

[Experience sharing] On top of coding with pyspark at work (which helps me with most of the syntax questions in the exam), my exam preparation mainly involves reading the two books, Spark: The Definitive Guide and Learning Spark 2.0.

It was my very first time taking an online proctored exam at home, and there were two things I wish I could have known before the exam:

  1. The spark documentation (PDF file) provided by Sentinel (i.e., the exam software) is not searchable. One has to scroll through the page to find what s/he needs.
  2. The proctor would check your workspace configuration during the exam (i.e., not at the beginning). The exam would pause during the check, so you don’t have to worry about losing time.

I believe there will be more tests conducted with an online proctor given the evolution of the pandemic. Perhaps we will get used to the online test workspace setup at home very soon.

Discussion on Deep Compression

I was leading a discussion on the paper Han, S., Mao, H., & Dally, W. J. (2015). Deep compression: Compressing deep neural networks with pruning, trained quantization and huffman coding. arXiv preprint arXiv:1510.00149. It introduces the methods to reduce the storage requirement of neural networks such that it can be stored on smaller devices.

The slides I made for the discussion can be found here.

Notes on ModelOps and MLOps talks

I wrote a short article on Medium, ODSC West 2020: Notes on ModelOps and MLOps talks, about what I learned from two talks in ODSC West 2020.

Running a single test case in the unittest suite

If your unit tests are written using the unittest package, to run a single test case in the TestSuite, the command line syntax is

python setup.py test -m tests.test_module.TestClass.test_case

Stack Overflow reference: Does unittest allow single case/suite testing through “setup.py test”?

If your unit tests are written using pytest, the command used would be

pytest tests/test_module.py::test_case

New pip release and changes in its way to resolve dependency conflicts

When I was trying to set up a conda environment to run a package…

ERROR: After October 2020 you may experience errors when installing or updating packages. This is because pip will change the way that it resolves dependency conflicts.

We recommend you use --use-feature=2020-resolver to test your packages with the new resolver before it becomes the default.

numpydoc 1.1.0 requires sphinx>=1.6.5, but you'll have sphinx 1.5.3 which is incompatible.

After googling, I found this post on StackOverflow explaining the issue - it is related to the changes in pip’s release 20.2.

To implement the check, install packages with the command

pip install --use-feature=2020-resolver package

Submitting pull request from forked repo to main repo

It’s HacktoberFest again! 👻 Here are some useful commands to merge your forked repo with upstream changes from the original repo (Also see GitHub Docs).

After forking, specify remote upstream repository:

git remote add upstream https://github.com/ORIGINAL_OWNER/ORIGINAL_REPOSITORY.git

You can then check the upstream locations via the command git remote -v.

Before submitting a pull request, you have to make sure your branch contains all the commits from upstream. You can do so by:

git fetch upstream
git checkout master
git merge upstream/master

🤓 Have fun coding! ⌨️

Split a vector/list in a pyspark DataFrame into columns

Split an array column

To split a column with arrays of strings, e.g. a DataFrame that looks like,

+---------+
|   strCol|
+---------+
|[A, B, C]|
+---------+

into separate columns, the following code without the use of UDF works.

import pyspark.sql.functions as F

df2 = df.select([F.col("strCol")[i] for i in range(3)])
df2.show()

Output:

+---------+---------+---------+
|strCol[0]|strCol[1]|strCol[2]|
+---------+---------+---------+
|        A|        B|        C|
+---------+---------+---------+

Split a vector column

To split a column with doubles stored in DenseVector format, e.g. a DataFrame that looks like,

+-------------+
|       intCol|
+-------------+
|[2.0,3.0,4.0]|
+-------------+

one have to construct a UDF that does the convertion of DenseVector to array(python list) first:

import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, DoubleType

def split_array_to_list(col):
    def to_list(v):
        return v.toArray().tolist()
    return F.udf(to_list, ArrayType(DoubleType()))(col)

df3 = df.select(split_array_to_list(F.col("intCol")).alias("split_int"))\
    .select([F.col("split_int")[i] for i in range(3)])
df3.show()

Output:

+------------+------------+------------+
|split_int[0]|split_int[1]|split_int[2]|
+------------+------------+------------+
|         2.0|         3.0|         4.0|
+------------+------------+------------+

Ranking hierarchical labels with SQL

To give indices to hierarchical labels, I can use DENSE_RANK() or RANK() depending on the situation. For example, if I have a DataFrame that looks like this:

+------+----------+
|Fridge|    Fruits|
+------+----------+
|     A|     apple|
|     B|    orange|
|     C|     apple|
|     D|     pears|
|     C|Watermelon|
+------+----------+

The following SQL code

SELECT
    Fridge,
    Fruits,
    DENSE_RANK() OVER (ORDER BY Fridge, Fruits) AS Loc_id,
    DENSE_RANK() OVER (ORDER BY Fridge) AS Fridge_id_dense,
    RANK() OVER (ORDER BY Fridge) AS Fridge_id,
    DENSE_RANK() OVER (ORDER BY Fruits) AS Fruit_id_dense,
    RANK() OVER (ORDER BY Fruits) AS Fruit_id
FROM fridge_list

would yield the following table

+------+----------+------+---------------+---------+--------------+--------+
|Fridge|    Fruits|Loc_id|Fridge_id_dense|Fridge_id|Fruit_id_dense|Fruit_id|
+------+----------+------+---------------+---------+--------------+--------+
|     A|     apple|     1|              1|        1|             2|       2|
|     B|    orange|     2|              2|        2|             3|       4|
|     C|Watermelon|     3|              3|        3|             1|       1|
|     C|     apple|     4|              3|        3|             2|       2|
|     D|     pears|     5|              4|        5|             4|       5|
+------+----------+------+---------------+---------+--------------+--------+

Custom Transformer that can be fitted into Pipeline

How to construct a custom Transformer that can be fitted into a Pipeline object? I learned from a colleague today how to do that.

Below is an example that includes all key components:

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
import pyspark.sql.functions as F

class CustomTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
  output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)
  
  @keyword_only
  def __init__(self, input_col: str = "input", output_col: str = "output"):
    super(CustomTransformer, self).__init__()
    self._setDefault(input_col=None, output_col=None)
    kwargs = self._input_kwargs
    self.set_params(**kwargs)
    
  @keyword_only
  def set_params(self, input_col: str = "input", output_col: str = "output"):
    kwargs = self._input_kwargs
    self._set(**kwargs)
    
  def get_input_col(self):
    return self.getOrDefault(self.input_col)
  
  def get_output_col(self):
    return self.getOrDefault(self.output_col)
  
  def _transform(self, df: DataFrame):
    input_col = self.get_input_col()
    output_col = self.get_output_col()
    # The custom action: concatenate the integer form of the doubles from the Vector
    transform_udf = F.udf(lambda x: '/'.join([str(int(y)) for y in x]), StringType())
    return df.withColumn(output_col, transform_udf(input_col))
  

Let’s test it with a dataframe

df = spark.createDataFrame([(Vectors.dense([2.0, 1.0, 3.0]),), (Vectors.dense([0.4, 0.9, 7.0]),)], ["numbers"])

and a Pipeline like this:

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline

elementwise_product = ElementwiseProduct(scalingVec=Vectors.dense([2.0, 3.0, 5.0]), inputCol="numbers", outputCol="product")
custom_transformer = CustomTransformer(input_col="product", output_col="results")
pipeline = Pipeline(stages=[elementwise_product, custom_transformer])
model = pipeline.fit(df)
results = model.transform(df)
results.show()

I manage to get the expected results:

+-------------+--------------+-------+
|      numbers|       product|results|
+-------------+--------------+-------+
|[2.0,1.0,3.0]|[4.0,3.0,15.0]| 4/3/15|
|[0.4,0.9,7.0]|[0.8,2.7,35.0]| 0/2/35|
+-------------+--------------+-------+

Minor release of my python package + release procedures

[hn2016_falwa Release 0.4.1] A minor release of my python package hn2016_falwa is published. Thanks Christopher Polster for submitting a pull request that fixes the interface of BarotropicField. Moreover, I added procedures to process masked array in QGField such that it can be conveniently used to process ERA5 data which is stored as masked array in netCDF files.

As a memo to myself - procedures for a release (which I often forget and have to google 😅):

  • [Updated on 2023/11/5] Update version number in:
    • setup.py,
    • readme.md
    • falwa/__init__.py
    • docs/source/conf.py
    • recipe/meta.yaml
  • Add a (light-weighted) tag to the commit: git tag <tagname>.
  • Not only push the commits but also the tag by git push origin <tagname>.
  • Update on Aug 15, 2021: To push the commit and corresponding tag simultaneously, use
    git push --atomic origin <branch name> <tag>
    

If I have time, I would update the version on PYPI too:

  • Create the dist/ directory and the installation files: python3 setup.py sdist bdist_wheel
  • Upload the package onto TestPyPI to test deployment: python3 -m twine upload --repository testpypi dist/*
  • Deploy the package onto PyPI for real: python3 -m twine upload dist/*

Bulk download of ERA5 data from CDSAPI

I wrote a sample script to download ERA5 reanalysis data via CDSAPI month by month. Here is the GitHub repo with instructions how to use it.

Reading Notes on Spark - The Definitive Guide

I am reading the book Spark: The Definitive Guide by Bill Chambers, Matei Zaharia. Here are my reading notes:

READ MORE button via jekyll

Found a workable solution adding “read more” button to jekyll posts from Jonny Langefeld’s blog post. 😄 Thanks for the solution!

Here we go. 😉

More efficient way to do outer join with large dataframes

Today I learned from a colleague the way of doing outer join of large dataframes more efficiently: instead of doing the outer join, you can first union the key column, and then implement left join twice. I have done an experiment myself on the cluster with two dataframes (df1 and df2) - each dataframe has ~10k rows, and there is only ~10% of overlap(i.e. an inner-join would result in ~1k rows).

The usual way of doing outer join would be like:

df3 = df1.join(df2, how='outer', on='id').drop_duplicates()

Here is an equivalent way(I call it union-left here) that takes less time to compute:

df3 = df1.select('id').union(df2.select('id'))
df3 = df3.join(x1_df, how='left', on='id')
df3 = df3.join(x2_df, how='left', on='id').drop_duplicates()

The distribution of IDs in the two dataframes:

  df1 only overlap df2 only
# of ids 8625 914 8623

Here is the distribution of computing times for inner, left, outer, right and union-left(that gives same results as outer) joins(I repeated each join 20 times):

For these sizes of dataframes, the union-left join is on average ~20% faster than the equivalent outer join.

Local wave activity calculation for Southern Hemisphere available in release0.4.0

[hn2016_falwa Release 0.4.0] 😄I am happy to announce that the climate data analysis in Nakamura and Huang(2018, Science) for the southern hemisphere is also available on GitHub now! (Finally, I have time to do a thorough check of the code and make the release…)

Check out the release note for enhanced functionality:
https://github.com/csyhuang/hn2016_falwa/releases/tag/0.4.0

The documentation page has been fixed too:
https://csyhuang.github.io/hn2016_falwa

Jupyter notebook demonstrating the usage of the functions:
https://github.com/csyhuang/hn2016_falwa/blob/master/examples/nh2018_science/demo_script_for_nh2018.ipynb

If you have any questions/issues regarding the usage of the package, feel free to post on the Issue page of the repo! I will help you fix them soon as I can!

Tips for writing more efficient SQL

Learned from colleagues some points to pay attention to when writing SQL queries. (This post will be updated from time to time.)

Partitioned tables

Always specify the partition in the where clause. If you have to retrieve data from several partitions, loop through it one-by-one.

Distinct elements

Note that the two queries

SELECT DISTINCT(item) FROM table_X

and

SELECT item FROM table_X
GROUP BY item

give the same result. However, the second SQL query will be executed faster. There is no difference calling distinct and group by via (py)spark though.

JOIN v.s. WHERE

Always use where to filter the table to be joined to the smallest, e.g.

SELECT c.* FROM credit c
INNER JOIN (
	SELECT date, item FROM purchase
	WHERE date > 20190207
) p
ON c.date = p.date
WHERE c.eligibility = True

Note that the line WHERE c.eligibility = True is executed to filter the table credit before the JOIN. This shrinks the table credit to the smallest before joining.

I started a comic series about bouldering (for fun)

This is not about work but for fun :D

I recently started learning bouldering (2.5 months till now?) and that’s lots of fun! I keep updating a comic series - the Matchman Bouldering Series - on Instagram/Tumblr that records the ideas I learned from friends in every session collaborating with my college friend Martin.

Some previews…

Matchman Bouldering Series

If you are fond of bouldering and have practical/physics ideas to share, ping me and see if we can collaborate! :)

Conversion of pandas dataframe to pyspark dataframe with an older version of pandas

Pandas dataframe can be converted to pyspark dataframe easily in the newest version of pandas after v0.19.2. If you are using an older version of pandas, you have to do a bit more work for such conversion as follows.

First, load the packages and initiate a spark session.

from pyspark.sql import SparkSession, DataFrame, Row
import pandas as pd

spark = SparkSession.builder \
        .master("local") \
        .appName("Pandas to pyspark DF") \
        .getOrCreate()

Here is an example of pandas dataframe to be converted.

df = pd.DataFrame({'index':[i for i in range(7)],
                   'alphabet':[i for i in 'pyspark']})
df.head(7)
index alphabet
0 0 p
1 1 y
2 2 s
3 3 p
4 4 a
5 5 r
6 6 k

To convert it to a pyspark dataframe, one has to create a list of Row objects and pass it into createDataFrame:

df_pyspark = spark.createDataFrame([
    Row(index=row[1]['index'], alphabet=row[1]['alphabet'])
    for row in df.iterrows()
])
df_pyspark.show()
+--------+-----+
|alphabet|index|
+--------+-----+
|       p|    0|
|       y|    1|
|       s|    2|
|       p|    3|
|       a|    4|
|       r|    5|
|       k|    6|
+--------+-----+

Common issues in RNN training

Exploding gradients and vanishing gradients are two common issues with the training of RNN.

To avoid exploding gradients, one may use:

  • Truncated Back-propagation through time (BPTT)
  • Clip gradients at threshold
  • RMSprop to adjust learning rate

Vanishing gradients are harder to detect. To avoid it, one may use:

  • Weight initialization
  • ReLu activation functions
  • RMSprop
  • LSTM, GRUs

Generate sequence from an array column of pyspark dataframe

Suppose I have a Hive table that has a column of sequences,

+------------------+
|          sequence|
+------------------+
|      [3, 23, 564]|
+------------------+

how to generate a column that contains permutations of each sequence in multiple rows? The desired output shall look like:

+------------------+------------------+
|          sequence|       permutation|
+------------------+------------------+
|      [3, 23, 564]|      [3, 23, 564]|
|      [3, 23, 564]|      [3, 564, 23]|
|      [3, 23, 564]|      [23, 3, 564]|
|      [3, 23, 564]|      [23, 564, 3]|
|      [3, 23, 564]|      [564, 3, 23]|
|      [3, 23, 564]|      [564, 23, 3]|
+------------------+------------------+

In order to get multiple rows out of each row, we need to use the function explode. First, we write a user-defined function (UDF) to return the list of permutations given a array (sequence):

import itertools
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import IntegerType, ArrayType

@udf_type(ArrayType(ArrayType(IntegerType())))
def permutation(a_list):
    return list(itertools.permutations(a_list, len(a_list)))

The udf_type function is adapted from the blog post by John Paton. The output type is specified to be an array of “array of integers”.

The application of this function with explode will yield the result above:

df = spark_session.createDataFrame([Row(sequence=[3, 23, 564])])

result = df.withColumn('permutation', F.explode(permutation(F.col('sequence'))))

my widget for counting (since Dec24, 2016)