TL;DR

With Python dataclasses and the new dataconf library, we can now type-safely parse configuration files into dataclasses. For users coming from Scala, this is an easy transition from case classes and PureConfig type-safe parsing. To see example usages in Python and PySpark use cases, jump down to usage.

Introduction

Ah, configuration files. One must love them, right? Maybe? It depends. For those in Scala and other JVM languages, we can use HOCON, a human readable format of json, typesafe config, and pureconfig to easily load complex configuration or property files directly into a case class. With that taken care of, life is good. What about our Python users?

// example nested case class for pureconfig parsing
case class PipelineParams (
	numExecutors: Int,
	executorCores: Int,
	executorMem: Int,
	driverMem: Int,
	driverCores: Int
)


case class Pipeline (
	tableName: str,
	filter: str,
	sparkParams: PipelineParams
)

// can load from config, file, string, url, ...
val config: Pipeline = pureconfig.ConfigSource.fromConfig(conf)
	.loadOrThrow[Pipeline]

The Introduction of dataclasses

In Python 3.7, dataclasses were introduced with backwards compatibility to Python 3.6 by a simple pip install. With this introduction, Python essentially introduced a Scala case class in mutable format. However, dataclasses can be frozen to achieve the immutable variant. With this introduction, can our Python users achieve the same streamlined parsing of config files? Well not yet. It is true PyHocon has been around for many years but something was still missing. In May 2020, Github user @zifeo published his library dataconf to PyPI which was the final link in the chain.

# example Python dataclass
@dataclass
class DatabaseQuery:
	table_name: Text
	filter: Text

dataconf Library

What dataconf does is it allows the users to read in configuration files with defined parameters and type directly into a Python dataclass. I stumbled across this library around August 2021 looking for a way to easily pass configuration files into Python. From my experience with Scala and knowledge of Python’s dataclasses, I was thrilled to see this capability. Over the past months, I have added some additional features that have been released in versions 0.1.5, 0.1.6, and 0.2.0 to further replicate pureconfig. Currently, dataconf has been maturing rapidly and is now at version 0.3.0. Due to missing updates not present in Python 3.7.x, dataconf will only work on Python >= 3.8.x.

Usage

Here at True Digital Group, we were developing a custom MLOps pipeline to serve our users and customers in automated fashion in Python. However, to orchestrate these pipelines, we would be employing the use of configuration files similar to data pipelines we build in Scala. dataconf was the answer. I added the ability to parse nested configs and mimicked the behavior of Scala sealed traits using an abstract base class of dataclasses in Python. This library might not be perfect but it is one we can now rely on for our Python pipeline needs.

In order to use dataconf, the user entry points will primarily be load and loads. However, if passing around ConfigTree objects, they would need to use __parse. The current plan would be to refactor the usage to something similar to pureconfig with from_config, from_file, and from_string in future releases or another similar pattern. A demo repo with code can be found here. One last note, version 0.2.0 and lower cannot parse -, so use version 0.2.1 or greater.

Example Usage

Suppose we need to run a pipeline that takes in data or data sources, does some processing and writes the data out. However, the data could come from many different file formats in Python or from a table, sql query, or HDFS path. In cases like this, using a configuration file, dataclasses, and dataconf can simplify our work and avoid branching. In the examples, I use the abstract metaclass to handle the different input types. With this, I can let the PipeParams or the Params handle determine the dataclass match. All classes come with load_df so we can use the same method name call to load the dataframe no matter which is parsed. The examples below are just scratching the surface on how dataclasses and dataconf can be used to simplify your production coding.

from abc import ABCMeta
from dataclasses import dataclass
from dataconf import loads
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from typing import Any, List, Optional, Text, Union


############################
########## Python ##########
############################
class InputData(metaclass=ABCMeta):
	# Replicating Scala sealed trait behavior
	pass


@dataclass
class CSV(InputData):
	# define whatever you need here
	location: Text
	header: Union[Text, int, List[int]] = "infer"
	sep: Optional[Text] = None
	delim: Optional[Text] = None

	def load_df(self) -> pd.DataFrame:
		return pd.read_csv(self.location, header=header, sep=sep, delimiter=delim)


@dataclass
class SQL(InputData):
	# define whatever you need here
	sql: Text
	con: Any  # SQLAlchemy, string, sqllite

	def load_df(self) -> pd.DataFrame:
		return pd.read_sql(self.sql, con=conn)


class PipeParams:
	write_path: Text
	input_source: InputData


# by changing the config input_source values, we can access the
# the other dataclasses without the need for branching.
str_conf = """
{
	write_path: "/path/to/save/data"
	input_source {
		sql: "select * from db.table where something"
		con: "connection string"
	}
	# other parameters needed
}
"""

conf = dataconf.loads(str_conf, PipeParams)
df = conf.load_df()

#############################
########## PySpark ##########
#############################
spark = SparkSession.builder.getOrCreate()
  

class InputType(metaclass=ABCMeta):
    # Replicating Scala sealed trait behavior
	pass


@dataclass
class TableSource(InputType):
	table_name: Text
	filter: Text

	def load_df(self, spark: SparkSession) -> pyspark.sql.dataframe:
		return spark.table(self.table_name).filter(self.filter)


@dataclass
class FileSource(InputType):
	file_path: Text
	format: Text
	filter: Text

	def load_df(self, spark: SparkSession) -> pyspark.sql.dataframe:
		return spark.read.format(self.format).load(self.file_path).filter(self.filter)


@dataclass 
class SQLSource(InputType):
    query: Text

	def load_df(self, spark: SparkSession) -> pyspark.sql.dataframe:
	    return spark.sql(self.query)
  
@dataclass
class Params:
	input_source: InputType
	write_path: Text

# by changing the config input_source values, we can access the
# the other dataclasses without they need for branching.
str_conf = """
{
	write_path: "/path/to/save/data"
	input_source {
		table_name: schema.table_name
		filter: "par_day < 20210501 and par_hour = 0"
	}
	# other parameters needed
}
"""

conf = loads(str_conf, Params)
df = conf.load_df()


class Generic:
	def __init__(self, params: Params) -> None:
		self.params = params

	# code to do stuff