Source code for api.api

# -*- coding: utf-8 -*-
#
# (C) Copyright 2017 SixSq (http://sixsq.com/).
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
 Python wrapper of the SlipStream API (http://ssapi.sixsq.com).


 Install
 -------

 Latest release from pypi
 ~~~~~~~~~~~~~~~~~~~~~~~~
 .. code-block:: Bash
 
   $ pip install slipstream-api
 
 Most recent version from source code
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 .. code-block:: Bash
 
   $ pip install 'https://github.com/slipstream/SlipStreamPythonAPI/archive/master.zip'

 
 Usage
 -----
 To use this library, import it and instanciate it.
 Then use one of the method to login.::
    from slipstream.api import Api
 
    api = Api()

    api.login_internal('username', 'password')

 
 Examples
 --------

 Login on Nuvla with username and password
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 ::

    api.login_internal('username', 'password')


 Login on Nuvla with key and secret
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 ::

    api.login_apikey('credential/ce02ef40-1342-4e68-838d-e1b2a75adb1e', 
                     'the-secret-key')


 List applications from the App Store
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 ::

    from pprint import pprint as pp
    pp(list(api.list_applications()))


 List base images
 ~~~~~~~~~~~~~~~~
 ::

    from pprint import pprint as pp
    pp(list(api.list_project_content('examples/images')))


 Simple component deployment (WordPress server)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 ::
 
    # Deploy the WordPress component
    deployment_id = api.deploy('apps/WordPress/wordpress')
 
 
 Complete application deployment (WordPress server)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 ::

    # Cloud
    cloud = 'exoscale-ch-gva'

    # WordPress Title
    wordpress_title = 'WordPress deployed by SlipStream through SlipStreamPythonAPI'
    
    # Create the dict of parameters to (re)define
    parameters = dict(wordpress_title=wordpress_title)
    
    # Deploy the WordPress component
    deployment_id = api.deploy('apps/WordPress/wordpress', cloud, parameters)
    
    # Wait the deployment to be ready
    while api.get_deployment(deployment_id).status != 'ready': time.sleep(10)
    
    # Print the WordPress URL
    print api.get_deployment(deployment_id).service_url

    # Print the WordPress admin username and password
    print api.get_deployment_parameter(deployment_id, 'machine:admin_email')
    print api.get_deployment_parameter(deployment_id, 'machine:admin_password')


 Terminate a deployment
 ~~~~~~~~~~~~~~~~~~~~~~
 ::

    api.terminate(deployment_id)

    
 API documentation
 -----------------

"""

from __future__ import absolute_import

import os
import six
import stat
import uuid
import logging

import requests
from requests.cookies import MockRequest
from requests.exceptions import HTTPError

from six import string_types, integer_types
from six.moves.urllib.parse import urlparse
from six.moves.http_cookiejar import MozillaCookieJar

from . import models

try:
    from xml.etree import cElementTree as etree
except ImportError:
    from xml.etree import ElementTree as etree

logger = logging.getLogger(__name__)

DEFAULT_ENDPOINT = 'https://nuv.la'
DEFAULT_COOKIE_FILE = os.path.expanduser('~/.slipstream/cookies.txt')


def _mod_url(path):
    parts = path.strip('/').split('/')
    if parts[0] == 'module':
        del parts[0]
    return '/module/' + '/'.join(parts)


def _mod(path, with_version=True):
    parts = path.split('/')
    if with_version:
        return '/'.join(parts[1:])
    else:
        return '/'.join(parts[1:-1])


def get_module_type(category):
    mapping = {'image': 'component',
               'deployment': 'application'}
    return mapping.get(category.lower(), category.lower())


def element_tree__iter(root):
    return getattr(root, 'iter',  # Python 2.7 and above
                   root.getiterator)  # Python 2.6 compatibility


class SlipStreamError(Exception):
    def __init__(self, reason, response=None):
        super(SlipStreamError, self).__init__(reason)
        self.reason = reason
        self.response = response


[docs]class SessionStore(requests.Session): """A ``requests.Session`` subclass implementing a file-based session store.""" def __init__(self, endpoint, reauthenticate, cookie_file=None): super(SessionStore, self).__init__() self.session_base_url = '{}/api/session'.format(endpoint) self.reauthenticate = reauthenticate self.login_params = None if cookie_file is None: cookie_file = DEFAULT_COOKIE_FILE cookie_dir = os.path.dirname(cookie_file) self.cookies = MozillaCookieJar(cookie_file) # Create the $HOME/.slipstream dir if it doesn't exist if not os.path.isdir(cookie_dir): os.mkdir(cookie_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) # Load existing cookies if the cookies.txt exists if os.path.isfile(cookie_file): self.cookies.load(ignore_discard=True) self.cookies.clear_expired_cookies() def need_to_login(self, accessed_url, status_code): return self.reauthenticate and status_code in [401, 403] and accessed_url != self.session_base_url def request(self, *args, **kwargs): super_request = super(SessionStore, self).request response = super_request(*args, **kwargs) if not self.verify and response.cookies: self._unsecure_cookie(args[1], response) if 'Set-Cookie' in response.headers: self.cookies.save(ignore_discard=True) url = args[1] if self.need_to_login(url, response.status_code): login_response = self.cimi_login(self.login_params) if login_response is not None and login_response.status_code == 201: # retry the call after reauthentication response = super_request(*args, **kwargs) return response def cimi_login(self, login_params): self.login_params = login_params if self.login_params: return self.request('POST', self.session_base_url, headers={'Content-Type': 'application/json', 'Accept': 'application/json'}, json={'sessionTemplate': login_params}) else: return None def _unsecure_cookie(self, url_str, response): url = urlparse(url_str) if url.scheme == 'http': for cookie in response.cookies: cookie.secure = False self.cookies.set_cookie_if_ok(cookie, MockRequest(response.request))
[docs] def clear(self, domain): """Clear cookies for the specified domain.""" try: self.cookies.clear(domain) self.cookies.save() except KeyError: pass
[docs]class Api(object): """ This class is a Python wrapper&helper of the native SlipStream REST API""" GLOBAL_PARAMETERS = ['bypass-ssh-check', 'refqname', 'keep-running', 'tags', 'mutable', 'type'] KEEP_RUNNING_VALUES = ['always', 'never', 'on-success', 'on-error'] CIMI_PARAMETERS_NAME = ['first', 'last', 'filter', 'select', 'expand', 'orderby', 'aggregation'] def __init__(self, endpoint=DEFAULT_ENDPOINT, cookie_file=None, insecure=False, reauthenticate=False): self.endpoint = endpoint self.session = SessionStore(endpoint, reauthenticate, cookie_file=cookie_file) self.session.verify = (insecure == False) self.session.headers.update({'Accept': 'application/xml'}) if insecure: try: requests.packages.urllib3.disable_warnings( requests.packages.urllib3.exceptions.InsecureRequestWarning) except: import urllib3 urllib3.disable_warnings( urllib3.exceptions.InsecureRequestWarning) self._username = None self._cimi_cloud_entry_point = None
[docs] def login(self, login_params): """Uses given 'login_params' to log into the SlipStream server. The 'login_params' must be a map containing an "href" element giving the id of the sessionTemplate resource and any other attributes required for the login method. E.g.: {"href" : "session-template/internal", "username" : "username", "password" : "password"} or {"href" : "session-template/api-key", "key" : "key", "secret" : "secret"} Returns server response as dict. Successful responses will contain a `status` code of 201 and the `resource-id` of the created session. :param login_params: {"href": "session-template/...", <creds>} :type login_params: dict :return: Server response. :rtype: dict """ return self.session.cimi_login(login_params)
[docs] def login_internal(self, username, password): """Login to the server using username and password. :param username: :param password: :return: see login() """ self._username = username return self.login({'href': 'session-template/internal', 'username': username, 'password': password})
[docs] def login_apikey(self, key, secret): """Login to the server using API key/secret pair. :param key: The Key ID (resource id). (example: credential/ce02ef40-1342-4e68-838d-e1b2a75adb1e) :param secret: The Secret Key corresponding to the Key ID :return: see login() """ return self.login({'href': 'session-template/api-key', 'key': key, 'secret': secret})
[docs] def logout(self): """Logs user out by deleting session. """ session_id = self.current_session() if session_id is not None: self._cimi_delete(session_id) self.session.login_params = None self._username = None
[docs] def current_session(self): """Returns current user session or None. :return: Current user session. :rtype: str """ resource_type = 'sessions' session = self.cimi_search(resource_type) if session and session.count > 0: return session.sessions[0].get('id') else: return None
def is_authenticated(self): return self.current_session() is not None @property def username(self): if not self._username: session_id = self.current_session() if session_id: self._username = self.cimi_get(session_id).json.get('username') return self._username def _text_get(self, url, **params): response = self.session.get('%s%s' % (self.endpoint, url), headers={'Accept': 'text/plain'}, params=params) response.raise_for_status() return response.text.encode('utf-8') def _xml_get(self, url, **params): response = self.session.get('%s%s' % (self.endpoint, url), headers={'Accept': 'application/xml'}, params=params) response.raise_for_status() parser = etree.XMLParser(encoding='utf-8') parser.feed(response.text.encode('utf-8')) return parser.close() def _xml_put(self, url, data): return self.session.put('%s%s' % (self.endpoint, url), headers={'Accept': 'application/xml', 'Content-Type': 'application/xml'}, data=data) def _get_user_xml(self, username): if not username: username = self.username try: return self._xml_get('/user/%s' % username) except requests.HTTPError as e: if e.response.status_code == 403: logger.debug("Access denied for user: {0}.") raise def _list_users_xml(self): try: return self._xml_get('/users') except requests.HTTPError as e: if e.response.status_code == 403: logger.debug("Access denied for users.") raise @staticmethod def _add_to_dict_if_not_none(d, key, value): if key is not None and value is not None: d[key] = value @staticmethod def _dict_values_to_string(d): return {k: v if isinstance(v, six.string_types) else str(v) for k, v in six.iteritems(d)} @staticmethod def _flatten_cloud_parameters(cloud_parameters): parameters = {} if cloud_parameters is not None: for cloud, params in six.iteritems(cloud_parameters): for name, value in six.iteritems(params): parameters['{}.{}'.format(cloud, name)] = value return parameters @staticmethod def _create_xml_parameter_entry(name, value): category = name.split('.', 1)[0] entry_xml = etree.Element('entry') etree.SubElement(entry_xml, 'string').text = name param_xml = etree.SubElement(entry_xml, 'parameter', name=name, category=category) etree.SubElement(param_xml, 'value').text = value return entry_xml @staticmethod def _check_xml_result(response): if not (200 <= response.status_code < 300): try: reason = etree.fromstring(response.text).get('detail') except: pass else: raise SlipStreamError(reason) response.raise_for_status() def _cimi_get_cloud_entry_point(self): cep_json = self._cimi_get('cloud-entry-point') return models.CloudEntryPoint(cep_json) @property def cimi_cloud_entry_point(self): if self._cimi_cloud_entry_point is None: self._cimi_cloud_entry_point = self._cimi_get_cloud_entry_point() return self._cimi_cloud_entry_point @classmethod def _split_cimi_params(cls, params): cimi_params = {} other_params = {} for key, value in params.items(): if key in cls.CIMI_PARAMETERS_NAME: cimi_params['$' + key] = value else: other_params[key] = value return cimi_params, other_params @staticmethod def _cimi_find_operation_href(cimi_resource, operation): operation_href = cimi_resource.operations_by_name.get(operation, {}).get('href') if not operation_href: raise KeyError("Operation '{}' not found.".format(operation)) return operation_href def _cimi_get_uri(self, resource_id=None, resource_type=None): if resource_id is None and resource_type is None: raise TypeError("You have to specify 'resource_uri' or 'resource_type'.") if resource_id is not None and resource_type is not None: raise TypeError("You can only specify 'resource_uri' or 'resource_type', not both.") if resource_type is not None: resource_id = self.cimi_cloud_entry_point.entry_points.get(resource_type) if resource_id is None: raise KeyError("Resource type '{}' not found.".format(resource_type)) return resource_id def _cimi_request(self, method, uri, params=None, json=None, data=None): response = self.session.request(method, '{}/{}/{}'.format(self.endpoint, 'api', uri), headers={'Accept': 'application/json'}, params=params, json=json, data=data) try: response.raise_for_status() except HTTPError as e: try: json_msg = e.response.json() message = json_msg.get('message') if message is None: error = json_msg.get('error') message = error.get('code') + ' - ' + error.get('reason') except: try: message = e.response.text except: message = str(e) raise SlipStreamError(message, response) return response.json() def _cimi_get(self, resource_id=None, resource_type=None, params=None): uri = self._cimi_get_uri(resource_id, resource_type) return self._cimi_request('GET', uri, params=params) def _cimi_post(self, resource_id=None, resource_type=None, params=None, json=None, data=None): uri = self._cimi_get_uri(resource_id, resource_type) return self._cimi_request('POST', uri, params=params, json=json, data=data) def _cimi_put(self, resource_id=None, resource_type=None, params=None, json=None, data=None): uri = self._cimi_get_uri(resource_id, resource_type) return self._cimi_request('PUT', uri, params=params, json=json, data=data) def _cimi_delete(self, resource_id=None): return self._cimi_request('DELETE', resource_id)
[docs] def cimi_get(self, resource_id): """ Retreive a CIMI resource by it's resource id :param resource_id: The id of the resource to retrieve :type resource_id: str :return: A CimiResource object corresponding to the resource """ resp_json = self._cimi_get(resource_id=resource_id) return models.CimiResource(resp_json)
[docs] def cimi_edit(self, resource_id, data): """ Edit a CIMI resource by it's resource id :param resource_id: The id of the resource to edit :type resource_id: str :param data: The data to serialize into JSON :type data: dict :return: A CimiResponse object which should contain the attributes 'status', 'resource-id' and 'message' :rtype: CimiResponse """ resource = self.cimi_get(resource_id=resource_id) operation_href = self._cimi_find_operation_href(resource, 'edit') return models.CimiResponse(self._cimi_put(resource_id=operation_href, json=data))
[docs] def cimi_delete(self, resource_id): """ Delete a CIMI resource by it's resource id :param resource_id: The id of the resource to delete :type resource_id: str :return: A CimiResponse object which should contain the attributes 'status', 'resource-id' and 'message' :rtype: CimiResponse """ resource = self.cimi_get(resource_id=resource_id) operation_href = self._cimi_find_operation_href(resource, 'delete') return models.CimiResponse(self._cimi_delete(resource_id=operation_href))
[docs] def cimi_add(self, resource_type, data): """ Add a CIMI resource to the specified resource_type (Collection) :param resource_type: Type of the resource (Collection name) :type resource_type: str :param data: The data to serialize into JSON :type data: dict :return: A CimiResponse object which should contain the attributes 'status', 'resource-id' and 'message' :rtype: CimiResponse """ collection = self.cimi_search(resource_type=resource_type, last=0) operation_href = self._cimi_find_operation_href(collection, 'add') return models.CimiResponse(self._cimi_post(resource_id=operation_href, json=data))
[docs] def cimi_operation(self, resource_id, operation, data=None): """ Execute an operation on a CIMI resource :param resource_id: The id of the resource to execute operation on :type resource_id: str :param operation: Operation name (e.g. describe) :type operation: str :param data: The data to serialize into JSON :type data: dict :return: A CimiResponse object which should contain the attributes 'status', 'resource-id' and 'message' :rtype: CimiResponse """ resource = self.cimi_get(resource_id=resource_id) operation_href = self._cimi_find_operation_href(resource, operation) resp_json = self._cimi_post(operation_href, json=data) return models.CimiResource(resp_json)
[docs] def create_user(self, username, password, email, first_name, last_name, organization=None, roles=None, privileged=False, default_cloud=None, default_keep_running='never', ssh_public_keys=None, log_verbosity=1, execution_timeout=30, usage_email='never', cloud_parameters=None): """ Create a new user into SlipStream. :param username: The user's username (need to be unique) :type username: str :param password: The user's password :type password: str :param email: The user's email address :type email: str :param first_name: The user's first name :type first_name: str :param last_name: The user's last name :type last_name: str :param organization: The user's organization/company :type organization: str|None :param roles: The user's roles :type roles: list :param privileged: true to create a privileged user, false otherwise :type privileged: bool :param default_cloud: The user's default Cloud :type default_cloud: str|None :param default_keep_running: The user's default setting for keep-running. :type default_keep_running: 'always' or 'never' or 'on-success' or 'on-error' :param ssh_public_keys: The SSH public keys to inject into deployed instances. One key per line. :type ssh_public_keys: str|None :param log_verbosity: The verbosity level of the logging inside instances. 0: Actions, 1: Steps, 2: Details, 3: Debugging :type log_verbosity: 0 or 1 or 2 or 3 :param execution_timeout: If a deployment stays in a transitionnal state for more than this value (in minutes) it will be forcefully terminated. :type execution_timeout: int :param usage_email: Set it to 'daily' if you want to receive daily email with a summary of your Cloud usage of the previous day. :type usage_email: 'never' or 'daily' :param cloud_parameters: To add Cloud specific parameters (like credentials). A dict with the cloud name as the key and a dict of parameter as the value. :type cloud_parameters: dict|None """ attrib = dict(name=username, password=password, email=email, firstName=first_name, lastName=last_name, issuper=privileged, state='ACTIVE', resourceUri='user/{}'.format(username)) self._add_to_dict_if_not_none(attrib, 'organization', organization) self._add_to_dict_if_not_none(attrib, 'roles', roles) _attrib = self._dict_values_to_string(attrib) parameters = self._flatten_cloud_parameters(cloud_parameters) self._add_to_dict_if_not_none(parameters, 'General.default.cloud.service', default_cloud) self._add_to_dict_if_not_none(parameters, 'General.keep-running', default_keep_running) self._add_to_dict_if_not_none(parameters, 'General.Verbosity Level', log_verbosity) self._add_to_dict_if_not_none(parameters, 'General.Timeout', execution_timeout) self._add_to_dict_if_not_none(parameters, 'General.mail-usage', usage_email) self._add_to_dict_if_not_none(parameters, 'General.ssh.public.key', ssh_public_keys) _parameters = self._dict_values_to_string(parameters) user_xml = etree.Element('user', **_attrib) params_xml = etree.SubElement(user_xml, 'parameters') for name, value in six.iteritems(_parameters): params_xml.append(self._create_xml_parameter_entry(name, value)) response = self._xml_put('/user/{}'.format(username), etree.tostring(user_xml, 'UTF-8')) self._check_xml_result(response) return True
[docs] def update_user(self, username=None, password=None, email=None, first_name=None, last_name=None, organization=None, roles=None, privileged=None, default_cloud=None, default_keep_running=None, ssh_public_keys=None, log_verbosity=None, execution_timeout=None, usage_email=None, cloud_parameters=None): """ Update an existing user in SlipStream. Any parameter provided will be updated, others parameters will not be touched. Parameters are identical to the ones of the method 'create_user' except that they can all be None. Username cannot be updated. This parameter define which user to update. If not provided or None the current user will be used """ root = self._get_user_xml(username) if 'roles' in root.attrib and not self.get_user().privileged: del root.attrib['roles'] attrib = {} self._add_to_dict_if_not_none(attrib, 'email', email) self._add_to_dict_if_not_none(attrib, 'roles', roles) self._add_to_dict_if_not_none(attrib, 'password', password) self._add_to_dict_if_not_none(attrib, 'issuper', privileged) self._add_to_dict_if_not_none(attrib, 'lastName', last_name) self._add_to_dict_if_not_none(attrib, 'firstName', first_name) self._add_to_dict_if_not_none(attrib, 'organization', organization) _attrib = self._dict_values_to_string(attrib) parameters = self._flatten_cloud_parameters(cloud_parameters) self._add_to_dict_if_not_none(parameters, 'General.default.cloud.service', default_cloud) self._add_to_dict_if_not_none(parameters, 'General.keep-running', default_keep_running) self._add_to_dict_if_not_none(parameters, 'General.Verbosity Level', log_verbosity) self._add_to_dict_if_not_none(parameters, 'General.Timeout', execution_timeout) self._add_to_dict_if_not_none(parameters, 'General.mail-usage', usage_email) self._add_to_dict_if_not_none(parameters, 'General.ssh.public.key', ssh_public_keys) _parameters = self._dict_values_to_string(parameters) for key, val in six.iteritems(_attrib): root.set(key, val) for key, val in six.iteritems(_parameters): param_xml = root.find('parameters/entry/parameter[@name="' + key + '"]') if param_xml is None: param_entry_xml = self._create_xml_parameter_entry(key, val) param_xml = param_entry_xml.find('parameter') root.find('parameters').append(param_entry_xml) value_xml = param_xml.find('value') if value_xml is None: value_xml = etree.SubElement(param_xml, 'value') value_xml.text = val parameters_xml = root.find('parameters') for entry in parameters_xml.findall('entry'): param = entry.find('parameter[@name="General.orchestrator.publicsshkey"]') if param: parameters_xml.remove(entry) response = self._xml_put('/user/{}'.format(root.get('name')), etree.tostring(root, 'UTF-8')) self._check_xml_result(response) return True
[docs] def get_user(self, username=None): """ Get informations for a given user, if permitted :param username: The username of the user. Default to the user logged in if not provided or None. """ root = self._get_user_xml(username) general_params = {} with_username = set() with_password = set() for p in root.findall('parameters/entry/parameter'): name = p.get('name', '') value = p.findtext('value', '') category = p.get('category', '') if (name.endswith('.username') or name.endswith('.access.id')) and value: with_username.add(category) elif (name.endswith('.password') or name.endswith('.secret.key')) and value: with_password.add(category) elif category == 'General': general_params[name] = value configured_clouds = with_username & with_password user = models.User( username=root.get('name'), cyclone_login=root.get('cycloneLogin'), github_login=root.get('githubLogin'), email=root.get('email'), first_name=root.get('firstName'), last_name=root.get('lastName'), organization=root.get('organization'), roles=root.get('roles', '').split(','), configured_clouds=configured_clouds, default_cloud=general_params.get('General.default.cloud.service'), ssh_public_keys=general_params.get('General.ssh.public.key', '').splitlines(), keep_running=general_params.get('General.keep-running'), timeout=general_params.get('General.Timeout'), privileged=root.get('issuper', "false").lower() == "true", active_since=root.get('activeSince'), last_online=root.get('lastOnline'), online=root.get('online')) return user
[docs] def list_users(self): """ List users (requires privileged access) """ root = self._list_users_xml() for elem in element_tree__iter(root)('item'): yield models.UserItem(username=elem.get('name'), email=elem.get('email'), first_name=elem.get('firstName'), last_name=elem.get('lastName'), organization=elem.get('organization'), roles=elem.get('roles', '').split(','), privileged=elem.get('issuper', "false").lower() == "true", active_since=elem.get('activeSince'), last_online=elem.get('lastOnline'), online=elem.get('online'))
[docs] def list_applications(self): """ List apps in the appstore """ root = self._xml_get('/appstore') for elem in element_tree__iter(root)('item'): yield models.App(name=elem.get('name'), type=get_module_type(elem.get('category')), version=int(elem.get('version')), path=_mod(elem.get('resourceUri'), with_version=False))
[docs] def get_element(self, path): """ Get details about a project, a component or an application :param path: The path of an element (project/component/application) :type path: str """ url = _mod_url(path) try: root = self._xml_get(url) except requests.HTTPError as e: if e.response.status_code == 403: logger.debug("Access denied for path: {0}. Skipping.".format(path)) raise ss_module = models.Module(name=root.get('shortName'), type=get_module_type(root.get('category')), created=root.get('creation'), modified=root.get('lastModified'), description=root.get('description'), version=int(root.get('version')), path=_mod('%s/%s' % (root.get('parentUri').strip('/'), root.get('shortName')))) return ss_module
[docs] def update_component(self, path, description=None, module_reference_uri=None, cloud_identifiers=None, keep_ref_uri_and_cloud_ids=False, logo_link=None): """ Update a component, when a parameter is not provided in parameter it is unchanged. :param path: The path of a component :type path: str :param description: Description of the component :type description: str :param module_reference_uri: URI of the parent component :type module_reference_uri: str :param cloud_identifiers: A dict where keys are cloud names and values are identifier of the image in the cloud :type cloud_identifiers: dict :param keep_ref_uri_and_cloud_ids: Don't remove module_reference_uri if any cloud identifier are provided, or don't remove cloud identifiers if a module_reference_uri is provided :type keep_ref_uri_and_cloud_ids: bool :param logo_link: URL to an image that should be used as logo :type logo_link: str """ url = _mod_url(path) try: root = self._xml_get(url) except requests.HTTPError as e: if e.response.status_code == 403: logger.debug("Access denied for path: {0}. Skipping.".format(path)) raise if str(root.get('category')) != "Image": raise SlipStreamError("Specified path is not a component") if description is not None: root.set('description', description) if logo_link is not None: root.set('logoLink', logo_link) if module_reference_uri is not None: root.set('moduleReferenceUri', module_reference_uri) if not keep_ref_uri_and_cloud_ids: root.set('isBase', 'false') root.find('cloudImageIdentifiers').clear() if cloud_identifiers is not None: cloud_image_identifiers = root.find('cloudImageIdentifiers') for cloud, identifier in cloud_identifiers.items(): node = cloud_image_identifiers.find('cloudImageIdentifier[@cloudServiceName="%s"]' % cloud) if identifier is None or len(identifier) == 0: if node is not None: cloud_image_identifiers.remove(node) else: if node is None: node = etree.Element('cloudImageIdentifier', cloudServiceName=cloud) cloud_image_identifiers.append(node) node.set('cloudImageIdentifier', identifier) if not keep_ref_uri_and_cloud_ids: root.set('moduleReferenceUri', '') root.set('isBase', 'true') try: self._xml_put(url, etree.tostring(root, 'UTF-8')) except requests.HTTPError as e: if e.response.status_code == 403: logger.debug("Access denied for path: {0}. Skipping.".format(path)) raise
[docs] def get_cloud_image_identifiers(self, path): """ Get all image identifiers associated to a native component :param path: The path of an component :type path: str """ url = _mod_url(path) try: root = self._xml_get(url) except requests.HTTPError as e: if e.response.status_code == 403: logger.debug("Access denied for path: {0}. Skipping.".format(path)) raise for node in root.findall("cloudImageIdentifiers/cloudImageIdentifier"): yield models.CloudImageIdentifier( cloud=node.get("cloudServiceName"), identifier=node.get("cloudImageIdentifier"), )
[docs] def get_application_nodes(self, path): """ Get nodes of an application :param path: The path of an application :type path: str """ url = _mod_url(path) try: root = self._xml_get(url) except requests.HTTPError as e: if e.response.status_code == 403: logger.debug("Access denied for path: {0}. Skipping.".format(path)) raise for node in root.findall("nodes/entry/node"): yield models.Node(path=_mod(node.get("imageUri")), name=node.get('name'), cloud=node.get('cloudService'), multiplicity=node.get('multiplicity'), max_provisioning_failures=node.get('maxProvisioningFailures'), network=node.get('network'), cpu=node.get('cpu'), ram=node.get('ram'), disk=node.get('disk'), extra_disk_volatile=node.get('extraDiskVolatile'), )
[docs] def get_parameters(self, path, parameter_name=None, parameter_names=None): """ Get all or a subset of the parameters associated to a project, a component or an application :param path: The path of an element (project/component/application) :type path: str :param parameter_name: The parameter name (eg: 'cpu.nb') :type parameter_name: str|None :param parameter_names: The list of parameter names (eg: ['cpu.nb', 'ram.GB', ]) :type parameter_names: list|None :return: A list containing all the parameters, or at most the one requested :rtype: list """ url = _mod_url(path) try: root = self._xml_get(url) except requests.HTTPError as e: if e.response.status_code == 403: logger.debug("Access denied for path: {0}. Skipping.".format(path)) raise if parameter_name is not None: query = 'parameters/entry/parameter[@name="' + parameter_name + '"]' else: query = 'parameters/entry/parameter' for node in root.findall(query): value = node.findtext('value', '') defaultValue = node.findtext('defaultValue', '') instructions = node.findtext('instructions', '') name = node.get("name") if parameter_names is None or name in parameter_names: yield models.ModuleParameter( name=name, value=value, defaultValue=defaultValue, category=node.get("category"), description=node.get("description"), isSet=node.get("isSet"), mandatory=node.get("mandatory"), readonly=node.get("readonly"), type=node.get("type"), instructions=instructions, )
[docs] def list_project_content(self, path=None, recurse=False): """ List the content of a project :param path: The path of a project. If None, list the root project. :type path: str :param recurse: Get project content recursively :type recurse: bool """ logger.debug("Starting with path: {0}".format(path)) # Path normalization if not path: url = '/module' else: url = _mod_url(path) logger.debug("Using normalized URL: {0}".format(url)) try: root = self._xml_get(url) except requests.HTTPError as e: if e.response.status_code == 403: logger.debug("Access denied for path: {0}. Skipping.".format(path)) return raise for elem in element_tree__iter(root)('item'): # Compute module path if elem.get('resourceUri'): app_path = elem.get('resourceUri') else: app_path = "%s/%s" % (root.get('parentUri').strip('/'), '/'.join([root.get('shortName'), elem.get('name'), elem.get('version')])) module_type = get_module_type(elem.get('category')) logger.debug("Found '{0}' with path: {1}".format(module_type, app_path)) app = models.App(name=elem.get('name'), type=module_type, version=int(elem.get('version')), path=_mod(app_path, with_version=False)) yield app if app.type == 'project' and recurse: logger.debug("Recursing into path: {0}".format(app_path)) for app in self.list_project_content(app_path, recurse): yield app
[docs] def list_deployments(self, inactive=False, cloud=None, offset=0, limit=20): """ List deployments :param inactive: Include inactive deployments. Default to False :type cloud: bool :param cloud: Retrieve only deployments for the specified Cloud :type cloud: str :param offset: Retrieve deployments starting by the offset<exp>th</exp> one. Default to 0 :type offset: int :param limit: Retrieve at most 'limit' deployments. Default to 20 :type limit: int """ _cloud = '' if cloud is not None: _cloud = cloud root = self._xml_get('/run', activeOnly=(not inactive), offset=offset, limit=limit, cloud=_cloud) for elem in element_tree__iter(root)('item'): yield models.Deployment(id=uuid.UUID(elem.get('uuid')), module=_mod(elem.get('moduleResourceUri')), status=elem.get('status').lower(), started_at=elem.get('startTime'), last_state_change=elem.get('lastStateChangeTime'), clouds=elem.get('cloudServiceNames', '').split(','), username=elem.get('username'), abort=elem.get('abort'), service_url=elem.get('serviceUrl'), scalable=elem.get('mutable'), )
[docs] def get_deployment(self, deployment_id): """ Get a deployment :param deployment_id: The deployment UUID of the deployment to get :type deployment_id: str or UUID """ root = self._xml_get('/run/' + str(deployment_id)) abort = root.findtext('runtimeParameters/entry/runtimeParameter[@key="ss:abort"]') service_url = root.findtext('runtimeParameters/entry/runtimeParameter[@key="ss:url.service"]') return models.Deployment(id=uuid.UUID(root.get('uuid')), module=_mod(root.get('moduleResourceUri')), status=root.get('state').lower(), started_at=root.get('startTime'), last_state_change=root.get('lastStateChangeTime'), clouds=root.get('cloudServiceNames', '').split(','), username=root.get('user'), abort=abort, service_url=service_url, scalable=root.get('mutable'), )
[docs] def get_deployment_parameter(self, deployment_id, parameter_name, ignore_abort=False): """ Get a parameter of a deployment :param deployment_id: The deployment UUID of the deployment to get :type deployment_id: str or UUID :param parameter_name: The parameter name (eg: ss:state) :type parameter_name: str :param ignore_abort: If False, raise an exception if the deployment has failed :type ignore_abort: bool """ ignoreabort = str(ignore_abort).lower() return self._text_get('/run/{}/{}'.format(str(deployment_id), parameter_name), ignoreabort=ignoreabort)
def get_deployment_events(self, deployment_id, types=None): if types is None: types = [] filter = "content/resource/href='run/%s'" % deployment_id if types: filter += " and (%s)" % ' or '.join(map(lambda x: "type='%s'" % x, types)) return self.cimi_search(resource_type='events', filter=filter)
[docs] def list_virtualmachines(self, deployment_id=None, cloud=None, offset=0, limit=20): """ List virtual machines :param deployment_id: Retrieve only virtual machines about the specified run_id. Default to None :type deployment_id: str or UUID :param cloud: Retrieve only virtual machines for the specified Cloud :type cloud: str :param offset: Retrieve virtual machines starting by the offset<exp>th</exp> one. Default to 0 :type offset: int :param limit: Retrieve at most 'limit' virtual machines. Default to 20 :type limit: int """ _deployment_id = '' if deployment_id is not None: _deployment_id = str(deployment_id) _cloud = '' if cloud is not None: _cloud = cloud root = self._xml_get('/vms', offset=offset, limit=limit, runUuid=_deployment_id, cloud=_cloud) for elem in element_tree__iter(root)('vm'): run_id_str = elem.get('runUuid') run_id = uuid.UUID(run_id_str) if run_id_str is not None else None yield models.VirtualMachine(id=elem.get('instanceId'), cloud=elem.get('cloud'), status=elem.get('state').lower(), deployment_id=run_id, deployment_owner=elem.get('runOwner'), node_name=elem.get('nodeName'), node_instance_id=elem.get('nodeInstanceId'), ip=elem.get('ip'), cpu=elem.get('cpu'), ram=elem.get('ram'), disk=elem.get('disk'), instance_type=elem.get('instanceType'), is_usable=elem.get('isUsable'))
[docs] def build_component(self, path, cloud=None): """ :param path: The path to a component :type path: str :param cloud: The Cloud on which to build the component. If None, the user default Cloud will be used. :type cloud: str """ response = self.session.post(self.endpoint + '/run', data={ 'type': 'Machine', 'refqname': path, 'parameter--cloudservice': cloud or 'default', }) response.raise_for_status() run_id = response.headers['location'].split('/')[-1] return uuid.UUID(run_id)
[docs] def deploy(self, path, cloud=None, parameters=None, tags=None, keep_running=None, scalable=False, multiplicity=None, tolerate_failures=None, check_ssh_key=False, raw_params=None): """ Run a component or an application :param path: The path of the component/application to deploy :type path: str :param cloud: A string or a dict to specify on which Cloud(s) to deploy the component/application. To deploy a component simply specify the Cloud name as a string. To deploy a deployment specify a dict with the nodenames as keys and Cloud names as values. If not specified the user default cloud will be used. :type cloud: str or dict :param parameters: A dict of parameters to redefine for this deployment. To redefine a parameter of a node use "<nodename>" as keys and dict of parameters as values. To redefine a parameter of a component or a global parameter use "<parametername>" as the key. :type parameters: dict :param tags: List of tags that can be used to identify or annotate a deployment :type tags: str or list :param keep_running: [Only applies to applications] Define when to terminate or not a deployment when it reach the 'Ready' state. Possibles values: 'always', 'never', 'on-success', 'on-error'. If scalable is set to True, this value is ignored and it will behave as if it was set to 'always'. If not specified the user default will be used. :type keep_running: 'always' or 'never' or 'on-success' or 'on-error' :param scalable: [Only applies to applications] True to start a scalable deployment. Default: False :type scalable: bool :param multiplicity: [Only applies to applications] A dict to specify how many instances to start per node. Nodenames as keys and number of instances to start as values. :type multiplicity: dict :param tolerate_failures: [Only applies to applications] A dict to specify how many failures to tolerate per node. Nodenames as keys and number of failure to tolerate as values. :type tolerate_failures: dict :param check_ssh_key: Set it to True if you want the SlipStream server to check if you have a public ssh key defined in your user profile. Useful if you want to ensure you will have access to VMs. :type check_ssh_key: bool :param raw_params: This allows you to pass parameters directly in the request to the SlipStream server. Keys must be formatted in the format understood by the SlipStream server. :type raw_params: dict :return: The deployment UUID of the newly created deployment :rtype: uuid.UUID """ _raw_params = dict() if raw_params is None else raw_params _raw_params.update(self._convert_parameters_to_raw_params(parameters)) _raw_params.update(self._convert_clouds_to_raw_params(cloud)) _raw_params.update(self._convert_multiplicity_to_raw_params(multiplicity)) _raw_params.update(self._convert_tolerate_failures_to_raw_params(tolerate_failures)) _raw_params['refqname'] = _mod_url(path)[1:] if tags: _raw_params['tags'] = tags if isinstance(tags, six.string_types) else ','.join(tags) if keep_running: if keep_running not in self.KEEP_RUNNING_VALUES: raise ValueError('"keep_running" should be one of {}, not "{}"'.format(self.KEEP_RUNNING_VALUES, keep_running)) _raw_params['keep-running'] = keep_running if scalable: _raw_params['mutable'] = 'on' if not check_ssh_key: _raw_params['bypass-ssh-check'] = 'true' response = self.session.post(self.endpoint + '/run', data=_raw_params) if response.status_code == 409: reason = etree.fromstring(response.text).get('detail') raise SlipStreamError(reason) response.raise_for_status() deployment_id = response.headers['location'].split('/')[-1] return uuid.UUID(deployment_id)
[docs] def terminate(self, deployment_id): """ Terminate a deployment :param deployment_id: The UUID of the deployment to terminate :type deployment_id: str or uuid.UUID """ response = self.session.delete('%s/run/%s' % (self.endpoint, deployment_id)) response.raise_for_status() return True
[docs] def add_node_instances(self, deployment_id, node_name, quantity=None): """ Add new instance(s) of a deployment's node (horizontal scale up). Warning: The targeted deployment has to be "scalable". :param deployment_id: The deployment UUID of the deployment on which to add new instances of a node. :type deployment_id: str|UUID :param node_name: Name of the node where to add instances. :type node_name: str :param quantity: Amount of node instances to add. If not provided it's server dependent (usually add one instance) :type quantity: int :return: The list of new node instance names. :rtype: list """ url = '%s/run/%s/%s' % (self.endpoint, str(deployment_id), str(node_name)) data = {"n": quantity} if quantity else None response = self.session.post(url, data=data) if response.status_code == 409: reason = etree.fromstring(response.text).get('detail') raise SlipStreamError(reason) response.raise_for_status() return response.text.split(",")
[docs] def remove_node_instances(self, deployment_id, node_name, ids): """ Remove a list of node instances from a deployment. Warning: The targeted deployment has to be "scalable". :param deployment_id: The deployment UUID of the deployment on which to remove instances of a node. :type deployment_id: str|UUID :param node_name: Name of the node where to remove instances. :type node_name: str :param ids: List of node instance ids to remove. Ids can also be provided as a CSV list. :type ids: list|str :return: True on success :rtype: bool """ url = '%s/run/%s/%s' % (self.endpoint, str(deployment_id), str(node_name)) response = self.session.delete(url, data={"ids": ",".join(str(id_) for id_ in ids)}) if response.status_code == 409: reason = etree.fromstring(response.text).get('detail') raise SlipStreamError(reason) response.raise_for_status() return response.status_code == 204
[docs] def usage(self): """ Get current usage and quota by cloud service. """ root = self._xml_get('/dashboard') for elem in element_tree__iter(root)('cloudUsage'): yield models.Usage(cloud=elem.get('cloud'), quota=int(elem.get('vmQuota')), run_usage=int(elem.get('userRunUsage')), vm_usage=int(elem.get('userVmUsage')), inactive_vm_usage=int(elem.get('userInactiveVmUsage')), others_vm_usage=int(elem.get('othersVmUsage')), pending_vm_usage=int(elem.get('pendingVmUsage')), unknown_vm_usage=int(elem.get('unknownVmUsage')))
[docs] def publish(self, path): """ Publish a component or an application to the appstore :param path: The path to a component or an application """ response = self.session.put('%s%s/publish' % (self.endpoint, _mod_url(path))) response.raise_for_status() return True
[docs] def unpublish(self, path): """ Unpublish a component or an application from the appstore :param path: The path to a component or an application """ response = self.session.delete('%s%s/publish' % (self.endpoint, _mod_url(path))) response.raise_for_status() return True
[docs] def delete_element(self, path): """ Delete a project, a component or an application :param path: The path to a component or an application """ response = self.session.delete('%s%s' % (self.endpoint, _mod_url(path))) response.raise_for_status() return True
@staticmethod def _check_type(obj_name, obj, allowed_types): if not isinstance(obj, allowed_types): raise ValueError('Invalid type "{}" for "{}"'.format(type(obj), obj_name)) @classmethod def _convert_clouds_to_raw_params(cls, clouds): return cls._convert_per_node_parameter_to_raw_params('cloudservice', clouds, allowed_types=string_types) @classmethod def _convert_multiplicity_to_raw_params(cls, multiplicity): return cls._convert_per_node_parameter_to_raw_params('multiplicity', multiplicity, allowed_types=(integer_types, string_types)) @classmethod def _convert_tolerate_failures_to_raw_params(cls, tolerate_failures): return cls._convert_per_node_parameter_to_raw_params('max-provisioning-failures', tolerate_failures, allowed_types=(integer_types, string_types)) @classmethod def _convert_per_node_parameter_to_raw_params(cls, parameter_name, parameters, allowed_types=(string_types, int), allow_no_node=True): raw_params = dict() if parameters is None: return raw_params if isinstance(parameters, dict): for key, value in parameters.items(): cls._check_type('{}:{}'.format(key, parameter_name), value, allowed_types) raw_params['parameter--node--{}--{}'.format(key, parameter_name)] = value elif allow_no_node: cls._check_type(parameter_name, parameters, allowed_types) raw_params['parameter--{}'.format(parameter_name)] = parameters else: cls._check_type(parameter_name, parameters, dict) return raw_params @classmethod def _convert_parameters_to_raw_params(cls, parameters): raw_params = dict() if parameters is None: return raw_params for key, value in parameters.items(): if isinstance(value, dict): # Redefine node parameters for parameter_name, parameter_value in value.items(): raw_params['parameter--node--{}--{}'.format(key, parameter_name)] = parameter_value else: if key in cls.GLOBAL_PARAMETERS: # Redefine a global parameter raw_params[key] = value else: # Redefine a component parameter raw_params['parameter--{}'.format(key)] = value return raw_params def get_cloud_credentials(self, cimi_filter=''): filter = "type^='cloud-cred'" if cimi_filter: filter += 'and %s' % cimi_filter return self.cimi_search(resource_type='credentials', filter=filter)