Source code for api4jenkins.plugin

# encoding: utf-8
import json
import time

import xml.etree.ElementTree as ET

from .item import Item


[docs]class PluginsManager(Item):
[docs] def get(self, name): for plugin in self.api_json(tree='plugins[shortName]')['plugins']: if plugin['shortName'] == name: return Plugin(self.jenkins, f'{self.url}plugin/{name}/') return None
[docs] def install(self, *names, block=False): plugin_xml = ET.Element('jenkins') for name in names: if '@' not in name: name += '@latest' ET.SubElement(plugin_xml, 'install', {'plugin': name}) self.handle_req('POST', 'installNecessaryPlugins', headers=self.headers, data=ET.tostring(plugin_xml)) while block and not self.installation_done: time.sleep(2)
[docs] def uninstall(self, *names): for name in names: self.handle_req('POST', f'plugin/{name}/doUninstall')
[docs] def set_site(self, url): self.handle_req('POST', 'siteConfigure', params={'site': url}) self.check_updates_server()
[docs] def check_updates_server(self): self.handle_req('POST', 'checkUpdatesServer')
@property def update_center(self): return UpdateCenter(self.jenkins, f'{self.jenkins.url}updateCenter/') @property def site(self): return self.update_center.site @property def restart_required(self): return self.update_center.restart_required @property def installation_done(self): return self.update_center.installation_done
[docs] def set_proxy(self, name, port, *, username='', password='', no_proxy='', test_url=''): data = {'name': name, 'port': port, 'userName': username, 'password': password, 'noProxyHost': no_proxy, 'testUrl': test_url} self.handle_req('POST', 'proxyConfigure', data={ 'json': json.dumps(data)})
def __iter__(self): for plugin in self.api_json(tree='plugins[shortName]')['plugins']: yield Plugin(self.jenkins, f'{self.url}plugin/{plugin["shortName"]}/')
[docs]class Plugin(Item):
[docs] def uninstall(self): self.handle_req('POST', 'doUninstall')
[docs]class UpdateCenter(Item): @property def installation_done(self): resp = self.handle_req('GET', 'installStatus') return all(job['installStatus'] != 'Pending' for job in resp.json()['data']['jobs']) @property def restart_required(self): return self.api_json(tree='restartRequiredForCompletion').get( 'restartRequiredForCompletion') @property def site(self): return self.api_json(tree='sites[url]')['sites'][0].get('url')