Creating a kubernetes autoscaling operator that responds to UPS events



Now that I have a good number of services running on my Talos Kubernetes cluster and backups sorted, I wanted a solution to power failures. For a long time, I’ve used a UPS to give enough time to shutdown servers gracefully. With services, databases and storage all on the same cluster, I wanted to investigate scaling down services and bringing them back up again when power is restored. This should happen in a specified order - deployments & statefulsets first followed by databases and final storage.

The basic idea is straight-forward:

when ups triggers a shutdowns event:
for priority in 1..n
  scale down workloads with priority
next priority

This involved a few different topics:

Scaling down Kubernetes workload

The basic command to scale a Kubernetes deployment or stateful set is kubectl scale —replicas N. In the background kubectl calls the Kubernetes API and updates the spec.replicas field to the new value. We can also directly patch the workload and helpfully the javascript Kubernetes API client provides an example:

async function scale(namespace, name, replicas) {
    // find the particular deployment
    const deployment = await k8sApi.readNamespacedDeployment({
        name,
        namespace,
    });

    // edit
    const newDeployment = {
        ...deployment,
        spec: {
            ...deployment.spec,
            replicas,
        },
    };

    // replace
    await k8sApi.replaceNamespacedDeployment({
        name,
        namespace,
        body: newDeployment,
    });
}

While this works for Deployments and StatefulSets, we need a different approach for CronJobs and CloudNativePG PostgreSQL Clusters

CronJobs

cronjobs can be suspended:

async function suspendCronJob(namespace, name) {
  logger.info(`Suspending cronjob/${name} in namespace: ${namespace}`)
  let patch = [{
    op: "replace",
    path: `/spec/suspend`,
    value: true
  }];
  await patchCustomObject(name, namespace, group, version, plural, patch);
  logger.info(`Completed suspending cronjob/${name} in namespace: ${namespace}`)
}

CloudNativePG PostgreSQL Clusters

A CloudNativePG PostgreSQL cluster can be hibernated by setting the cnpg.io/hibernation=on annotation:

kubectl annotate cluster cloudpg-cluster --overwrite cnpg.io/hibernation=on

A hibernated cluster won’t have any running Pods, while the PVCs are retained so that the cluster can be rehydrated at a later time. Replica PVCs will be kept in addition to the primary’s PVC. The hibernation procedure will delete the primary Pod and then the replica Pods, avoiding switchover, to ensure the replicas are kept in sync. The hibernation status can be monitored by looking for the cnpg.io/hibernation condition:

kubectl get cluster cloudpg-cluster -o "jsonpath={.status.conditions[?(.type==\"cnpg.io/hibernation\")]}" 
{
        "lastTransitionTime":"2023-03-05T20:43:35Z",
        "message":"Cluster has been hibernated",
        "reason":"Hibernated",
        "status":"True",
        "type":"cnpg.io/hibernation"
}

To rehydrate a cluster, either set the cnpg.io/hibernation annotation to off:

kubectl annotate cluster cloudpg-cluster --overwrite cnpg.io/hibernation=off

Defining the order of scaling activities and Capturing the cluster “up” specification

To be able to scale cluster workloads without access to storage, the ups-scaler operator needed a way to store a couple of data points:

When a workload is scaled down, the workload specification is updated to the new values. To be able to restore the workload back to the original state, we also need to store this state somewhere. Before scaling down any services, the operator gets and saves the current cluster specification.

My original plan was to use two annotations:

Rook-Ceph requires a different order for scaling a cluster back up to that for scaling down - it’s not just the reverse. This meant adding a third label for the scaling up order:

After scaling rook-ceph down for on the cluster, I learnt something else. When the rook-ceph operator restarted, custom annotations and labels were removed. A new design was needed. Enter CustomResources.

Refactoring from annotations to Custom Resources

