TL;DR
With Python dataclasses
and the new dataconf
library, we can now type-safely parse configuration files into dataclasses. For users coming from Scala, this is an easy transition from case classes and PureConfig type-safe parsing. To see example usages in Python and PySpark use cases, jump down to usage.
Introduction
Ah, configuration files. One must love them, right? Maybe? It depends. For those in Scala and other JVM languages, we can use HOCON, a human readable format of json, typesafe config
, and pureconfig to easily load complex configuration or property files directly into a case class. With that taken care of, life is good. What about our Python users?
// example nested case class for pureconfig parsing
case class PipelineParams (
numExecutors: Int,
executorCores: Int,
executorMem: Int,
driverMem: Int,
driverCores: Int
)
case class Pipeline (
tableName: str,
filter: str,
sparkParams: PipelineParams
)
// can load from config, file, string, url, ...
val config: Pipeline = pureconfig.ConfigSource.fromConfig(conf)
.loadOrThrow[Pipeline]
The Introduction of dataclasses
In Python 3.7, dataclasses were introduced with backwards compatibility to Python 3.6 by a simple pip install. With this introduction, Python essentially introduced a Scala case class in mutable format. However, dataclasses
can be frozen to achieve the immutable variant. With this introduction, can our Python users achieve the same streamlined parsing of config files? Well not yet. It is true PyHocon has been around for many years but something was still missing. In May 2020, Github user @zifeo published his library dataconf to PyPI which was the final link in the chain.
# example Python dataclass
@dataclass
class DatabaseQuery:
table_name: Text
filter: Text
dataconf Library
What dataconf
does is it allows the users to read in configuration files with defined parameters and type directly into a Python dataclass
. I stumbled across this library around August 2021 looking for a way to easily pass configuration files into Python. From my experience with Scala and knowledge of Python’s dataclasses
, I was thrilled to see this capability. Over the past months, I have added some additional features that have been released in versions 0.1.5
, 0.1.6
, and 0.2.0
to further replicate pureconfig
. Currently, dataconf
has been maturing rapidly and is now at version 0.3.0
. Due to missing updates not present in Python 3.7.x
, dataconf
will only work on Python >= 3.8.x
.
Usage
Here at True Digital Group, we were developing a custom MLOps pipeline to serve our users and customers in automated fashion in Python. However, to orchestrate these pipelines, we would be employing the use of configuration files similar to data pipelines we build in Scala. dataconf
was the answer. I added the ability to parse nested configs and mimicked the behavior of Scala sealed traits using an abstract base class of dataclasses
in Python. This library might not be perfect but it is one we can now rely on for our Python pipeline needs.
In order to use dataconf
, the user entry points will primarily be load
and loads
. However, if passing around ConfigTree
objects, they would need to use __parse
. The current plan would be to refactor the usage to something similar to pureconfig
with from_config
, from_file
, and from_string
in future releases or another similar pattern. A demo repo with code can be found here. One last note, version 0.2.0
and lower cannot parse -
, so use version 0.2.1
or greater.
Example Usage
Suppose we need to run a pipeline that takes in data or data sources, does some processing and writes the data out. However, the data could come from many different file formats in Python or from a table, sql query, or HDFS path. In cases like this, using a configuration file, dataclasses, and dataconf
can simplify our work and avoid branching. In the examples, I use the abstract metaclass to handle the different input types. With this, I can let the PipeParams
or the Params
handle determine the dataclass
match. All classes come with load_df
so we can use the same method name call to load the dataframe no matter which is parsed. The examples below are just scratching the surface on how dataclasses and dataconf
can be used to simplify your production coding.
from abc import ABCMeta
from dataclasses import dataclass
from dataconf import loads
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from typing import Any, List, Optional, Text, Union
############################
########## Python ##########
############################
class InputData(metaclass=ABCMeta):
# Replicating Scala sealed trait behavior
pass
@dataclass
class CSV(InputData):
# define whatever you need here
location: Text
header: Union[Text, int, List[int]] = "infer"
sep: Optional[Text] = None
delim: Optional[Text] = None
def load_df(self) -> pd.DataFrame:
return pd.read_csv(self.location, header=header, sep=sep, delimiter=delim)
@dataclass
class SQL(InputData):
# define whatever you need here
sql: Text
con: Any # SQLAlchemy, string, sqllite
def load_df(self) -> pd.DataFrame:
return pd.read_sql(self.sql, con=conn)
class PipeParams:
write_path: Text
input_source: InputData
# by changing the config input_source values, we can access the
# the other dataclasses without the need for branching.
str_conf = """
{
write_path: "/path/to/save/data"
input_source {
sql: "select * from db.table where something"
con: "connection string"
}
# other parameters needed
}
"""
conf = dataconf.loads(str_conf, PipeParams)
df = conf.load_df()
#############################
########## PySpark ##########
#############################
spark = SparkSession.builder.getOrCreate()
class InputType(metaclass=ABCMeta):
# Replicating Scala sealed trait behavior
pass
@dataclass
class TableSource(InputType):
table_name: Text
filter: Text
def load_df(self, spark: SparkSession) -> pyspark.sql.dataframe:
return spark.table(self.table_name).filter(self.filter)
@dataclass
class FileSource(InputType):
file_path: Text
format: Text
filter: Text
def load_df(self, spark: SparkSession) -> pyspark.sql.dataframe:
return spark.read.format(self.format).load(self.file_path).filter(self.filter)
@dataclass
class SQLSource(InputType):
query: Text
def load_df(self, spark: SparkSession) -> pyspark.sql.dataframe:
return spark.sql(self.query)
@dataclass
class Params:
input_source: InputType
write_path: Text
# by changing the config input_source values, we can access the
# the other dataclasses without they need for branching.
str_conf = """
{
write_path: "/path/to/save/data"
input_source {
table_name: schema.table_name
filter: "par_day < 20210501 and par_hour = 0"
}
# other parameters needed
}
"""
conf = loads(str_conf, Params)
df = conf.load_df()
class Generic:
def __init__(self, params: Params) -> None:
self.params = params
# code to do stuff