Source code for api4jenkins.node

# encoding: utf-8

import json
import re

from .exceptions import ItemNotFoundError
from .item import Item
from .mix import ConfigurationMixIn, DeletionMixIn, RunScriptMixIn


[docs]class Nodes(Item): ''' classdocs '''
[docs] def create(self, name, **kwargs): node_setting = { 'nodeDescription': '', 'numExecutors': 1, 'remoteFS': '/home/jenkins', 'labelString': '', 'mode': 'NORMAL', 'retentionStrategy': { 'stapler-class': 'hudson.slaves.RetentionStrategy$Always' }, 'nodeProperties': {'stapler-class-bag': 'true'}, 'launcher': {'stapler-class': 'hudson.slaves.JNLPLauncher'} } node_setting.update(kwargs) params = { 'name': name, 'type': 'hudson.slaves.DumbSlave$DescriptorImpl', 'json': json.dumps(node_setting) } self.handle_req('POST', 'doCreateItem', data=params)
[docs] def get(self, name): for item in self.api_json(tree='computer[displayName]')['computer']: if name == item['displayName']: item['url'] = f"{self.url}{item['displayName']}/" return self._new_instance_by_item(__name__, item) return None
[docs] def iter_builds(self): tree = 'computer[oneOffExecutors[currentExecutable[url]]]' for computer in self.api_json(tree, 2)['computer']: for executor in computer.get('oneOffExecutors'): # in case of issue: # https://github.com/joelee2012/api4jenkins/issues/16 if not executor['currentExecutable']: continue yield self._new_instance_by_item('api4jenkins.build', executor['currentExecutable'])
def __iter__(self): for item in self.api_json(tree='computer[displayName]')['computer']: item['url'] = f"{self.url}{item['displayName']}/" yield self._new_instance_by_item(__name__, item)
[docs]class Node(Item, ConfigurationMixIn, DeletionMixIn, RunScriptMixIn):
[docs] def enable(self): if self.offline: self.handle_req('POST', 'toggleOffline', params={'offlineMessage': ''})
[docs] def disable(self, msg=''): if not self.offline: self.handle_req('POST', 'toggleOffline', params={'offlineMessage': msg})
[docs] def iter_builds(self): tree = 'oneOffExecutors[currentExecutable[url]]' for executor in self.api_json(tree, 2)['oneOffExecutors']: yield self._new_instance_by_item('api4jenkins.build', executor['currentExecutable'])
[docs]class MasterComputer(Node): def __init__(self, jenkins, url): super().__init__(jenkins, re.sub(r'/master/$', '/(master)/', url))
[docs]class SlaveComputer(Node): pass
[docs]class KubernetesComputer(Node):
[docs] def exists(self): try: self.handle_req('GET', '') return True except ItemNotFoundError: return False
[docs]class DockerComputer(Node): pass