Klaster OKD 4.19 (OpenShift) – Projekt monitorujący serwer PVE 9 i klaster OKD morsUsl, aplikacja mors-usl-okd

Wstęp

Poniższy opis jest częścią większej całości https://itadmin.vblog.ovh/klaster-okd-4-19-openshift-monitorowanie-serwera-pve-9-i-klastra-okd-projekt-mors/

Wymaga przygotowania klastra OKD 4.19 oraz serwera baz danych PostgreSQL

Prawidłowa kolejność przygotowania projektu MORS na klastrze OKD 4.19

Opis aplikacji mors-usl-okd

Aplikacja służy do zasilania przez REST API do bazy danych PostgreSQL: dba_mors danymi pochodzącymi klastra OKD. Wykorzystuje ServiceAccount o nazwie mors-usl-okd w namespace morsusl-prod aby przez uprawnienia RBAC na poziomie całego klastra móc pobierać metryki klastra z:

  • GET/LIST nodes
  • GET/LIST metrics.k8s.io nodes.

Struktura aplikacji mors-usl-okd

/repo/pakiety/morsusl/1.0.0/mors-usl-okd
drwxr-xr-x 2 bastuser bastuser  21 Feb 11 19:56 app
-rw-r--r-- 1 bastuser bastuser 324 Feb 11 20:04 cm-mors-usl-okd.yaml
-rw-r--r-- 1 bastuser bastuser 302 Feb 11 20:55 Dockerfile

Przygotowanie Dockerfile na potrzeby zbudowania obrazu aplikacji

FROM python:3.13-slim
# Security / runtime niceties
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1
WORKDIR /app
# Install runtime deps
RUN pip install --no-cache-dir requests==2.32.3
# Create non-root user
RUN useradd -m -u 10001 appuser
USER 10001
COPY app/ /app/
CMD ["python", "-m", "main"]

Kod aplikacji python mors-usl-okd

from __future__ import annotations
import json
import os
import time
import typing as t
from dataclasses import dataclass
from pathlib import Path
import requests
from requests import Response

# ---------------------------
# Konfiguracja / logowanie
# ---------------------------

def env(name: str, default: str | None = None) -> str:
    val = os.getenv(name, default)
    if val is None:
        raise RuntimeError(f"Brak wymaganej zmiennej środowiskowej: {name}")
    return val

def env_bool(name: str, default: str = "0") -> bool:
    v = os.getenv(name, default).strip().lower()
    return v in {"1", "true", "yes", "y", "on"}

def env_int(name: str, default: int) -> int:
    v = os.getenv(name, str(default)).strip()
    try:
        return int(v)
    except ValueError as e:
        raise RuntimeError(f"Zmienna {name} musi być int, a jest: {v!r}") from e

DEBUG = env_bool("DEBUG", "0")

def log(msg: str) -> None:
    print(msg, flush=True)


def dlog(msg: str) -> None:
    if DEBUG:
        log(f"[DEBUG] {msg}")

# ---------------------------
# In-cluster ServiceAccount auth (OpenShift/Kubernetes)
# ---------------------------

SA_TOKEN_PATH = Path("/var/run/secrets/kubernetes.io/serviceaccount/token")
SA_CA_PATH = Path("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")

def in_cluster_available() -> bool:
    return SA_TOKEN_PATH.exists()

def read_sa_token() -> str:
    token = SA_TOKEN_PATH.read_text(encoding="utf-8").strip()
    if not token or len(token) < 20:
        raise RuntimeError("ServiceAccount token wygląda podejrzanie (pusty/za krótki).")
    return token

def build_verify(skip_tls_verify: bool) -> bool | str:
    """
    requests.verify może być:
      - False (gdy skip verify)
      - True (gdy systemowe CA wystarcza)
      - ścieżka do pliku CA (najlepiej w klastrze: SA_CA_PATH)
    """
    if skip_tls_verify:
        return False
    if SA_CA_PATH.exists():
        return str(SA_CA_PATH)
    return True

def maybe_disable_insecure_warning(skip_tls_verify: bool) -> None:
    # Wyłączamy warning tylko wtedy, gdy świadomie robimy verify=False
    if not skip_tls_verify:
        return
    try:
        import urllib3
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    except Exception:
        # Nie jest krytyczne
        pass