Thanks to the suggestion from @nicr9, I started investigating. CustomResources are extensions of the Kubernetes API and, in this case, allowed the annotations to replaced with something independent, flexible and scalable. One of the major advantages of using CustomResources is that the workloads do not each need to be annotated - the scale down priorities can be held separately.

First we need a CustomResourceDefinition:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: upsscalers.ups-scaler.io
spec:
  group: ups-scaler.io
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                # used to match cluster resource
                type:
                  type: string
                # used to match cluster resource
                namespace:
                  type: string
                # used to match cluster resource
                name:
                  type: string
                # custom resource defines
                shutdownPriority:
                  type: integer
                # custom resource defines
                scaleUpPriority:
                  type: integer
                # cluster -> custom resource
                specReplicas:
                  type: integer
                  default: -1
      subresources:
        status: {}
  scope: Namespaced
  names:
    kind: UpsScaler
    plural: upsscalers
    singular: upsscaler
    shortNames:
      - ups

This is actually quite straight-forward. There are three fields (type, name, namespace) that are used to match the CustomResource to an actual workload and then three fields to capture the information we need (shutdownPriority, scaleUpPriority, specReplicas). The value for specReplicas is added by the operator so the default is -1.

A single CustomResource for each workload then provides the configuration:

apiVersion: ups-scaler.io/v1alpha1
kind: UpsScaler
metadata:
  name: nginx-deployment-down-priority-1
  namespace: ups-dev
spec:
  type: deployment
  namespace: ups-dev
  name: nginx-deployment-down-priority-1
  shutdownPriority: 1
  scaleUpPriority: 0

Having the name and namesspace in both the metadata and spec sections is most likely duplicative. One for the cleanup list.

Handling UPS events

My UPS device is an APC Back-UPS BX1600MI. It is directly connected to a Synology NAS and the UPS status is exposed using nut-exporter and Prometheus. Originally, I planned to get the UPS status from Prometheus - mainly because Prometheus has an easy to access API:

async function getPrometheusMetric(queryString)
{
  let prometheusURL = 'http://prometheus-kube-prometheus-prometheus.prometheus.svc.cluster.local:9090/api/v1/query'
  let url = prometheusURL + '?query=' + queryString;
  try {
    r = await axios.get(prometheusURL, {
      params: { 'query' : queryString }
    })
    return r.data.data.result
  } catch (error) {
    logger.error(error);
  }
}

Of course, once Prometheus is scaled down, the API goes away and it’s not possible to determine when the power is back on.

As an alternative, the operator now queries nut-exporter directly and converts (some of) the Prometheus metrics into a JSON object.

nut-exporter output:

# TYPE network_ups_tools_battery_runtime gauge
network_ups_tools_battery_runtime 1700

...

# TYPE network_ups_tools_ups_status gauge
network_ups_tools_ups_status{flag="BOOST"} 0
network_ups_tools_ups_status{flag="BYPASS"} 0
network_ups_tools_ups_status{flag="CAL"} 0
network_ups_tools_ups_status{flag="CHRG"} 0
network_ups_tools_ups_status{flag="DISCHRG"} 0
network_ups_tools_ups_status{flag="FSD"} 0
network_ups_tools_ups_status{flag="HB"} 0
network_ups_tools_ups_status{flag="LB"} 0
network_ups_tools_ups_status{flag="OB"} 0
network_ups_tools_ups_status{flag="OFF"} 0
network_ups_tools_ups_status{flag="OL"} 1
network_ups_tools_ups_status{flag="OVER"} 0
network_ups_tools_ups_status{flag="RB"} 0
network_ups_tools_ups_status{flag="SD"} 0
network_ups_tools_ups_status{flag="TRIM"} 0

Becomes a JSON object:

{
  "status": "ok",
  "flags": {
    "BOOST": 0,
    "BYPASS": 0,
    "CAL": 0,
    "CHRG": 0,
    "DISCHRG": 0,
    "FSD": 0,
    "HB": 0,
    "LB": 0,
    "OB": 0,
    "OFF": 0,
    "OL": 1,
    "OVER": 0,
    "RB": 0,
    "SD": 0,
    "TRIM": 0
  },
  "runtime": 1700
}

