aliyun-openapi-python-sdk icon indicating copy to clipboard operation
aliyun-openapi-python-sdk copied to clipboard

InvalidNetworkType when executing codes generated from "Preview -> View Open API"

Open qzweng opened this issue 2 years ago • 1 comments

  • Python Version: 3.7.2
  • aliyunsdkcore Version: 2.13.36
  • API: RunInstancesRequest()

The launching works in web console. However, when executing the codes generated by "View Open API" in "Preview" step, it gives error:

  • Fail. Business error. Code: OperationDenied.InvalidNetworkType, Message: The specified network type is not available in the specified region.

I guess it is the VPC / default network type / default vSwitch causes the problem. Could you give a hint on how to fill in the NetworkType fields?


The executed code is attached as follows,

#!/usr/bin/env python
# coding=utf-8
import json
import time
import traceback

from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest


RUNNING_STATUS = 'Running'
CHECK_INTERVAL = 3
CHECK_TIMEOUT = 180


class AliyunRunInstancesExample(object):

    def __init__(self):
        self.access_id = '<AccessKey>'
        self.access_secret = '<AccessSecret>'

        # Whether to send a pre-check only request. If this is set to true, a pre-check only request will be sent, no instance will be created, and no fees will be generated. If this is set to false, a normal request will be sent. For normal requests, after the pre-check is passed, an instance will be created and fees will be generated.
        self.dry_run = False
        # The region ID
        self.region_id = 'cn-hangzhou'
        # The instance type
        self.instance_type = 'ecs.t5-lc2m1.nano'
        # The billing method of the instance
        self.instance_charge_type = 'PostPaid'
        # The image ID
        self.image_id = 'ubuntu_22_04_x64_20G_alibase_20221228.vhd'
        # The security group to which the instances belongs
        self.security_group_id = 'sg-bp1gsay3u7********'
        # The period of the subscription
        self.period = 1
        # The unit of the subscription period
        self.period_unit = 'Hourly'
        # The zone ID
        self.zone_id = 'random'
        # The billing method of the network bandwidth
        self.internet_charge_type = 'PayByTraffic'
        # The instance name
        self.instance_name = 'worker[0,2]'
        # The password of the instance
        self.password = 'Sensitive information. The actual value has been masked.'
        # The number of instances you want to create
        self.amount = 1
        # The maximum outbound bandwidth to the Internet
        self.internet_max_bandwidth_out = 5
        # The hostname of the instance
        self.host_name = 'worker[0,2]'
        # Whether to add a sequential suffix to the instance name and hostname
        self.unique_suffix = True
        # Whether the instance is I/O-optimized
        self.io_optimized = 'optimized'
        # pay-as-you-go
        self.spot_strategy = 'SpotAsPriceGo'
        # Whether to enable security hardening
        self.security_enhancement_strategy = 'Active'
        # The size of the system disk
        self.system_disk_size = '20'
        # The type of the system disk
        self.system_disk_category = 'cloud_efficiency'
        
        self.client = AcsClient(self.access_id, self.access_secret, self.region_id)

    def run(self):
        try:
            ids = self.run_instances()
            self._check_instances_status(ids)
        except ClientException as e:
            print('Fail. Something with your connection with Aliyun go incorrect.'
                  ' Code: {code}, Message: {msg}'
                  .format(code=e.error_code, msg=e.message))
        except ServerException as e:
            print('Fail. Business error.'
                  ' Code: {code}, Message: {msg}'
                  .format(code=e.error_code, msg=e.message))
        except Exception:
            print('Unhandled error')
            print(traceback.format_exc())

    def run_instances(self):
        """
        Calling the instance creation API, and querying the instance status after the instance ID is retrieved.
        :return:instance_ids The ID of the instance that needs to be checked
        """
        request = RunInstancesRequest()
       
        request.set_DryRun(self.dry_run)
        
        request.set_InstanceType(self.instance_type)
        request.set_InstanceChargeType(self.instance_charge_type)
        request.set_ImageId(self.image_id)
        request.set_SecurityGroupId(self.security_group_id)
        request.set_Period(self.period)
        request.set_PeriodUnit(self.period_unit)
        request.set_ZoneId(self.zone_id)
        request.set_InternetChargeType(self.internet_charge_type)
        request.set_InstanceName(self.instance_name)
        request.set_Password(self.password)
        request.set_Amount(self.amount)
        request.set_InternetMaxBandwidthOut(self.internet_max_bandwidth_out)
        request.set_HostName(self.host_name)
        request.set_UniqueSuffix(self.unique_suffix)
        request.set_IoOptimized(self.io_optimized)
        request.set_SpotStrategy(self.spot_strategy)
        request.set_SecurityEnhancementStrategy(self.security_enhancement_strategy)
        request.set_SystemDiskSize(self.system_disk_size)
        request.set_SystemDiskCategory(self.system_disk_category)
         
        body = self.client.do_action_with_exception(request)
        data = json.loads(body)
        instance_ids = data['InstanceIdSets']['InstanceIdSet']
        print('Success. Instance creation succeed. InstanceIds: {}'.format(', '.join(instance_ids)))
        return instance_ids

    def _check_instances_status(self, instance_ids):
        """
        Checking the instance status every 3 seconds within 3 minutes
        :param instance_ids The ID of the instance that needs to be checked
        :return:
        """
        start = time.time()
        while True:
            request = DescribeInstancesRequest()
            request.set_InstanceIds(json.dumps(instance_ids))
            body = self.client.do_action_with_exception(request)
            data = json.loads(body)
            for instance in data['Instances']['Instance']:
                if RUNNING_STATUS in instance['Status']:
                    instance_ids.remove(instance['InstanceId'])
                    print('Instance boot successfully: {}'.format(instance['InstanceId']))

            if not instance_ids:
                print('Instances all boot successfully')
                break

            if time.time() - start > CHECK_TIMEOUT:
                print('Instances boot failed within {timeout}s: {ids}'
                      .format(timeout=CHECK_TIMEOUT, ids=', '.join(instance_ids)))
                break

            time.sleep(CHECK_INTERVAL)


if __name__ == '__main__':
    AliyunRunInstancesExample().run()

qzweng avatar Feb 27 '23 10:02 qzweng

Would you mind adding some unit tests for aliyun-python-sdk-ecs, especially on the usage of NetworkInterfaces?

qzweng avatar Mar 01 '23 04:03 qzweng