Real Time Data Processing Using Kafka Nodejs

  1. GitHub Repository Link

  2. Install Node.js

  3. Setup Tailwind with Vite and React

  4. Install MySQL

  5. Install MySQL Workbench

  6. Create Your Account for Train Data API

» What we are doing?

In this article, we’re going to use Kafka, Node.js, MySQL, and React for real-time data processing.

So In the backend, we will configure Kafka producers and consumers to handle train data messages and store them in a MySQL database. We will also set up Docker containers for Zookeeper and Kafka to manage our message streams.

And in the frontend, we will build a React application that fetches train data from our backend API.

But before we dive into the code, You should have a basic understanding of What is Kafka and its architecture.

» What is Kafka?

Kafka

Kafka is a distributed streaming platform that allows you to publish, subscribe to, and process streams of records in real-time. It’s widely used for building real-time data pipelines and streaming applications.

Kafka is a distributed streaming platform that allows you to publish, subscribe to, and process streams of records in real-time. It’s widely used for building real-time data pipelines and streaming applications.

A topic is a category or feed name to which records are sent by producers.

Producers are applications that publish or write records to Kafka topics.

Consumers are applications that read records from Kafka topics.

A consumer group is a group of consumers that work together to consume records from a topic.

Offsets are unique identifiers of records within a partition, allowing consumers to keep track of their position.

Brokers are Kafka servers that store and serve records.

Zookeeper manages and coordinates Kafka brokers and maintains metadata.

Alright, now that we’ve covered the basics, let’s get our hands dirty with some code. We’ll start by setting up our backend with Node.js, Kafka, and MySQL.

» Backend Setup

Create the Project Directory.

Then create a folder called backend inside it.

So in the backend folder we will be needing five files: consumer, client, admin, producer and a server.js file for our entry point of the server.

mkdir kafka-nodejs
cd kafka-nodejs
mkdir backend
cd backend
backend/
├── admin.js
├── client.js
├── consumer.js
├── producer.js
├── package.json
└── package-lock.json
└── node_modules

Next, let’s install the necessary dependencies. In the terminal, run the following command:

npm init -y
npm install kafkajs mysql2 express stompit async  

Before moving ahead let’s first make sure you have an account on publicdatafeeds and then grab your username and password and paste in somewhere safe, because You will need it ahead.

Now, let’s set up Kafka admin and client in admin.js and client.js.

// admin.js
const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<ip-address>:9092"],
});

async function init() {
  const admin = exports.kafka.admin();
  console.log("Admin connecting...");
  await admin.connect();
  console.log("Admin Connection Success...");

  console.log("Creating Topics [train_activation, train_cancellation]");
  await admin.createTopics({
    topics: [
      { topic: "train_activation", numPartitions: 2 },
      { topic: "train_cancellation", numPartitions: 2 },
    ],
  });
  console.log("Topics Created Success [train_activation, train_cancellation]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

In admin.js, we’ll connect to Kafka, create topics for train activation and cancellation, and disconnect from Kafka.

// client.js
const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<ip-address>:9092"],
});

In client.js, we’ll set up a basic Kafka client configuration that we’ll reuse in other files. client.js script serves as the configuration file for establishing connections to Kafka and MySQL clients.

Please replace the <ip-address> with your IP address.

With the admin and client set up, let’s move on to creating our Kafka producer in producer.js. The producer will send train data messages to our Kafka topics.

// producer.js
const { Kafka } = require('kafkajs');
const stompit = require('stompit');
const async = require('async');

// Configuration for the Kafka brokers
const kafkaConfig = {
    brokers: ["<ip-address>:9092"],
};

// Create Kafka producer
const kafkaProducer = new Kafka({
    clientId: "rail_app_producer",
    ...kafkaConfig,
}).producer();

const initKafkaProducer = async () => {
    try {
        await kafkaProducer.connect();
        console.log('Producer connected successfully');
    } catch (error) {
        console.error('Error connecting Kafka producer:', error.message);
        process.exit(1); // Exit the process if unable to connect
    }
};

// Initialize Kafka producer
initKafkaProducer();

const connectOptions = { // Configuration for the STOMP connection to the Network Rail feed
    host: 'publicdatafeeds.networkrail.co.uk',
    port: 61618,
    connectHeaders: {
        'heart-beat': '15000,15000',
        'client-id': '',
        'host': '/',
        'login': '<Your UserName for the API>',
        'passcode': '<Password for the API>'
    }
};