There are a few values we are interested in. Note: The values here are reported by my UPS setup and could be different for your configuration.

Mocking Kubernetes endpoints with nock

Without doubt this is where I learnt the most during this project - about the behaviour of my orchestrator and about the Kubernetes API.

I’m a fan of the riteway way testing framework from Eric Elliott as it provides a simple way to define tests and compare expected and actual behaviour. Setting up a test is straight-forward as shown by the documentation example:

import { describe, Try } from 'riteway';

// a function to test
const sum = (...args) => {
  if (args.some(v => Number.isNaN(v))) throw new TypeError('NaN');
  return args.reduce((acc, n) => acc + n, 0);
};

describe('sum()', async assert => {

  assert({
    given: 'no arguments',
    should: 'return 0',
    actual: sum(),
    expected: 0
  });

  assert({
    given: 'zero',
    should: 'return the correct sum',
    actual: sum(2, 0),
    expected: 2
  });
});

Tests for this project included tests for each Deployment, StatefulSet, CronJob and Cluster workload types, tests for the UPS nut exporter and, perhaps most importantly, end-to-end tests for cluster scale down and scale up events. Below is an extract from the cluster scale down test:

describe('checkAndActionNutStatus() shutdown', async assert => {
  let given = 'A shutdown event';
  let should = 'Shutdown the cluster and respond that the cluster has been scaled down';
  failOnNockNoMatch(assert);
  let shutdownMockData = upsMockData.replace('network_ups_tools_ups_status{flag="FSD"} 0', 'network_ups_tools_ups_status{flag="FSD"} 1');
  let nutExporterMockResult = shutdownMockData;
  nock(`${NUT_EXPORTER}`)
    .get(`/ups_metrics`)
    .reply(200, nutExporterMockResult);

  // Kubernetes endpoint
  let mockDeploymentResult = {};
  let mockClusterResult = {};
  let mockStatefulSetResult = {};
  let mockCronJobSetResult = {};
  let mockUpsCRResult = {};
  mockDeploymentResult.items = await readMockResponseFromFile(`./test-data/deployments.json`);
  mockClusterResult.items = await readMockResponseFromFile(`./test-data/clusters.json`);
  mockStatefulSetResult.items = await readMockResponseFromFile(`./test-data/statefulsets.json`);

  mockCronJobSetResult.items =  await readMockResponseFromFile(`./test-data/cronjobs.json`);
  mockUpsCRResult.items = await readMockResponseFromFile(`./test-data/cr.json`);
  mockStatefulSetResult.items.splice(2, 1);
  // Use a single nock interceptor object
  nock(`${KUBERNETES_API_ENDPOINT}`)
    // GET WORKLOADS
    .get(`/apis/apps/v1/deployments`).reply(200, () => {
      return mockDeploymentResult
    })
    .get(`/apis/postgresql.cnpg.io/v1/clusters`).reply(200, () => {
      return mockClusterResult
    })
    .get(`/apis/apps/v1/statefulsets`).reply(200, () => {
      return mockStatefulSetResult
    })
    .get(`/apis/batch/v1/cronjobs`).reply(200, () => {
      return mockCronJobSetResult
    })
    .get(`/apis/ups-scaler.io/v1alpha1/upsscalers`).reply(200, () => {
      return mockUpsCRResult
    })
    // PRIORITY 1
    .get(`/apis/apps/v1/namespaces/ups/deployments/nginx-deployment-down-priority-1`).reply(200, () => {
      return mockDeploymentResult.items[1];
    })
    .get(`/apis/apps/v1/namespaces/ups/deployments/nginx-deployment-down-priority-1`).reply(200, () => {
      return mockDeploymentResult.items[1];
    })
    .put(`/apis/apps/v1/namespaces/ups/deployments/nginx-deployment-down-priority-1`).reply(200, () => {
      mockDeploymentResult.items[1].spec.replicas = 0;
      return mockDeploymentResult.items[1];
    })
    .get(`/apis/apps/v1/namespaces/ups/deployments/nginx-deployment-down-priority-1`).reply(200, () => {
      delete mockDeploymentResult.items[1].status.readyReplicas;
      return mockDeploymentResult.items[1];
    })
    // PRIORITY 2
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      return mockStatefulSetResult.items[0];
    })
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      return mockStatefulSetResult.items[0];
    })
    .put(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      mockStatefulSetResult.items[0].spec.replicas = 0;
      return mockStatefulSetResult.items[0];
    })
    // Two extra gets to simulate a delay
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      return mockStatefulSetResult.items[0];
    })
    // And this one
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      return mockStatefulSetResult.items[0];
    })
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      delete mockStatefulSetResult.items[0].status.readyReplicas;
      return mockStatefulSetResult.items[0];
    })

    // PRIORITY 3
    // PRIORITY 4
    // PRIORITY 5
    // etc  

  let expectedResult = {
    ok: true,
    status: 'Cluster scaled down',
    data: {}
  }
  let actualResult = await checkAndActionNutStatus();
  assert({
    given: given,
    should: should,
    actual: actualResult,
    expected: expectedResult
  });
});

