Open Policy Agent (OPA) plugin for Kafka authorization.
- Kafka 3.8.0+ (for older Kafka versions, please check previous release)
- Java 17 or above
- OPA installed and running on the brokers
Download the latest OPA authorizer plugin jar from Releases (or Maven Central) and put the
file (opa-authorizer-{$VERSION}.jar) somewhere Kafka recognizes it - this could be directly in Kafka's libs directory
or in a separate plugin directory pointed out to Kafka at startup, e.g:
CLASSPATH=/usr/local/share/kafka/plugins/*
To activate the opa-kafka-plugin add the authorizer.class.name to server.properties
authorizer.class.name=org.openpolicyagent.kafka.OpaAuthorizer
The plugin supports the following properties:
| Property Key | Example | Default | Description | 
|---|---|---|---|
| opa.authorizer.url | http://opa:8181/v1/data/kafka/authz/allow | Name of the OPA policy to query. [required] | |
| opa.authorizer.allow.on.error | false | false | Fail-closed or fail-open if OPA call fails. | 
| opa.authorizer.cache.initial.capacity | 5000 | 5000 | Initial decision cache size. | 
| opa.authorizer.cache.maximum.size | 50000 | 50000 | Max decision cache size. | 
| opa.authorizer.cache.expire.after.seconds | 3600 | 3600 | Decision cache expiry in seconds. | 
| opa.authorizer.metrics.enabled | true | false | Whether or not expose JMX metrics for monitoring. | 
| super.users | User:alice;User:bob | Super users which are always allowed. | |
| opa.authorizer.truststore.path | /path/to/mytruststore.p12 | Path to the PKCS12 truststore for HTTPS requests to OPA. | |
| opa.authorizer.truststore.password | ichangedit | changeit | Password for the truststore. | 
| opa.authorizer.truststore.type | PKCS12,JKSor whatever your JVM supports | PKCS12 | Type of the truststore. | 
Example structure of input data provided from opa-kafka-plugin to Open Policy Agent.
{
    "action": {
        "logIfAllowed": true,
        "logIfDenied": true,
        "operation": "DESCRIBE",
        "resourcePattern": {
            "name": "alice-topic",
            "patternType": "LITERAL",
            "resourceType": "TOPIC",
            "unknown": false
        },
        "resourceReferenceCount": 1
    },
    "requestContext": {
        "clientAddress": "192.168.64.1",
        "clientInformation": {
            "softwareName": "unknown",
            "softwareVersion": "unknown"
        },
        "connectionId": "192.168.64.4:9092-192.168.64.1:58864-0",
        "header": {
            "data": {
                "clientId": "rdkafka",
                "correlationId": 5,
                "requestApiKey": 3,
                "requestApiVersion": 2
            },
            "headerVersion": 1
        },
        "listenerName": "SASL_PLAINTEXT",
        "principal": {
            "name": "alice-consumer",
            "principalType": "User"
        },
        "securityProtocol": "SASL_PLAINTEXT"
    }
}The following table summarizes the supported resource types and operation names.
| input.action.resourcePattern.resourceType | input.action.operation | 
|---|---|
| CLUSTER | CLUSTER_ACTION | 
| CLUSTER | CREATE | 
| CLUSTER | DESCRIBE | 
| GROUP | READ | 
| GROUP | DESCRIPTION | 
| TOPIC | CREATE | 
| TOPIC | ALTER | 
| TOPIC | DELETE | 
| TOPIC | DESCRIBE | 
| TOPIC | READ | 
| TOPIC | WRITE | 
| TRANSACTIONAL_ID | DESCRIBE | 
| TRANSACTIONAL_ID | WRITE | 
These are handled by the method authorizeAction, and passed to OPA with an action, that identifies the accessed resource and the performed operation. patternType is always LITERAL.
Creation of a topic checks for CLUSTER + CREATE. If this is denied, it will check for TOPIC with its name + CREATE.
When doing idempotent write to a topic, and the first request for operation=IDEMPOTENT_WRITE on the resourceType=CLUSTER is denied, the method authorizeByResourceType to check, if the user has the right to write to any topic. If yes, the idempotent write is granted by Kafka's ACL-implementation. To allow for a similar check, it is mapped to OPA with patternType=PREFIXED, resourceType=TOPIC, and name="".
{
  "action": {
    "logIfAllowed": true,
    "logIfDenied": true,
    "operation": "DESCRIBE",
    "resourcePattern": {
      "name": "",
      "patternType": "PREFIXED",
      "resourceType": "TOPIC",
      "unknown": false
    },
    "resourceReferenceCount": 1
  },
  ...
}It's likely possible to use all different resource types and operations described in the Kafka API docs: https://kafka.apache.org/24/javadoc/org/apache/kafka/common/acl/AclOperation.html https://kafka.apache.org/24/javadoc/org/apache/kafka/common/resource/ResourceType.html
| Protocol | Description | 
|---|---|
| PLAINTEXT | Un-authenticated, non-encrypted channel | 
| SASL_PLAINTEXT | authenticated, non-encrypted channel | 
| SASL | authenticated, SSL channel | 
| SSL | SSL channel | 
More info:
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html
With the sample policy rego you will out of the box get
a structure where an "owner" can one user per type (consumer, producer, mgmt). The owner and user type is separated by -.
- Username structure: <owner>-<type>
- Topic name structure: <owner->.*
Example: 
User alice-consumer will be...
- allowed to consume on topic alice-topic1
- allowed to consume on topic alice-topic-test
- denied to produce on any topic
- denied to consume on topic bob-topic
Using gradle wrapper: ./gradlew clean test shadowJar
The resulting jar (with dependencies embedded) will be named opa-authorizer-{$VERSION}-all.jar and stored in
build/libs.
Set log level log4j.logger.org.openpolicyagent=INFO in config/log4j.properties
Use DEBUG or TRACE for debugging.
In a busy Kafka cluster it might be good to tweak the cache since it may produce a lot of log entries in Open Policy Agent, especially if decision logs are turned on. If the policy isn't dynamically updated very often it's recommended to cache a lot to improve performance and reduce the amount of log entries.
The plugin exposes some metrics that can be useful in operation.
- opa.authorizer:type=authorization-result- authorized-request-count: number of allowed requests
- unauthorized-request-count: number of denied requests
 
- opa.authorizer:type=request-handle- request-to-opa-count: number of HTTP request sent to OPA to get authorization result
- cache-hit-rate: Cache hit rate. Cache miss rate should be- 1 - cache-hit-rate
- cache-usage-percentage: the ratio of cache size over maximum cache capacity
 
For questions, discussions and announcements related to Styra products, services and open source projects, please join the Styra community on Slack!