Source code for api4jenkins.plugin

# encoding: utf-8
import json
import time
import xml.etree.ElementTree as ET

from .item import AsyncItem, Item


[docs] class PluginsManager(Item):
[docs] def get(self, name): for plugin in self.api_json(tree='plugins[shortName]')['plugins']: if plugin['shortName'] == name: return Plugin(self.jenkins, f'{self.url}plugin/{name}/') return None
[docs] def install(self, *names, block=False): plugin_xml = ET.Element('jenkins') for name in names: if '@' not in name: name += '@latest' ET.SubElement(plugin_xml, 'install', {'plugin': name}) self.handle_req('POST', 'installNecessaryPlugins', headers=self.headers, content=ET.tostring(plugin_xml)) while block and not self.installation_done: time.sleep(2)
[docs] def uninstall(self, *names): for name in names: self.handle_req('POST', f'plugin/{name}/doUninstall')
[docs] def set_site(self, url): self.handle_req('POST', 'siteConfigure', params={'site': url}) self.check_updates_server()
[docs] def check_updates_server(self): self.handle_req('POST', 'checkUpdatesServer')
@property def update_center(self): return UpdateCenter(self.jenkins, f'{self.jenkins.url}updateCenter/') @property def site(self): return self.update_center.site @property def restart_required(self): return self.update_center.restart_required @property def installation_done(self): return self.update_center.installation_done
[docs] def set_proxy(self, name, port, *, username='', password='', no_proxy='', test_url=''): data = {'name': name, 'port': port, 'userName': username, 'password': password, 'noProxyHost': no_proxy, 'testUrl': test_url} self.handle_req('POST', 'proxyConfigure', data={ 'json': json.dumps(data)})
def __iter__(self): for plugin in self.api_json(tree='plugins[shortName]')['plugins']: yield Plugin(self.jenkins, f'{self.url}plugin/{plugin["shortName"]}/')
[docs] class Plugin(Item):
[docs] def uninstall(self): self.handle_req('POST', 'doUninstall')
[docs] class UpdateCenter(Item): @property def installation_done(self): resp = self.handle_req('GET', 'installStatus') return all(job['installStatus'] != 'Pending' for job in resp.json()['data']['jobs']) @property def restart_required(self): return self.api_json(tree='restartRequiredForCompletion').get( 'restartRequiredForCompletion') @property def site(self): return self.api_json(tree='sites[url]')['sites'][0].get('url')
# async class
[docs] class AsyncPluginsManager(AsyncItem):
[docs] async def get(self, name): data = await self.api_json(tree='plugins[shortName]') for plugin in data['plugins']: if plugin['shortName'] == name: return AsyncPlugin(self.jenkins, f'{self.url}plugin/{name}/') return None
[docs] async def install(self, *names, block=False): plugin_xml = ET.Element('jenkins') for name in names: if '@' not in name: name += '@latest' ET.SubElement(plugin_xml, 'install', {'plugin': name}) await self.handle_req('POST', 'installNecessaryPlugins', headers=self.headers, content=ET.tostring(plugin_xml)) while block and not await self.installation_done: time.sleep(2)
[docs] async def uninstall(self, *names): for name in names: await self.handle_req('POST', f'plugin/{name}/doUninstall')
[docs] async def set_site(self, url): await self.handle_req('POST', 'siteConfigure', params={'site': url}) await self.check_updates_server()
[docs] async def check_updates_server(self): await self.handle_req('POST', 'checkUpdatesServer')
@property def update_center(self): return AsyncUpdateCenter(self.jenkins, f'{self.jenkins.url}updateCenter/') @property def site(self): return self.update_center.site @property def restart_required(self): return self.update_center.restart_required @property def installation_done(self): return self.update_center.installation_done
[docs] async def set_proxy(self, name, port, *, username='', password='', no_proxy='', test_url=''): data = {'name': name, 'port': port, 'userName': username, 'password': password, 'noProxyHost': no_proxy, 'testUrl': test_url} await self.handle_req('POST', 'proxyConfigure', data={ 'json': json.dumps(data)})
async def __aiter__(self): data = await self.api_json(tree='plugins[shortName]') for plugin in data['plugins']: yield AsyncPlugin(self.jenkins, f'{self.url}plugin/{plugin["shortName"]}/')
[docs] class AsyncPlugin(AsyncItem):
[docs] async def uninstall(self): await self.handle_req('POST', 'doUninstall')
[docs] class AsyncUpdateCenter(AsyncItem): @property async def installation_done(self): resp = await self.handle_req('GET', 'installStatus') return all(job['installStatus'] != 'Pending' for job in resp.json()['data']['jobs']) @property async def restart_required(self): data = await self.api_json(tree='restartRequiredForCompletion') return data.get('restartRequiredForCompletion') @property async def site(self): data = await self.api_json(tree='sites[url]') return data['sites'][0].get('url')