# ---------------------------
# Kubernetes quantity parsing
# ---------------------------

_DECIMAL_SI = {
    "n": 1e-9,
    "u": 1e-6,
    "m": 1e-3,
    "": 1.0,
    "k": 1e3,
    "M": 1e6,
    "G": 1e9,
    "T": 1e12,
    "P": 1e15,
    "E": 1e18,
}

_BINARY_SI = {
    "Ki": 1024**1,
    "Mi": 1024**2,
    "Gi": 1024**3,
    "Ti": 1024**4,
    "Pi": 1024**5,
    "Ei": 1024**6,
}

def parse_quantity_cpu_to_cores(q: str) -> float:
    """
    CPU quantity -> cores (float)
    Przykłady:
      - "411m" -> 0.411
      - "2" -> 2.0
      - "250000000n" -> 0.25
    """
    q = q.strip()
    for suffix in sorted(_DECIMAL_SI.keys(), key=len, reverse=True):
        if suffix and q.endswith(suffix):
            num = q[: -len(suffix)]
            return float(num) * _DECIMAL_SI[suffix]
    return float(q)

def parse_quantity_mem_to_bytes(q: str) -> float:
    """
    Memory quantity -> bytes (float)
    Przykłady:
      - "4891Mi" -> 4891 * 2^20
      - "16Gi" -> 16 * 2^30
    """
    q = q.strip()
    for suffix in sorted(_BINARY_SI.keys(), key=len, reverse=True):
        if q.endswith(suffix):
            num = q[: -len(suffix)]
            return float(num) * _BINARY_SI[suffix]
    for suffix in sorted(_DECIMAL_SI.keys(), key=len, reverse=True):
        if suffix and q.endswith(suffix):
            num = q[: -len(suffix)]
            return float(num) * _DECIMAL_SI[suffix]
    return float(q)

# ---------------------------
# OKD / Kubernetes API (in-cluster)
# ---------------------------

@dataclass(frozen=True)
class OkdConfig:
    api_url: str
    skip_tls_verify: bool
    http_timeout: int

def okd_api_get(cfg: OkdConfig, token: str, path: str) -> dict:
    url = cfg.api_url.rstrip("/") + path
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json",
    }
    verify_val = build_verify(cfg.skip_tls_verify)

    dlog(f"GET {url} verify={verify_val!r}")
    r = requests.get(
        url,
        headers=headers,
        timeout=cfg.http_timeout,
        verify=verify_val,
    )
    if r.status_code != 200:
        raise RuntimeError(f"GET {path} failed: HTTP {r.status_code} -> {r.text[:500]}")
    return t.cast(dict, r.json())

def okd_get_nodes_capacity(cfg: OkdConfig, token: str) -> dict[str, dict[str, float]]:
    """Zwraca capacity CPU (cores) i memory (bytes) per node."""
    doc = okd_api_get(cfg, token, "/api/v1/nodes")
    out: dict[str, dict[str, float]] = {}

    for item in doc.get("items", []):
        name = item.get("metadata", {}).get("name")
        cap = item.get("status", {}).get("capacity", {}) or {}
        if not name:
            continue

        cpu = parse_quantity_cpu_to_cores(str(cap.get("cpu", "0")))
        mem = parse_quantity_mem_to_bytes(str(cap.get("memory", "0")))

        out[name] = {"cpu_cores": cpu, "mem_bytes": mem}

    return out

def okd_get_nodes_usage(cfg: OkdConfig, token: str) -> dict[str, dict[str, float]]:
    """Zwraca usage CPU (cores) i memory (bytes) per node z Metrics API."""
    doc = okd_api_get(cfg, token, "/apis/metrics.k8s.io/v1beta1/nodes")
    out: dict[str, dict[str, float]] = {}

    for item in doc.get("items", []):
        name = item.get("metadata", {}).get("name")
        usage = item.get("usage", {}) or {}
        if not name:
            continue

        cpu = parse_quantity_cpu_to_cores(str(usage.get("cpu", "0")))
        mem = parse_quantity_mem_to_bytes(str(usage.get("memory", "0")))

        out[name] = {"cpu_cores": cpu, "mem_bytes": mem}

    return out

# ---------------------------
# Mapowanie i payload do MORS API
# ---------------------------

