This blog shows how to use Apache APISIX to develop a customize authorization plugin for the kafka cluster.
Prerequisites
- Have a running OpenShift cluster
- Run a Kafka cluster with strimzi kafka operator
- Install kubectl, OpenShift CLI and curl on host
Expose the Kafka Cluster by KafkaBridge
To simplify the configuration setting for the kafka. I provision the kafka by strimzi-kafka-operator. In order to make Kafka expose interfaces externally like other services, I use KafkaBridge
to transform it into an HTTP service.
- Create the
KafkaBridge
# namespace
KAFKA_NAMESPACE=kafka
# create kafka bridge instance
cat <<EOF | oc apply -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
name: strimzi-kafka-bridge
namespace: ${KAFKA_NAMESPACE}
spec:
bootstrapServers: kafka-kafka-bootstrap.${KAFKA_NAMESPACE}.svc:9092
http:
port: 8080
replicas: 1
EOF
- Verification
KAFKA_NAMESPACE=kafka
# forward 8080 by bridge pod
kubectl -n ${KAFKA_NAMESPACE} port-forward $(kubectl get pods -l strimzi.io/cluster=strimzi-kafka-bridge -n ${KAFKA_NAMESPACE} -o jsonpath="{.items[0].metadata.name}") 8080:8080
# or forward 8080 by svc
kubectl -n ${KAFKA_NAMESPACE} port-forward svc/$(kubectl get svc -l strimzi.io/cluster=strimzi-kafka-bridge -n ${KAFKA_NAMESPACE} -o jsonpath="{.items[0].metadata.name}") 8080:8080
# list topic
curl http://localhost:8080/topics
# consume message with the consumer
while true; do curl -X GET http://localhost:8080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records \
-H 'accept: application/vnd.kafka.json.v2+json'; sleep 1; done
Running APISIX on Openshift
- Install APISIX on ROSA
oc create sa apisix-sa -n apisix
oc adm policy add-scc-to-user anyuid -z apisix-sa -n apisix
helm install apisix apisix/apisix \
--set gateway.type=NodePort \
--set etcd.podSecurityContext.enabled=false \
--set etcd.containerSecurityContext.enabled=false \
--set serviceAccount.name=apisix-sa \
--namespace apisix
- Configure the Kafka Route with Admin API
# forward 9180 port to local host
kubectl -n apisix port-forward $(kubectl get pods -l app.kubernetes.io/name=apisix -n apisix -o jsonpath="{.items[0].metadata.name}") 9180:9180
# the bridge service name can be accessed by
# kubectl get svc -l strimzi.io/cluster=strimzi-kafka-bridge -n $KAFKA_NAMESPACE -o jsonpath="{.items[0].metadata.name}"
curl "http://127.0.0.1:9180/apisix/admin/routes/1" \
-H "X-API-KEY: edd1c9f034335f136f87ad84b625c8f1" -X PUT -d '
{
"methods": ["GET", "POST", "DELETE", "PUT"],
"host": "example.com",
"uri": "/*",
"plugins": {
"ext-plugin-post-resp": {
"conf": [
{"name":"my-response-rewrite", "value":"{\"tag\":\"\"}"}
]
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"strimzi-kafka-bridge-bridge-service.kafka.svc:8080": 1
}
}
}'
- Request the Kafka Service with Client API
# forward the http api of apisix to local host
kubectl -n apisix port-forward $(kubectl get pods -l app.kubernetes.io/name=apisix -n apisix -o jsonpath="{.items[0].metadata.name}") 9080:9080
# list topic
curl --verbose --header "Host: example.com" http://localhost:9080/topics
# send message to the topic
curl --header "Host: example.com" --location 'http://localhost:9080/topics/event' -H 'Content-Type: application/vnd.kafka.json.v2+json' --data \
'{
"records":[
{
"key":"event5",
"value": "hello5"
},
{
"key":"event6",
"value": "world6"
}
]
}'
# create a kafka consumer in a new consumer group
curl --header "Host: example.com" -X POST http://localhost:9080/consumers/strimzi-kafka-consumer-group \
-H 'content-type: application/vnd.kafka.v2+json' \
-d '{
"name": "strimzi-kafka-consumer",
"auto.offset.reset": "earliest",
"format": "json",
"enable.auto.commit": true,
"fetch.min.bytes": 512,
"consumer.request.timeout.ms": 30000
}'
# subscribe to the topic
curl --header "Host: example.com" -X POST http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/subscription \
-H 'content-type: application/vnd.kafka.v2+json' \
-d '{
"topics": [
"event"
]
}'
# consume message with the consumer
while true; do curl --header "Host: example.com" -X GET http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records \
-H 'accept: application/vnd.kafka.json.v2+json'; sleep 1; done
Develop an Authentication Plugin with Golang
Develop a validation plugin for the certificates
I develop the plugin leverage the Go plugin runner. The plugin is just read the certificate from the header and then validate it. You can visit this for more detail.
Build the APISIX Image with the above Plugin
git clone git@github.com:apache/apisix-go-plugin-runner.git
# develop the plugin
...
# build binary
make build
# create Dockerfile to add the build binary
`Dockerfile
FROM apache/apisix:3.6.0-debian
COPY ./go-runner /usr/local/apisix/apisix-go-plugin-runner/go-runner
`
# build and push image
docker build -f ./Dockerfile -t quay.io/myan/apisix-360-go:0.1 .
docker push quay.io/myan/apisix-360-go:0.1
Startup the Plugin When Running the Server
Modify the
config.yaml
byapisix
ConfigMap.
etcd:
host: # it's possible to define multiple etcd hosts addresses of the same etcd cluster.
- "http://apisix-etcd.apisix.svc.cluster.local:2379"
prefix: "/apisix" # configuration prefix in etcd
timeout: 30 # 30 seconds
...
# Nginx will hide all environment variables by default. So you need to declare your variable first in the conf/config.yaml
# https://github.com/apache/apisix/blob/master/docs/en/latest/external-plugin.md
nginx_config:
envs:
- APISIX_LISTEN_ADDRESS
- APISIX_CONF_EXPIRE_TIME
ext-plugin:
# path_for_test: "/tmp/runner.sock"
cmd: ["/usr/local/apisix/apisix-go-plugin-runner/go-runner", "run", "-m", "prod"]
- Replace the APISIX Deployment Image
# image: quay.io/myan/apisix-360-go:0.1
kubectl set image deployment/apisix apisix=quay.io/myan/apisix-360-go:0.1
- Verification
# set the certificate
CERT_CONTENT_BASE64=$(base64 < rest/client.crt)
# list the topics
curl -i 'http://127.0.0.1:9080/topics' \
-H 'Host: example.com' \
-H 'Content-Type: application/vnd.kafka.json.v2+json' \
-H 'Source: client' \
-H "Client-Certificate: $CERT_CONTENT_BASE64"
# create consumer
curl -X POST 'http://localhost:9080/consumers/strimzi-kafka-consumer-group' \
-H 'Host: example.com' \
-H 'Content-Type: application/vnd.kafka.json.v2+json' \
-H 'Source: client' \
-H "Client-Certificate: $CERT_CONTENT_BASE64" \
-d '{
"name": "strimzi-kafka-consumer",
"auto.offset.reset": "earliest",
"format": "json",
"enable.auto.commit": true,
"fetch.min.bytes": 512,
"consumer.request.timeout.ms": 30000
}'
# subscribe topic event with the consumer group 'strimzi-kafka-consumer'
curl -X POST 'http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/subscription' \
-H 'Host: example.com' \
-H 'Content-Type: application/vnd.kafka.json.v2+json' \
-H 'Source: client' \
-H "Client-Certificate: $CERT_CONTENT_BASE64" \
-d '{
"topics": ["event"]
}'
# consume message
curl -X GET 'http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records' \
-H 'Host: example.com' \
-H 'Accept: application/vnd.kafka.json.v2+json' \
-H 'Source: client' \
-H "Client-Certificate: $CERT_CONTENT_BASE64" \