Source code for api4jenkins.system

# encoding: utf-8
import json
from functools import partial

from .item import AsyncItem, Item, snake
from .mix import AsyncRunScriptMixIn, RunScriptMixIn


[docs] class System(Item, RunScriptMixIn): def __init__(self, jenkins, url): ''' see: https://support.cloudbees.com/hc/en-us/articles/216118748-How-to-Start-Stop-or-Restart-your-Instance- ''' super().__init__(jenkins, url) def _post(entry): return self.handle_req('POST', entry) for entry in ['restart', 'safeRestart', 'exit', 'safeExit', 'quietDown', 'cancelQuietDown']: setattr(self, snake(entry), partial(_post, entry))
[docs] def reload_jcasc(self): return self.handle_req('POST', 'configuration-as-code/reload')
[docs] def export_jcasc(self): return self.handle_req('POST', 'configuration-as-code/export').text
[docs] def apply_jcasc(self, content): params = {"newSource": content} resp = self.handle_req( 'POST', 'configuration-as-code/checkNewSource', params=params) if resp.text.startswith('<div class=error>'): raise ValueError(resp.text) data = {'json': json.dumps(params), 'replace': 'Apply new configuration'} return self.handle_req('POST', 'configuration-as-code/replace', data=data)
[docs] def decrypt_secret(self, text): cmd = f'println(hudson.util.Secret.decrypt("{text}"))' return self.run_script(cmd)
# async class
[docs] class AsyncSystem(AsyncItem, AsyncRunScriptMixIn): def __init__(self, jenkins, url): ''' see: https://support.cloudbees.com/hc/en-us/articles/216118748-How-to-Start-Stop-or-Restart-your-Instance- ''' super().__init__(jenkins, url) async def _post(entry): return await self.handle_req('POST', entry) for entry in ['restart', 'safeRestart', 'exit', 'safeExit', 'quietDown', 'cancelQuietDown']: setattr(self, snake(entry), partial(_post, entry))
[docs] async def reload_jcasc(self): return await self.handle_req('POST', 'configuration-as-code/reload')
[docs] async def export_jcasc(self): data = await self.handle_req('POST', 'configuration-as-code/export') return data.text
[docs] async def apply_jcasc(self, content): params = {"newSource": content} resp = await self.handle_req( 'POST', 'configuration-as-code/checkNewSource', params=params) if resp.text.startswith('<div class=error>'): raise ValueError(resp.text) data = {'json': json.dumps(params), 'replace': 'Apply new configuration'} return await self.handle_req('POST', 'configuration-as-code/replace', data=data)
[docs] async def decrypt_secret(self, text): cmd = f'println(hudson.util.Secret.decrypt("{text}"))' return await self.run_script(cmd)