const reconnectOptions = {
    initialReconnectDelay: 10,
    maxReconnectDelay: 30000,
    useExponentialBackOff: true,
    maxReconnects: 30,
    randomize: false
};

const connectionManager = new stompit.ConnectFailover([connectOptions], reconnectOptions);

connectionManager.connect((error, client, reconnect) => {
    if (error) {
        console.error('Terminal error, gave up reconnecting:', error.message);
        return;
    }

    client.on('error', (error) => {
        console.error('Connection lost. Reconnecting...', error.message);
        reconnect();
    });

    const headers = {
        destination: '/topic/TRAIN_MVT_ALL_TOC',
        'activemq.subscriptionName': 'somename-train_mvt',
        ack: 'client-individual'
    };

    client.subscribe(headers, (error, message) => {
        if (error) {
            console.error('Subscription failed:', error.message);
            return;
        }

        message.readString('utf-8', async (error, body) => {
            if (error) {
                console.error('Failed to read a message', error.message);
                return;
            }

            if (body) {
                try {
                    const data = JSON.parse(body);

                    async.each(data, async (item) => {
                        const timestamp = new Date().toISOString();

                        if (item.header) {
                            if (item.header.msg_type === '0001') {
                                // Train Activation
                                const stanox = item.body.tp_origin_stanox || item.body.sched_origin_stanox || 'N/A';
                                console.log(timestamp, '- Train', item.body.train_id, 'activated at stanox', stanox);

                                // Send the message to Kafka
                                await sendToKafka('train_activation', { timestamp, trainId: item.body.train_id, stanox });
                            } else if (item.header.msg_type === '0002') {
                                // Train Cancellation
                                const stanox = item.body.loc_stanox || 'N/A';
                                const reasonCode = item.body.canx_reason_code || 'N/A';
                                console.log(timestamp, '- Train', item.body.train_id, 'cancelled. Cancellation Reason:', reasonCode, 'at stanox', stanox);

                                // Send the message to Kafka
                                await sendToKafka('train_cancellation', { timestamp, trainId: item.body.train_id, stanox, reasonCode });
                            }
                        }
                    });
                } catch (e) {
                    console.error('Failed to parse JSON', e.message);
                }
            }

            client.ack(message);
        });
    });
});

// Add a log statement inside sendToKafka to confirm messages are being sent
async function sendToKafka(topic, message) {
    try {
        await kafkaProducer.send({
            topic,
            messages: [
                {
                    value: JSON.stringify(message),
                },
            ],
        });
        console.log(`Message sent to Kafka topic "${topic}":`, message); // Log confirmation
    } catch (error) {
        console.error('Error sending message to Kafka:', error.message);
    }
}

Next, let’s set up our Kafka consumer in consumer.js. The consumer will read messages from Kafka topics and save the data to our MySQL database.

// consumer.js
const { Kafka, logLevel } = require('kafkajs');
const mysql = require('mysql2');

// Create MySQL connection pool
const pool = mysql.createPool({
  connectionLimit: 10,
  host: '127.0.0.1',
  user: '<root username>',
  password: '<password>',
  database: 'rail_data',
});

// Configuration for the Kafka brokers
const kafkaConfig = {
  brokers: ["<ip-address>:9092"],
  logLevel: logLevel.DEBUG, // Set log level to DEBUG for detailed logging
};

// Create Kafka consumer
const kafkaConsumer = new Kafka({
  clientId: "rail_app_consumer",
  ...kafkaConfig,
}).consumer({
  groupId: "rail_consumer_group",
});

// Topics produced by the producer
const topics = ["train_activation", "train_cancellation"];

// Connect the Kafka consumer 
const initKafkaConsumer = async () => {
  await kafkaConsumer.connect();
  await kafkaConsumer.subscribe({ topics });

  await kafkaConsumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      try {
        const processedMessage = JSON.parse(message.value.toString('utf-8'));

        // Log the received message
        console.log(processedMessage);

        // Insert data into MySQL database
        if (topic === 'train_activation') {
          insertActiveTrain(processedMessage.trainId, processedMessage.stanox, processedMessage.timestamp);
        } else if (topic === 'train_cancellation') {
          insertCancelledTrain(processedMessage.trainId, processedMessage.stanox, processedMessage.reasonCode, processedMessage.timestamp);
        }

        // Add your processing logic here
        // For now, let's log that the message is being processed
        console.log('Processing message...');
      } catch (error) {
        console.error('Error processing message:', error.message);
      }
    },
  });
};

