cloudbeat icon indicating copy to clipboard operation
cloudbeat copied to clipboard

add rate limit to asset inventory

Open orouz opened this issue 1 year ago • 3 comments

Summary of your changes

  1. adds rate limiting to the asset inventory client. our current usage is only for ListAssets
  2. retries requests made by ListAssets client whenever they failed due to rate limiting
  3. adds a cache to the policies fetcher (using a new cache utility, as we already have 3 caches in this file)

for ListAssets, we only use the per-project quota, which is 100 per minute per consumer project. we do this because:

  1. the consumer project (the one consuming the quota) is set by the user when they run gcloud config set project <project_id> before deploying the agent. (verified with gcloud config get billing/quota_project).
  2. in both our use cases: single-account and organization-account, we always use a single quota project. we never re-define it for the user
  3. in anyway, the org quotas are 800 per minute per org and 650,000 per day per org. so the per-project quota is more restrictive than both of these, meaning we shouldn't exceed those either.
  4. we don't account for multiple cloudbeat instances running together without being synced on the quotas each of them consume (this is assumed to be an unlikely edge case we accept for now)

Screenshot/Data

test script
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	asset "cloud.google.com/go/asset/apiv1"
	"cloud.google.com/go/asset/apiv1/assetpb"
	"github.com/googleapis/gax-go"
	"golang.org/x/time/rate"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
)

type Iterator interface {
	Next() (*assetpb.Asset, error)
}

var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer {
	return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{
		Initial:    1 * time.Second,
		Max:        10 * time.Second,
		Multiplier: 1.2,
	})
})

type AssetsInventoryRateLimiter struct {
	methods map[string]*rate.Limiter
}

const projectQuota = 100

func NewAssetsInventoryRateLimiter() *AssetsInventoryRateLimiter {
	return &AssetsInventoryRateLimiter{
		methods: map[string]*rate.Limiter{
			"/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/projectQuota), 1),
		},
	}
}

func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption {
	return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
		limiter := rl.methods[method]
		if limiter != nil {
			limiter.Wait(ctx)
		}
		return invoker(ctx, method, req, reply, cc, opts...)
	})
}

func main() {
	ctx := context.Background()
	limiter := NewAssetsInventoryRateLimiter()
	clientA, err := asset.NewClient(ctx, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))

	if err != nil {
		log.Fatalf("failed to create client: %v", err)
	}

	var totalGot int
	var totalLost int
	start := time.Now()
	// simulate 10x the project per-minute quota by requesting some assets multiple times
	var assets []*assetpb.Asset
	for i := 0; i < projectQuota*10; i++ {
		log.Printf("Iteration: %d \n", i)
		resp := getAllAssets(clientA.ListAssets(ctx, &assetpb.ListAssetsRequest{
			Parent:      fmt.Sprintf("organizations/%s", "693506308612"),
			AssetTypes:  []string{"logging.googleapis.com/LogBucket"},
			ContentType: assetpb.ContentType_RESOURCE,
		}, RetryOnResourceExhausted))
		if resp == nil {
			totalLost++
		} else {
			totalGot++
			assets = append(assets, resp...)
		}
	}
	end := time.Now()
	log.Println("-----------------------------------------")
	log.Printf("time: %v \n", end.Sub(start))
	log.Printf("assets: %d \n", len(assets))
	log.Printf("requests lost: %d \n", totalLost)
	log.Printf("requests got: %d \n", totalGot)
}

func getAllAssets(it Iterator) []*assetpb.Asset {
	results := make([]*assetpb.Asset, 0)
	for {
		response, err := it.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return nil
		}
		results = append(results, response)
	}
	return results
}

  1. this is roughly the same code from the assets inventory provider that calls ListAssets
  2. you can change the Parent param to a different organization. make sure to re-run gcloud auth login and gcloud auth application-default login
  3. the script limits requests to 100 per minute and retries failed ones. (you can adjust the quota here)
  4. it simulates consumption of 10x the quota
  5. eventually (+10mins) returns requests got: 1000, which means it doesn't lose any requests.
  6. before this PR, the same usage would error after 100 requests and then just log the error and continue. you can remove the rl.Wait() and RetryOnResourceExhausted to see verify

Related Issues

  • part of https://github.com/elastic/cloudbeat/issues/2054
  • closes https://github.com/elastic/cloudbeat/issues/2073

orouz avatar Mar 24 '24 13:03 orouz

This pull request does not have a backport label. Could you fix it @orouz? 🙏 To fixup this pull request, you need to add the backport labels for the needed branches, such as:

  • backport-v./d./d./d is the label to automatically backport to the 8./d branch. /d is the digit NOTE: backport-skip has been added to this pull request.

mergify[bot] avatar Mar 24 '24 13:03 mergify[bot]

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b gcp_assets_inventory_rl upstream/gcp_assets_inventory_rl
git merge upstream/main
git push upstream gcp_assets_inventory_rl

mergify[bot] avatar Mar 24 '24 14:03 mergify[bot]

:bar_chart: Allure Report - :green_heart: No failures were reported.

Result Count
🟥 Failed 0
🟩 Passed 359
⬜ Skipped 33

github-actions[bot] avatar Mar 31 '24 08:03 github-actions[bot]