I love to share what I've learnt with others. Check out my blog posts and notes about my
academic research, as well as technical solutions on software engineering and data
science challenges. Opinions expressed in this blog are solely my own.
I passed the Databricks Certified Associate Developer exam for Apache Spark 3.0 (python). Here is my certificate!
I registered for the exam when joining the Spark Summit this June, hoping to set a goal to push myself to dive deeper into spark architecture and performance tuning.
[Experience sharing] On top of coding with pyspark at work (which helps me with most of the syntax questions in the exam), my exam preparation mainly involves reading the two books, Spark: The Definitive Guide and Learning Spark 2.0.
It was my very first time taking an online proctored exam at home, and there were two things I wish I could have known before the exam:
The spark documentation (PDF file) provided by Sentinel (i.e., the exam software) is not searchable. One has to scroll through the page to find what s/he needs.
The proctor would check your workspace configuration during the exam (i.e., not at the beginning). The exam would pause during the check, so you don’t have to worry about losing time.
I believe there will be more tests conducted with an online proctor given the evolution of the pandemic. Perhaps we will get used to the online test workspace setup at home very soon.
When I was trying to set up a conda environment to run a package…
ERROR: After October 2020 you may experience errors when installing or updating packages. This is because pip will change the way that it resolves dependency conflicts.
We recommend you use --use-feature=2020-resolver to test your packages with the new resolver before it becomes the default.
numpydoc 1.1.0 requires sphinx>=1.6.5, but you'll have sphinx 1.5.3 which is incompatible.
It’s HacktoberFest again! 👻 Here are some useful commands to merge your forked repo with upstream changes from the original repo (Also see GitHub Docs).
After forking, specify remote upstream repository:
To give indices to hierarchical labels, I can use DENSE_RANK() or RANK() depending on the situation.
For example, if I have a DataFrame that looks like this:
How to construct a custom Transformer that can be fitted into a Pipeline object? I learned from a colleague today how to do that.
Below is an example that includes all key components:
frompysparkimportkeyword_onlyfrompyspark.mlimportTransformerfrompyspark.ml.param.sharedimportHasInputCol,HasOutputCol,Param,Params,TypeConvertersfrompyspark.ml.utilimportDefaultParamsReadable,DefaultParamsWritablefrompyspark.sqlimportDataFramefrompyspark.sql.typesimportStringTypeimportpyspark.sql.functionsasFclassCustomTransformer(Transformer,HasInputCol,HasOutputCol,DefaultParamsReadable,DefaultParamsWritable):input_col=Param(Params._dummy(),"input_col","input column name.",typeConverter=TypeConverters.toString)output_col=Param(Params._dummy(),"output_col","output column name.",typeConverter=TypeConverters.toString)@keyword_onlydef__init__(self,input_col:str="input",output_col:str="output"):super(CustomTransformer,self).__init__()self._setDefault(input_col=None,output_col=None)kwargs=self._input_kwargsself.set_params(**kwargs)@keyword_onlydefset_params(self,input_col:str="input",output_col:str="output"):kwargs=self._input_kwargsself._set(**kwargs)defget_input_col(self):returnself.getOrDefault(self.input_col)defget_output_col(self):returnself.getOrDefault(self.output_col)def_transform(self,df:DataFrame):input_col=self.get_input_col()output_col=self.get_output_col()# The custom action: concatenate the integer form of the doubles from the Vector
transform_udf=F.udf(lambdax:'/'.join([str(int(y))foryinx]),StringType())returndf.withColumn(output_col,transform_udf(input_col))
[hn2016_falwa Release 0.4.1] A minor release of my python package hn2016_falwa is published. Thanks Christopher Polster for submitting a pull request that fixes the interface of BarotropicField. Moreover, I added procedures to process masked array in QGField such that it can be conveniently used to process ERA5 data which is stored as masked array in netCDF files.
As a memo to myself - procedures for a release (which I often forget and have to google 😅):
[Updated on 2023/11/5] Update version number in:
setup.py,
readme.md
falwa/__init__.py
docs/source/conf.py
recipe/meta.yaml
Add a (light-weighted) tag to the commit: git tag <tagname>.
Not only push the commits but also the tag by git push origin <tagname>.
Update on Aug 15, 2021: To push the commit and corresponding tag simultaneously, use
git push --atomic origin <branch name> <tag>
If I have time, I would update the version on PYPI too:
Create the dist/ directory and the installation files: python3 setup.py sdist bdist_wheel
Upload the package onto TestPyPI to test deployment: python3 -m twine upload --repository testpypi dist/*
Deploy the package onto PyPI for real: python3 -m twine upload dist/*
Today I learned from a colleague the way of doing outer join of large dataframes more efficiently: instead of doing the outer join, you can first union the key column, and then implement left join twice. I have done an experiment myself on the cluster with two dataframes (df1 and df2) - each dataframe has ~10k rows, and there is only ~10% of overlap(i.e. an inner-join would result in ~1k rows).
Here is the distribution of computing times for inner, left, outer, right and union-left(that gives same results as outer) joins(I repeated each join 20 times):
For these sizes of dataframes, the union-left join is on average ~20% faster than the equivalent outer join.
[hn2016_falwa Release 0.4.0] 😄I am happy to announce that the climate data analysis in Nakamura and Huang(2018, Science) for the southern hemisphere is also available on GitHub now! (Finally, I have time to do a thorough check of the code and make the release…)
If you have any questions/issues regarding the usage of the package, feel free to post on the Issue page of the repo! I will help you fix them soon as I can!
Learned from colleagues some points to pay attention to when writing SQL queries. (This post will be updated from time to time.)
Partitioned tables
Always specify the partition in the where clause. If you have to retrieve data from several partitions, loop through it one-by-one.
Distinct elements
Note that the two queries
SELECTDISTINCT(item)FROMtable_X
and
SELECTitemFROMtable_XGROUPBYitem
give the same result. However, the second SQL query will be executed faster. There is no difference calling distinct and group by via (py)spark though.
JOIN v.s. WHERE
Always use where to filter the table to be joined to the smallest, e.g.
Note that the line WHERE c.eligibility = True is executed to filter the table credit before the JOIN. This shrinks the table credit to the smallest before joining.
I recently started learning bouldering (2.5 months till now?) and that’s lots of fun! I keep updating a comic series - the Matchman Bouldering Series - on Instagram/Tumblr that records the ideas I learned from friends in every session collaborating with my college friend Martin.
Some previews…
If you are fond of bouldering and have practical/physics ideas to share, ping me and see if we can collaborate! :)
Pandas dataframe can be converted to pyspark dataframe easily in the newest version of pandas after v0.19.2. If you are using an older version of pandas, you have to do a bit more work for such conversion as follows.
First, load the packages and initiate a spark session.
frompyspark.sqlimportSparkSession,DataFrame,Rowimportpandasaspdspark=SparkSession.builder \
.master("local") \
.appName("Pandas to pyspark DF") \
.getOrCreate()
Here is an example of pandas dataframe to be converted.
In order to get multiple rows out of each row, we need to use the function explode. First, we write a user-defined function (UDF) to return the list of permutations given a array (sequence):