Skip to content
KafkaGuard
Get started
FeaturesDocsEnterprisePricingBlogToolsGet started

Advanced Usage Guide

Advanced topics, integrations, and best practices for power users and production deployments.

Table of Contents


Custom Policy Creation

Overview

While KafkaGuard provides three built-in policy tiers (baseline-dev, enterprise-default, and finance-iso), you can create custom policies tailored to your organization's specific requirements.

When to Create Custom Policies

Create custom policies when you need to:

  • ✅ Enforce organization-specific compliance requirements
  • ✅ Combine controls from multiple tiers
  • ✅ Add custom controls for internal standards
  • ✅ Adjust control severity levels for your environment
  • ✅ Create environment-specific policies (dev, staging, prod)

Quick Policy Structure Reference

version: "1.0"
name: "My Custom Policy"
description: "Custom policy for our environment"
tier: "custom"

controls:
  - id: KG-XXX
    title: "Control title"
    description: "What this control checks"
    severity: HIGH | MEDIUM | LOW
    category: security | reliability | operational | compliance
    expr: |
      # CEL expression that returns true (pass) or false (fail)
      broker.config["auto.create.topics.enable"] == "false"
    remediation: |
      Step-by-step fix instructions
    compliance:
      pci_dss: ["4.1", "8.2"]
      soc2: ["CC6.1"]
      iso27001: ["A.12.6.1"]

Comprehensive Policy Creation Guide

For complete documentation on creating custom policies, see:

Policy Creation Guide (Story 9.3)

This comprehensive guide covers:

  • Policy file structure and schema
  • CEL expression syntax and examples
  • Available variables (broker, topic, cluster)
  • Control ID conventions (KG-001 to KG-999)
  • Severity and category guidelines
  • Compliance mapping (PCI-DSS, SOC2, ISO 27001)
  • Testing and validation
  • Examples and templates

Policy Validation

Always validate custom policies before deployment:

# Validate policy syntax
kafkaguard validate-policy --policy policies/custom-policy.yaml

# Test policy against dev cluster first
kafkaguard scan \
  --bootstrap kafka-dev:9092 \
  --policy policies/custom-policy.yaml \
  --format json \
  --out ./test-reports

# Review results and iterate
cat test-reports/scan-*.json | jq '.findings[] | select(.status == "FAILED")'

Report Customization

Selecting Report Formats

Generate specific report formats based on your needs:

# Single format
kafkaguard scan --bootstrap kafka:9092 --format json

# Multiple formats
kafkaguard scan --bootstrap kafka:9092 --format json,html,pdf

# All formats
kafkaguard scan --bootstrap kafka:9092 --format json,html,pdf,csv

Output Directory Management

Organize reports by environment, date, or cluster:

# By environment
kafkaguard scan \
  --bootstrap kafka-prod:9095 \
  --out /var/reports/prod

# By date
REPORT_DIR="/var/reports/$(date +%Y-%m-%d)"
mkdir -p "$REPORT_DIR"
kafkaguard scan \
  --bootstrap kafka:9092 \
  --out "$REPORT_DIR"

# By cluster
kafkaguard scan \
  --bootstrap kafka-cluster1:9092 \
  --out /var/reports/cluster1

Report Naming Convention

KafkaGuard uses a consistent naming pattern:

scan-<timestamp>-<scan_id>.<format>

Example:
scan-20251115140530-abc123def456.json
scan-20251115140530-abc123def456.html
scan-20251115140530-abc123def456.pdf
scan-20251115140530-abc123def456.csv

Benefits:

  • Chronological sorting (timestamp first)
  • Easy correlation across formats (same scan_id)
  • Unique identifiers prevent overwrites
  • Glob pattern matching (scan-*.json)

Report Archival Strategy

Implement report retention policies:

#!/bin/bash
# archive-reports.sh - Automated report archival

REPORTS_DIR="/var/reports/kafkaguard"
ARCHIVE_DIR="/var/reports/kafkaguard/archive"
RETENTION_DAYS=90

# Create archive structure
mkdir -p "$ARCHIVE_DIR/$(date +%Y-%m)"

# Archive reports older than 30 days
find "$REPORTS_DIR" -maxdepth 1 -name "scan-*" -mtime +30 \
  -exec mv {} "$ARCHIVE_DIR/$(date +%Y-%m)/" \;

# Compress archived reports older than 60 days
find "$ARCHIVE_DIR" -name "scan-*.pdf" -mtime +60 -exec gzip {} \;

# Delete reports older than retention period
find "$ARCHIVE_DIR" -name "scan-*.gz" -mtime +$RETENTION_DAYS -delete

echo "Report archival complete"

Custom Report Templates (Phase 2)

Note: Custom report templates are planned for Phase 2. Currently, report formats use built-in templates.

