cloudbeat
cloudbeat copied to clipboard
add rate limit to asset inventory
Summary of your changes
- adds rate limiting to the asset inventory client. our current usage is only for
ListAssets - retries requests made by
ListAssetsclient whenever they failed due to rate limiting - adds a cache to the policies fetcher (using a new cache utility, as we already have 3 caches in this file)
for ListAssets, we only use the per-project quota, which is 100 per minute per consumer project. we do this because:
- the consumer project (the one consuming the quota) is set by the user when they run
gcloud config set project <project_id>before deploying the agent. (verified withgcloud config get billing/quota_project). - in both our use cases:
single-accountandorganization-account, we always use a single quota project. we never re-define it for the user - in anyway, the org quotas are
800 per minute per organd650,000 per day per org. so the per-project quota is more restrictive than both of these, meaning we shouldn't exceed those either. - we don't account for multiple cloudbeat instances running together without being synced on the quotas each of them consume (this is assumed to be an unlikely edge case we accept for now)
Screenshot/Data
test script
package main
import (
"context"
"fmt"
"log"
"time"
asset "cloud.google.com/go/asset/apiv1"
"cloud.google.com/go/asset/apiv1/assetpb"
"github.com/googleapis/gax-go"
"golang.org/x/time/rate"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
type Iterator interface {
Next() (*assetpb.Asset, error)
}
var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{
Initial: 1 * time.Second,
Max: 10 * time.Second,
Multiplier: 1.2,
})
})
type AssetsInventoryRateLimiter struct {
methods map[string]*rate.Limiter
}
const projectQuota = 100
func NewAssetsInventoryRateLimiter() *AssetsInventoryRateLimiter {
return &AssetsInventoryRateLimiter{
methods: map[string]*rate.Limiter{
"/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/projectQuota), 1),
},
}
}
func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption {
return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
limiter := rl.methods[method]
if limiter != nil {
limiter.Wait(ctx)
}
return invoker(ctx, method, req, reply, cc, opts...)
})
}
func main() {
ctx := context.Background()
limiter := NewAssetsInventoryRateLimiter()
clientA, err := asset.NewClient(ctx, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
var totalGot int
var totalLost int
start := time.Now()
// simulate 10x the project per-minute quota by requesting some assets multiple times
var assets []*assetpb.Asset
for i := 0; i < projectQuota*10; i++ {
log.Printf("Iteration: %d \n", i)
resp := getAllAssets(clientA.ListAssets(ctx, &assetpb.ListAssetsRequest{
Parent: fmt.Sprintf("organizations/%s", "693506308612"),
AssetTypes: []string{"logging.googleapis.com/LogBucket"},
ContentType: assetpb.ContentType_RESOURCE,
}, RetryOnResourceExhausted))
if resp == nil {
totalLost++
} else {
totalGot++
assets = append(assets, resp...)
}
}
end := time.Now()
log.Println("-----------------------------------------")
log.Printf("time: %v \n", end.Sub(start))
log.Printf("assets: %d \n", len(assets))
log.Printf("requests lost: %d \n", totalLost)
log.Printf("requests got: %d \n", totalGot)
}
func getAllAssets(it Iterator) []*assetpb.Asset {
results := make([]*assetpb.Asset, 0)
for {
response, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil
}
results = append(results, response)
}
return results
}
- this is roughly the same code from the assets inventory provider that calls
ListAssets - you can change the
Parentparam to a different organization. make sure to re-rungcloud auth loginandgcloud auth application-default login - the script limits requests to 100 per minute and retries failed ones. (you can adjust the quota here)
- it simulates consumption of 10x the quota
- eventually (+10mins) returns
requests got: 1000, which means it doesn't lose any requests. - before this PR, the same usage would error after 100 requests and then just log the error and continue. you can remove the
rl.Wait()andRetryOnResourceExhaustedto see verify
Related Issues
- part of https://github.com/elastic/cloudbeat/issues/2054
- closes https://github.com/elastic/cloudbeat/issues/2073
This pull request does not have a backport label. Could you fix it @orouz? 🙏 To fixup this pull request, you need to add the backport labels for the needed branches, such as:
-
backport-v./d./d./dis the label to automatically backport to the8./dbranch./dis the digit NOTE:backport-skiphas been added to this pull request.
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b gcp_assets_inventory_rl upstream/gcp_assets_inventory_rl
git merge upstream/main
git push upstream gcp_assets_inventory_rl
:bar_chart: Allure Report - :green_heart: No failures were reported.
| Result | Count |
|---|---|
| 🟥 Failed | 0 |
| 🟩 Passed | 359 |
| ⬜ Skipped | 33 |