Clare S. Y. Huang Data Scientist | Atmospheric Dynamicist

I love to share what I've learnt with others. Check out my blog posts and notes about my academic research, as well as technical solutions on software engineering and data science challenges.


Read libsvm files into PySpark dataframe

I wanted to load the libsvm files provided in tensorflow/ranking into PySpark dataframe, but couldn’t find existing modules for that. Here is a version I wrote to do the job. (Disclaimer: not the most elegant solution, but it works.)

First of all, load the pyspark utilities required.

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.ml.linalg import SparseVector

Initiate a spark session for creation of dataframe.

sc = SparkContext("local", "read_libsvm")
spark_session = SparkSession \
    .builder \
    .appName("read_libsvm") \
    .getOrCreate()

Get the path to the data files.

_TRAIN_DATA_PATH="data/train.txt"
_TEST_DATA_PATH="data/test.txt"

Here is the module I wrote for the purpose:

def read_libsvm(filepath, query_id=True):
    '''
    A utility function that takes in a libsvm file and turn it to a pyspark dataframe.

    Args:
        filepath (str): The file path to the data file.
        query_id (bool): whether 'qid' is present in the file.

    Returns:
        A pyspark dataframe that contains the data loaded.
    '''

    with open(filepath, 'r') as f:
        raw_data = [x.split() for x in f.readlines()]

    train_outcome = [int(x[0]) for x in raw_data]

    if query_id:
        train_qid = [int(x[1].lstrip('qid:')) for x in raw_data]

    index_value_dict = list()
    for row in raw_data:
        index_value_dict.append(dict([(int(x.split(':')[0]), float(x.split(':')[1]))
                                       for x in row[(1 + int(query_id)):]]))

    max_idx = max([max(x.keys()) for x in index_value_dict])
    rows = [
        Row(
            qid=train_qid[i],
            label=train_outcome[i],
            feat_vector=SparseVector(max_idx + 1, index_value_dict[i])
        )
        for i in range(len(index_value_dict))
    ]
    df = spark_session.createDataFrame(rows)
    
    return df

Let’s see how the train and test sets look like in the tf-ranking package:

train_df = read_libsvm(_TRAIN_DATA_PATH)
test_df = read_libsvm(_TEST_DATA_PATH)

train_df.show(5)
+--------------------+-----+---+
|         feat_vector|label|qid|
+--------------------+-----+---+
|(137,[5,13,17,42,...|    0|  1|
|(137,[11,13,18,30...|    2|  1|
|(137,[11,27,29,39...|    2|  1|
|(137,[5,10,26,31,...|    1|  1|
|(137,[13,17,22,24...|    2|  2|
+--------------------+-----+---+
only showing top 5 rows
test_df.show(5)
+--------------------+-----+---+
|         feat_vector|label|qid|
+--------------------+-----+---+
|(137,[2,7,37,40,4...|    1|  1|
|(137,[1,8,12,15,2...|    2|  1|
|(137,[4,11,15,16,...|    0|  1|
|(137,[14,19,20,33...|    0|  1|
|(137,[9,12,19,26,...|    1|  2|
+--------------------+-----+---+
only showing top 5 rows

Curated pyspark solutions

Here is a curation of some solutions I found when working with pyspark.

How to replace string in a column?

Source

from pyspark.sql.functions import *
new_df = df.withColumn(col_name, regexp_replace(col_name, pattern, replacement))

How to avoid duplicate columns when joining two dataframe on columns with the same name?

Source

df = left_df.join(right_df, ["name"])

Ubuntu on Windows

Found a useful article: How to Access Your Ubuntu Bash Files in Windows (and Your Windows System Drive in Bash).

Useful Git commands at work

Below are solutions I curated online to solve problems related to Git when collaborating with others and working on several branches together. This post will be updated from time to time.

To copy a file to the current branch from another branch (ref)

git checkout another_branch the_file_you_want.txt

To merge changes from another branch to yours, you can use merge or rebase depending on the preferred commit order. BitBucket has a nice tutorial discussing the difference btween the two. Usually I’d love to have the changes pulled from another branch as a single commit with git merge.

git checkout master      # the branch with changes
git pull                 # pull the remote changes to local master branch
git checkout mybranch    # go back to mybranch
git merge master         # incorporate the changes into mybranch

Useful tutorial from BitBucket on git revert.

[to be continued]

Resources on Python Packaging

On my way figuring out how to properly write unit test for my python package, I have come across the useful pages below:

Unit tests are worth the time writing to make sure your package works as you expected. I also found some commercial packages using unit tests as sample script for user to refer to (e.g. AllenNLP).

my widget for counting (since Dec24, 2016)