Planned Features:

  • Custom HTML templates with organization branding
  • Custom PDF layouts and styling
  • Template variables for organization info
  • Export templates for reuse

Performance Tuning

Timeout Configuration

Adjust timeout based on cluster size and network conditions:

# Default timeout: 300 seconds (5 minutes)
kafkaguard scan --bootstrap kafka:9092

# Small clusters (<100 topics): 300 seconds
kafkaguard scan --bootstrap kafka:9092 --timeout 300

# Medium clusters (100-500 topics): 600 seconds (10 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 600

# Large clusters (500-1000 topics): 900 seconds (15 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 900

# Very large clusters (1000+ topics): 1800 seconds (30 minutes)
kafkaguard scan --bootstrap kafka:9092 --timeout 1800

Parallel Collection Settings

KafkaGuard uses parallel collection by default for faster scans:

# Default: parallel enabled with 6 max collectors
kafkaguard scan --bootstrap kafka:9092

# Increase max collectors for large clusters (10+ brokers)
kafkaguard scan \
  --bootstrap kafka:9092 \
  --parallel true \
  --max-collectors 10

# Reduce max collectors if experiencing memory issues
kafkaguard scan \
  --bootstrap kafka:9092 \
  --max-collectors 3

# Disable parallel collection (not recommended, slower)
kafkaguard scan \
  --bootstrap kafka:9092 \
  --parallel false

Memory Optimization

Monitor and optimize memory usage:

# Monitor memory during scan
top -p $(pgrep kafkaguard)

# Set resource limits (Linux)
ulimit -v 524288  # Limit to 512MB virtual memory
kafkaguard scan --bootstrap kafka:9092

# Docker: Set memory limits
docker run --rm \
  --memory=256m \
  --cpus=1.0 \
  -v $(pwd)/policies:/policies:ro \
  -v $(pwd)/reports:/reports \
  kafkaguard/kafkaguard:latest scan \
    --bootstrap kafka:9092 \
    --policy /policies/enterprise-default.yaml \
    --out /reports

Expected Memory Usage:

Cluster SizeExpected Memory
3 brokers, <100 topics50-100 MB
5 brokers, 100-500 topics100-150 MB
10 brokers, 500-1000 topics150-200 MB
20+ brokers, 1000+ topics200-300 MB

If memory usage exceeds these ranges, please report to the KafkaGuard team.

Network Latency Considerations

Optimize for high-latency networks:

# High latency network (>100ms RTT)
kafkaguard scan \
  --bootstrap kafka:9092 \
  --timeout 900 \
  --max-collectors 3  # Reduce concurrent connections

# Monitor network latency
ping -c 10 kafka.example.com

Running Scans During Off-Peak Hours

Schedule scans during low-traffic periods:

# Cron job for 2 AM daily scan
# /etc/cron.d/kafkaguard-nightly
0 2 * * * kafkaguard /usr/local/bin/kafkaguard scan \
  --bootstrap kafka-prod:9095 \
  --policy /opt/kafkaguard/policies/enterprise-default.yaml \
  --out /var/reports/kafkaguard/nightly \
  --timeout 900 \
  2>&1 | logger -t kafkaguard

Monitoring System Integration

Splunk Integration

Parse JSON reports and forward to Splunk for analysis:

#!/bin/bash
# splunk-forward.sh - Forward KafkaGuard reports to Splunk

REPORTS_DIR="/var/reports/kafkaguard"
SPLUNK_HEC_URL="https://splunk.example.com:8088/services/collector"
SPLUNK_HEC_TOKEN="your-hec-token"

# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)

# Extract findings and forward to Splunk
cat "$LATEST_JSON" | jq -c '.findings[]' | while read -r finding; do
  curl -k "$SPLUNK_HEC_URL" \
    -H "Authorization: Splunk $SPLUNK_HEC_TOKEN" \
    -d "{\"event\": $finding, \"sourcetype\": \"kafkaguard:scan\"}"
done

echo "Forwarded $(jq '.findings | length' "$LATEST_JSON") findings to Splunk"

Splunk Search Queries:

# All KafkaGuard findings
sourcetype=kafkaguard:scan

# Failed controls only
sourcetype=kafkaguard:scan status=FAILED

# HIGH severity failures
sourcetype=kafkaguard:scan status=FAILED severity=HIGH

# Failures by control ID
sourcetype=kafkaguard:scan status=FAILED | stats count by control_id

# Trend over time
sourcetype=kafkaguard:scan | timechart count by status

ELK Stack Integration

Index KafkaGuard reports in Elasticsearch for analysis and visualization:

#!/bin/bash
# elk-index.sh - Index KafkaGuard reports in Elasticsearch

REPORTS_DIR="/var/reports/kafkaguard"
ELASTICSEARCH_URL="http://elasticsearch:9200"
INDEX_NAME="kafkaguard-findings"

# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)

# Extract metadata and findings
METADATA=$(cat "$LATEST_JSON" | jq -c '.metadata')
FINDINGS=$(cat "$LATEST_JSON" | jq -c '.findings[]')

# Index each finding
echo "$FINDINGS" | while read -r finding; do
  # Combine metadata with finding
  DOCUMENT=$(jq -n \
    --argjson metadata "$METADATA" \
    --argjson finding "$finding" \
    '$metadata + $finding')

  # Index in Elasticsearch
  curl -X POST "$ELASTICSEARCH_URL/$INDEX_NAME/_doc" \
    -H 'Content-Type: application/json' \
    -d "$DOCUMENT"
done

echo "Indexed $(jq '.findings | length' "$LATEST_JSON") findings in Elasticsearch"

Kibana Visualizations:

  • Control Status Pie Chart: status field
  • Severity Distribution Bar Chart: severity field
  • Failed Controls Table: Filter status:FAILED, show control_id, title, severity
  • Score Trend Line Chart: metadata.timestamp vs summary.score
  • Compliance Heatmap: compliance.pci_dss, compliance.soc2, compliance.iso27001

Custom Monitoring with JSON Parsing

Extract specific metrics from JSON reports:

#!/bin/bash
# extract-metrics.sh - Extract metrics from KafkaGuard JSON report

LATEST_JSON=$(ls -t reports/scan-*.json | head -1)

# Extract overall score
SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
echo "Overall Score: $SCORE%"

# Extract control counts
TOTAL=$(cat "$LATEST_JSON" | jq -r '.summary.total_controls')
PASSED=$(cat "$LATEST_JSON" | jq -r '.summary.passed')
FAILED=$(cat "$LATEST_JSON" | jq -r '.summary.failed')

echo "Controls: $PASSED passed, $FAILED failed (out of $TOTAL)"

# Extract HIGH severity failures
FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')
echo "HIGH severity failures: $FAILED_HIGH"

# Export as Prometheus metrics (text format)
cat > /var/lib/node_exporter/kafkaguard.prom <<EOF
# HELP kafkaguard_scan_score Overall compliance score (0-100)
# TYPE kafkaguard_scan_score gauge
kafkaguard_scan_score $SCORE

# HELP kafkaguard_controls_total Total number of controls evaluated
# TYPE kafkaguard_controls_total gauge
kafkaguard_controls_total $TOTAL

# HELP kafkaguard_controls_passed Number of passed controls
# TYPE kafkaguard_controls_passed gauge
kafkaguard_controls_passed $PASSED

# HELP kafkaguard_controls_failed Number of failed controls
# TYPE kafkaguard_controls_failed gauge
kafkaguard_controls_failed $FAILED

# HELP kafkaguard_failures_high Number of HIGH severity failures
# TYPE kafkaguard_failures_high gauge
kafkaguard_failures_high $FAILED_HIGH
EOF

Prometheus Metrics Export (Phase 2)

Note: Native Prometheus metrics export is planned for Phase 2.

Planned Metrics:

  • kafkaguard_scan_duration_seconds - Scan duration
  • kafkaguard_scan_score - Overall compliance score
  • kafkaguard_controls_total - Total controls evaluated
  • kafkaguard_controls_passed - Passed controls count
  • kafkaguard_controls_failed - Failed controls count
  • kafkaguard_failures_by_severity - Failures by severity (labels: high, medium, low)
  • kafkaguard_scan_timestamp - Last scan timestamp

Trend Dashboard & Fleet Comparison

