# encoding: utf-8
import json
from functools import partial
from .item import AsyncItem, Item, snake
from .mix import AsyncRunScriptMixIn, RunScriptMixIn
[docs]
class System(Item, RunScriptMixIn):
def __init__(self, jenkins, url):
'''
see: https://support.cloudbees.com/hc/en-us/articles/216118748-How-to-Start-Stop-or-Restart-your-Instance-
'''
super().__init__(jenkins, url)
def _post(entry):
return self.handle_req('POST', entry)
for entry in ['restart', 'safeRestart', 'exit',
'safeExit', 'quietDown', 'cancelQuietDown']:
setattr(self, snake(entry), partial(_post, entry))
[docs]
def reload_jcasc(self):
return self.handle_req('POST', 'configuration-as-code/reload')
[docs]
def export_jcasc(self):
return self.handle_req('POST', 'configuration-as-code/export').text
[docs]
def apply_jcasc(self, content):
params = {"newSource": content}
resp = self.handle_req(
'POST', 'configuration-as-code/checkNewSource', params=params)
if resp.text.startswith('<div class=error>'):
raise ValueError(resp.text)
data = {'json': json.dumps(params),
'replace': 'Apply new configuration'}
return self.handle_req('POST', 'configuration-as-code/replace', data=data)
[docs]
def decrypt_secret(self, text):
cmd = f'println(hudson.util.Secret.decrypt("{text}"))'
return self.run_script(cmd)
# async class
[docs]
class AsyncSystem(AsyncItem, AsyncRunScriptMixIn):
def __init__(self, jenkins, url):
'''
see: https://support.cloudbees.com/hc/en-us/articles/216118748-How-to-Start-Stop-or-Restart-your-Instance-
'''
super().__init__(jenkins, url)
async def _post(entry):
return await self.handle_req('POST', entry)
for entry in ['restart', 'safeRestart', 'exit',
'safeExit', 'quietDown', 'cancelQuietDown']:
setattr(self, snake(entry), partial(_post, entry))
[docs]
async def reload_jcasc(self):
return await self.handle_req('POST', 'configuration-as-code/reload')
[docs]
async def export_jcasc(self):
data = await self.handle_req('POST', 'configuration-as-code/export')
return data.text
[docs]
async def apply_jcasc(self, content):
params = {"newSource": content}
resp = await self.handle_req(
'POST', 'configuration-as-code/checkNewSource', params=params)
if resp.text.startswith('<div class=error>'):
raise ValueError(resp.text)
data = {'json': json.dumps(params),
'replace': 'Apply new configuration'}
return await self.handle_req('POST', 'configuration-as-code/replace', data=data)
[docs]
async def decrypt_secret(self, text):
cmd = f'println(hudson.util.Secret.decrypt("{text}"))'
return await self.run_script(cmd)