diff --git a/.gitignore b/.gitignore index 6d59a5c..c60af00 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +py3env/* *.pyc supervisord.pid diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..28543c3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +Logbook==0.12.5 +redis==2.10.5 +tornado==4.3 diff --git a/server/CommandProtocol.py b/server/CommandProtocol.py index 3673d51..21ce41e 100644 --- a/server/CommandProtocol.py +++ b/server/CommandProtocol.py @@ -1,211 +1,129 @@ # This file is part of Propagator, a KDE Sysadmin Project # # Copyright (C) 2015-2016 Boudhayan Gupta # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of KDE e.V. (or its successor approved by the # membership of KDE e.V.) nor the names of its contributors may be used # to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # Protocol Description: # # CREATE reponame - create reponame.git on server and mirrors # RENAME oldrepo newrepo - move/rename oldrepo.git to newrepo.git # UPDATE reponame - sync reponame.git with its mirrors # DELETE reponame - delete reponame.git # FLUSH - try to commit all pending updates import os import shlex -import celery from datetime import datetime from collections import namedtuple - -import CeleryWorkers from ServerConfig import ServerConfig +# task dispatcher +from RemoteControl import RemotePlugins +from RemoteDispatcher import RemoteDispatcher +dispatcher = RemoteDispatcher(RemotePlugins, "redis-experiment") + # protocol exceptions class PropagatorProtocolException(Exception): def logline(self): time = datetime.now().strftime("%Y-%m-%d %k:%M:%S") return "{0} | {1}\n".format(time, str(self)) class InvalidCommandException(PropagatorProtocolException): def __init__(self, desc, command): self.description = desc self.command = command super(InvalidCommandException, self).__init__("{0}: {1}".format(desc, command)) class InvalidActionException(PropagatorProtocolException): def __init__(self, action): self.action = action super(InvalidActionException, self).__init__("Invalid command: {0}".format(action)) -# helper functions - -def isExcluded(repo, config): - - for pattern in config: - p = re.compile(pattern) - if p.match(repo): - return True - return False - -def CreateRepo(repo, upSpec): - - # find our repository and read in the description - repoRoot = ServerConfig.get("RepoRoot") - repoPath = os.path.join(repoRoot, repo) - if not os.path.exists(repoPath): - return - - repoDesc = "This repository has no description" - repoDescFile = os.path.join(repoPath, "description") - if os.path.exists(repoDescFile): - with open(repoDescFile) as f: - repoDesc = f.read().strip() - - # spawn the create tasks - if ServerConfig.get("GithubEnabled") and (not isExcluded(repo, ServerConfig.get("GithubExcepts"))): - CeleryWorkers.CreateRepoGithub.delay(repo, repoDesc) - - if ServerConfig.get("AnongitEnabled") and (not isExcluded(repo, ServerConfig.get("AnongitExcepts"))): - for server in ServerConfig.get("AnongitServers"): - CeleryWorkers.CreateRepoAnongit.delay(repo, server, repoDesc) - -def RenameRepo(srcRepo, destRepo, upSpec): - - if ServerConfig.get("GithubEnabled") and (not isExcluded(repo, ServerConfig.get("GithubExcepts"))): - CeleryWorkers.MoveRepoGithub.delay(srcRepo, destRepo) - - if ServerConfig.get("AnongitEnabled") and (not isExcluded(repo, ServerConfig.get("AnongitExcepts"))): - for server in ServerConfig.get("AnongitServers"): - CeleryWorkers.CreateRepoAnongit.delay(srcRepo, destRepo, server) - -def UpdateRepo(repo, upSpec): - - # find our repository - repoRoot = ServerConfig.get("RepoRoot") - repoPath = os.path.join(repoRoot, repo) - if not os.path.exists(repoPath): - return - - # lift the repo description as we might need to create the repo first - repoDesc = "This repository has no description" - repoDescFile = os.path.join(repoPath, "description") - if os.path.exists(repoDescFile): - with open(repoDescFile) as f: - repoDesc = f.read().strip() - - # spawn push to github task first - if ServerConfig.get("GithubEnabled") and (not isExcluded(repo, ServerConfig.get("GithubExcepts"))): - githubPrefix = ServerConfig.get("GithubPrefix") - githubUser = ServerConfig.get("GithubUser") - githubRemote = "%s@github.com:%s/%s" % (githubUser, githubPrefix, repo) - - createTask = CeleryWorkers.CreateRepoGithub.si(repo, repoDesc) - syncTask = CeleryWorkers.SyncRepo.si(repoPath, githubRemote, True) - celery.chain(createTask, syncTask)() - - # now spawn all push to anongit tasks - if ServerConfig.get("AnongitEnabled") and (not isExcluded(repo, ServerConfig.get("AnongitExcepts"))): - anonUser = ServerConfig.get("AnongitUser") - anonPrefix = ServerConfig.get("AnongitPrefix") - for server in ServerConfig.get("AnongitServers"): - anonRemote = "%s@%s:%s/%s" % (anonUser, server, anonPrefix, repo) - - createTask = CeleryWorkers.CreateRepoAnongit.si(repo, server, repoDesc) - syncTask = CeleryWorkers.SyncRepo.si(repoPath, anonRemote, False) - celery.chain(createTask, syncTask)() - -def DeleteRepo(repo, upSpec): - - if ServerConfig.get("GithubEnabled") and (not isExcluded(repo, ServerConfig.get("GithubExcepts"))): - CeleryWorkers.DeleteRepoGithub.delay(repo) - - if ServerConfig.get("AnongitEnabled") and (not isExcluded(repo, ServerConfig.get("AnongitExcepts"))): - for server in ServerConfig.get("AnongitServers"): - CeleryWorkers.DeleteRepoAnongit.delay(repo, server) +# protocol parse helpers def ParseCommand(cmdString): components = shlex.split(cmdString) action = components[0].lower() if not action in ("create", "rename", "delete", "update"): raise InvalidActionException(action) ActionCommand = namedtuple("ActionCommand", ["action", "arguments", "upstream"]) if action == "create": try: args = { "srcRepo": components[1] } except IndexError: raise InvalidCommandException("create command does not contain source repository details", cmdString) try: upstream = components[2] except IndexError: upstream = None elif action == "update": try: args = { "srcRepo": components[1] } except IndexError: raise InvalidCommandException("update command does not contain source repository details", cmdString) try: upstream = components[2] except IndexError: upstream = None elif action == "delete": try: args = { "srcRepo": components[1] } except IndexError: raise InvalidCommandException("delete command does not contain source repository details", cmdString) try: upstream = components[2] except IndexError: upstream = None elif action == "rename": try: args = { "srcRepo": components[1], "destRepo": components[2] } except IndexError: raise InvalidCommandException("rename command does not contain source and/or destination repository details", cmdString) try: upstream = components[3] except IndexError: upstream = None return ActionCommand(action, args, upstream) def ExecuteCommand(context): if context.action == "create": - CreateRepo(context.arguments.get("srcRepo"), context.upstream) + dispatcher.createRepo(context.arguments.get("srcRepo"), ident = context.upstream) elif context.action == "update": - UpdateRepo(context.arguments.get("srcRepo"), context.upstream) + dispatcher.updateRepo(context.arguments.get("srcRepo"), ident = context.upstream) elif context.action == "delete": - DeleteRepo(context.arguments.get("srcRepo"), context.upstream) + dispatcher.deleteRepo(context.arguments.get("srcRepo"), ident = context.upstream) elif context.action == "rename": - RenameRepo(context.arguments.get("srcRepo"), context.arguments.get("destRepo"), context.upstream) + dispatcher.moveRepo(context.arguments.get("srcRepo"), context.arguments.get("destRepo"), ident = context.upstream) diff --git a/server/Server.py b/server/GatorServer.py similarity index 94% rename from server/Server.py rename to server/GatorServer.py index 5b3a350..7604fff 100644 --- a/server/Server.py +++ b/server/GatorServer.py @@ -1,39 +1,44 @@ #!/usr/bin/python3 # This file is part of Propagator, a KDE Sysadmin Project # # Copyright 2015-2016 (C) Boudhayan Gupta # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of KDE e.V. (or its successor approved by the # membership of KDE e.V.) nor the names of its contributors may be used # to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import sys import tornado.ioloop -import CommandServer + +# set up logging +from logbook import StreamHandler +StreamHandler(sys.stdout).push_application() # start the command server +import CommandServer cmdServer = CommandServer.CommandServer() cmdServer.listen(58192, "::1") # start the ioloop tornado.ioloop.IOLoop.current().start() diff --git a/server/GithubRemote.py b/server/GithubRemote.py index 10b65ea..60cf616 100644 --- a/server/GithubRemote.py +++ b/server/GithubRemote.py @@ -1,118 +1,118 @@ # This file is part of Propagator, a KDE Sysadmin Project # -# Copyright 2015 Boudhayan Gupta +# Copyright (C) 2015-2016 Boudhayan Gupta # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of KDE e.V. (or its successor approved by the # membership of KDE e.V.) nor the names of its contributors may be used # to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import os import sys import requests try: import simplejson as json except ImportError: import json class GithubRemote(object): def __init__(self, name, desc = "This repository has no description"): self.REPO_NAME = name self.REPO_DESC = desc cfgPath = os.environ.get("GATOR_CONFIG_FILE") cfgData = {} with open(cfgPath) as f: cfgData = json.load(f) self.ORGANISATION = cfgData.get("GithubOrganization") self.ACCESS_TOKEN = None with open(os.path.expanduser(cfgData.get("GithubAPIKeyFile"))) as f: self.ACCESS_TOKEN = f.read().strip() self.SESSION = requests.Session() self.SESSION.headers.update({"Accept": "application/vnd.github.v3+json"}) self.SESSION.headers.update({"Authorization": " ".join(("token", ACCESS_TOKEN))}) if self.REPO_NAME.endswith(".git"): self.REPO_NAME = self.REPO_NAME.rstrip(".git") def __repr__(self): return ("" % self.REPO_NAME) def setRepoDescription(self, desc): self.REPO_DESC = desc if self.repoExists(): payload = {"description": desc} url = "/".join(("https://api.github.com/repos", self.ORGANISATION, self.REPO_NAME)) r = self.SESSION.patch(url, data = json.dumps(payload)) return ((r.status_code == 201) and ("id" in r.json.keys())) return True def repoExists(self): url = "/".join(("https://api.github.com/repos", self.ORGANISATION, self.REPO_NAME)) r = self.SESSION.get(url) return ((r.ok) and ("id" in r.json.keys())) def createRepo(self): payload = { "name": self.REPO_NAME, "description": self.REPO_DESC, "private": False, "has_issues": False, "has_wiki": False, "has_downloads": False, "auto_init": False, } url = "/".join(("https://api.github.com/orgs", self.ORGANISATION, "repos")) r = self.SESSION.post(url, data = json.dumps(payload)) return ((r.status_code == 201) and ("id" in r.json.keys())) def deleteRepo(self): url = "/".join(("https://api.github.com/repos", self.ORGANISATION, self.REPO_NAME)) r = self.SESSION.delete(url) return (r.status_code == 204) def moveRepo(self, newname): if newname.endswith(".git"): newname = newname.rstrip(".git") payload = {"name": newname} url = "/".join(("https://api.github.com/repos", self.ORGANISATION, self.REPO_NAME)) r = self.SESSION.patch(url, data = json.dumps(payload)) if (r.status_code == 201) and ("id" in r.json.keys()): self.REPO_NAME = newname return True return False diff --git a/server/RemoteControl.py b/server/RemoteControl.py new file mode 100644 index 0000000..eae06b8 --- /dev/null +++ b/server/RemoteControl.py @@ -0,0 +1,92 @@ +# This file is part of Propagator, a KDE Sysadmin Project +# +# Copyright (C) 2015-2016 Boudhayan Gupta +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of KDE e.V. (or its successor approved by the +# membership of KDE e.V.) nor the names of its contributors may be used +# to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import os + +from importlib.machinery import SourceFileLoader +from logbook import Logger + +try: + import simplejson as json +except ImportError: + import json + +from ServerConfig import ServerConfig + +class RemoteLoader(object): + + def __init__(self): + self.mLogger = Logger("RemoteLoader") + self.mLogger.info("loading remote management plugins...") + + defaultSearchPath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "remotes") + self.mPluginsSearchPaths = [ defaultSearchPath ] + self.mPluginsSearchPaths.extend(ServerConfig.get("RemotePluginsDir", [])) + + self.mLoadedPlugins = {} + self.mTaskMap = {} + + for searchPath in self.mPluginsSearchPaths: + self.mLogger.info(" loading plugins from directory: {0}", searchPath) + for pluginDir in os.listdir(searchPath): + pluginPath = os.path.join(searchPath, pluginDir) + pluginName, pluginEntry = self.loadPlugin(pluginPath) + if pluginName: + self.mLoadedPlugins[pluginName] = pluginEntry + self.mTaskMap[pluginName] = pluginEntry.get("instance").getTaskMap() + self.mLogger.info(" loaded plugin: {0}".format(pluginName)) + self.mLogger.info("done loading remote management plugins") + + def loadPlugin(self, path): + metaFile = os.path.join(path, "metadata.json") + if not os.path.isfile(metaFile): + return (None, None) + + codeFile = os.path.join(path, "EntryPoint.py") + if not os.path.isfile(codeFile): + return (None, None) + + plugin = {} + with open(metaFile) as f: + plugin["meta"] = json.load(f) + pluginName = plugin.get("meta").get("name") + plugin["instance"] = SourceFileLoader("{0}.EntryPoint".format(pluginName), codeFile).load_module() + return (pluginName, plugin) + + def listLoadedPlugins(self): + return self.mLoadedPlugins.keys() + + def taskFunction(self, plugin, taskid): + if not plugin in self.mLoadedPlugins.keys(): + raise NotImplementedError("plugin {0} is not available".format(plugin)) + if not taskid in self.mTaskMap.get(plugin).keys(): + raise NotImplementedError("plugin {0} does not implement task {1}".format(plugin, taskid)) + return self.mTaskMap.get(plugin).get(taskid) + +RemotePlugins = RemoteLoader() diff --git a/server/RemoteDispatcher.py b/server/RemoteDispatcher.py new file mode 100644 index 0000000..6cebcb1 --- /dev/null +++ b/server/RemoteDispatcher.py @@ -0,0 +1,54 @@ +from logbook import Logger +from redis import Redis +from uuid import uuid4 + +try: + import simplejson as json +except ImportError: + import json + +class RemoteDispatcher(object): + + def __init__(self, loader, qkey, host = "localhost", port = 6379, db = 0, password = None): + self.mLogger = Logger("RemoteDispatcher") + self.mLogger.info("connecting to the redis task queue...") + self.mRedisConn = Redis(host = host, port = port, db = db, password = password) + self.mQueueKey = "{}-IncomingTasks".format(qkey) + self.mLogger.info("connected") + self.mRemoteLoader = loader + + def createJob(self, plugin, jobclass, argsdict, depends = None): + jobid = "{0}-{1}".format(plugin, str(uuid4())) + payload = { + "jobclass": "{0}:{1}".format(plugin, jobclass), + "jobid": jobid, + "arguments": argsdict, + "depends": depends + } + self.mRedisConn.rpush(self.mQueueKey, json.dumps(payload)) + return jobid + + def createRepo(self, repo, desc = None, ifexists = False, ident = None): + for plugin in self.mRemoteLoader.listLoadedPlugins(): + createArgs = { "repo": repo, "desc": desc, "ifexists": ifexists } + self.createJob(plugin, "createrepo", createArgs) + + def setRepoDescription(self, repo, desc = None, ident = None): + for plugin in self.mRemoteLoader.listLoadedPlugins(): + descArgs = { "repo": repo, "desc": desc } + self.createJob(plugin, "setdesc", descArgs) + + def moveRepo(self, repo, dest, ident = None): + for plugin in self.mRemoteLoader.listLoadedPlugins(): + moveArgs = { "repo": repo, "dest": dest } + self.createJob(plugin, "moverepo", moveArgs) + + def updateRepo(self, repo, ident = None): + for plugin in self.mRemoteLoader.listLoadedPlugins(): + upArgs = { "repo": repo } + self.createjob(plugin, "updaterepo", upArgs) + + def deleteRepo(self, repo, ident = None): + for plugin in self.mRemoteLoader.listLoadedPlugins(): + delArgs = { "repo": repo } + self.createJob(plugin, "deleterepo", createArgs) diff --git a/server/TaskServer.py b/server/TaskServer.py new file mode 100644 index 0000000..e3a44ec --- /dev/null +++ b/server/TaskServer.py @@ -0,0 +1,100 @@ +#!/usr/bin/python3 +# This file is part of Propagator, a KDE Sysadmin Project +# +# Copyright 2015-2016 (C) Boudhayan Gupta +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of KDE e.V. (or its successor approved by the +# membership of KDE e.V.) nor the names of its contributors may be used +# to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import sys +import os +import argparse +import traceback + +from redis import Redis +from logbook import Logger + +from RemoteControl import RemotePlugins + +try: + import simplejson as json +except ImportError: + import json + +class RedisConsumer(object): + + def __init__(self, qkey, host = "localhost", port = 6379, db = 0, password = None): + self.mLogger = Logger("RedisConsumer") + self.mRedisConn = Redis(host = host, port = port, db = db, password = password) + self.mTaskQueueKey = "{}-IncomingTasks".format(qkey) + self.mFailedQueueKey = "{}-FailedTasks".format(qkey) + self.mDoneQueueKey = "{}-DoneTasks".format(qkey) + + def runSingleTask(self): + queueKey, taskJson = (i.decode() for i in self.mRedisConn.blpop(self.mTaskQueueKey)) + task = json.loads(taskJson) + + plugin, taskid = task.get("jobclass").split(":") + try: + func = RemotePlugins.taskFunction(plugin, taskid) + ret = func(tasks.get("arguments")) + except Exception: + task["return"] = None + task["success"] = False + task["traceback"] = traceback.format_exc() + self.mRedisConn.rpush(self.mFailedQueueKey, json.dumps(task)) + else: + task["return"] = ret + task["success"] = True + self.mRedisConn.rpush(self.mDoneQueueKey, json.dumps(task)) + + def runProcessLoop(self): + self.mLogger.info("consumer is now listening for tasks") + while True: self.runSingleTask() + +def CmdlineParse(): + + parser = argparse.ArgumentParser(prog = "TaskServer.py", description = "Server to process tasks created by the propagator daemon") + parser.add_argument("-e", "--eid", dest = "eid", action = "store", default = os.getpid(), help = "set an identifier for the server, for logging purposes (default is pid)") + + return parser.parse_args() + +if __name__ == "__main__": + + # parse command line arguments + info = CmdlineParse() + + # set up logging + from logbook import StreamHandler, Logger + StreamHandler(sys.stdout).push_application() + logger = Logger("TaskServer-{}".format(info.eid)) + + # start up + logger.info("starting...") + from RemoteControl import RemotePlugins + print(RemotePlugins.listLoadedPlugins()) + + consumer = RedisConsumer("redis-experiment") + consumer.runProcessLoop() diff --git a/server/remotes/anongit/EntryPoint.py b/server/remotes/anongit/EntryPoint.py new file mode 100644 index 0000000..784b198 --- /dev/null +++ b/server/remotes/anongit/EntryPoint.py @@ -0,0 +1,2 @@ +def getTaskMap(): + return {} diff --git a/server/remotes/anongit/metadata.json b/server/remotes/anongit/metadata.json new file mode 100644 index 0000000..c9f5030 --- /dev/null +++ b/server/remotes/anongit/metadata.json @@ -0,0 +1,11 @@ +{ + "name": "anongit", + "description": "Remote management plugin for unmanaged anonymous read-only git servers", + "version": "1.0.0", + "license": "BSD", + "author": "Boudhayan Gupta ", + + "pushtype": "full", + "identspec": "anongit", + "configvar": "GATOR_PCFG_ANONGIT" +} diff --git a/server/remotes/github/EntryPoint.py b/server/remotes/github/EntryPoint.py new file mode 100644 index 0000000..784b198 --- /dev/null +++ b/server/remotes/github/EntryPoint.py @@ -0,0 +1,2 @@ +def getTaskMap(): + return {} diff --git a/server/remotes/github/metadata.json b/server/remotes/github/metadata.json new file mode 100644 index 0000000..28f5cd0 --- /dev/null +++ b/server/remotes/github/metadata.json @@ -0,0 +1,11 @@ +{ + "name": "github", + "description": "Remote management plugin for GitHub Organizations", + "version": "1.0.0", + "license": "BSD", + "author": "Boudhayan Gupta ", + + "pushtype": "restricted", + "identspec": "gh", + "configvar": "GATOR_PCFG_GITHUB" +}