Feat: add watch subcommand to observe specified resource lifecycle
Background
The initial requirement of kusion watch comes from the fact that after the user successfully creates, modifies or deletes the resource using the kusion apply, which can monitor the real-time update of the resource, so as to ensure that Spec and Live are ultimately consistent on the Runtime side. Therefore, the operation object of the Watch subcommand is not a single resource, but a single Stack under a certain Project.
Kusion has been connected to Kubernetes and Terraform. For k8s resources, client-go provides Watch interface, resource changes that can be obtained in real time; and for the Terraform ecosystem, there is no so-called Live Manifest, and the Terraform side just saves The resource list after the operation is successfully executed, which is equivalent to a cache.
Target
As an alpha feature, the primary purpose is to define the watch workflow. So TF runtime is not include in this time. And there are a few preconditions:
- only for k8s
- only for namespace, service and deployment
Watch Workflow
The complete workflow can be simplified as 4 stages:
- Compile Project&Stack to get all resources in Stack
- Call the IaC Engine interface to start watching
- IaC Engine asynchronously triggers the Runtime Watch interface for the resource list
- Runtime implements Watch logic and customizes the prompt Message
sequenceDiagram
autonumber
actor User
participant KusionCLI
participant KCLVM
participant IaC Engine
participant Runtime
User ->> KusionCLI: kusion apply --watch
KusionCLI->>KCLVM: compile
activate KCLVM
KCLVM-->>KusionCLI: kcl resoults
deactivate KCLVM
KusionCLI ->> IaC Engine: watch resources
activate IaC Engine
loop Every Resource
IaC Engine ->> Runtime: watch one
activate Runtime
Runtime -->> IaC Engine: event
deactivate Runtime
IaC Engine ->> IaC Engine: render table row
end
IaC Engine -->> KusionCLI: render table
deactivate IaC Engine
KusionCLI --> User: flush table
References
kubectl
For k8s, the --w flag in the kubectl get command provides the implementation of related capabilities. The Watch behavior of k8s for resources is unterminated, and the object of Watch can only be one resource or one certain type of resource. And formatted output for each resource, defined here pkg/printers/internalversion/printers.go. The key implementation of kubectl get -w is as follows:
// staging/src/k8s.io/kubectl/pkg/cmd/get/get.go:636
func (o *GetOptions) watch(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
// ...
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
intr := interrupt.New(nil, cancel)
intr.Run(func() error {
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
objToPrint := e.Object
if o.OutputWatchEvents {
objToPrint = &metav1.WatchEvent{Type: string(e.Type), Object: runtime.RawExtension{Object: objToPrint}}
}
if err := printer.PrintObj(objToPrint, writer); err != nil {
return false, err
}
writer.Flush()
// after processing at least one event, start outputting objects
*outputObjects = true
return false, nil
})
return err
})
// ....
}
The objToPrint variable in the above code listing is the result of conversion from the API Server according to the specified format, not the YAML output according to the definition of the resource type. k8s defines parsing strategies for different resource types; the following is the table output format of the selected Pod:
// pkg/printers/internalversion/printers.go:87
func AddHandlers(h printers.PrintHandler) {
podColumnDefinitions := []metav1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]},
{Name: "Ready", Type: "string", Description: "The aggregate readiness state of this pod for accepting traffic."},
{Name: "Status", Type: "string", Description: "The aggregate status of the containers in this pod."},
{Name: "Restarts", Type: "string", Description: "The number of times the containers in this pod have been restarted and when the last container in this pod has restarted."},
{Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]},
{Name: "IP", Type: "string", Priority: 1, Description: apiv1.PodStatus{}.SwaggerDoc()["podIP"]},
{Name: "Node", Type: "string", Priority: 1, Description: apiv1.PodSpec{}.SwaggerDoc()["nodeName"]},
{Name: "Nominated Node", Type: "string", Priority: 1, Description: apiv1.PodStatus{}.SwaggerDoc()["nominatedNodeName"]},
{Name: "Readiness Gates", Type: "string", Priority: 1, Description: apiv1.PodSpec{}.SwaggerDoc()["readinessGates"]},
}
h.TableHandler(podColumnDefinitions, printPodList)
h.TableHandler(podColumnDefinitions, printPod)
// ...
}
k8s officially provides 46 resources parsing strategies.
kubespy
kubespy is a tool developed by pulumi two years ago to monitor Kubernetes resources in real time. The trace command currently only supports the monitoring of two resources, Deployment and Service. The whole business process is roughly divided into the following 3 steps:
- Lock resource, analysis its dependents. For example, monitoring Deployments also needs to monitor the ReplicaSets and Pods it owns
- Start the Channel of Watch, receive events infinitely, and filter event objects according to the event type
- Print event key information
func traceDeployment(namespace, name string) {
//...
for {
select {
case e := <-deploymentEvents:
if e.Type == k8sWatch.Deleted {
o := e.Object.(*unstructured.Unstructured)
delete(o.Object, "spec")
delete(o.Object, "status")
}
table[deployment] = []k8sWatch.Event{e}
case e := <-replicaSetEvents:
o := e.Object.(*unstructured.Unstructured)
if e.Type == k8sWatch.Deleted {
delete(repSets, o.GetName())
} else {
repSets[o.GetName()] = e
}
table[v1ReplicaSet] = []k8sWatch.Event{}
for _, rsEvent := range repSets {
table[v1ReplicaSet] = append(table[v1ReplicaSet], rsEvent)
}
case e := <-podEvents:
o := e.Object.(*unstructured.Unstructured)
if e.Type == k8sWatch.Deleted {
delete(pods, o.GetName())
} else {
pods[o.GetName()] = e
}
table[v1Pod] = []k8sWatch.Event{}
for _, podEvent := range pods {
table[v1Pod] = append(table[v1Pod], podEvent)
}
}
print.DeploymentWatchTable(writer, table)
}
}
DeploymentWatchTable() uses the uilive framework, which can dynamically refresh the table instead of incrementally; the formatted output of the subsequent kusion watch can be referred it.
v1: https://github.com/KusionStack/kusion/pull/144
- [x] ComponentStatus(deprecated in v.1.19+)
- [x] ConfigMap
- [x] Endpoints
- [x] Event
- [x] Namespace
- [x] Node
- [x] PersistentVolumeClaim
- [x] PersistentVolume
- [x] Pod
- [x] PodTemplate
- [x] ReplicationController
- [x] ResourceQuota
- [x] Secret
- [x] ServiceAccount
- [x] Service
admissionregistration.k8s.io/v1
- [ ] MutatingWebhookConfiguration
- [ ] ValidatingWebhookConfiguration
apiextensions.k8s.io/v1
- [ ] CustomResourceDefinition
apps/v1
- [x] ControllerRevision
- [x] DaemonSet
- [x] Deployment
- [x] ReplicaSet
- [x] StatefulSet
autoscaling/v2
- [ ] HorizontalPodAutoscaler
batch/v1
- [x] CronJob
- [x] Job
certificates.k8s.io/v1
- [ ] CertificateSigningRequest
coordination.k8s.io/v1
- [ ] Lease
discovery.k8s.io/v1
- [x] EndpointSlice
events.k8s.io/v1
- [ ] Event
flowcontrol.apiserver.k8s.io/v1beta2
- [ ] FlowSchema
- [ ] PriorityLevelConfiguration
networking.k8s.io/v1
- [ ] IngressClass
- [ ] Ingress
- [ ] NetworkPolicy
node.k8s.io/v1
- [ ] RuntimeClass
policy/v1
- [ ] PodDisruptionBudget
policy/v1beta1
- [ ] PodSecurityPolicy
rbac.authorization.k8s.io/v1
- [ ] ClusterRoleBinding
- [ ] ClusterRole
- [ ] RoleBinding
- [ ] Role
scheduling.k8s.io/v1
- [ ] PriorityClass
storage.k8s.io/v1
- [ ] CSIDriver
- [ ] CSINode
- [ ] CSIStorageCapacity
- [ ] StorageClass
- [ ] VolumeAttachment