NODE_KEY_MAP = {
    "compute-1": ("compute_1_cpu_prc", "compute_1_ram_prc"),
    "compute-2": ("compute_2_cpu_prc", "compute_2_ram_prc"),
    "compute-3": ("compute_3_cpu_prc", "compute_3_ram_prc"),
    "control-plane-1": ("control_1_cpu_prc", "control_1_ram_prc"),
    "control-plane-2": ("control_2_cpu_prc", "control_2_ram_prc"),
    "control-plane-3": ("control_3_cpu_prc", "control_3_ram_prc"),
}

def classify_node(name: str) -> str | None:
    # node nazwy w stylu compute-1.testcluster.okdlab.local
    short = name.split(".", 1)[0]
    return short if short in NODE_KEY_MAP else None

def build_payload(
    cap: dict[str, dict[str, float]],
    use: dict[str, dict[str, float]],
) -> dict[str, int]:
    payload: dict[str, int] = {}

    # zawsze wysyłamy komplet kluczy
    for cpu_key, ram_key in NODE_KEY_MAP.values():
        payload[cpu_key] = 0
        payload[ram_key] = 0

    for node_name, u in use.items():
        short = classify_node(node_name)
        if not short:
            continue

        cpu_key, ram_key = NODE_KEY_MAP[short]

        # capacity słownik jest po pełnych nazwach node
        c = cap.get(node_name)
        if not c:
            dlog(f"Brak capacity dla node {node_name}")
            continue

        cpu_prc = int(round((u["cpu_cores"] / max(c["cpu_cores"], 1e-9)) * 100))
        ram_prc = int(round((u["mem_bytes"] / max(c["mem_bytes"], 1.0)) * 100))

        cpu_prc = max(0, min(100, cpu_prc))
        ram_prc = max(0, min(100, ram_prc))

        payload[cpu_key] = cpu_prc
        payload[ram_key] = ram_prc

    return payload

# ---------------------------
# MORS API client
# ---------------------------

@dataclass(frozen=True)
class MorsConfig:
    url: str
    token: str
    tls_verify: bool
    http_timeout: int

def mors_post(cfg: MorsConfig, payload: dict[str, int]) -> Response:
    headers = {
        "Content-Type": "application/json",
        "Authorization": cfg.token,
        "Accept": "application/json",
    }
    dlog(f"POST {cfg.url} payload={payload}")

    r = requests.post(
        cfg.url,
        headers=headers,
        data=json.dumps(payload),
        timeout=cfg.http_timeout,
        verify=cfg.tls_verify,
    )
    return r

# ---------------------------
# Main loop
# ---------------------------

def main() -> int:
    mors_url = env("MORS_API_URL")
    mors_token = env("MORS_API_TOKEN")
    mors_tls_verify = env_bool("MORS_API_TLS_VERIFY", "1")

    okd_url = env("OKD_API_URL", "https://kubernetes.default.svc")
    okd_skip_tls = env_bool("OKD_API_SKIP_TLS_VERIFY", "no")

    interval = env_int("UPDATE_INTERVAL", 5)
    timeout = env_int("HTTP_TIMEOUT", 10)

    maybe_disable_insecure_warning(okd_skip_tls)

    mors_cfg = MorsConfig(
        url=mors_url,
        token=mors_token,
        tls_verify=mors_tls_verify,
        http_timeout=timeout,
    )
    okd_cfg = OkdConfig(
        api_url=okd_url,
        skip_tls_verify=okd_skip_tls,
        http_timeout=timeout,
    )

    log("mors-usl-okd starting...")
    log(f"UPDATE_INTERVAL={interval}s HTTP_TIMEOUT={timeout}s DEBUG={int(DEBUG)}")
    log(f"OKD_API_URL={okd_url} skip_tls_verify={okd_skip_tls}")
    log(f"MORS_API_URL={mors_url} tls_verify={mors_tls_verify}")

    if not in_cluster_available():
        log("[ERROR] Brak ServiceAccount token w podzie. Ten kontener musi działać w Kubernetes/OpenShift.")
        log(f"Sprawdź czy istnieje: {SA_TOKEN_PATH}")
        return 2

    token = read_sa_token()
    dlog("Używam in-cluster ServiceAccount token (bez OAuth user login).")

    while True:
        try:
            cap = okd_get_nodes_capacity(okd_cfg, token)
            use = okd_get_nodes_usage(okd_cfg, token)
            payload = build_payload(cap, use)

            r = mors_post(mors_cfg, payload)

            if r.status_code == 401:
                log(f"[ERROR] MORS API unauthorized (401). Sprawdź MORS_API_TOKEN. Body: {r.text[:300]}")
            elif 200 <= r.status_code < 300:
                log(f"OK -> wysłano metryki. HTTP {r.status_code}")
            else:
                log(f"[WARN] MORS API HTTP {r.status_code}: {r.text[:300]}")

        except requests.exceptions.SSLError as e:
            log(f"[ERROR] TLS/SSL error: {e}")
        except requests.exceptions.Timeout:
            log("[ERROR] Timeout podczas komunikacji HTTP")
        except requests.exceptions.ConnectionError as e:
            log(f"[ERROR] Connection error: {e}")
        except Exception as e:
            msg = str(e)

            # Bardzo częste w tym miejscu: RBAC na nodes / metrics
            if "HTTP 403" in msg or "forbidden" in msg.lower():
                log(f"[ERROR] RBAC/Forbidden: {msg[:300]}")
                log("[HINT] Nadaj ServiceAccount uprawnienia do: get/list nodes i get/list nodes.metrics.k8s.io")
            else:
                log(f"[ERROR] {type(e).__name__}: {msg}")

            if DEBUG:
                raise

        time.sleep(max(1, interval))

