Source code for api4jenkins.queue

# encoding: utf-8
import re
from .item import Item
from .mix import ActionsMixIn


[docs]class Queue(Item):
[docs] def get(self, id): for item in self.api_json(tree='items[id,url]')['items']: if item['id'] == int(id): return QueueItem(self.jenkins, f"{self.jenkins.url}{item['url']}") return None
[docs] def cancel(self, id): self.handle_req('POST', 'cancelItem', params={ 'id': id}, allow_redirects=False)
def __iter__(self): for item in self.api_json(tree='items[url]')['items']: yield QueueItem(self.jenkins, f"{self.jenkins.url}{item['url']}")
# https://javadoc.jenkins.io/hudson/model/Queue.html#buildables # (enter) --> waitingList --+--> blockedProjects # | ^ # | | # | v # +--> buildables ---> pending ---> left # ^ | # | | # +---(rarely)---+
[docs]class QueueItem(Item, ActionsMixIn): def __init__(self, jenkins, url): if not url.startswith('https') and jenkins.url.startswith('https'): url = re.sub(r'^http[s]', 'https', url) super().__init__(jenkins, url) self.id = int(self.url.split('/')[-2]) self._build = None
[docs] def get_job(self): if self._class.endswith('$BuildableItem'): return self.get_build().get_job() task = self.api_json(tree='task[url]')['task'] return self._new_instance_by_item('api4jenkins.job', task)
[docs] def get_build(self): if not self._build: _class = self._class # BlockedItem does not have build if _class.endswith('$LeftItem'): executable = self.api_json('executable[url]')['executable'] self._build = self._new_instance_by_item( 'api4jenkins.build', executable) elif _class.endswith(('$BuildableItem', '$WaitingItem')): for build in self.jenkins.nodes.iter_builds(): # https://javadoc.jenkins.io/hudson/model/Run.html#getQueueId-- # https://javadoc.jenkins.io/hudson/model/Queue.Item.html#getId-- if int(build.queue_id) == self.id: self._build = build break return self._build
[docs] def cancel(self): self.jenkins.queue.cancel(self.id)
# due to item type is dynamic
[docs]class BuildableItem(QueueItem): pass
[docs]class BlockedItem(QueueItem): pass
[docs]class LeftItem(QueueItem): pass
[docs]class WaitingItem(QueueItem): pass