Source code for api4jenkins.item

# encoding: utf-8

import re
from importlib import import_module

from requests.exceptions import HTTPError

from .exceptions import (AuthenticationError, BadRequestError,
                         ItemNotFoundError, ServerError)


[docs]def camel(s): if s[0] == '_': return s first, *other = s.split('_') return first.lower() + ''.join(x.title() for x in other)
def _snake(): pattern = re.compile(r'(?<!^)(?=[A-Z])') def func(name): return pattern.sub('_', name).lower() return func snake = _snake()
[docs]def append_slash(url): return url if url[-1] == '/' else url + '/'
def _new_item(): delimiter = re.compile(r'[.$]') def func(jenkins, module, item): class_name = delimiter.split(item['_class'])[-1] module = import_module(module) if not hasattr(module, class_name): msg = f'''{module} has no class {class_name} to describe {item["url"]}, patch new class with api4jenkins._patch_to, see: https://api4jenkins.readthedocs.io/en/latest/user/example.html#patch''' raise AttributeError(msg) _class = getattr(module, class_name) return _class(jenkins, item['url']) return func new_item = _new_item()
[docs]class Item: ''' classdocs ''' headers = {'Content-Type': 'text/xml; charset=utf-8'} _dynamic_attrs = [] def __init__(self, jenkins, url): self.jenkins = jenkins self.url = append_slash(url)
[docs] def api_json(self, tree='', depth=0): params = {'depth': depth} if tree: params['tree'] = tree return self.handle_req('GET', 'api/json', params=params).json()
[docs] def handle_req(self, method, entry, **kwargs): self._add_crumb(kwargs) try: return self.jenkins.send_req(method, self.url + entry, **kwargs) except HTTPError as e: if e.response.status_code == 404: raise ItemNotFoundError( f'Not found {entry} for item: {self}') from e if e.response.status_code == 401: raise AuthenticationError( f'Invalid authorization for {self}') from e if e.response.status_code == 403: raise PermissionError( f'No permission to {entry} for {self}') from e if e.response.status_code == 400: raise BadRequestError(e.response.headers['X-Error']) from e if e.response.status_code == 500: # import xml.etree.ElementTree as ET # tree = ET.fromstring(e.response.text) # stack_trace = tree.find( # './/div[@id="error-description"]/pre').text raise ServerError(e.response.text) from e raise
def _add_crumb(self, kwargs): if self.jenkins.crumb: headers = kwargs.get('headers', {}) headers.update(self.jenkins.crumb) kwargs['headers'] = headers def _new_instance_by_item(self, module, item): return new_item(self.jenkins, module, item)
[docs] def exists(self): try: self.api_json(tree='_class') return True except ItemNotFoundError: return False
@property def dynamic_attrs(self): if not self._dynamic_attrs: data = self.api_json() self.__class__._dynamic_attrs = \ [snake(key) for key, val in data.items() if isinstance( val, (int, str, bool, type(None)))] return self._dynamic_attrs def __eq__(self, other): return type(self) is type(other) and self.url == other.url def __str__(self): return f'<{type(self).__name__}: {self.url}>' def __getattr__(self, name): if name in self.dynamic_attrs: attr = camel(name) return self.api_json(tree=attr)[attr] return super().__getattribute__(name)