// Function to format timestamp to MySQL DATETIME format
function formatTimestampToMySQL(timestamp) {
  const date = new Date(timestamp);
  const year = date.getFullYear();
  const month = (`0${date.getMonth() + 1}`).slice(-2);
  const day = (`0${date.getDate()}`).slice(-2);
  const hours = (`0${date.getHours()}`).slice(-2);
  const minutes = (`0${date.getMinutes()}`).slice(-2);
  const seconds = (`0${date.getSeconds()}`).slice(-2);
  return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`;
}

// Function to insert active train data into the database
function insertActiveTrain(trainId, stanox, timestamp) {
  const formattedTimestamp = formatTimestampToMySQL(timestamp);
  const sql = 'INSERT INTO active_trains (train_id, stanox, timestamp) VALUES (?, ?, ?)';
  const values = [trainId, stanox, formattedTimestamp];

  pool.query(sql, values, (error, results) => {
    if (error) {
      console.error('Error inserting active train data:', error);
    } else {
      console.log('Inserted active train data:', results);
    }
  });
}

// Function to insert cancelled train data into the database
function insertCancelledTrain(trainId, stanox, reasonCode, timestamp) {
  const formattedTimestamp = formatTimestampToMySQL(timestamp);
  const sql = 'INSERT INTO cancelled_trains (train_id, stanox, reason_code, timestamp) VALUES (?, ?, ?, ?)';
  const values = [trainId, stanox, reasonCode, formattedTimestamp];

  pool.query(sql, values, (error, results) => {
    if (error) {
      console.error('Error inserting cancelled train data:', error);
    } else {
      console.log('Inserted cancelled train data:', results);
    }
  });
}

// Initialize Kafka consumer
initKafkaConsumer();

// Handle process termination to close the Kafka consumer gracefully
process.on('SIGTERM', async () => {
  await kafkaConsumer.disconnect();
  process.exit(0);
});

process.on('SIGINT', async () => {
  await kafkaConsumer.disconnect();
  process.exit(0);
});

» Setup MySQL DB

Now let’s set up our MySQL database. In MySQL Workbench, create a database called rail_data and two tables: active_trains and cancelled_trains.

CREATE DATABASE rail_data;

USE rail_data;

CREATE TABLE IF NOT EXISTS active_trains (
    id INT AUTO_INCREMENT PRIMARY KEY,
    train_id VARCHAR(255) NOT NULL,
    stanox VARCHAR(255) NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);


CREATE TABLE IF NOT EXISTS cancelled_trains (
    id INT AUTO_INCREMENT PRIMARY KEY,
    train_id VARCHAR(255) NOT NULL,
    reason_code VARCHAR(255) NOT NULL,
    stanox VARCHAR(255) NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

To check it if it is set up correctly run the follwoing command in workbench:

DESCRIBE active_trains

DESCRIBE cancelled_trains

And you should have empty tables for it.

» Test Kafka And DB

Now that we’ve set up our backend, it’s time to test it.

We’ll start the consumer and producer files, run Zookeeper and Kafka using Docker, and verify that our data is being correctly stored in MySQL Workbench. Let’s get started.

First, we’ll start Zookeeper and Kafka using Docker. Open your terminal and the command:

docker run -p 2181:2181 zookeeper

This will start Zookeper Container and expose PORT 2181.

Pull Kafka image and run it:

docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.2:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.2:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

Now, we have Zookeeper running on port 2181 and Kafka running on port 9092. Next, let’s start our Kafka producer and consumer.

Open the project directory, navigate to the backend directory and start the producer by running the command: node producer.js and then start the consumer file with the command: node consumer.js

# Start the producer
node producer.js
# Start the consumer
node consumer.js

Now that the producer and consumer running, they will start sending and processing train data messages.

And in terminal you can look at these data that the producer.js file is giving us.

Now that we know that the producer is working fine, let’s verify that this data is being stored in our MySQL database.

For it open MySQL Workbench and connect to your rail_data database with the command:

USE rail_data;

And then run the command:

SELECT * FROM active_trains;

And you should see, the active_trains table contains the data sent by our Kafka producer.

Now, let’s check the cancelled_trains table by running the command:

SELECT * FROM cancelled_trains;

And Similarly, the cancelled_trains table contains the data processed by our Kafka consumer.

This confirms that our backend and database is working as expected.

» Create Server

Now, let’s create our API endpoints in backend/server.js to fetch the data from MySQL.

// backend/server.js
const express = require('express');
const mysql = require('mysql2');
const cors = require('cors'); // Optional: to handle cross-origin requests
const app = express();
const port = 8080; // You can use any port you like
app.use(cors()); // Enable CORS if needed

// Create MySQL connection pool
const pool = mysql.createPool({
  connectionLimit: 10,
  host: '127.0.0.1',
  user: 'root',
  password: 'password',
  database: 'rail_data',
});


// Middleware
app.use(express.json());
app.use(cors()); // Enable CORS if needed

app.get('/api/active-trains', (req, res) => {
    const limit = parseInt(req.query.limit, 10) || 10; // Number of items per page
    const offset = parseInt(req.query.offset, 10) || 0; // Offset for pagination
  
    const sql = 'SELECT * FROM active_trains LIMIT ? OFFSET ?';
    pool.query(sql, [limit, offset], (error, results) => {
      if (error) {
        console.error('Error fetching active trains:', error);
        return res.status(500).json({ error: 'Internal Server Error' });
      }
      res.json(results);
    });
  });
  

app.get('/api/cancelled-trains', (req, res) => {
    const limit = parseInt(req.query.limit, 10) || 10; // Number of items per page
    const offset = parseInt(req.query.offset, 10) || 0; // Offset for pagination
  
    const sql = 'SELECT * FROM cancelled_trains LIMIT ? OFFSET ?';
    pool.query(sql, [limit, offset], (error, results) => {
      if (error) {
        console.error('Error fetching cancelled trains:', error);
        return res.status(500).json({ error: 'Internal Server Error' });
      }
      res.json(results);
    });
  });
  
  
// Start the server
app.listen(port, () => {
  console.log(`Server running on http://localhost:${port}`);
});