if __name__ == "__main__":
    raise SystemExit(main())

Co robi Service (w praktyce)

Service to „stabilny punkt dostępu” do Podów w klastrze. Pody żyją i umierają, mają zmienne IP. Service ma stały DNS i stały virtual IP (ClusterIP). Service kieruje ruch do Podów przez selektory labeli (np. app=mors-gui-api). W projekcie morsgui-prod jest mors-gui-api. Najlepsza praktyka w klastrze:Inne workloady w klastrze (np. mors-usl-proxmox w morsusl-prod) powinny trafiać do API przez Service, nie przez Route.

Czyli zamiast:

https://mors-gui-api.morsgui-prod.apps… (Route / ingress / TLS / zewnętrzny endpoint)
np. https://mors-gui-api.morsgui-prod.apps.testcluster.okdlab.local/rest-api/v1/stats/proxmox/create

powinno się używać:

http://mors-gui-api.morsgui-prod.svc:8080/… (wewnętrzny Service DNS)
np. http://mors-gui-api.morsgui-prod.svc:8080/rest-api/v1/stats/proxmox/create

To ma duże plusy:

omijasz ingress/router, zwykle omijasz TLS (możesz iść po HTTP w sieci klastra, jeśli akceptujesz), unikasz warningów TLS typu InsecureRequestWarning, mniej zależności od certów/route.

Co robi Route

Route wystawia Service „na zewnątrz” klastra przez router/ingress OpenShift. Route mapuje hosta DNS (np. …apps.testcluster…) do Service. Route może robić TLS (edge/passthrough/reencrypt), host-based routing itd.

Route jest potrzebna, jeśli:

Chcesz, żeby API było dostępne z sieci spoza klastra, albo z innych środowisk (np. z laptopa/VM poza OCP).

Kiedy Route nie jest potrzebna.

Jeżeli jedynym klientem API jest inny Pod w klastrze (u Ciebie: mors-usl-proxmox), to w wielu przypadkach Route jest zbędna.

Przygotowanie configmapy dla aplikacji mors-usl-okd

apiVersion: v1
kind: ConfigMap
metadata:
  name: cm-mors-usl-okd
  namespace: morsusl-prod
data:
  MORS_API_URL: "http://mors-gui-api.morsgui-prod.svc:8080/rest-api/v1/stats/okd/create"
  OKD_API_URL: "https://kubernetes.default.svc"
  OKD_API_SKIP_TLS_VERIFY: "no"
  UPDATE_INTERVAL: "5"
  DEBUG: "0"
  HTTP_TIMEOUT: "10"

Przygotowanie secrets dla aplikacji mors-usl-okd

apiVersion: v1
kind: Secret
metadata:
  name: secret-mors-usl-api-token
  namespace: morsusl-prod