Once scans are uploaded to the on-prem platform (--upload http://kafkaguard:3001), use the dashboard to track compliance over time:

  • /dashboard/trends?cluster=<id>&period=30d — Per-cluster compliance timeline with regression highlighting
  • /dashboard/compare — Fleet-grid comparing all clusters: score, delta badge, and sparkline

The dashboard period selector supports 7d, 30d, 90d, and 1y windows. Regressions (score drops ≥ 5 points) are highlighted in red on the chart.

The trend data is also available via API for scripting:

# Fetch trend data for a cluster (last 30 days)
curl -s "http://kafkaguard:3001/api/v1/trends/$CLUSTER_ID?period=30d" \
  -H "x-api-key: $KAFKAGUARD_API_KEY" | jq '.[] | {date: .scanned_at, score: .score}'

Ticketing System Integration

Jira Integration

Automatically create Jira tickets for failed controls:

#!/usr/bin/env python3
# jira-create-tickets.py - Create Jira tickets from KafkaGuard findings

import json
import sys
from jira import JIRA

# Jira configuration
JIRA_SERVER = 'https://jira.example.com'
JIRA_USERNAME = 'kafkaguard-bot'
JIRA_API_TOKEN = 'your-api-token'
JIRA_PROJECT = 'SEC'  # Security project

# Connect to Jira
jira = JIRA(server=JIRA_SERVER, basic_auth=(JIRA_USERNAME, JIRA_API_TOKEN))

# Load latest KafkaGuard report
with open(sys.argv[1]) as f:
    report = json.load(f)

# Process failed controls
failed_findings = [f for f in report['findings'] if f['status'] == 'FAILED' and f['severity'] == 'HIGH']

for finding in failed_findings:
    # Check if ticket already exists (avoid duplicates)
    jql = f'project = {JIRA_PROJECT} AND summary ~ "{finding["control_id"]}"'
    existing = jira.search_issues(jql)

    if existing:
        print(f"Ticket already exists for {finding['control_id']}: {existing[0].key}")
        continue

    # Create new ticket
    issue_dict = {
        'project': {'key': JIRA_PROJECT},
        'summary': f"{finding['control_id']}: {finding['title']}",
        'description': f"""
Kafka compliance control failed:

*Control ID:* {finding['control_id']}
*Severity:* {finding['severity']}
*Category:* {finding['category']}

*Evidence:*
{finding['evidence']}

*Remediation Steps:*
{finding['remediation']}

*Compliance Impact:*
- PCI-DSS: {', '.join(finding['compliance']['pci_dss'])}
- SOC2: {', '.join(finding['compliance']['soc2'])}
- ISO 27001: {', '.join(finding['compliance']['iso27001'])}

*Scan Details:*
- Cluster: {report['metadata']['cluster_id']}
- Timestamp: {report['metadata']['timestamp']}
- Policy: {report['metadata']['policy']}
        """,
        'issuetype': {'name': 'Bug'},
        'priority': {'name': 'High'},
        'labels': ['kafkaguard', 'compliance', 'security']
    }

    new_issue = jira.create_issue(fields=issue_dict)
    print(f"Created ticket {new_issue.key} for {finding['control_id']}")

print(f"Processed {len(failed_findings)} HIGH severity findings")

Usage:

# Run scan and create tickets
kafkaguard scan \
  --bootstrap kafka-prod:9095 \
  --policy policies/enterprise-default.yaml \
  --format json \
  --out /var/reports

LATEST_JSON=$(ls -t /var/reports/scan-*.json | head -1)
python3 jira-create-tickets.py "$LATEST_JSON"

ServiceNow Integration

Create ServiceNow incidents from KafkaGuard findings:

#!/bin/bash
# servicenow-create-incidents.sh - Create ServiceNow incidents

REPORTS_DIR="/var/reports/kafkaguard"
SNOW_INSTANCE="https://your-instance.service-now.com"
SNOW_USERNAME="kafkaguard-integration"
SNOW_PASSWORD="your-password"

# Find latest report
LATEST_JSON=$(ls -t "$REPORTS_DIR"/scan-*.json | head -1)

# Extract HIGH severity failures
cat "$LATEST_JSON" | jq -c '.findings[] | select(.status == "FAILED" and .severity == "HIGH")' | while read -r finding; do
  CONTROL_ID=$(echo "$finding" | jq -r '.control_id')
  TITLE=$(echo "$finding" | jq -r '.title')
  EVIDENCE=$(echo "$finding" | jq -r '.evidence')
  REMEDIATION=$(echo "$finding" | jq -r '.remediation')

  # Create ServiceNow incident
  curl -X POST "$SNOW_INSTANCE/api/now/table/incident" \
    -u "$SNOW_USERNAME:$SNOW_PASSWORD" \
    -H "Content-Type: application/json" \
    -d "{
      \"short_description\": \"Kafka Compliance: $CONTROL_ID - $TITLE\",
      \"description\": \"Evidence: $EVIDENCE\\n\\nRemediation: $REMEDIATION\",
      \"urgency\": \"2\",
      \"impact\": \"2\",
      \"priority\": \"2\",
      \"category\": \"Security\",
      \"subcategory\": \"Compliance\"
    }"

  echo "Created incident for $CONTROL_ID"
done

Automation with Scripting

Bash Script Example

Multi-cluster scanning with email notifications:

#!/bin/bash
# automated-compliance-scan.sh - Automated multi-cluster scanning with alerts

set -e

# Configuration
CLUSTERS=("kafka-dev:9092" "kafka-staging:9095" "kafka-prod:9095")
POLICIES=("baseline-dev.yaml" "enterprise-default.yaml" "enterprise-default.yaml")
REPORTS_BASE="/var/reports/kafkaguard"
ALERT_EMAIL="security@example.com"

# Timestamp for this scan run
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
SUMMARY_FILE="/tmp/scan-summary-$TIMESTAMP.txt"

