Skip to main content

APISIX integrate with Kafka for real-time log monitoring

· 5 min read
Sylvia

This article describes how to use the kafka-logger plugin with APISIX. Wiht enhancements, the plugin now has mature and complete functions.

Apache Kafka is an open source stream processing platform managed by Apache, written in Scala and Java. Apache Kafka provides uniformed, high-throughput, low-latency functionality for real-time data processing.

Apache Kafka's persistence layer is essentially a "massive publish/subscribe message queue following a distributed transaction logging architecture," making it valuable as an enterprise-class infrastructure for processing streaming data. It is used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Implementation: kafka-logger

Apache APISIX has been providing support for Apache Kafka since version 1.2 with the kafka-logger plugin release. kafka-logger has been enhanced several times since then to provide very mature and complete functionality. It supports pushing API request logs, request bodies, and response bodies, to a Kafka cluster in JSON format.

When using kafka-logger, users can send a wide range of data and customize the format of the logs sent. kafka-logger supports sending logs in a packaged manner in a batch or for automatic retries.

How to use

Step 1: Start Kafka Cluster

This article only demonstrates one way to start the cluster. Please refer to the official documentation for details of other ways to start the cluster.

# Start a cluster with 1 ZooKeeper node and 3 kafka nodes using docker-compose
# At the same time, an EFAK is started for data monitoring.
version: '3'

services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.1
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zookeeper:2888:3888

kafka1:
image: confluentinc/cp-kafka:6.2.1
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zookeeper

kafka2:
image: confluentinc/cp-kafka:6.2.1
hostname: kafka2
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zookeeper


kafka3:
image: confluentinc/cp-kafka:6.2.1
hostname: kafka3
ports:
- "9094:9094"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zookeeper

efak:
image: nickzurich/kafka-eagle:2.0.9
hostname: efak
ports:
- "8048:8048"
depends_on:
- kafka1
- kafka2
- kafka3

Step 2: Create Topic

Next we create the test Topic to collect logs.

test Topic
Click to Preview

Step 3: Create Routing and Enable Plugin

The following commands allow you to create routes and enable the kafka-logger plugin.

curl -XPUT 'http://127.0.0.1:9080/apisix/admin/routes/r1' \
--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \
--header 'Content-Type: application/json' \
--data-raw '{
"uri": "/*",
"plugins": {
"kafka-logger": {
"batch_max_size": 1,
"broker_list": {
"127.0.0.1": 9092
},
"disable": false,
"kafka_topic": "test",
"producer_type": "sync"
}
},
"upstream": {
"nodes": {
"httpbin.org:80": 1
},
"type": "roundrobin"
}
}'

The above code configures the kafka broker address, the target Topic, the production mode for synchronization, and the maximum number of logs to be included in each batch. Here we can start by setting batch_max_size to 1, i.e. write one message to Kafka for every log produced.

With the above settings, it is possible to send the logs of API requests under the /* path to Kafka.

Step 4: Send Requests

Next, we send some requests through the API and record the number of requests.

# Send 10 requests to API
curl http://127.0.0.1:9080/get

As you can see in the figure below, some log messages have been written to the test topic we created. If you click to view the log content, you can see that the logs of the API requests made above have been written.

Send Requests-1
Click to Preview

Send Requests-2
Click to Preview

Customize the logging structure

Of course, we can also set the structure of the log data sent to Kafka during use through the metadata configuration provided by the kafka-logger plugin. By setting the log_format data, you can control the type of data sent.

For example, $host and $time_iso8601 in the following data are built-in variables provided by Nginx; APISIX variables such as $route_id and $service_id are also supported.

curl -XPUT 'http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger' \
--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \
--header 'Content-Type: application/json' \
--data-raw '{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr",
"route_id": "$route_id"
}
}'

A simple test by sending a request shows that the above logging structure settings have taken effect. Currently, Apache APISIX provides a variety of log format templates, which provides great flexibility in configuration, and more details on log format can be found in Apache APISIX documentation.

customize the logging structure
Click to Preview

Turn off the plugin

If you are done using the plugin, simply remove the kafka-logger plugin-related configuration from the route configuration and save it to turn off the plugin on the route. Thanks to the dynamic advantage of Apache APISIX, the process of turning on and off the plugin does not require restarting Apache APISIX, which is very convenient.

curl http://127.0.0.1:9080/apisix/admin/routes/1  -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"methods": ["GET"],
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'

Summary

In this article, we have introduced the feature preview and usage steps of the kafka-logger plugin. For more information about the kafka-logger plugin and the full configuration list, you can refer to the official documentation.

We are also currently working on other logging plugins to integrate with more related services. If you're interested in such integration projects, feel free to start a discussion in GitHub Discussions or communicate via the mailing list.