diff --git a/config/propagator.cfg.example b/config/propagator.cfg.example index 3491bbe..e1b588b 100644 --- a/config/propagator.cfg.example +++ b/config/propagator.cfg.example @@ -1,10 +1,12 @@ [general] repobase=/srv/git logs_dir=~/.propagator/logs +max_retries = 5 +retry_interval_step = 10 [smtp] host=localhost port=25 auth=no user=none pass=none diff --git a/propagator/remotes/remotebase.py b/propagator/remotes/dummy.py similarity index 70% copy from propagator/remotes/remotebase.py copy to propagator/remotes/dummy.py index 10e43b6..c5d9d6d 100644 --- a/propagator/remotes/remotebase.py +++ b/propagator/remotes/dummy.py @@ -1,64 +1,59 @@ # This file is part of Propagator, a KDE Sysadmin Project # # Copyright (C) 2015-2016 Boudhayan Gupta # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of KDE e.V. (or its successor approved by the # membership of KDE e.V.) nor the names of its contributors may be used # to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import abc -from logbook import Logger +from propagator.remotes.remotebase import RemoteBase -class RemoteBase(abc.ABC): - def __init__(self): - self.logger = Logger(self.plugin_name) +class Remote(RemoteBase): + def __init__(self, opslog): + self.logger = opslog self.plugin_init() - @abc.abstractproperty + @property def plugin_name(self): - pass + return "dummy" - @abc.abstractmethod def plugin_init(self, *args, **kwargs): - pass + self.logger.info("loaded dummy plugin") + print("loaded dummy plugin") - @abc.abstractmethod - def create(self, args): - pass + def create(self, name, desc): + print("create repo - {}, {}".format(name, desc)) - @abc.abstractmethod - def rename(self, args): - pass + def rename(self, name, dest): + print("rename repo - {}, {}".format(name, dest)) - @abc.abstractmethod - def update(self, args): - pass + def update(self, repo, name): + print("update repo - {}".format(name)) + raise Exception - @abc.abstractmethod - def delete(self, args): - pass + def delete(self, name): + print("delete repo - {}".format(name)) - @abc.abstractmethod - def setdesc(self, args): - pass + def setdesc(self, name, desc): + print("set repo desc: {}, {}".format(name, desc)) diff --git a/propagator/remotes/github/__init__.py b/propagator/remotes/github.py similarity index 100% rename from propagator/remotes/github/__init__.py rename to propagator/remotes/github.py diff --git a/propagator/remotes/remotebase.py b/propagator/remotes/remotebase.py index 10e43b6..3c2f27c 100644 --- a/propagator/remotes/remotebase.py +++ b/propagator/remotes/remotebase.py @@ -1,64 +1,64 @@ # This file is part of Propagator, a KDE Sysadmin Project # # Copyright (C) 2015-2016 Boudhayan Gupta # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of KDE e.V. (or its successor approved by the # membership of KDE e.V.) nor the names of its contributors may be used # to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import abc from logbook import Logger class RemoteBase(abc.ABC): - def __init__(self): - self.logger = Logger(self.plugin_name) + def __init__(self, opslog): + self.logger = opslog self.plugin_init() @abc.abstractproperty def plugin_name(self): pass @abc.abstractmethod def plugin_init(self, *args, **kwargs): pass @abc.abstractmethod def create(self, args): pass @abc.abstractmethod def rename(self, args): pass @abc.abstractmethod def update(self, args): pass @abc.abstractmethod def delete(self, args): pass @abc.abstractmethod def setdesc(self, args): pass diff --git a/propagator/remoteslave/amqp.py b/propagator/remoteslave/amqp.py index ff71e34..298faf7 100644 --- a/propagator/remoteslave/amqp.py +++ b/propagator/remoteslave/amqp.py @@ -1,76 +1,87 @@ # This file is part of Propagator, a KDE Sysadmin Project # # Copyright 2015 Boudhayan Gupta # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of KDE e.V. (or its successor approved by the # membership of KDE e.V.) nor the names of its contributors may be used # to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import pika from propagator.core.config import config_amqp -queue_name_for_slave = lambda slave_name: ".".join(("propagator", "slave", slave_name)) -exchange_name = lambda: "propagator" +queue_name_for_slave = lambda slave_name: "propagator.slave.{}".format(slave_name) +delay_queue_name_for_slave = lambda slave_name: "propagator.slave.{}.delay".format(slave_name) +exchange_name = lambda: "propagator.exchange.master" +delay_exchange_name = lambda: "propagator.exchange.delay" def create_channel(): # get the relevant configuration and set sane defaults amqp_user = config_amqp.get("user", "guest") amqp_pass = config_amqp.get("pass", "guest") amqp_host = config_amqp.get("host", "localhost") amqp_port = config_amqp.get("port", 5672) amqp_vhost = config_amqp.get("vhost", "/") # connect to the amqp server creds = pika.PlainCredentials(amqp_user, amqp_pass) params = pika.ConnectionParameters(amqp_host, amqp_port, amqp_vhost, creds) conn = pika.BlockingConnection(params) channel = conn.channel() # done, return channel return channel def prepare_channel_producer(channel): # just declare the exchange and return the channel channel.exchange_declare(exchange = exchange_name(), exchange_type = "fanout", auto_delete = True) return channel def prepare_channel_consumer(channel, slave_name): # we need the exchange declared too... channel = prepare_channel_producer(channel) # declare the message exchange and a queue for the slave, and bind them queue_name = queue_name_for_slave(slave_name) channel.queue_declare(queue = queue_name, auto_delete = True) channel.queue_bind(queue = queue_name, exchange = exchange_name()) + # ...and the dead-letter exchange + delay_queue_name = delay_queue_name_for_slave(slave_name) + channel.exchange_declare(exchange = delay_exchange_name(), exchange_type = "direct", auto_delete = True) + channel.queue_declare(queue = delay_queue_name, durable = True, arguments = { + "x-dead-letter-exchange": delay_exchange_name(), + "x-dead-letter-routing-key": queue_name + }) + channel.queue_bind(queue = queue_name, exchange = delay_exchange_name(), routing_key = queue_name) + # done, return channel return channel def create_channel_producer(): channel = create_channel() return prepare_channel_producer(channel) def create_channel_consumer(slave_name): channel = create_channel() return prepare_channel_consumer(channel, slave_name) diff --git a/propagator/remoteslave/slavecore.py b/propagator/remoteslave/slavecore.py index 28a39bc..38348dc 100644 --- a/propagator/remoteslave/slavecore.py +++ b/propagator/remoteslave/slavecore.py @@ -1,95 +1,234 @@ # This file is part of Propagator, a KDE Sysadmin Project # # Copyright 2015 Boudhayan Gupta # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of KDE e.V. (or its successor approved by the # membership of KDE e.V.) nor the names of its contributors may be used # to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import sys import os +import git +import pika import signal import logbook import importlib +try: + import simplejson as json +except ImportError: + import json + from propagator import VERSION as version from propagator.core.config import config_general from propagator.remoteslave import amqp class SlaveCore(object): def __init__(self, slave_name): # set up the logger as early as possible self.log = logbook.Logger("RemoteSlave-{}".format(str(os.getpid()))) self.log.info("This is KDE Propagator {} - Remote Slave".format(version)) self.log.info(" Starting...") - # create the operations log handler and load in the slave + # create the operations log handler and load in the slave, and other things self.opslog = self.init_slave_logger(slave_name) - self.remote = self.init_slave_module(slave_name) + self.remote = self.init_slave_module(slave_name).Remote(self.opslog) + self.repobase = config_general.get("repobase") + self.max_retries = config_general.get("max_retries", 5) + self.retry_step = config_general.get("retry_interval_step", 300) * 1000 + self.slave_name = slave_name # set up the amqp channel, and bind it to the consumer callback self.channel = amqp.create_channel_consumer(slave_name) self.channel.basic_consume(self.process_single_message, amqp.queue_name_for_slave(slave_name)) def __call__(self): # set up sigterm to also raise KeyboardInterrupt signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT)) - - # run the main loop self.log.info("listening for new tasks...") try: self.channel.start_consuming() except KeyboardInterrupt: self.channel.stop_consuming() self.log.info("slave is shutting down...") def init_slave_logger(self, slave_name): # get the logs directory and ensure that it exists default_logdir = os.path.expanduser("~/.propagator/logs") logdir = config_general.get("logs_dir", default_logdir) if not os.path.isdir(logdir): os.makedirs(logdir) # fire up a logger with its own handler to redirect to the file logpath = os.path.join(logdir, "remote.{}.log".format(slave_name)) logger = logbook.Logger("slave-{}".format(slave_name)) logger.handlers.append(logbook.FileHandler(logpath)) # done, return logger return logger def init_slave_module(self, slave_name): self.log.info("remote plugin requested: {}".format(slave_name)) plugin_name = "propagator.remotes.{}".format(slave_name) try: - self.remote = importlib.import_module(plugin_name) + remote = importlib.import_module(plugin_name) except ImportError: self.log.critical("remote plugin not found: {}".format(slave_name)) self.log.critical("this slave will now exit.") sys.exit(1) self.log.info("loaded remote plugin: {}".format(slave_name)) + return remote def process_single_message(self, channel, method, properties, body): - print("[*] Body: {}".format(body)) + if type(body) is bytes: + body = body.decode("utf-8") + try: + data = json.loads(body) + except json.JSONDecodeError: + self.log.error("task malformed: {}".format(body)) + channel.basic_ack(method.delivery_tag) + return + + # check the retry count + if not data.get("attempt"): + data["attempt"] = 0; + + # check if message is conditional to only one slave + remote_for = data.get("remote_for") + if remote_for not in (None, self.slave_name): + self.log.debug("skipped conditional task not meant for this slave: {}".format(body)) + channel.basic_ack(method.delivery_tag) + return + + # check for existence and validity of method + valid_ops = ("create", "rename", "update", "delete", "syncdesc") + op = data.get("operation") + if (not op) or (op not in valid_ops): + self.log.error("task malformed: invalid or no operation: {}".format(body)) + channel.basic_ack(method.delivery_tag) + return + + # check for source repo in message + repo = data.get("repository") + if not repo: + self.log.error("task malformed: no repository: {}".format(body)) + channel.basic_ack(method.delivery_tag) + return + + # check if the repo can be handled by this repo + if not self.remote.can_handle_repo(repo): + self.log.debug("repository cannot be handled by this repo, skipping: {}".format(body)) + channel.basic_ack(method.delivery_tag) + return + + # check if the source repo is valid and exists + repo = get_repo(repo) + if not repo and op != "delete": + self.log.error("invalid repository: {}".format(body)) + channel.basic_ack(method.delivery_tag) + return + + ret = getattr(self, "process_op_{}".format(op))(data, repo) + if ret is False: + data["attempt"] = data["attempt"] + 1 + if data["attempt"] > self.max_retries: + self.fail_permanently(data) + else: + message = json.dumps(data) + backoff = str(self.retry_step * data["attempt"]) + self.channel.basic_publish( + exchange = "", + routing_key = amqp.delay_queue_name_for_slave(self.slave_name), + properties = pika.BasicProperties(expiration = backoff), + body = message + ) channel.basic_ack(method.delivery_tag) + + def process_op_create(self, data, repo): + name = data.get("repository") + try: + self.remote.create(name, repo.description) + except Exception: + self.opslog.error("could not create repository: {}".format(name)) + self.opslog.exception() + return False + self.opslog.info("created repository: {}".format(name)) + + def process_op_rename(self, data, repo): + name = data.get("repository") + dest = data.get("destination") + if not dest: + self.log.error("task malformed: no destination: {}".format(body)) + return + try: + self.remote.rename(name, dest) + except Exception: + self.opslog.error("could not create repository: {}".format(name)) + self.opslog.exception() + return False + self.opslog.info("renamed repository: {} -> {}".format(name, dest)) + + def process_op_update(self, data, repo): + name = data.get("repository") + if not repo.branches: + self.opslog.info("skipping update of empty repository: { @abc.abstractmethod}".format(name)) + return + try: + self.remote.update(repo, name) + except Exception: + self.opslog.error("could not update repository: {}".format(name)) + self.opslog.exception() + return False + self.opslog.info("updated repository: {}".format(name)) + + def process_op_delete(self, data, repo): + name = data.get("repository") + try: + self.remote.delete(name) + except Exception: + self.opslog.error("could not delete repository: {}".format(name)) + self.opslog.exception() + return False + self.opslog.info("deleted repository: {}".format(name)) + + def process_op_syncdesc(self, data, repo): + name = data.get("repository") + try: + self.remote.setdesc(name, repo.description) + except Exception: + self.opslog.error("could not sync repository description: {}".format(name)) + self.opslog.exception() + return False + self.opslog.info("synced repository description: {}".format(name)) + + def get_repo(self, repo): + path = os.path.join(self.repobase, repo) + try: + repo = git.Repo(path) + except (git.exc.NoSuchPathError, git.exc.InvalidGitRepositoryError): + return None + return repo + + def fail_permanently(self, data): + pass diff --git a/propagator/utils/__init__.py b/propagator/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/propagator/remotes/remotebase.py b/propagator/utils/mirrorsync.py similarity index 67% copy from propagator/remotes/remotebase.py copy to propagator/utils/mirrorsync.py index 10e43b6..3018963 100644 --- a/propagator/remotes/remotebase.py +++ b/propagator/utils/mirrorsync.py @@ -1,64 +1,46 @@ # This file is part of Propagator, a KDE Sysadmin Project # -# Copyright (C) 2015-2016 Boudhayan Gupta +# Copyright 2015 Boudhayan Gupta # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of KDE e.V. (or its successor approved by the # membership of KDE e.V.) nor the names of its contributors may be used # to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import abc -from logbook import Logger +import sys +import argparse -class RemoteBase(abc.ABC): - def __init__(self): - self.logger = Logger(self.plugin_name) - self.plugin_init() +from propagator.remoteslave import amqp +from propagator.remoteslave.slavecore import SlaveCore - @abc.abstractproperty - def plugin_name(self): - pass +def cmdline_process(): + parser = argparse.ArgumentParser(description = "Sync updates to all repository mirrors through Propagator") + parser.add_argument("reponame", type = str, help = "the name of the repository to update") + parser.add_argument("remotename", type = str, nargs = "*", help = "update only these remotes") + parser.add_argument("-v", "--verbose", type = bool, default = False, help = "give verbose output on the standard output") + args = parser.parse_args() + return args - @abc.abstractmethod - def plugin_init(self, *args, **kwargs): - pass - - @abc.abstractmethod - def create(self, args): - pass - - @abc.abstractmethod - def rename(self, args): - pass - - @abc.abstractmethod - def update(self, args): - pass - - @abc.abstractmethod - def delete(self, args): - pass - - @abc.abstractmethod - def setdesc(self, args): - pass +def main(): + args = cmdline_process() + print(args)