echo "KafkaGuard Automated Compliance Scan - $TIMESTAMP" > "$SUMMARY_FILE"
echo "=================================================" >> "$SUMMARY_FILE"
echo "" >> "$SUMMARY_FILE"

# Scan each cluster
for i in "${!CLUSTERS[@]}"; do
  CLUSTER="${CLUSTERS[$i]}"
  POLICY="${POLICIES[$i]}"
  CLUSTER_NAME=$(echo "$CLUSTER" | cut -d: -f1)

  echo "Scanning $CLUSTER_NAME..." | tee -a "$SUMMARY_FILE"

  REPORT_DIR="$REPORTS_BASE/$CLUSTER_NAME/$TIMESTAMP"
  mkdir -p "$REPORT_DIR"

  # Run scan
  kafkaguard scan \
    --bootstrap "$CLUSTER" \
    --policy "policies/$POLICY" \
    --format json,html \
    --out "$REPORT_DIR" \
    --log-level info || true

  # Parse results
  LATEST_JSON=$(ls -t "$REPORT_DIR"/scan-*.json | head -1)

  if [ -f "$LATEST_JSON" ]; then
    SCORE=$(cat "$LATEST_JSON" | jq -r '.summary.score')
    FAILED=$(cat "$LATEST_JSON" | jq -r '.summary.failed')
    FAILED_HIGH=$(cat "$LATEST_JSON" | jq '[.findings[] | select(.status == "FAILED" and .severity == "HIGH")] | length')

    echo "  Score: $SCORE%" | tee -a "$SUMMARY_FILE"
    echo "  Failed Controls: $FAILED" | tee -a "$SUMMARY_FILE"
    echo "  HIGH Severity Failures: $FAILED_HIGH" | tee -a "$SUMMARY_FILE"

    if [ "$FAILED_HIGH" -gt 0 ]; then
      echo "  ⚠️  ALERT: HIGH severity failures detected!" | tee -a "$SUMMARY_FILE"
    fi
  else
    echo "  ❌ Scan failed!" | tee -a "$SUMMARY_FILE"
  fi

  echo "" | tee -a "$SUMMARY_FILE"
done

# Send email summary
mail -s "KafkaGuard Compliance Scan Summary - $TIMESTAMP" "$ALERT_EMAIL" < "$SUMMARY_FILE"

echo "Scan complete. Summary sent to $ALERT_EMAIL"

Python Script Example

Advanced report aggregation and analysis:

#!/usr/bin/env python3
# aggregate-reports.py - Aggregate and analyze KafkaGuard reports

import json
import glob
from datetime import datetime
from collections import defaultdict

class ReportAggregator:
    def __init__(self, reports_dir):
        self.reports_dir = reports_dir
        self.reports = []

    def load_reports(self):
        """Load all JSON reports"""
        for report_file in glob.glob(f"{self.reports_dir}/**/scan-*.json", recursive=True):
            with open(report_file) as f:
                report = json.load(f)
                report['_file'] = report_file
                self.reports.append(report)
        print(f"Loaded {len(self.reports)} reports")

    def aggregate_scores(self):
        """Calculate aggregate scores"""
        scores = [r['summary']['score'] for r in self.reports]
        return {
            'avg_score': sum(scores) / len(scores),
            'min_score': min(scores),
            'max_score': max(scores),
            'total_reports': len(scores)
        }

    def aggregate_failures_by_control(self):
        """Count failures by control ID"""
        failures = defaultdict(int)

        for report in self.reports:
            for finding in report['findings']:
                if finding['status'] == 'FAILED':
                    failures[finding['control_id']] += 1

        # Sort by frequency
        return sorted(failures.items(), key=lambda x: x[1], reverse=True)

    def aggregate_failures_by_severity(self):
        """Count failures by severity"""
        severity_counts = defaultdict(int)

        for report in self.reports:
            for finding in report['findings']:
                if finding['status'] == 'FAILED':
                    severity_counts[finding['severity']] += 1

        return dict(severity_counts)

    def generate_trend_data(self):
        """Generate score trend over time"""
        trend = []

        for report in sorted(self.reports, key=lambda r: r['metadata']['timestamp']):
            trend.append({
                'timestamp': report['metadata']['timestamp'],
                'score': report['summary']['score'],
                'failed': report['summary']['failed']
            })

        return trend

    def generate_summary(self):
        """Generate comprehensive summary"""
        print("\nKafkaGuard Report Aggregation Summary")
        print("=" * 50)

        # Scores
        scores = self.aggregate_scores()
        print(f"\nOverall Scores:")
        print(f"  Average: {scores['avg_score']:.1f}%")
        print(f"  Min: {scores['min_score']}%")
        print(f"  Max: {scores['max_score']}%")
        print(f"  Total Reports: {scores['total_reports']}")

        # Top failures
        print(f"\nTop 10 Failed Controls:")
        failures = self.aggregate_failures_by_control()
        for control_id, count in failures[:10]:
            print(f"  {control_id}: {count} failures")

        # Severity distribution
        print(f"\nFailures by Severity:")
        severity = self.aggregate_failures_by_severity()
        for sev, count in severity.items():
            print(f"  {sev}: {count}")

        # Trend
        print(f"\nScore Trend (last 5 scans):")
        trend = self.generate_trend_data()
        for entry in trend[-5:]:
            print(f"  {entry['timestamp']}: {entry['score']}% ({entry['failed']} failed)")

