kusion icon indicating copy to clipboard operation
kusion copied to clipboard

Feat: add watch subcommand to observe specified resource lifecycle

Open howieyuen opened this issue 3 years ago • 17 comments

Background

The initial requirement of kusion watch comes from the fact that after the user successfully creates, modifies or deletes the resource using the kusion apply, which can monitor the real-time update of the resource, so as to ensure that Spec and Live are ultimately consistent on the Runtime side. Therefore, the operation object of the Watch subcommand is not a single resource, but a single Stack under a certain Project.

Kusion has been connected to Kubernetes and Terraform. For k8s resources, client-go provides Watch interface, resource changes that can be obtained in real time; and for the Terraform ecosystem, there is no so-called Live Manifest, and the Terraform side just saves The resource list after the operation is successfully executed, which is equivalent to a cache.

Target

As an alpha feature, the primary purpose is to define the watch workflow. So TF runtime is not include in this time. And there are a few preconditions:

  • only for k8s
  • only for namespace, service and deployment

Watch Workflow

The complete workflow can be simplified as 4 stages:

  1. Compile Project&Stack to get all resources in Stack
  2. Call the IaC Engine interface to start watching
  3. IaC Engine asynchronously triggers the Runtime Watch interface for the resource list
  4. Runtime implements Watch logic and customizes the prompt Message
sequenceDiagram
    autonumber
    actor User
    participant KusionCLI
    participant KCLVM
    participant IaC Engine
    participant Runtime
    User ->> KusionCLI: kusion apply --watch
    KusionCLI->>KCLVM: compile
    activate KCLVM
    KCLVM-->>KusionCLI: kcl resoults
    deactivate KCLVM
    KusionCLI ->> IaC Engine: watch resources
    activate IaC Engine
    loop Every Resource
    IaC Engine ->> Runtime: watch one
    activate Runtime
    Runtime -->> IaC Engine: event
    deactivate Runtime
    IaC Engine ->> IaC Engine: render table row
    end
    IaC Engine -->> KusionCLI: render table 
    deactivate IaC Engine
   KusionCLI --> User: flush table 

References

kubectl

For k8s, the --w flag in the kubectl get command provides the implementation of related capabilities. The Watch behavior of k8s for resources is unterminated, and the object of Watch can only be one resource or one certain type of resource. And formatted output for each resource, defined here pkg/printers/internalversion/printers.go. The key implementation of kubectl get -w is as follows:

// staging/src/k8s.io/kubectl/pkg/cmd/get/get.go:636
func (o *GetOptions) watch(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
    // ...
    ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	intr := interrupt.New(nil, cancel)
	intr.Run(func() error {
		_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
			objToPrint := e.Object
			if o.OutputWatchEvents {
				objToPrint = &metav1.WatchEvent{Type: string(e.Type), Object: runtime.RawExtension{Object: objToPrint}}
			}
			if err := printer.PrintObj(objToPrint, writer); err != nil {
				return false, err
			}
			writer.Flush()
			// after processing at least one event, start outputting objects
			*outputObjects = true
			return false, nil
		})
		return err
	})
    // ....
}

The objToPrint variable in the above code listing is the result of conversion from the API Server according to the specified format, not the YAML output according to the definition of the resource type. k8s defines parsing strategies for different resource types; the following is the table output format of the selected Pod:

// pkg/printers/internalversion/printers.go:87
func AddHandlers(h printers.PrintHandler) {
	podColumnDefinitions := []metav1.TableColumnDefinition{
		{Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]},
		{Name: "Ready", Type: "string", Description: "The aggregate readiness state of this pod for accepting traffic."},
		{Name: "Status", Type: "string", Description: "The aggregate status of the containers in this pod."},
		{Name: "Restarts", Type: "string", Description: "The number of times the containers in this pod have been restarted and when the last container in this pod has restarted."},
		{Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]},
		{Name: "IP", Type: "string", Priority: 1, Description: apiv1.PodStatus{}.SwaggerDoc()["podIP"]},
		{Name: "Node", Type: "string", Priority: 1, Description: apiv1.PodSpec{}.SwaggerDoc()["nodeName"]},
		{Name: "Nominated Node", Type: "string", Priority: 1, Description: apiv1.PodStatus{}.SwaggerDoc()["nominatedNodeName"]},
		{Name: "Readiness Gates", Type: "string", Priority: 1, Description: apiv1.PodSpec{}.SwaggerDoc()["readinessGates"]},
	}
	h.TableHandler(podColumnDefinitions, printPodList)
	h.TableHandler(podColumnDefinitions, printPod)
    // ...
}

k8s officially provides 46 resources parsing strategies.

kubespy

kubespy is a tool developed by pulumi two years ago to monitor Kubernetes resources in real time. The trace command currently only supports the monitoring of two resources, Deployment and Service. The whole business process is roughly divided into the following 3 steps:

  1. Lock resource, analysis its dependents. For example, monitoring Deployments also needs to monitor the ReplicaSets and Pods it owns
  2. Start the Channel of Watch, receive events infinitely, and filter event objects according to the event type
  3. Print event key information