The actual test simulates five different workloads on the cluster and scales them down in priority order making 27 calls to the Kubernetes API in the process - including get, put and patch calls. We only get a successul test if these endpoints are called (and no others) and the checkAndActionNutStatus() function returns ok: true.

Breaking the code down:

failOnNockNoMatch() is a function that causes the test to fail if a nock endpoint is not defined. This ensures there is a mock for every API call.

export function failOnNockNoMatch(assert) {
  nock.emitter.on('no match', (req) => {
    logger.error(`Nock does not have a mock for the path: ${req.path}`)
    logger.error(`req.path: ${req.path}`);
    logger.error(`req.options.href: ${options.href}`);
    assert({
      given: 'an unexpected HTTP request',
      should: 'not have an unmatched request',
      actual: `Nock: No match for request ${req.method} ${req.href}`,
      expected: 'All requests should match nock interceptors',
    });
  });
};

The sample dataset and expected results are then loaded from files - as the workload definitions are fairly verbose, I find it cleaner to keep these in separate files.

  // Kubernetes endpoint
  let mockDeploymentResult = {};
  let mockClusterResult = {};
  let mockStatefulSetResult = {};
  let mockCronJobSetResult = {};
  let mockUpsCRResult = {};
  mockDeploymentResult.items = await readMockResponseFromFile(`./test-data/deployments.json`);
  mockClusterResult.items = await readMockResponseFromFile(`./test-data/clusters.json`);
  mockStatefulSetResult.items = await readMockResponseFromFile(`./test-data/statefulsets.json`);
  mockCronJobSetResult.items =  await readMockResponseFromFile(`./test-data/cronjobs.json`);
  mockUpsCRResult.items = await readMockResponseFromFile(`./test-data/cr.json`);

We then set up a mock for the UPS event to trigger a scale down:

  let shutdownMockData = upsMockData.replace('network_ups_tools_ups_status{flag="FSD"} 0', 'network_ups_tools_ups_status{flag="FSD"} 1');
  nock(`${NUT_EXPORTER}`)
    .get(`/ups_metrics`)
    .reply(200, shutdownMockData);

And for the expected API calls. First getting the cluster workloads:

  nock(`${KUBERNETES_API_ENDPOINT}`)
    // GET WORKLOADS
    .get(`/apis/apps/v1/deployments`).reply(200, () => {
      return mockDeploymentResult
    })
    .get(`/apis/postgresql.cnpg.io/v1/clusters`).reply(200, () => {
      return mockClusterResult
    })
    .get(`/apis/apps/v1/statefulsets`).reply(200, () => {
      return mockStatefulSetResult
    })
    .get(`/apis/batch/v1/cronjobs`).reply(200, () => {
      return mockCronJobSetResult
    })
    .get(`/apis/ups-scaler.io/v1alpha1/upsscalers`).reply(200, () => {
      return mockUpsCRResult
    })