if __name__ == '__main__':
    import sys

    reports_dir = sys.argv[1] if len(sys.argv) > 1 else '/var/reports/kafkaguard'

    aggregator = ReportAggregator(reports_dir)
    aggregator.load_reports()
    aggregator.generate_summary()

Usage:

python3 aggregate-reports.py /var/reports/kafkaguard

PowerShell Example (Windows)

# kafkaguard-scan.ps1 - Windows PowerShell automation

param(
    [string]$Bootstrap = "kafka.example.com:9092",
    [string]$Policy = "policies/enterprise-default.yaml",
    [string]$OutDir = "C:\Reports\KafkaGuard"
)

Write-Host "KafkaGuard Automated Scan" -ForegroundColor Green
Write-Host "=========================" -ForegroundColor Green

# Create reports directory
New-Item -ItemType Directory -Force -Path $OutDir | Out-Null

# Run scan
Write-Host "Scanning $Bootstrap..."
& kafkaguard scan `
    --bootstrap $Bootstrap `
    --policy $Policy `
    --format json,html `
    --out $OutDir `
    --log-level info

# Parse results
$LatestJson = Get-ChildItem -Path $OutDir -Filter "scan-*.json" | Sort-Object LastWriteTime -Descending | Select-Object -First 1

if ($LatestJson) {
    $Report = Get-Content $LatestJson.FullName | ConvertFrom-Json
    $Score = $Report.summary.score
    $Failed = $Report.summary.failed
    $FailedHigh = ($Report.findings | Where-Object { $_.status -eq "FAILED" -and $_.severity -eq "HIGH" }).Count

    Write-Host ""
    Write-Host "Results:" -ForegroundColor Yellow
    Write-Host "  Score: $Score%"
    Write-Host "  Failed Controls: $Failed"
    Write-Host "  HIGH Severity Failures: $FailedHigh"

    if ($FailedHigh -gt 0) {
        Write-Host "  WARNING: HIGH severity failures detected!" -ForegroundColor Red
    }
}
else {
    Write-Host "ERROR: Scan failed!" -ForegroundColor Red
    exit 1
}

Write-Host ""
Write-Host "Scan complete. Reports saved to $OutDir"

Air-Gapped Deployment Best Practices

Bundle Creation and Verification

Create reproducible, versioned bundles:

#!/bin/bash
# create-airgapped-bundle.sh - Create versioned air-gapped bundle

VERSION="2.3.0"
BUILD_DATE=$(date +%Y%m%d)
BUNDLE_NAME="kafkaguard-airgapped-${VERSION}-${BUILD_DATE}"
BUNDLE_DIR="$BUNDLE_NAME"

echo "Creating KafkaGuard air-gapped bundle: $BUNDLE_NAME"

# Create bundle structure
mkdir -p "$BUNDLE_DIR"/{bin,policies,certs,docs,scripts}

# Download KafkaGuard binary
wget -O "$BUNDLE_DIR/bin/kafkaguard" \
  https://github.com/KafkaGuard/kafkaguard-releases/releases/download/v${VERSION}/kafkaguard_Linux_x86_64.tar.gz
chmod +x "$BUNDLE_DIR/bin/kafkaguard"

# Clone repository for policies and docs
git clone --depth 1 --branch v${VERSION} https://github.com/KafkaGuard/kafkaguardmain.git /tmp/kafkaguard-repo
cp -r /tmp/kafkaguard-repo/policies "$BUNDLE_DIR/"
cp -r /tmp/kafkaguard-repo/docs "$BUNDLE_DIR/"
cp /tmp/kafkaguard-repo/README.md "$BUNDLE_DIR/"

# Create installation script
cat > "$BUNDLE_DIR/install.sh" <<'EOF'
#!/bin/bash
# install.sh - Install KafkaGuard in air-gapped environment

set -e

echo "Installing KafkaGuard..."

# Install binary
sudo cp bin/kafkaguard /usr/local/bin/
sudo chmod +x /usr/local/bin/kafkaguard

# Install policies
sudo mkdir -p /opt/kafkaguard
sudo cp -r policies /opt/kafkaguard/
sudo cp -r docs /opt/kafkaguard/

# Verify installation
kafkaguard version

echo "✅ KafkaGuard installed successfully"
echo ""
echo "Next steps:"
echo "1. Copy certificates to /opt/kafkaguard/certs/"
echo "2. Run: kafkaguard scan --bootstrap <kafka-broker> --policy /opt/kafkaguard/policies/enterprise-default.yaml"
EOF

chmod +x "$BUNDLE_DIR/install.sh"

# Create manifest
cat > "$BUNDLE_DIR/MANIFEST.txt" <<EOF
KafkaGuard Air-Gapped Bundle
=============================

Version: $VERSION
Build Date: $BUILD_DATE
Platform: linux-amd64

Contents:
- bin/kafkaguard: KafkaGuard binary
- policies/: Policy files (baseline-dev, enterprise-default)
- docs/: Documentation
- install.sh: Installation script

Installation:
1. Extract bundle: tar -xzf ${BUNDLE_NAME}.tar.gz
2. Run: cd ${BUNDLE_NAME} && sudo ./install.sh
3. Verify: kafkaguard version

For full documentation, see docs/user-guide/index.md
EOF

# Create tarball
cd ..
tar -czf "${BUNDLE_NAME}.tar.gz" "$BUNDLE_NAME"

# Generate checksums
sha256sum "${BUNDLE_NAME}.tar.gz" > "${BUNDLE_NAME}.tar.gz.sha256"
md5sum "${BUNDLE_NAME}.tar.gz" > "${BUNDLE_NAME}.tar.gz.md5"

# Create verification script
cat > "verify-${BUNDLE_NAME}.sh" <<EOF
#!/bin/bash
# Verify bundle integrity

echo "Verifying bundle integrity..."

if sha256sum -c "${BUNDLE_NAME}.tar.gz.sha256"; then
    echo "✅ SHA256 checksum valid"
else
    echo "❌ SHA256 checksum FAILED"
    exit 1
fi

if md5sum -c "${BUNDLE_NAME}.tar.gz.md5"; then
    echo "✅ MD5 checksum valid"
else
    echo "❌ MD5 checksum FAILED"
    exit 1
fi

echo "✅ Bundle integrity verified"
EOF

chmod +x "verify-${BUNDLE_NAME}.sh"

# Clean up
rm -rf /tmp/kafkaguard-repo

echo "✅ Bundle created: ${BUNDLE_NAME}.tar.gz"
echo "   Size: $(du -h "${BUNDLE_NAME}.tar.gz" | cut -f1)"
echo "   SHA256: $(cat "${BUNDLE_NAME}.tar.gz.sha256" | cut -d' ' -f1)"
echo ""
echo "Verification script: verify-${BUNDLE_NAME}.sh"

Secure Transfer Procedures

Document secure transfer process:

  1. Generate checksums (SHA256, MD5)
  2. Encrypt bundle (GPG, AES-256)
  3. Transfer via approved method (USB, secure file transfer)
  4. Verify integrity on air-gapped system
  5. Decrypt and extract
  6. Validate checksums again

Updating KafkaGuard in Air-Gapped Environments

Establish update procedures:

#!/bin/bash
# update-airgapped.sh - Update KafkaGuard in air-gapped environment

CURRENT_VERSION=$(kafkaguard version --format json | jq -r '.version')
NEW_BUNDLE="kafkaguard-airgapped-1.1.0-20251201.tar.gz"

echo "Current version: $CURRENT_VERSION"
echo "Updating to new bundle: $NEW_BUNDLE"

# Verify bundle
sha256sum -c "$NEW_BUNDLE.sha256" || exit 1

# Extract bundle
tar -xzf "$NEW_BUNDLE"
cd kafkaguard-airgapped-*

# Backup current installation
sudo cp /usr/local/bin/kafkaguard /usr/local/bin/kafkaguard.backup
sudo cp -r /opt/kafkaguard /opt/kafkaguard.backup

# Install new version
sudo ./install.sh

# Verify update
NEW_VERSION=$(kafkaguard version --format json | jq -r '.version')
echo "Updated to version: $NEW_VERSION"

# Test scan
kafkaguard scan \
  --bootstrap kafka-internal:9092 \
  --policy /opt/kafkaguard/policies/baseline-dev.yaml \
  --format json \
  --out /tmp/test-reports

if [ $? -eq 0 ] || [ $? -eq 1 ]; then
    echo "✅ Update successful and verified"
else
    echo "❌ Update verification failed, rolling back..."
    sudo cp /usr/local/bin/kafkaguard.backup /usr/local/bin/kafkaguard
    exit 1
fi

Policy Updates in Air-Gapped Environments

Manage policy updates separately:

# Update only policies (no binary update)
cd /opt/kafkaguard
sudo cp -r policies policies.backup.$(date +%Y%m%d)
sudo cp -r /path/to/new/policies .

# Validate new policies
for policy in policies/*.yaml; do
    kafkaguard validate-policy --policy "$policy"
done

Versioning and Change Management

Maintain change log for air-gapped deployments:

# Air-Gapped Deployment Change Log

## 2025-11-15: v2.0.0 Initial Deployment
- Installed KafkaGuard v2.0.0
- Deployed policies: baseline-dev, enterprise-default
- Configured for kafka-internal:9092 cluster
- First scan: 95% score

## 2025-12-01: Policy Update
- Updated enterprise-default policy
- Added 2 custom controls (KG-101, KG-102)
- Re-scan: 93% score (new controls)

## 2025-12-15: v2.1.0 Upgrade
- Upgraded KafkaGuard to v2.1.0
- New features: improved report generation
- Verified: All scans working

Production Security Considerations

Least Privilege Access

KafkaGuard only needs read-only access to Kafka clusters:

# Required Kafka ACLs for KafkaGuard
kafka-acls.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --add --allow-principal User:kafkaguard \
  --operation Describe --cluster kafka-cluster

kafka-acls.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --add --allow-principal User:kafkaguard \
  --operation Describe --topic '*'

kafka-acls.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --add --allow-principal User:kafkaguard \
  --operation Describe --group '*'

NOT required:

  • ❌ WRITE permissions
  • ❌ DELETE permissions
  • ❌ ALTER permissions
  • ❌ CREATE permissions

Credential Rotation Procedures

Implement regular credential rotation:

#!/bin/bash
# rotate-credentials.sh - Rotate KafkaGuard credentials

NEW_PASSWORD=$(openssl rand -base64 32)

# Update password on Kafka broker
kafka-configs.sh --bootstrap-server kafka:9095 \
  --command-config admin.properties \
  --alter --add-config "SCRAM-SHA-512=[password=$NEW_PASSWORD]" \
  --entity-type users --entity-name kafkaguard

# Update in Vault
vault kv put secret/kafka/prod password="$NEW_PASSWORD"

# Test new credentials
export KAFKAGUARD_SASL_PASSWORD="$NEW_PASSWORD"
kafkaguard scan \
  --bootstrap kafka:9095 \
  --security-protocol SASL_SSL \
  --sasl-mechanism SCRAM-SHA-512 \
  --sasl-username kafkaguard \
  --policy policies/enterprise-default.yaml \
  --format json \
  --out /tmp/test-reports

if [ $? -eq 0 ] || [ $? -eq 1 ]; then
    echo "✅ Credential rotation successful"
else
    echo "❌ Credential rotation failed - reverting"
    # Revert to old password
    exit 1
fi

Audit Logging

Enable comprehensive audit logging:

#!/bin/bash
# Log all KafkaGuard executions

LOG_DIR="/var/log/kafkaguard"
mkdir -p "$LOG_DIR"

LOG_FILE="$LOG_DIR/audit-$(date +%Y%m%d).log"

# Log execution details
echo "$(date -Iseconds) | User: $(whoami) | Command: kafkaguard scan $@" >> "$LOG_FILE"

# Run scan with logging
kafkaguard scan "$@" 2>&1 | tee -a "$LOG_FILE"

EXIT_CODE=$?

# Log result
echo "$(date -Iseconds) | Exit Code: $EXIT_CODE" >> "$LOG_FILE"

exit $EXIT_CODE

Log Retention:

  • Development: 30 days
  • Production: 90+ days (PCI-DSS requirement)
  • Compliance: Per regulatory requirements (1-7 years)

Network Segmentation

Run KafkaGuard in management/monitoring network:

  • DO: Run KafkaGuard from dedicated management network
  • DO: Restrict network access to Kafka brokers (firewall rules)
  • DO: Use VPN for remote scanning
  • DON'T: Run from untrusted networks
  • DON'T: Allow KafkaGuard to be accessible from internet

Report Access Control

Protect reports containing sensitive cluster information:

# Set restrictive permissions on reports
chmod 600 reports/*.pdf
chmod 600 reports/*.json

# Store reports in access-controlled directory
REPORTS_DIR="/var/reports/kafkaguard"
sudo chown -R kafkaguard:security "$REPORTS_DIR"
sudo chmod 750 "$REPORTS_DIR"

Report Distribution:

  • ✅ Use secure file transfer (SFTP, S3 with encryption)
  • ✅ Encrypt reports before sharing (GPG, ZIP with password)
  • ✅ Track report access (audit logs)
  • ❌ Don't email reports unencrypted
  • ❌ Don't store reports in public locations

Next Steps

For more information:


Document Information

  • Last Updated: 2026-04-25
  • Applies to Version: KafkaGuard 2.3.0+
  • Feedback: Open an issue for improvements