Skip to content

Server Configuration

Repository: federator
Description: server configuration


Overview

This document describes the configuration properties for the Federator server, including connection settings, filtering, and common configuration options shared across Federator components.

Configuration Files

  • server.properties: Main server configuration. Location defined by the FEDERATOR_SERVER_PROPERTIES environment variable.
  • common-configuration.properties: Contains properties shared by both server and client. Location defined by the FEDERATOR_COMMON_PROPERTIES environment variable.

Server Properties

Property Description
kafka.bootstrapServers Bootstrap servers for the source Kafka instance
kafka.defaultKeyDeserializerClass Default key deserializer class
kafka.defaultValueDeserializerClass Default value deserializer class
kafka.consumerGroup Consumer group for the server
kafka.pollDuration Duration to poll for messages (ms)
kafka.pollRecords Number of records to poll per request
kafka.additional.* Optional: Additional Kafka consumer properties (prefix removed before passing to Kafka)
shared.headers Definition of Kafka message headers
filter.shareAll If true, all messages are shared (no filtering)
server.port Port the server listens on
server.tlsEnabled If true, enables TLS for gRPC communication
server.keepAliveTime Keep-alive time for the server (ms)
consumer.inactivity.timeout Duration of inactivity (ISO-8601, e.g. PT30S) before the server disconnects the consumer

Note: - The consumer.inactivity.timeout property controls how long the server will wait for messages before disconnecting a consumer due to inactivity. If no messages are received within this duration, the consumer connection is closed automatically. This helps free up resources and ensures efficient operation.

Common Configuration Properties

Common properties are defined in common-configuration.properties and may include:

Property Description
idp.mtls.enabled Enable mutual TLS for Identity Provider (IDP) communication (true/false)
idp.client.secret OAuth2 client secret (used when mTLS is disabled)
idp.jwks.url JWKS URL of the IDP (used for verifying JWT signatures)
idp.token.url Token endpoint URL of the IDP (used to fetch OAuth2 tokens)
idp.token.backoff Backoff time in milliseconds before retrying a failed token request (default: 1000 ms)
idp.client.id OAuth2 client ID (registered with the IDP)
idp.keystore.path Path to client keystore file (PKCS12 or JKS) for mutual TLS
idp.keystore.password Password for the client keystore
idp.truststore.path Path to truststore containing the IDP's root CA or certificate chain
idp.truststore.password Password for the truststore
... Other shared properties as required

All properties can be set directly or via environment variables (see ${...} syntax in the file). These settings ensure secure authentication and authorisation between Federator components and the Identity Provider, supporting both mTLS and OAuth2 flows.

Custom Filter Configuration

Federator supports custom filtering logic for Kafka messages. The producer (server) receives consumer attribute configuration from the Management-Node as JSON, for example:

{
  "clientId": "FEDERATOR_HEG",
  "producers": [
    {
      "products": [
        {
          "name": "BrownfieldLandAvailability",
          "topic": "topic.BrownfieldLandAvailability",
          "consumers": [
            {
              "name": "ENV-CONSUMER-1",
              "idpClientId": "FEDERATOR_ENV",
              "attributes": [
                { "name": "nationality", "value": "GBR", "type": "string" },
                { "name": "clearance", "value": "0", "type": "string" },
                { "name": "organisation_type", "value": "NON-GOV3", "type": "string" }
              ]
            }
          ]
        }
      ],
      "name": "HEG-PRODUCER-1",
      "idpClientId": "FEDERATOR_HEG"
    }
  ]
}

Filtering Logic

  • No attributes set: All data is passed to the client.
  • One attribute set: Only messages with a matching attribute and value are passed.
  • Multiple attributes set: Messages must match all attributes (AND logic).

The filter checks Kafka message headers for the required attributes and values. For example, if a consumer requires nationality=GBR and clearance=0, only messages with both headers will be delivered.

Example Filter Implementation

public class AttributeBasedMessageFilter implements MessageFilter {
    private final Map<String, String> requiredAttributes;
    public AttributeBasedMessageFilter(Map<String, String> requiredAttributes) {
        this.requiredAttributes = requiredAttributes;
    }
    @Override
    public boolean filterOut(KafkaEvent<?, ?> message) {
        if (requiredAttributes == null || requiredAttributes.isEmpty()) {
            return false; // No attributes: pass all messages
        }
        for (Map.Entry<String, String> entry : requiredAttributes.entrySet()) {
            String headerValue = message.getHeader(entry.getKey());
            if (!entry.getValue().equals(headerValue)) {
                return true; // Filter out if any attribute does not match
            }
        }
        return false; // Pass if all attributes match
    }
}

For details on authentication and key generation, see Authentication Configuration.

Interceptors

Federator uses gRPC server interceptors to enforce authentication and authorization for all incoming requests:

AuthServerInterceptor

  • Validates the presence and format of the Authorization header (expects a Bearer token).
  • Verifies the JWT token using the Identity Provider (IDP) public keys or mTLS.
  • Rejects requests with missing, invalid, or expired tokens.
  • Configuration properties used: idp.jwks.url, idp.token.url, idp.mtls.enabled, etc.

ConsumerVerificationServerInterceptor

  • Extracts the consumer ID from the JWT token.
  • Validates the token audience against the configured client ID (idp.client.id).
  • Authorizes the consumer by checking the producer configuration (from Management-Node) to ensure the consumer is permitted for the requested product/topic.
  • Attaches the verified consumer ID to the gRPC context for downstream use.
  • Rejects requests from unauthorized consumers or with invalid audience.

These interceptors are essential for secure, multi-tenant operation, ensuring only authenticated and authorized clients can access federated data. For implementation details, see: - AuthServerInterceptor.java - ConsumerVerificationServerInterceptor.java

gRPC Request Sequence Diagram

sequenceDiagram
    participant Client as gRPC Client
    participant Auth as AuthServerInterceptor
    participant Verify as ConsumerVerificationServerInterceptor
    participant Server as Federator Server
    participant Mgmt as Management-Node
    participant Kafka as Kafka/Redis
    participant Filter as Custom Filter

    Client->>Auth: Send gRPC request with Bearer token
    Auth->>Auth: Validate Authorization header & JWT
    Auth-->>Client: Reject if invalid
    Auth->>Verify: Pass request if valid
    Verify->>Verify: Extract consumer ID from JWT
    Verify->>Verify: Validate audience (idp.client.id)
    Verify->>Mgmt: Fetch producer/consumer config
    Verify->>Verify: Authorize consumer for topic
    Verify-->>Client: Reject if unauthorized
    Verify->>Server: Pass request if authorized
    Server->>Kafka: Read messages from Kafka
    Server->>Filter: Apply custom filter (attributes)
    Filter-->>Server: Return filtered messages
    Server-->>Client: Send federated data

Maintained by the National Digital Twin Programme (NDTP).

© Crown Copyright 2025. This work has been developed by the National Digital Twin Programme and is legally attributed to the Department for Business and Trade (UK) as the governing entity.
Licensed under the Open Government Licence v3.0.
For full licensing terms, see OGL_LICENSE.md.