And that’s it for the backend setup! We have our Kafka producers and consumers in place, our MySQL database set up, and API endpoints ready to serve data. Now we’ll move on to setting up the frontend with React.

» Frontend Setup

I’ve provided the link to set up Frontend at the top. Please follow that.

Ok. Did it?

Noice. Let’s move ahead…

Now create 3 files that we will be needing to work with in the src directory.

ActiveTrainList that will show us the list of active trains and then CancelledTrainList and TrainData.js file to display information about active and cancelled trains.

TrainData.js file:

// frontend/src/TrainData.js
import React, { useState, useEffect } from 'react';
import axios from 'axios';

const TrainData = () => {
  const [activeTrains, setActiveTrains] = useState([]);
  const [cancelledTrains, setCancelledTrains] = useState([]);

  useEffect(() => {
    const fetchActiveTrains = async () => {
      try {
        const response = await axios.get('http://localhost:8080/api/active-trains');
        setActiveTrains(response.data);
      } catch (error) {
        console.error('Error fetching active trains:', error);
      }
    };

    const fetchCancelledTrains = async () => {
      try {
        const response = await axios.get('http://localhost:8080/api/cancelled-trains');
        setCancelledTrains(response.data);
      } catch (error) {
        console.error('Error fetching cancelled trains:', error);
      }
    };

    fetchActiveTrains();
    fetchCancelledTrains();
  }, []);

  return (
    <div>
      <h1>Train Data</h1>
      <h2>Active Trains</h2>
      <table>
        <thead>
          <tr>
            <th>Train ID</th>
            <th>Stanox</th>
            <th>Timestamp</th>
          </tr>
        </thead>
        <tbody>
          {activeTrains.map((train) => (
            <tr key={train.train_id}>
              <td>{train.train_id}</td>
              <td>{train.stanox}</td>
              <td>{new Date(train.timestamp).toLocaleString()}</td>
            </tr>
          ))}
        </tbody>
      </table>

      <h2>Cancelled Trains</h2>
      <table>
        <thead>
          <tr>
            <th>Train ID</th>
            <th>Stanox</th>
            <th>Reason Code</th>
            <th>Timestamp</th>
          </tr>
        </thead>
        <tbody>
          {cancelledTrains.map((train) => (
            <tr key={train.train_id}>
              <td>{train.train_id}</td>
              <td>{train.stanox}</td>
              <td>{train.reason_code}</td>
              <td>{new Date(train.timestamp).toLocaleString()}</td>
            </tr>
          ))}
        </tbody>
      </table>
    </div>
  );
};

export default TrainData;

ActiveTrainList.js file:

// frontend/src/ActiveTrainList.js
import React, { useState, useEffect } from 'react';
import axios from 'axios';

