This project implements a data pipeline using PySpark for data ingestion and processing. It provides functionality to read data from various sources including PostgreSQL databases and CSV files, process the data using PySpark, and store the results.
- Data ingestion from multiple sources:
- PostgreSQL database using JDBC
- PostgreSQL database using pandas
- Direct SQL queries to Spark
- Configurable logging system
- Modular pipeline architecture
- Support for both batch and streaming data processing
pipeline/
├── ingest.py # Data ingestion module
├── transform.py # Data transformation module
├── persist.py # Data persistence module
└── resources/
├── configs/
│ ├── logging.conf # Logging configuration
│ └── pipeline.ini # Pipeline configuration
└── postgresql-42.2.18.jar # PostgreSQL JDBC driver
- Python 3.8 or higher
- Apache Spark 3.5.0
- PostgreSQL 12 or higher
- Java 8 or higher (required for Spark)
- Clone the repository:
git clone https://github.com/imratnesh/pyspark-pipeline.git
cd pyspark-pipeline
- Create and activate a virtual environment (recommended):
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
- Install the required dependencies:
pip install -r requirements.txt
-
PostgreSQL Setup:
- Ensure PostgreSQL is running on localhost:5432
- Default credentials (modify as needed):
- Username: postgres
- Password: xxxx
- Database: postgres
- Create required schemas and tables:
- Use the provided SQL script to create schemas and tables:
psql -U postgres -d postgres -f CREATE_TABLES.sql
- This will create:
- futurexschema.futurex_course_catalog
- fxxcoursedb.fx_course_table
- Use the provided SQL script to create schemas and tables:
-
Logging Configuration:
- Logging settings are in
pipeline/resources/configs/logging.conf
- Adjust log levels and output paths as needed
- Logging settings are in
The pipeline provides several methods for data ingestion:
- Direct SQL query to Spark:
from pipeline.ingest import Ingest
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Pipeline").getOrCreate()
ingest = Ingest(spark)
df = ingest.ingest_data()
- PostgreSQL ingestion using pandas:
ingest.read_from_pg()
- PostgreSQL ingestion using JDBC:
ingest.read_from_pg_using_jdbc_driver()
After installation and configuration, verify your setup:
- Check Python dependencies:
pip list | grep -E "pyspark|psycopg2|pandas"
- Check Spark and Java installation:
spark-submit --version
java -version
- Check PostgreSQL connection and tables:
psql -U postgres -d postgres -c "\dt futurexschema.*"
psql -U postgres -d postgres -c "\dt fxxcoursedb.*"
- Run a sample pipeline ingestion (from Python shell):
from pipeline.ingest import Ingest
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Pipeline").getOrCreate()
ingest = Ingest(spark)
df = ingest.ingest_data()
print(df.show())
If you see a DataFrame output, your setup is correct!
- Follow PEP 8 style guide for Python code
- Ensure proper logging is implemented for all operations
- Test database connections before running the pipeline
- Use type hints for better code maintainability
- Write unit tests for new features
- Fork the repository
- Create a feature branch
- Commit your changes
- Push to the branch
- Create a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- LinkedIn: Ratnesh Kushwaha
- YouTube: India Analytica
REF For support, please open an issue in the GitHub repository or contact the maintainers.