type: Opaque
stringData:
  API_TOKEN: "cxbn762xamnMYt2d"

Uruchomienie aplikacji mors-uls-okd w projekcie morsusl-prod

# jesli projekt jeszcze nie istnieje
oc new-project morsusl-prod

# wrzuć CM i Secret jako manifesty:
oc apply -f /repo/pakiety/morsusl/1.0.0/mors-usl-okd/cm-mors-usl-okd.yaml
oc apply -f /repo/pakiety/morsusl/1.0.0/mors-usl-okd/secret-mors-usl-okd.yaml

# build binarny z Dockerfile
oc new-build --strategy=docker --binary --name=mors-usl-okd-build

cd /repo/pakiety/morsusl/1.0.0/mors-usl-okd/
oc start-build mors-usl-okd-build --from-dir=. --follow

# app z image stream builda
oc new-app mors-usl-okd-build:latest --name=mors-usl-okd

# ENV z configmapy
oc set env deployment/mors-usl-okd --from=configmap/cm-mors-usl-okd

# ENV z secreta (TO JEST TEN BRAKUJĄCY KROK)
oc set env deployment/mors-usl-okd --from=secret/secret-mors-usl-okd

OpenShift automatycznie montuje do poda:

token serviceaccount:

/var/run/secrets/kubernetes.io/serviceaccount/token

CA klastra:

/var/run/secrets/kubernetes.io/serviceaccount/ca.crt

Czyli aplikacja może:

użyć in-cluster auth (SA token),

pytać API pod adresem: https://kubernetes.default.svc

Nasz aplikacja działa w klastrze OpenShift i ma czytać:

listę node’ów (/api/v1/nodes) → to zasób “cluster-scoped”

metryki node’ów (/apis/metrics.k8s.io/v1beta1/nodes) → też “cluster-scoped”

To są endpointy, do których:

zwykły pod “z automatu” nie ma dostępu

domyślny ServiceAccount w namespace (default) też najczęściej nie ma dostępu

Dlatego bez RBAC dostajesz 403 “forbidden”.

potrzebujesz RBAC dla serviceaccount

Domyślny SA w namespace zwykle nie ma prawa do:

GET/LIST nodes

GET/LIST metrics.k8s.io nodes

Więc musisz dodać uprawnienia.

# Tworzy obiekt Kubernetes: ServiceAccount o nazwie mors-usl-okd w namespace morsusl-prod
# ServiceAccount to “tożsamość” dla poda w klastrze.
oc create sa mors-usl-okd -n morsusl-prod

# Nadaje uprawnienia RBAC na poziomie całego klastra: bierze gotową rolę wbudowaną: ClusterRole cluster-reader, przypisuje ją do ServiceAccount mors-usl-okd w namespace morsusl-prod
# -z mors-usl-okd → “serviceaccount” (zamiast usera)
oc adm policy add-cluster-role-to-user cluster-reader -z mors-usl-okd -n morsusl-prod

# Modyfikuje Deployment mors-usl-okd w namespace morsusl-prod,  Pod musi mieć tę tożsamość, której dałeś uprawnienia.
oc set serviceaccount deployment/mors-usl-okd mors-usl-okd -n morsusl-prod

# Wymusza restart rollout’u Deploymentu.
oc rollout restart deployment/mors-usl-okd

# logi
oc logs -f deployment/mors-usl-okd

Lepiej (docelowo) — minimalny ClusterRole pod metrics i nodes “least privilege”, należy utworzyć własny nowy  ClusterRole z uprawnieniami:

  • nodes (get/list)
  • nodes.metrics.k8s.io (get/list)

Weryfikacja działania aplikacji mors-usl-okd

[bastuser@bastion mors-usl-proxmox]$ oc logs -f mors-usl-okd-7dd877976d-tsvsj
mors-usl-okd starting...
UPDATE_INTERVAL=10s HTTP_TIMEOUT=10s DEBUG=0
OKD_API_URL=https://kubernetes.default.svc skip_tls_verify=False
MORS_API_URL=http://mors-gui-api.morsgui-prod.svc:8080/rest-api/v1/stats/okd/create tls_verify=True
OK -> wysłano metryki. HTTP 200
OK -> wysłano metryki. HTTP 200