func traceDeployment(namespace, name string) {
    //...
    for {
		select {
		case e := <-deploymentEvents:
			if e.Type == k8sWatch.Deleted {
				o := e.Object.(*unstructured.Unstructured)
				delete(o.Object, "spec")
				delete(o.Object, "status")
			}
			table[deployment] = []k8sWatch.Event{e}
		case e := <-replicaSetEvents:
			o := e.Object.(*unstructured.Unstructured)
			if e.Type == k8sWatch.Deleted {
				delete(repSets, o.GetName())
			} else {
				repSets[o.GetName()] = e
			}
			table[v1ReplicaSet] = []k8sWatch.Event{}
			for _, rsEvent := range repSets {
				table[v1ReplicaSet] = append(table[v1ReplicaSet], rsEvent)
			}
		case e := <-podEvents:
			o := e.Object.(*unstructured.Unstructured)
			if e.Type == k8sWatch.Deleted {
				delete(pods, o.GetName())
			} else {
				pods[o.GetName()] = e
			}

			table[v1Pod] = []k8sWatch.Event{}
			for _, podEvent := range pods {
				table[v1Pod] = append(table[v1Pod], podEvent)
			}
		}
		print.DeploymentWatchTable(writer, table)
	}
}

DeploymentWatchTable() uses the uilive framework, which can dynamically refresh the table instead of incrementally; the formatted output of the subsequent kusion watch can be referred it.

howieyuen avatar May 12 '22 07:05 howieyuen

v1: https://github.com/KusionStack/kusion/pull/144

  • [x] ComponentStatus(deprecated in v.1.19+)
  • [x] ConfigMap
  • [x] Endpoints
  • [x] Event
  • [x] Namespace
  • [x] Node
  • [x] PersistentVolumeClaim
  • [x] PersistentVolume
  • [x] Pod
  • [x] PodTemplate
  • [x] ReplicationController
  • [x] ResourceQuota
  • [x] Secret
  • [x] ServiceAccount
  • [x] Service

howieyuen avatar Oct 10 '22 06:10 howieyuen

admissionregistration.k8s.io/v1

  • [ ] MutatingWebhookConfiguration
  • [ ] ValidatingWebhookConfiguration

howieyuen avatar Oct 10 '22 07:10 howieyuen

apiextensions.k8s.io/v1

  • [ ] CustomResourceDefinition

howieyuen avatar Oct 10 '22 07:10 howieyuen

apps/v1

  • [x] ControllerRevision
  • [x] DaemonSet
  • [x] Deployment
  • [x] ReplicaSet
  • [x] StatefulSet

howieyuen avatar Oct 10 '22 07:10 howieyuen

autoscaling/v2

  • [ ] HorizontalPodAutoscaler

howieyuen avatar Oct 10 '22 07:10 howieyuen

batch/v1

  • [x] CronJob
  • [x] Job

howieyuen avatar Oct 10 '22 07:10 howieyuen

certificates.k8s.io/v1

  • [ ] CertificateSigningRequest

howieyuen avatar Oct 10 '22 07:10 howieyuen

coordination.k8s.io/v1

  • [ ] Lease

howieyuen avatar Oct 10 '22 07:10 howieyuen

discovery.k8s.io/v1

  • [x] EndpointSlice

howieyuen avatar Oct 10 '22 07:10 howieyuen

events.k8s.io/v1

  • [ ] Event

howieyuen avatar Oct 10 '22 07:10 howieyuen

flowcontrol.apiserver.k8s.io/v1beta2

  • [ ] FlowSchema
  • [ ] PriorityLevelConfiguration

howieyuen avatar Oct 10 '22 07:10 howieyuen

networking.k8s.io/v1

  • [ ] IngressClass
  • [ ] Ingress
  • [ ] NetworkPolicy

howieyuen avatar Oct 10 '22 07:10 howieyuen

node.k8s.io/v1

  • [ ] RuntimeClass

howieyuen avatar Oct 10 '22 07:10 howieyuen

policy/v1

  • [ ] PodDisruptionBudget

policy/v1beta1

  • [ ] PodSecurityPolicy

howieyuen avatar Oct 10 '22 07:10 howieyuen

rbac.authorization.k8s.io/v1

  • [ ] ClusterRoleBinding
  • [ ] ClusterRole
  • [ ] RoleBinding
  • [ ] Role

howieyuen avatar Oct 10 '22 07:10 howieyuen

scheduling.k8s.io/v1

  • [ ] PriorityClass

howieyuen avatar Oct 10 '22 07:10 howieyuen

storage.k8s.io/v1

  • [ ] CSIDriver
  • [ ] CSINode
  • [ ] CSIStorageCapacity
  • [ ] StorageClass
  • [ ] VolumeAttachment

howieyuen avatar Oct 10 '22 07:10 howieyuen