And then scaling down an individual workload. MariaDB-Galera runs on the cluster with three replicas and takes a few seconds to scale down. This is simulated in the test by not updating spec.readyReplicas immediately. This approach ensures that workload dependencies are correctly handled during both scale down and scale up activities.

    // PRIORITY 2
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      return mockStatefulSetResult.items[0];
    })
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      return mockStatefulSetResult.items[0];
    })
    .put(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      mockStatefulSetResult.items[0].spec.replicas = 0;
      return mockStatefulSetResult.items[0];
    })
    // Two extra gets to simulate a delay
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      return mockStatefulSetResult.items[0];
    })
    // And this one
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      return mockStatefulSetResult.items[0];
    })
    .get(`/apis/apps/v1/namespaces/ups/statefulsets/ups-mariadb-galera`).reply(200, () => {
      delete mockStatefulSetResult.items[0].status.readyReplicas;
      return mockStatefulSetResult.items[0];
    })

That’s it! These tests saved the day when I switched the code from using annotations to CustomResources. The code changes didn’t take long, but the Kubernetes API calls changed, using some different endpoints and in a different order. Refactoring the tests was a bit of a slog but the first run on the cluster ran perfectly, first time!

Pre-pulling container images

One last problem to solve… Harbor runs locally in the cluster as a repository for my own images and as a proxy cache for images hosted elsewhere. This means that the ups-scaler container image is hosted on the cluster and if the cluster services are down, the image cannot be pulled from the repository. The container image will be on (at least) one worker node from the scale down. However, if the power goes off, Kubernetes might schedule the Pod on a different node when the cluster restarts. There are a few ways I could have solved this: for example, hosting another registry elsewhere on my network or pushing the image to a public repository. In the end, I found a way to pre-pull the container images onto each worker node using a DaemonSet.

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: prepuller
spec:
  selector:
    matchLabels:
      name: prepuller
  template:
    metadata:
      labels:
        name: prepuller
    spec:
      # Configure an init container for each image you want to pull
      initContainers:
        - name: prepuller-1
          # Set the image you want to pull
          image: ORG/IMAGE:TAG
          # Use a known command that will exit successfully immediately
          # Any no-op command will do but YMMV with scratch based containers
          command: ["sh", "-c", "'true'"]

        # - name: prepuller-2
        #   image: ...
        #   command: ["sh", "-c", "'true'"]

        # etc...

      # Use the pause container to ensure the Pod goes into a `Running` phase
      # but doesn't take up resource on the cluster
      containers:
        - name: pause
          image: gcr.io/google_containers/pause:3.2
          resources:
            limits:
              cpu: 1m
              memory: 8Mi
            requests:
              cpu: 1m
              memory: 8Mi

This DaemonSet creates a Pod on each node that pulls the ups-scaler container images and then changes the Pod status to running. The actual ups-scaler Deployment manifest can then set imagePullPolicy: Never because the container image will already be on the node. When a new version of ups-scaler is deployed to cluster, the Deployment Pod will fail to start until the DaemonSet Pod on the node has pulled the image. Until this is the case, Kubernetes automatically restarts the Deployment Pod and after a few seconds all is good.

Testing a power outage

I learnt a lot more testing for real by turning the power off on the live cluster. My UPS reports a run time of 23 minutes when it is online. In reality, it is more like six minutes. This has implications for the logic triggering the cluster scale down. I had planned on waiting for the SD or FSD events but after validating the production behaviour the scale down needs to start as soon as the power goes off and the OB flag is set.

As a result, I also wanted to optimise the scaling down activities. Adjusting the workload priorities so that more events happen in parallel, I was able to reduce the time to scale down from 3 min 30 sec to about a minute.

Thanks for reading!