Clare S. Y. Huang Data Scientist | Atmospheric Dynamicist

Read libsvm files into PySpark dataframe

I wanted to load the libsvm files provided in tensorflow/ranking into PySpark dataframe, but couldn’t find existing modules for that. Here is a version I wrote to do the job. (Disclaimer: not the most elegant solution, but it works.) First of all, load the pyspark utilities required.

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.ml.linalg import SparseVector

Initiate a spark session for creation of dataframe.

sc = SparkContext("local", "read_libsvm")
spark_session = SparkSession \
    .builder \
    .appName("read_libsvm") \
    .getOrCreate()

Get the path to the data files.

_TRAIN_DATA_PATH="data/train.txt"
_TEST_DATA_PATH="data/test.txt"

Here is the module I wrote for the purpose:

def read_libsvm(filepath, query_id=True):
    '''
    A utility function that takes in a libsvm file and turn it to a pyspark dataframe.

    Args:
        filepath (str): The file path to the data file.
        query_id (bool): whether 'qid' is present in the file.

    Returns:
        A pyspark dataframe that contains the data loaded.
    '''

    with open(filepath, 'r') as f:
        raw_data = [x.split() for x in f.readlines()]

    train_outcome = [int(x[0]) for x in raw_data]

    if query_id:
        train_qid = [int(x[1].lstrip('qid:')) for x in raw_data]

    index_value_dict = list()
    for row in raw_data:
        index_value_dict.append(dict([(int(x.split(':')[0]), float(x.split(':')[1]))
                                       for x in row[(1 + int(query_id)):]]))

    max_idx = max([max(x.keys()) for x in index_value_dict])
    rows = [
        Row(
            qid=train_qid[i],
            label=train_outcome[i],
            feat_vector=SparseVector(max_idx + 1, index_value_dict[i])
        )
        for i in range(len(index_value_dict))
    ]
    df = spark_session.createDataFrame(rows)
    
    return df

Let’s see how the train and test sets look like in the tf-ranking package:

train_df = read_libsvm(_TRAIN_DATA_PATH)
test_df = read_libsvm(_TEST_DATA_PATH)

train_df.show(5)
+--------------------+-----+---+
|         feat_vector|label|qid|
+--------------------+-----+---+
|(137,[5,13,17,42,...|    0|  1|
|(137,[11,13,18,30...|    2|  1|
|(137,[11,27,29,39...|    2|  1|
|(137,[5,10,26,31,...|    1|  1|
|(137,[13,17,22,24...|    2|  2|
+--------------------+-----+---+
only showing top 5 rows
test_df.show(5)
+--------------------+-----+---+
|         feat_vector|label|qid|
+--------------------+-----+---+
|(137,[2,7,37,40,4...|    1|  1|
|(137,[1,8,12,15,2...|    2|  1|
|(137,[4,11,15,16,...|    0|  1|
|(137,[14,19,20,33...|    0|  1|
|(137,[9,12,19,26,...|    1|  2|
+--------------------+-----+---+
only showing top 5 rows
<< Previous Page