const ActiveTrainsList = ({ apiUrl, limit }) => {
  const [items, setItems] = useState([]);
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState(null);
  const [hasMore, setHasMore] = useState(true);
  const [offset, setOffset] = useState(0);

  const fetchItems = async () => {
    setLoading(true);
    try {
      const response = await axios.get(`${apiUrl}?limit=${limit}&offset=${offset}`);
      console.log('Fetched active trains:', response.data); // Log the fetched data
      if (response.data.length < limit) {
        setHasMore(false);
      }
      setItems(prevItems => [...prevItems, ...response.data]);
      setOffset(prevOffset => prevOffset + limit);
    } catch (error) {
      setError('Error fetching data');
      console.error('Error fetching active trains:', error);
    }
    setLoading(false);
  };

  useEffect(() => {
    fetchItems();
  }, []);

  return (
    <div>
      {error && <p>{error}</p>}
      {items.length > 0 ? (
        <ul>
          {items.map((item, index) => (
            <li key={index}>
              {/* Adjust these properties based on your data structure */}
              Train ID: {item.train_id}, 
              Stanox: {item.stanox}, 
              Timestamp: {item.timestamp}
            </li>
          ))}
        </ul>
      ) : (
        <p>No data available.</p>
      )}
      {loading && <p>Loading...</p>}
      {hasMore && !loading && (
        <button onClick={fetchItems}>Load More</button>
      )}
    </div>
  );
};

export default ActiveTrainsList;

CancelledTrainList.js file:

// frontend/src/CancelledTrainList.js
import React, { useState, useEffect } from 'react';
import axios from 'axios';

const PaginatedList = ({ apiUrl, limit }) => {
  const [items, setItems] = useState([]);
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState(null);
  const [hasMore, setHasMore] = useState(true);
  const [offset, setOffset] = useState(0);

  const fetchItems = async () => {
    setLoading(true);
    try {
      const response = await axios.get(`${apiUrl}?limit=${limit}&offset=${offset}`);
      console.log('Fetched items:', response.data); // Log the fetched data
      if (response.data.length < limit) {
        setHasMore(false);
      }
      setItems(prevItems => [...prevItems, ...response.data]);
      setOffset(prevOffset => prevOffset + limit);
    } catch (error) {
      setError('Error fetching data');
      console.error('Error fetching data:', error);
    }
    setLoading(false);
  };

  useEffect(() => {
    fetchItems();
  }, []);

  return (
    <div>
      {error && <p>{error}</p>}
      {items.length > 0 ? (
        <ul>
          {items.map((item, index) => (
            <li key={index}>
              {/* Adjust these properties based on your data structure */}
              Train ID: {item.train_id}, 
              Stanox: {item.stanox}, 
              Timestamp: {item.timestamp}
            </li>
          ))}
        </ul>
      ) : (
        <p>No data available.</p>
      )}
      {loading && <p>Loading...</p>}
      {hasMore && !loading && (
        <button onClick={fetchItems}>Load More</button>
      )}
    </div>
  );
};

export default PaginatedList;

App.js file:

// App.js
import React from 'react';
import PaginatedList from './PaginatedList';
import ActiveTrainsList from './ActiveTrainsList';

const App = () => {
  return (
    <div>
      <h1>Cancelled Trains</h1>
      <PaginatedList apiUrl="http://localhost:8080/api/cancelled-trains" limit={10} />

      <h1>Active Trains</h1>
      <ActiveTrainsList apiUrl="http://localhost:8080/api/active-trains" limit={10} />
    </div>
  );
};

export default App;

And that’s all we have for our React/Frontend code.

So let’s, start the React app.

Open the terminal and navigate to the frontend directory and run the command:

npm start

Open your browser and navigate to http://localhost:3000. You should see the train data displayed with a ‘Load More’ button to fetch additional entries.

» Outro

And that’s all for this article:

So In the backend, we configured Kafka producers and consumers to handle train data messages and store them in a MySQL database. We also set up Docker containers for Zookeeper and Kafka to manage our message streams.

And in the frontend, we built a React application that fetches train data from our backend API. We implemented components to display both active and cancelled trains, along with a ‘Load More’ button to fetch additional entries.

And that’s it! We’ve successfully integrated Kafka with Node.js for real-time data processing use Docker to manage Zookeeper and Kafka containers, and store and retrieve data with MySQL.


And that’s all for this blog. I hope you enjoyed it and found it useful. Also if you have any doubts please drop a message on our discord server. The link can be found below.

👉 And I’ll see you in next one, till then bye-bye and take care.