A real-time data processing application that ingests data from Apache Kafka, processes it using a Spring Boot application, and stores the results in a PostgreSQL database. This system demonstrates a full pipeline from data ingestion to storage, suitable for scalable and high-throughput environments.
- Real-Time Data Ingestion: Utilizes Apache Kafka for high-throughput, low-latency ingestion of data streams.
- Data Processing: Implements a Spring Boot application that consumes Kafka messages, processes them (filtering, transformation, aggregation), and persists the results.
- Database Storage: Stores processed data in PostgreSQL, a reliable and scalable relational database.
- Scalability: Designed to handle large volumes of data with the ability to scale horizontally.
Below is the architecture diagram of the Real-Time Data Processing System:
+------------------+
| |
| Data Producer |
| (Kafka Producer) |
| |
+---------+--------+
|
| 1. Send messages to Kafka topic
v
+---------+--------+
| |
| Kafka Broker |
| (incoming-data) |
| |
+---------+--------+
|
| 2. Kafka Consumer subscribes to topic
v
+---------+--------+
| |
| Spring Boot |
| Application |
| (Kafka Consumer) |
| |
+---------+--------+
|
| 3. Process data (filter, transform)
v
+---------+--------+
| |
| PostgreSQL |
| Database |
| (processed_data) |
| |
+------------------+
-
Data Producer (Kafka Producer):
- Any application or service that generates data to be processed.
- Sends messages to the Kafka topic
incoming-data
.
-
Kafka Broker:
- Acts as a messaging system that receives and holds messages from producers.
- Manages the
incoming-data
topic where messages are stored until consumed.
-
Spring Boot Application (Kafka Consumer):
- Listens to the Kafka topic
incoming-data
. - Consumes messages in real-time.
- Performs data processing tasks such as filtering, transformation, and aggregation.
- Listens to the Kafka topic
-
PostgreSQL Database:
- Stores the processed data in the
processed_data
table. - Allows for querying and analysis of the stored data.
- Stores the processed data in the
- Step 1: Data producers send messages to the Kafka broker's
incoming-data
topic. - Step 2: The Spring Boot application consumes messages from the
incoming-data
topic. - Step 3: The application processes the data (e.g., filters invalid messages, transforms the data).
- Step 4: Processed data is saved to the
processed_data
table in the PostgreSQL database.
Before you begin, ensure you have the following installed on your system:
- Java Development Kit (JDK) 17 or higher
- Apache Kafka (latest version)
- Apache Zookeeper (comes bundled with Kafka)
- PostgreSQL (version 12 or higher)
- Apache Maven
- Git
git clone https://github.com/srikanthamsa/realtime-data-processing-system.git
cd realtime-data-processing-system
-
Create Database: Log in to PostgreSQL and create a new database.
CREATE DATABASE your_postgres_database;
-
Set Up User (Optional): Ensure you have a user with the necessary privileges.
CREATE USER postgres WITH PASSWORD 'your_postgres_password'; GRANT ALL PRIVILEGES ON DATABASE your_postgres_database TO your_postgres_username;
-
Open
src/main/resources/application.properties
and update the following properties:# Database Configuration spring.datasource.url=jdbc:postgresql://localhost:5432/your_postgres_database spring.datasource.username=your_postgres_username spring.datasource.password=your_postgres_password # Kafka Configuration spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=data-processing-group # Hibernate Configuration spring.jpa.hibernate.ddl-auto=update spring.jpa.show-sql=true spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect
mvn clean install
# Navigate to Kafka installation directory
cd path/to/kafka
# Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
Open a new terminal:
# Navigate to Kafka installation directory
cd path/to/kafka
# Start Kafka Broker
bin/kafka-server-start.sh config/server.properties
Open a new terminal:
# Navigate to Kafka installation directory
cd path/to/kafka
# Create topic 'incoming-data'
bin/kafka-topics.sh --create --topic incoming-data --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
mvn spring-boot:run
The application will start and listen for messages on the incoming-data
Kafka topic.
Open a new terminal:
# Navigate to Kafka installation directory
cd path/to/kafka
# Start Kafka Console Producer
bin/kafka-console-producer.sh --topic incoming-data --bootstrap-server localhost:9092
Type messages and press Enter after each one:
Hello World
This is a test message
Another message for processing
-
Application Logs: Check the terminal running the Spring Boot application to see logs indicating message receipt and processing.
-
Database Verification: Connect to the PostgreSQL database to verify that data has been stored.
psql -h localhost -U postgres -d your_postgres_database
SELECT * FROM processed_data;
The application logs showcase the successful real-time processing of messages received from the Kafka topic incoming-data
by the Spring Boot application. Here's a breakdown of the key steps illustrated in the logs:
-
Message Reception: The application receives various messages sent to the Kafka topic, such as:
- "Hello World"
- "This is for test purposes"
- "Data processing test"
- "Everything working as expected!"
- An empty message (noted in the logs)
- "123456789+-*/!"
-
Filtering: The application implements a filtering mechanism to exclude invalid or empty messages. In the logs, we see an empty message being received and filtered out:
Received message: Message filtered out:
-
Transformation: Valid messages undergo a transformation process. Specifically, each message is reversed and converted to uppercase. For example:
- Original: "Hello World"
- Transformed: "DLROW OLLEH"
- Original: "This is for test purposes"
- Transformed: "SESOPRUP TSET ROF SI SIHT"
-
Data Aggregation: Additional metadata is computed for each message, such as the length of the original message (
dataLength
) and a timestamp of when the message was processed (processedAt
). -
Database Insertion: The processed data is saved to the PostgreSQL database. The logs display Hibernate executing SQL
INSERT
statements to store eachProcessedData
object:Hibernate: insert into processed_data (data_length, original_data, processed_at, transformed_data, valid) values (?, ?, ?, ?, ?) Processed and saved data: ProcessedData{id=1, originalData='Hello World', transformedData='DLROW OLLEH', processedAt=2024-09-29T21:17:39.997929400, dataLength=11, valid=true}
-
Error Handling: The application gracefully handles network interruptions or other transient issues. For instance, the log entry:
2024-09-29T21:26:39.688+05:30 INFO 31204 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-data-processing-group-1, groupId=data-processing-group] Node -1 disconnected.
indicates a temporary disconnection that doesn't halt the application's operation.
Summary: The logs confirm that the system effectively ingests messages from Kafka, processes them according to the defined business logic (filtering out invalid entries, transforming valid messages), and persists the results in a PostgreSQL database. This demonstrates the application's capability to handle real-time data processing tasks reliably and efficiently.
Contributions are welcome! Please fork the repository and create a pull request with your changes.
- Fork the Project
- Create your Feature Branch (
git checkout -b feature/YourFeature
) - Commit your Changes (
git commit -m 'Add Your Feature'
) - Push to the Branch (
git push origin feature/YourFeature
) - Open a Pull Request
Distributed under the MIT License. See LICENSE
file for more information.
Your Name
- Email: [email protected]
- LinkedIn: Srikant Hamsa
- GitHub: srikanthamsa
Feel free to reach out if you have any questions or suggestions!