Real Time Data Processing Using Kafka Nodejs
» Important Links
» What we are doing?
In this article, we’re going to use Kafka, Node.js, MySQL, and React for real-time data processing.
So In the backend, we will configure Kafka producers and consumers to handle train data messages and store them in a MySQL database. We will also set up Docker containers for Zookeeper and Kafka to manage our message streams.
And in the frontend, we will build a React application that fetches train data from our backend API.
But before we dive into the code, You should have a basic understanding of What is Kafka and its architecture.
» What is Kafka?
Kafka is a distributed streaming platform that allows you to publish, subscribe to, and process streams of records in real-time. It’s widely used for building real-time data pipelines and streaming applications.
Kafka is a distributed streaming platform that allows you to publish, subscribe to, and process streams of records in real-time. It’s widely used for building real-time data pipelines and streaming applications.
A topic is a category or feed name to which records are sent by producers.
Producers are applications that publish or write records to Kafka topics.
Consumers are applications that read records from Kafka topics.
A consumer group is a group of consumers that work together to consume records from a topic.
Offsets are unique identifiers of records within a partition, allowing consumers to keep track of their position.
Brokers are Kafka servers that store and serve records.
Zookeeper manages and coordinates Kafka brokers and maintains metadata.
Alright, now that we’ve covered the basics, let’s get our hands dirty with some code. We’ll start by setting up our backend with Node.js, Kafka, and MySQL.
» Backend Setup
Create the Project Directory.
Then create a folder called backend inside it.
So in the backend folder we will be needing five files: consumer, client, admin, producer and a server.js file for our entry point of the server.
mkdir kafka-nodejs
cd kafka-nodejs
mkdir backend
cd backend
backend/
├── admin.js
├── client.js
├── consumer.js
├── producer.js
├── package.json
└── package-lock.json
└── node_modules
Next, let’s install the necessary dependencies. In the terminal, run the following command:
npm init -y
npm install kafkajs mysql2 express stompit async
Before moving ahead let’s first make sure you have an account on publicdatafeeds and then grab your username and password and paste in somewhere safe, because You will need it ahead.
Now, let’s set up Kafka admin and client in admin.js
and client.js
.
// admin.js
const { Kafka } = require("kafkajs");
exports.kafka = new Kafka({
clientId: "my-app",
brokers: ["<ip-address>:9092"],
});
async function init() {
const admin = exports.kafka.admin();
console.log("Admin connecting...");
await admin.connect();
console.log("Admin Connection Success...");
console.log("Creating Topics [train_activation, train_cancellation]");
await admin.createTopics({
topics: [
{ topic: "train_activation", numPartitions: 2 },
{ topic: "train_cancellation", numPartitions: 2 },
],
});
console.log("Topics Created Success [train_activation, train_cancellation]");
console.log("Disconnecting Admin..");
await admin.disconnect();
}
init();
In admin.js
, we’ll connect to Kafka, create topics for train activation and cancellation, and disconnect from Kafka.
// client.js
const { Kafka } = require("kafkajs");
exports.kafka = new Kafka({
clientId: "my-app",
brokers: ["<ip-address>:9092"],
});
In client.js
, we’ll set up a basic Kafka client configuration that we’ll reuse in other files. client.js script serves as the configuration file for establishing connections to Kafka and MySQL clients.
Please replace the <ip-address>
with your IP address.
With the admin and client set up, let’s move on to creating our Kafka producer in producer.js
. The producer will send train data messages to our Kafka topics.
// producer.js
const { Kafka } = require('kafkajs');
const stompit = require('stompit');
const async = require('async');
// Configuration for the Kafka brokers
const kafkaConfig = {
brokers: ["<ip-address>:9092"],
};
// Create Kafka producer
const kafkaProducer = new Kafka({
clientId: "rail_app_producer",
...kafkaConfig,
}).producer();
const initKafkaProducer = async () => {
try {
await kafkaProducer.connect();
console.log('Producer connected successfully');
} catch (error) {
console.error('Error connecting Kafka producer:', error.message);
process.exit(1); // Exit the process if unable to connect
}
};
// Initialize Kafka producer
initKafkaProducer();
const connectOptions = { // Configuration for the STOMP connection to the Network Rail feed
host: 'publicdatafeeds.networkrail.co.uk',
port: 61618,
connectHeaders: {
'heart-beat': '15000,15000',
'client-id': '',
'host': '/',
'login': '<Your UserName for the API>',
'passcode': '<Password for the API>'
}
};
const reconnectOptions = {
initialReconnectDelay: 10,
maxReconnectDelay: 30000,
useExponentialBackOff: true,
maxReconnects: 30,
randomize: false
};
const connectionManager = new stompit.ConnectFailover([connectOptions], reconnectOptions);
connectionManager.connect((error, client, reconnect) => {
if (error) {
console.error('Terminal error, gave up reconnecting:', error.message);
return;
}
client.on('error', (error) => {
console.error('Connection lost. Reconnecting...', error.message);
reconnect();
});
const headers = {
destination: '/topic/TRAIN_MVT_ALL_TOC',
'activemq.subscriptionName': 'somename-train_mvt',
ack: 'client-individual'
};
client.subscribe(headers, (error, message) => {
if (error) {
console.error('Subscription failed:', error.message);
return;
}
message.readString('utf-8', async (error, body) => {
if (error) {
console.error('Failed to read a message', error.message);
return;
}
if (body) {
try {
const data = JSON.parse(body);
async.each(data, async (item) => {
const timestamp = new Date().toISOString();
if (item.header) {
if (item.header.msg_type === '0001') {
// Train Activation
const stanox = item.body.tp_origin_stanox || item.body.sched_origin_stanox || 'N/A';
console.log(timestamp, '- Train', item.body.train_id, 'activated at stanox', stanox);
// Send the message to Kafka
await sendToKafka('train_activation', { timestamp, trainId: item.body.train_id, stanox });
} else if (item.header.msg_type === '0002') {
// Train Cancellation
const stanox = item.body.loc_stanox || 'N/A';
const reasonCode = item.body.canx_reason_code || 'N/A';
console.log(timestamp, '- Train', item.body.train_id, 'cancelled. Cancellation Reason:', reasonCode, 'at stanox', stanox);
// Send the message to Kafka
await sendToKafka('train_cancellation', { timestamp, trainId: item.body.train_id, stanox, reasonCode });
}
}
});
} catch (e) {
console.error('Failed to parse JSON', e.message);
}
}
client.ack(message);
});
});
});
// Add a log statement inside sendToKafka to confirm messages are being sent
async function sendToKafka(topic, message) {
try {
await kafkaProducer.send({
topic,
messages: [
{
value: JSON.stringify(message),
},
],
});
console.log(`Message sent to Kafka topic "${topic}":`, message); // Log confirmation
} catch (error) {
console.error('Error sending message to Kafka:', error.message);
}
}
Next, let’s set up our Kafka consumer in consumer.js
. The consumer will read messages from Kafka topics and save the data to our MySQL database.
// consumer.js
const { Kafka, logLevel } = require('kafkajs');
const mysql = require('mysql2');
// Create MySQL connection pool
const pool = mysql.createPool({
connectionLimit: 10,
host: '127.0.0.1',
user: '<root username>',
password: '<password>',
database: 'rail_data',
});
// Configuration for the Kafka brokers
const kafkaConfig = {
brokers: ["<ip-address>:9092"],
logLevel: logLevel.DEBUG, // Set log level to DEBUG for detailed logging
};
// Create Kafka consumer
const kafkaConsumer = new Kafka({
clientId: "rail_app_consumer",
...kafkaConfig,
}).consumer({
groupId: "rail_consumer_group",
});
// Topics produced by the producer
const topics = ["train_activation", "train_cancellation"];
// Connect the Kafka consumer
const initKafkaConsumer = async () => {
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topics });
await kafkaConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const processedMessage = JSON.parse(message.value.toString('utf-8'));
// Log the received message
console.log(processedMessage);
// Insert data into MySQL database
if (topic === 'train_activation') {
insertActiveTrain(processedMessage.trainId, processedMessage.stanox, processedMessage.timestamp);
} else if (topic === 'train_cancellation') {
insertCancelledTrain(processedMessage.trainId, processedMessage.stanox, processedMessage.reasonCode, processedMessage.timestamp);
}
// Add your processing logic here
// For now, let's log that the message is being processed
console.log('Processing message...');
} catch (error) {
console.error('Error processing message:', error.message);
}
},
});
};
// Function to format timestamp to MySQL DATETIME format
function formatTimestampToMySQL(timestamp) {
const date = new Date(timestamp);
const year = date.getFullYear();
const month = (`0${date.getMonth() + 1}`).slice(-2);
const day = (`0${date.getDate()}`).slice(-2);
const hours = (`0${date.getHours()}`).slice(-2);
const minutes = (`0${date.getMinutes()}`).slice(-2);
const seconds = (`0${date.getSeconds()}`).slice(-2);
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`;
}
// Function to insert active train data into the database
function insertActiveTrain(trainId, stanox, timestamp) {
const formattedTimestamp = formatTimestampToMySQL(timestamp);
const sql = 'INSERT INTO active_trains (train_id, stanox, timestamp) VALUES (?, ?, ?)';
const values = [trainId, stanox, formattedTimestamp];
pool.query(sql, values, (error, results) => {
if (error) {
console.error('Error inserting active train data:', error);
} else {
console.log('Inserted active train data:', results);
}
});
}
// Function to insert cancelled train data into the database
function insertCancelledTrain(trainId, stanox, reasonCode, timestamp) {
const formattedTimestamp = formatTimestampToMySQL(timestamp);
const sql = 'INSERT INTO cancelled_trains (train_id, stanox, reason_code, timestamp) VALUES (?, ?, ?, ?)';
const values = [trainId, stanox, reasonCode, formattedTimestamp];
pool.query(sql, values, (error, results) => {
if (error) {
console.error('Error inserting cancelled train data:', error);
} else {
console.log('Inserted cancelled train data:', results);
}
});
}
// Initialize Kafka consumer
initKafkaConsumer();
// Handle process termination to close the Kafka consumer gracefully
process.on('SIGTERM', async () => {
await kafkaConsumer.disconnect();
process.exit(0);
});
process.on('SIGINT', async () => {
await kafkaConsumer.disconnect();
process.exit(0);
});
» Setup MySQL DB
Now let’s set up our MySQL database. In MySQL Workbench, create a database called rail_data
and two tables: active_trains
and cancelled_trains
.
CREATE DATABASE rail_data;
USE rail_data;
CREATE TABLE IF NOT EXISTS active_trains (
id INT AUTO_INCREMENT PRIMARY KEY,
train_id VARCHAR(255) NOT NULL,
stanox VARCHAR(255) NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS cancelled_trains (
id INT AUTO_INCREMENT PRIMARY KEY,
train_id VARCHAR(255) NOT NULL,
reason_code VARCHAR(255) NOT NULL,
stanox VARCHAR(255) NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
To check it if it is set up correctly run the follwoing command in workbench:
DESCRIBE active_trains
DESCRIBE cancelled_trains
And you should have empty tables for it.
» Test Kafka And DB
Now that we’ve set up our backend, it’s time to test it.
We’ll start the consumer and producer files, run Zookeeper and Kafka using Docker, and verify that our data is being correctly stored in MySQL Workbench. Let’s get started.
First, we’ll start Zookeeper and Kafka using Docker. Open your terminal and the command:
docker run -p 2181:2181 zookeeper
This will start Zookeper Container and expose PORT 2181.
Pull Kafka image and run it:
docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.2:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.2:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka
Now, we have Zookeeper running on port 2181 and Kafka running on port 9092. Next, let’s start our Kafka producer and consumer.
Open the project directory, navigate to the backend
directory and start the producer by running the command: node producer.js
and then start the consumer file with the command: node consumer.js
# Start the producer
node producer.js
# Start the consumer
node consumer.js
Now that the producer and consumer running, they will start sending and processing train data messages.
And in terminal you can look at these data that the producer.js file is giving us.
Now that we know that the producer is working fine, let’s verify that this data is being stored in our MySQL database.
For it open MySQL Workbench and connect to your rail_data
database with the command:
USE rail_data;
And then run the command:
SELECT * FROM active_trains;
And you should see, the active_trains
table contains the data sent by our Kafka producer.
Now, let’s check the cancelled_trains
table by running the command:
SELECT * FROM cancelled_trains;
And Similarly, the cancelled_trains
table contains the data processed by our Kafka consumer.
This confirms that our backend and database is working as expected.
» Create Server
Now, let’s create our API endpoints in backend/server.js
to fetch the data from MySQL.
// backend/server.js
const express = require('express');
const mysql = require('mysql2');
const cors = require('cors'); // Optional: to handle cross-origin requests
const app = express();
const port = 8080; // You can use any port you like
app.use(cors()); // Enable CORS if needed
// Create MySQL connection pool
const pool = mysql.createPool({
connectionLimit: 10,
host: '127.0.0.1',
user: 'root',
password: 'password',
database: 'rail_data',
});
// Middleware
app.use(express.json());
app.use(cors()); // Enable CORS if needed
app.get('/api/active-trains', (req, res) => {
const limit = parseInt(req.query.limit, 10) || 10; // Number of items per page
const offset = parseInt(req.query.offset, 10) || 0; // Offset for pagination
const sql = 'SELECT * FROM active_trains LIMIT ? OFFSET ?';
pool.query(sql, [limit, offset], (error, results) => {
if (error) {
console.error('Error fetching active trains:', error);
return res.status(500).json({ error: 'Internal Server Error' });
}
res.json(results);
});
});
app.get('/api/cancelled-trains', (req, res) => {
const limit = parseInt(req.query.limit, 10) || 10; // Number of items per page
const offset = parseInt(req.query.offset, 10) || 0; // Offset for pagination
const sql = 'SELECT * FROM cancelled_trains LIMIT ? OFFSET ?';
pool.query(sql, [limit, offset], (error, results) => {
if (error) {
console.error('Error fetching cancelled trains:', error);
return res.status(500).json({ error: 'Internal Server Error' });
}
res.json(results);
});
});
// Start the server
app.listen(port, () => {
console.log(`Server running on http://localhost:${port}`);
});
And that’s it for the backend setup! We have our Kafka producers and consumers in place, our MySQL database set up, and API endpoints ready to serve data. Now we’ll move on to setting up the frontend with React.
» Frontend Setup
I’ve provided the link to set up Frontend at the top. Please follow that.
Ok. Did it?
Noice. Let’s move ahead…
Now create 3 files that we will be needing to work with in the src
directory.
ActiveTrainList
that will show us the list of active trains and then CancelledTrainList
and TrainData.js
file to display information about active and cancelled trains.
TrainData.js file:
// frontend/src/TrainData.js
import React, { useState, useEffect } from 'react';
import axios from 'axios';
const TrainData = () => {
const [activeTrains, setActiveTrains] = useState([]);
const [cancelledTrains, setCancelledTrains] = useState([]);
useEffect(() => {
const fetchActiveTrains = async () => {
try {
const response = await axios.get('http://localhost:8080/api/active-trains');
setActiveTrains(response.data);
} catch (error) {
console.error('Error fetching active trains:', error);
}
};
const fetchCancelledTrains = async () => {
try {
const response = await axios.get('http://localhost:8080/api/cancelled-trains');
setCancelledTrains(response.data);
} catch (error) {
console.error('Error fetching cancelled trains:', error);
}
};
fetchActiveTrains();
fetchCancelledTrains();
}, []);
return (
<div>
<h1>Train Data</h1>
<h2>Active Trains</h2>
<table>
<thead>
<tr>
<th>Train ID</th>
<th>Stanox</th>
<th>Timestamp</th>
</tr>
</thead>
<tbody>
{activeTrains.map((train) => (
<tr key={train.train_id}>
<td>{train.train_id}</td>
<td>{train.stanox}</td>
<td>{new Date(train.timestamp).toLocaleString()}</td>
</tr>
))}
</tbody>
</table>
<h2>Cancelled Trains</h2>
<table>
<thead>
<tr>
<th>Train ID</th>
<th>Stanox</th>
<th>Reason Code</th>
<th>Timestamp</th>
</tr>
</thead>
<tbody>
{cancelledTrains.map((train) => (
<tr key={train.train_id}>
<td>{train.train_id}</td>
<td>{train.stanox}</td>
<td>{train.reason_code}</td>
<td>{new Date(train.timestamp).toLocaleString()}</td>
</tr>
))}
</tbody>
</table>
</div>
);
};
export default TrainData;
ActiveTrainList.js file:
// frontend/src/ActiveTrainList.js
import React, { useState, useEffect } from 'react';
import axios from 'axios';
const ActiveTrainsList = ({ apiUrl, limit }) => {
const [items, setItems] = useState([]);
const [loading, setLoading] = useState(false);
const [error, setError] = useState(null);
const [hasMore, setHasMore] = useState(true);
const [offset, setOffset] = useState(0);
const fetchItems = async () => {
setLoading(true);
try {
const response = await axios.get(`${apiUrl}?limit=${limit}&offset=${offset}`);
console.log('Fetched active trains:', response.data); // Log the fetched data
if (response.data.length < limit) {
setHasMore(false);
}
setItems(prevItems => [...prevItems, ...response.data]);
setOffset(prevOffset => prevOffset + limit);
} catch (error) {
setError('Error fetching data');
console.error('Error fetching active trains:', error);
}
setLoading(false);
};
useEffect(() => {
fetchItems();
}, []);
return (
<div>
{error && <p>{error}</p>}
{items.length > 0 ? (
<ul>
{items.map((item, index) => (
<li key={index}>
{/* Adjust these properties based on your data structure */}
Train ID: {item.train_id},
Stanox: {item.stanox},
Timestamp: {item.timestamp}
</li>
))}
</ul>
) : (
<p>No data available.</p>
)}
{loading && <p>Loading...</p>}
{hasMore && !loading && (
<button onClick={fetchItems}>Load More</button>
)}
</div>
);
};
export default ActiveTrainsList;
CancelledTrainList.js file:
// frontend/src/CancelledTrainList.js
import React, { useState, useEffect } from 'react';
import axios from 'axios';
const PaginatedList = ({ apiUrl, limit }) => {
const [items, setItems] = useState([]);
const [loading, setLoading] = useState(false);
const [error, setError] = useState(null);
const [hasMore, setHasMore] = useState(true);
const [offset, setOffset] = useState(0);
const fetchItems = async () => {
setLoading(true);
try {
const response = await axios.get(`${apiUrl}?limit=${limit}&offset=${offset}`);
console.log('Fetched items:', response.data); // Log the fetched data
if (response.data.length < limit) {
setHasMore(false);
}
setItems(prevItems => [...prevItems, ...response.data]);
setOffset(prevOffset => prevOffset + limit);
} catch (error) {
setError('Error fetching data');
console.error('Error fetching data:', error);
}
setLoading(false);
};
useEffect(() => {
fetchItems();
}, []);
return (
<div>
{error && <p>{error}</p>}
{items.length > 0 ? (
<ul>
{items.map((item, index) => (
<li key={index}>
{/* Adjust these properties based on your data structure */}
Train ID: {item.train_id},
Stanox: {item.stanox},
Timestamp: {item.timestamp}
</li>
))}
</ul>
) : (
<p>No data available.</p>
)}
{loading && <p>Loading...</p>}
{hasMore && !loading && (
<button onClick={fetchItems}>Load More</button>
)}
</div>
);
};
export default PaginatedList;
App.js file:
// App.js
import React from 'react';
import PaginatedList from './PaginatedList';
import ActiveTrainsList from './ActiveTrainsList';
const App = () => {
return (
<div>
<h1>Cancelled Trains</h1>
<PaginatedList apiUrl="http://localhost:8080/api/cancelled-trains" limit={10} />
<h1>Active Trains</h1>
<ActiveTrainsList apiUrl="http://localhost:8080/api/active-trains" limit={10} />
</div>
);
};
export default App;
And that’s all we have for our React/Frontend code.
So let’s, start the React app.
Open the terminal and navigate to the frontend directory and run the command:
npm start
Open your browser and navigate to http://localhost:3000. You should see the train data displayed with a ‘Load More’ button to fetch additional entries.
» Outro
And that’s all for this article:
So In the backend, we configured Kafka producers and consumers to handle train data messages and store them in a MySQL database. We also set up Docker containers for Zookeeper and Kafka to manage our message streams.
And in the frontend, we built a React application that fetches train data from our backend API. We implemented components to display both active and cancelled trains, along with a ‘Load More’ button to fetch additional entries.
And that’s it! We’ve successfully integrated Kafka with Node.js for real-time data processing use Docker to manage Zookeeper and Kafka containers, and store and retrieve data with MySQL.
And that’s all for this blog. I hope you enjoyed it and found it useful. Also if you have any doubts please drop a message on our discord server. The link can be found below.
👉 And I’ll see you in next one, till then bye-bye and take care.