#!/usr/bin/python
"""
amqpclt
"""
import argparse
import datetime
import os
import pprint
import re
import signal
import socket
from subprocess import Popen, PIPE
import ssl
import sys
import syslog
import time
import traceback
import urllib
import uuid
    
try:
    import simplejson as json
except ImportError:
    import json
    
try:
    import hashlib
    md5_hash = hashlib.md5
except ImportError:
    import md5
    md5_hash = md5.md5

from warnings import simplefilter
simplefilter("ignore")

import auth.credential as credential
import messaging.queue as queue
from messaging.message import Message

#pika.log.setup(pika.log.INFO, color=True)
pp = pprint.PrettyPrinter(indent=2)
running = False
CALLBACK = None
CONFIG = dict()
TO_SYSLOG = False

PROG = "amqpclt"
AUTHOR = "Massimo Paladin <massimo.paladin@gmail.com>"
COPYRIGHT = "Copyright (C) 2012 CERN"
VERSION = "0.4"
DATE = "26 September 2012"
SHORT_DESCRIPTION = "versatile AMQP client"
DESCRIPTION = """
**amqpclt** is a versatile tool to interact with messaging brokers speaking 
AMQP and/or message queues (see :py:mod:`messaging.queue`) on disk.

It receives messages (see :py:mod:`messaging.message`) from an incoming
module, optionally massaging them (i.e. filtering and/or modifying), and 
sends them to an outgoing module. Depending on which modules are used, 
the tool can perform different operations.

Here are the supported incoming modules:
<{LIST_BEGIN}>
- broker: connect to a messaging broker using AMQP, subscribe to one 
  or more queues and receive the messages sent by the broker

- queue: read messages from a message queue on disk
  (see :py:mod:`messaging.queue`)
<{LIST_END}>
Here are the supported outgoing modules:
<{LIST_BEGIN}>
- broker: connect to a messaging broker using AMQP and send the messages

- queue: store the messages in a message queue on disk
  (see :py:mod:`messaging.queue`)
<{LIST_END}>
Here are some frequently used combinations:
<{LIST_BEGIN}>
- incoming broker + outgoing queue: drain some destinations, storing 
  the messages on disk

- incoming queue + outgoing broker: (re-)send messages that have been
  previously stored on disk, optionally with modifications (such as
  altering the destination)

- incoming broker + outgoing broker: shovel messages from one broker
  to another
<{LIST_END}>
See the "EXAMPLES" sections for concrete examples.
"""
CONFIGURATION_FILE = """
**amqpclt** can read its options from a configuration file. For this,
the Perl Config::General module is used and the option names are the
same as on the command line. For instance::

    daemon = true
    pidfile = /var/run/amqpclt.pid
    incoming-queue = path=/var/spool/amqpclt
    outgoing-broker-uri = amqp://broker.acme.com:5672/virtual_host
    outgoing-broker-auth = "plain name=guest pass=guest"

Alternatively, options can be nested::

    <outgoing-broker>
        uri = amqp://broker.acme.com:5672/virtual_host
        auth = "plain name=guest pass=guest"
    </outgoing-broker>

Or even::

    <outgoing>
        <broker>
            uri = amqp://broker.acme.com:5672/virtual_host
            <auth>
                scheme = plain
                name = guest
                pass = guest
            </auth>
        </broker>
    </outgoing>

The options specified on the command line have precedence over the
ones found in the configuration file.
"""
CALLBACK_GUIDE = """
**amqpclt** can be given python code to execute on all processed messages.
This can be used for different purposes:
<{LIST_BEGIN}>
- massaging: the code can change any part of the message, including setting
  or removing header fields
  
- filtering: the code can decide if the message must be given to the
  outgoing module or not
  
- displaying: the code can print any part of the message

- copying: the code can store a copy of the message into files or
  message queues
<{LIST_END}>
To use callbacks, the --callback-path or --callback-code option must be used.
The python code must provide functions with the following signature:
<{LIST_BEGIN}>
- start(self, DATA)
  (optional) this will be called when the program starts, with the supplied
  data (see the --callback-data option) as a list reference

- check(self, MESSAGE)
  (mandatory) this will be called when the program has one message to process;
  it will be given the message (see messaging.message.Message) and must return
  either a message (it could be the same one or a new one) or a string
  describing why the message has been dropped

- idle(self)
  (optional) this will be called when the program has no message to process

- stop(self)
  (optional) this will be called when the program stops
<{LIST_END}>
The code can be put in a file, on the command line or in the **amqpclt**
configuration file, using the "here document" syntax.

Here is an example (to be put in the **amqpclt** configuration file) that
prints on stdout a JSON array of messages::

    callback-code = <<EOF
    def start (self):
        self.count = 0
    def check(self, msg):
        if self.count:
            sys.stdout.write(", ")
        else:
            sys.stdout.write("[")
        self.count += 1
        sys.stdout.write(msg.serialize())
        return msg
    def stop(self):
        if self.count:
            sys.stdout.write("]\\n")
        else:
            sys.stdout.write("[]\\n")
    EOF

For simple callback code that only needs the check subroutine, it is enough
to supply the "inside code". If the function definition is missing,
the supplied code will be wrapped with::

    def check(self, msg):
        hdr = msg.header
        ... your code goes here ...
        return msg

This allows for instance to remove the message-id header with something like::

  $ amqpclt ... --callback-code 'del(hdr["foo"])'
"""
EXAMPLES = {"SENDING" : """
Here is an example of a configuration file for a message sender 
daemon (from queue to broker), forcing the persistent header to true
(something which is highly recommended for reliable messaging) and
setting the destination::

    # define the source message queue
    <incoming-queue>
     path = /var/spool/sender
    </incoming-queue>
    # modify the message header on the fly
    callback-code = <<EOF
        hdr["destination"] = "/queue/app1.data"
        hdr["persistent"] = "true"
    EOF
    # define the destination broker
    <outgoing-broker>
        uri = "amqp://broker.acme.com:5672/virtual_host"
    </outgoing-broker>
    # miscellaneous options
    reliable = true
    pidfile = /var/run/sender.pid
    daemon = true
    loop = true
    remove = true
""", 
"RECEIVING" : """
Here is an example of a configuration file for a message receiver
(from broker to queue)::

    # define the source broker
    <incoming-broker>
        uri = "amqp://broker.acme.com:5672/virtual_host"
        <auth>
            scheme = plain
            name = receiver
            pass = secret
        </auth>
    </incoming-broker>
    # define the subscriptions
    <subscribe>
        destination = /queue/app1.data
    </subscribe>
    <subscribe>
        destination = /queue/app2.data
    </subscribe>
    # define the destination message queue
    <outgoing-queue>
        path = /var/spool/receiver
    </outgoing-queue>
    # miscellaneous options
    pidfile = /var/run/receiver.pid
    
To run it as a daemon::

    $ amqpclt --config test.conf --daemon

To use the configuration file above with some options
on the command line to drain the queues::

    $ amqpclt --config test.conf --timeout-inactivity 10
""",
"SHOVELING" : """
Here is an example of a configuration file for a message shoveler
(from broker to broker), clearing some headers on the fly so that messages
can be replayed safely::

    # define the source broker
    <incoming-broker>
        uri = "amqp://broker.acme.com:5672/virtual_host"
    </incoming-broker>
    # define the subscriptions
    <subscribe>
        destination = /queue/app1.data
    </subscribe>
    <subscribe>
        destination = /queue/app2.data
    </subscribe>
    # define the destination broker
    <outgoing-broker>
        uri = "amqp://dev-broker.acme.com:5672/virtual_host"
    </outgoing-broker>
    # modify the message destination
    callback-code = <<EOF
        hdr["destination"] = "/queue/dest_to_be_replayed"
    EOF
""",
"TAPPING" : """
Callback code can also be used to tap messages, i.e. get a copy of all
messages processed by **amqpclt**. Here is some callback code for this purpose
that could for instance be merged with the shoveling code above.
It also shows how to use the --callback-data option::

    callback-code = <<EOF
        def start(self, path, qtype="DQS"):
            self.tap_queue = queue.new({"path" : path, "type" : qtype})
        
        def check(self, msg):
            self.tap_queue.add_message(msg)
            return msg
    EOF

Callback data must be given to specify which message queue to use::

    $ amqpclt --config tap.conf --callback-data "/tmp/tap,DQS"
"""
}

ARGUMENTS = {
    "callback-code" : {"long" : "--callback-code",
              "help" : "execute the Python code on each message, "
                    "see the \"CALLBACK\" section for more information",
              "metavar" : "CODE",
            },
    "callback-data" : {"long" : "--callback-data",
              "help" : "pass this data to the user supplied callback code, "
                    "see the \"CALLBACK\" section for more information",
              "metavar" : "VALUE,...",
            },
    "callback-path" : {"long" : "--callback-path",
              "help" : "execute the Python code in the given file on each "
                    "message, see the \"CALLBACK\" section for more "
                    "information",
              "metavar" : "PATH",
            },
    "conf" : {"long" : "--conf",
              "help" : "use the given configuration file, see the"
                       "CONFIGURATION FILE section for more information",
              "metavar" : "PATH",
            },
    "count" : {"long" : "--count",
               "short" : "-c",
               "type" : int,
               "help" : "process at most the given number of messages; "
                        "note: when using an incoming broker, to avoid "
                        "consuming more messages, it is recommended to "
                        "enable the --reliable option",
               "metavar" : "INTEGER",
            },
    "daemon" : {"long" : "--daemon",
              "action" : "store_true",
              "help" : "detach **amqpclt** so that it becomes a daemon "
                    "running in the background; debug, warning and error "
                    "messages will get sent to syslog",
            },
    "debug" : {"long" : "--debug",
              "type" : int,
              "help" : "show debugging information, get an integer value"
                        "as debug value",
            },
    "duration" : {"long" : "--duration",
              "type" : int,
              "help" : "process messages during at most the given number of "
                       "seconds and then stop",
              "metavar" : "SECONDS",
            },
#    "heartbeat" : {"long" : "--heartbeat",
#              "action" : "store_true",
#              "help" : "enable AMQP heart-beats between **amqpclt** and "
#                       "the broker(s)",
#            },
    "help" : {"short" : "-h",
              "long" : "--help",
              "action" : "help",
              "help" : "print the help page"},
    "incoming-broker-auth" : {"long" : "--incoming-broker-auth",
              "help" : "use this authentication string "
                    "(see :py:mod:`auth.credential`) "
                    "to authenticate to the incoming broker",
              "metavar" : "STRING",
            },
    "incoming-broker-module" : {"long" : "--incoming-broker-module",
              "help" : "module to use (pika|kombu)",
              "metavar" : "STRING",
            },
    "incoming-broker-type" : {"long" : "--incoming-broker-type",
              "help" : "set the incoming broker type; this can be useful "
                       "when using features which are broker specific",
              "metavar" : "STRING",
            },
    "incoming-broker-uri" : {"long" : "--incoming-broker-uri",
              "help" : "use this authentication URI "
                       "to connect to the incoming broker",
              "metavar" : "URI",
            },
    "incoming-queue" : {"long" : "--incoming-queue",
              "help" : "read incoming messages from the given message queue "
                    "(see :py:mod:`messaging.queue`)",
              "metavar" : "KEY=VALUE...",
            },
    "lazy" : {"long" : "--lazy",
              "action" : "store_true",
              "help" : "initialize the outgoing module only after having "
                       "received the first message",
            },
    "loop" : {"long" : "--loop",
              "action" : "store_true",
              "help" : "when using an incoming message queue, loop over it",
            },
    "outgoing-broker-auth" : {"long" : "--outgoing-broker-auth",
              "help" : "use this authentication string "
                       "(see :py:mod:`auth.credential`) "
                       "to authenticate to the outgoing broker",
              "metavar" : "STRING",
            },
    "outgoing-broker-module" : {"long" : "--outgoing-broker-module",
              "help" : "module to use (pika|kombu)",
              "metavar" : "STRING",
            },
    "outgoing-broker-type" : {"long" : "--outgoing-broker-type",
              "help" : "set the outgoing broker type; this can be useful "
                       "when using features which are broker specific",
              "metavar" : "STRING",
            },
    "outgoing-broker-uri" : {"long" : "--outgoing-broker-uri",
              "help" : "use this authentication URI to connect to the "
                       "outgoing broker",
              "metavar" : "URI",
            },
    "outgoing-queue" : {"long" : "--outgoing-queue",
              "help" : "store outgoing messages into the given message queue "
                       "(see :py:mod:`messaging.queue`)",
              "metavar" : "KEY=VALUE...",
            },
    "pidfile" : {"long" : "--pidfile",
                 "help" : "use this pid file",
                 "metavar" : "PATH",
            },
    "pod" : {"long" : "--pod",
             "action" : "store_true",
             "help" : "print the pod guide"},
    "prefetch" : {"long" : "--prefetch",
              "type" : int,
              "help" : "set the prefetch value (i.e. the maximum number of "
                    "messages to received without acknowledging them) on the "
                    "incoming broker",
              "metavar" : "INTEGER",
            },
    "quit" : {"long" : "--quit",
              "action" : "store_true",
              "help" : "tell another instance of **amqpclt** (identified by "
                    "its pid file, as specified by the --pidfile option) "
                    "to quit",
            },
    "reliable" : {"long" : "--reliable",
              "action" : "store_true",
              "help" : "use AMQP features for more reliable messaging "
                    "(i.e. client side acknowledgments) at the "
                    "cost of less performance",
            },
    "remove" : {"long" : "--remove",
              "action" : "store_true",
              "help" : "when using an incoming message queue, remove the "
                       "processed messages",
            },
    "rst" : {"long" : "--rst",
             "action" : "store_true",
             "help" : "print the rst guide"},
    "statistics" : {"long" : "--statistics",
              "action" : "store_true",
              "help" : "report statistics at the end of the execution",
            },
    "status" : {"long" : "--status",
              "action" : "store_true",
              "help" : "get the status of another instance of **amqpclt** "
                    "(identified by its pid file, as specified by the "
                    "--pidfile option); the exit code will be zero if the "
                    "instance is alive and non-zero otherwise",
            },
    "subscribe" : {"long" : "--subscribe",
              "action" : "append",
              "help" : "use these options in the AMQP subscription used "
                    "with the incoming broker; this option can be given "
                    "multiple times",
              "metavar" : "KEY=VALUE...",
            },
    "timeout-connect" : {"long" : "--timeout-connect",
              "type" : float,
              "help" : "use this timeout when connecting to the broker; "
                       "can be fractional",
              "metavar" : "SECONDS",
            },
    "timeout-inactivity" : {"long" : "--timeout-inactivity",
              "type" : float,
              "help" : "use this timeout in the incoming module to stop "
                    "**amqpclt** when no new messages have been received "
                    "(aka drain mode); can be fractional",
              "metavar" : "SECONDS",
            },
    "timeout-linger" : {"long" : "--timeout-linger",
              "type" : float,
              "help" : "when stopping **amqpclt**, use this timeout to finish "
                       "interacting with the broker; can be fractional",
              "metavar" : "SECONDS",
            },
    "version" : {"long" : "--version",
                 "action" : "version",
                 "version" : "%s %s" % (PROG, VERSION),
                 "help" : "print the program version"},
    "window" : {"long" : "--window",
                "type" : int,
                "help" : "keep at most the given number of "
                         "not-yet-acknowledged messages in memory",
                "metavar" : "INTEGER",
            },
}

DEFAULT_OPTIONS = {
                  "daemon"      : False,
                  "debug"       : 0,
#                  "heartbeat"   : False,
                  "lazy"        : False,
                  "loop"        : False,
                  "remove"      : False,
                  "reliable"    : False,
                  "statistics"  : False,
                  }

###################################### BEGIN COMMON CODE
#### Daemon helper
def daemonize():
    """ Daemonize. UNIX double fork mechanism. """
    try: 
        pid = os.fork() 
        if pid > 0:
            # exit first parent
            sys.exit(0) 
    except OSError:
        error = sys.exc_info()[1]
        sys.stderr.write("fork #1 failed: %d (%s)\n"
                         % (error.errno, error.strerror))
        sys.exit(1)
    # decouple from parent environment
    os.chdir('/') 
    os.setsid() 
    os.umask(0) 
    # do second fork
    try: 
        pid = os.fork() 
        if pid > 0:
            # exit from second parent
            sys.exit(0) 
    except OSError:
        error = sys.exc_info()[1]
        sys.stderr.write("fork #2 failed: %d (%s)\n"
                         % (error.errno, error.strerror))
        sys.exit(1) 
    sys.stdout.flush()
    sys.stderr.flush()
    stdin = open(os.devnull, 'r')
    stdout = open(os.devnull, 'a+')
    stderr = open(os.devnull, 'a+')
    os.dup2(stdin.fileno(), sys.stdin.fileno())
    os.dup2(stdout.fileno(), sys.stdout.fileno())
    os.dup2(stderr.fileno(), sys.stderr.fileno())

#### PID helpers
class PIDError(Exception):
    """ PID related errors. """
    
def pid_read(path, action=False):
    """ Return the pid content. """
    content = (None, None)
    if not os.path.exists(path):
        if action:
            return ("", None)
        return ""
    try:
        pid_file = open(path, "r")
        pid_content = pid_file.readlines()
        if len(pid_content) == 1:
            content = (int(pid_content[0]), None)
        else:
            content = (int(pid_content[0]), pid_content[1].strip())
    except IOError:
        error = sys.exc_info()[1]
        raise IOError("cannot read pidfile %s: %s" % (path, error))
    else:
        pid_file.close()
    if action:
        return content
    return content[0]

def pid_touch(path):
    """ Touch the pid. """
    try:
        os.utime(path, None)
    except OSError:
        raise OSError("cannot utime pidfile %s" % path)
    else:
        return True

def pid_write(path, pid, action=None, excl=False):
    """ Write content to the pid. """
    try:
        if excl:
            mode = os.O_WRONLY | os.O_CREAT | os.O_EXCL
        else:
            mode = os.O_WRONLY | os.O_CREAT
        pid_file = os.open(path, mode)
        content = "%s\n" % pid
        if action is not None:
            content += "%s\n" % action
        os.write(pid_file, content)
    except IOError:
        error = sys.exc_info()[1]
        raise IOError("cannot write to %s: %s" % (path, error.strerror))
    except OSError:
        error = sys.exc_info()[1]
        raise IOError("cannot open pidfile %s: %s" %
                      (path, error.strerror))
    else:
        os.close(pid_file)
        return pid
    return None

def pid_check(path):
    """ Check the pid content and return the action if present. """
    (pid, action) = pid_read(path, True)
    if pid is None:
        return
    if not pid:
        raise PIDError("lost or corrupted pid file: %s\n" % path)
    if pid != os.getpid():
        raise PIDError("pidfile has been taken by another pid: %s\n" % pid)
    if action is None:
        return ""
    return action
    
def pid_quit(path, program=""):
    """ Write quit to the pid. """
    pid = pid_read(path)
    if pid is None:
        return
    if pid:
        if program:
            print("%s (pid %d) is being told to quit..." % 
                  (program, pid))
        if pid_write(path, pid, "quit") is None:
            return
        for _ in range(5):
            try:
                os.kill(pid, 0)
                time.sleep(1)
            except OSError:
                break
        try:
            os.kill(pid, 0)
            if program:
                print("%s (pid %d) is still running, killing it now...\n" %
                      (program, pid))
            signals = [signal.SIGTERM, signal.SIGINT,
                       signal.SIGQUIT, signal.SIGKILL]
            for sig in signals:
                try:
                    os.kill(pid, sig)
                except OSError:
                    log_warning("cannot kill(%d, %d)" % (pid, sig))
                try:
                    os.kill(pid, 0)
                except OSError:
                    break
                time.sleep(1)
            try:
                os.kill(pid, 0)
                raise PIDError("could not kill %d" % pid)
            except OSError:
                if program:
                    print("%s (pid %d) has been successfully killed\n" % 
                          (program, pid))
        except OSError:
            if program:
                print("%s (pid %d) does not seem to be running anymore" %
                      (program, pid))
    elif program:
        print ("%s does not seem to be running" % (program, ))
    if os.path.isfile(path):
        try:
            log_warning("removing pid file %s\n" % path)
            os.remove(path)
        except OSError:
            raise PIDError("failed to remove pid file: %s" % path)
    return pid

def pid_status(path, maxage=None):
    """ Return the pid status. """
    pid = pid_read(path)
    if pid is None:
        return (None, None)
    if not pid:
        return (3, "does not seem to be running")
    try:
        os.kill(pid, 0)
    except OSError:
        return (3, "pid %s does not seem to be running anymore" % pid)
    if maxage is None:
        return (0, "seems to be running")
    try:
        stat = os.stat(path)
    except OSError:
        return (3, "(pid %d) does not have its pidfile anymore" % pid)
    fileage = time.time() - stat.st_mtime
    mdate = datetime.datetime.fromtimestamp(stat.st_mtime)
    if fileage > maxage:
        return (1, "(pid %d) is not running since %s" % (pid, mdate))
    return (0, "(pid %d) was active on %s" % (pid, mdate))
    
def pid_remove(path):
    """ Remove the pidfile. """
    pid = pid_read(path)
    if pid is None:
        return
    if os.getpid() != pid:
        return
    try:
        os.remove(path)
    except OSError:
        error = sys.exc_info()[1]
        raise OSError("cannot remove pidfile %s: %s" % (path, error))
    else:
        return pid
    
def log_exceptions(re_raise=True):
    """
    Log exceptions to configured log and re raise the exception or exit.
    """
    def out_function(in_function):
        """ Wrap function. """
        def out_function(*args, **kwargs):
            """ Wrapping and catching exceptions. """
            __name__ = in_function.__name__
            try:
                in_function(*args, **kwargs)
            except SystemExit:
                raise sys.exc_info()[1]
            except Exception:
                (_, error, error_tb) = sys.exc_info()
                log_debug("%s" % (" ".join(traceback.format_tb(error_tb)),))
                log_error("%s" % (error,))
                if re_raise:
                    raise error
                else:
                    sys.exit(1)
        return out_function
    return out_function
###################################### END COMMON CODE

def print_rst():
    """ Print the rst for the web page. """
    out =  "%s\n" % PROG
    out += "===================\n\n"
    out += "%s %s - %s\n\n" % (PROG, VERSION, SHORT_DESCRIPTION,)
    out += "SYNOPSIS\n"
    out += "--------\n\n"
    out += "**%s** *[OPTIONS]*\n\n" % PROG
    out += "DESCRIPTION\n"
    out += "-----------\n\n"
    out += "%s\n\n" % DESCRIPTION
    out += "OPTIONS\n"
    out += "-------\n\n"
    options = ""
    for _, elopts in sorted(ARGUMENTS.iteritems()):
        options += "**"
        if "short" in elopts:
            options += "%s, " % elopts["short"]
        options += "%s**" % elopts["long"]
        if elopts.get("action", None) is None:
            if "metavar" in elopts:
                options += " *%s*" % elopts["metavar"]
            else:
                options += " *%s*" % \
                        elopts.get("long").replace("-", "").upper()
        else:
            options += ""
        options += "\n\t%s\n\n" % elopts.get("help", "")
    out += options
    for title, text in [("CONFIGURATION FILE", CONFIGURATION_FILE),
                        ("CALLBACK", CALLBACK_GUIDE),
                        ("EXAMPLES", EXAMPLES),
                        ("AUTHOR", "%s - %s" % (AUTHOR, COPYRIGHT))]:
        if title == "EXAMPLES":
            out += "%s\n%s\n\n" % (title, len(title) * "-")
            for stitle, example in text.items():
                out += "%s\n%s\n\n%s\n\n" % \
                        (stitle, len(stitle) * ".", example)
        else:
            out += "%s\n%s\n\n%s\n\n" % (title, len(title) * "-", text)
    out = out.replace("<{LIST_BEGIN}>", "")\
             .replace("<{LIST_END}>", "")
    print(out)

def print_pod():
    """ Print the pod for the man page. """
    out = "=head1 NAME\n\n"
    out += "%s %s - %s\n\n" % (PROG, VERSION, SHORT_DESCRIPTION,)
    out += "=head1 SYNOPSIS\n\n"
    out += "B<%s> I<[OPTIONS]>\n\n" % PROG
    out += "=head1 DESCRIPTION\n\n"
    out += "%s\n\n" % DESCRIPTION
    out += "=head1 OPTIONS\n\n=over\n\n"
    options = ""
    for _, elopts in sorted(ARGUMENTS.iteritems()):
        options += "=item B<"
        if "short" in elopts:
            options += "%s, " % elopts["short"]
        options += "%s>" % elopts["long"]
        if elopts.get("action") is None:
            if "metavar" in elopts:
                options += " I<%s>" % elopts.get("metavar")
            else:
                options += " I<%s>" % \
                            elopts.get("long").replace("-", "").upper()
        else:
            options += ""
        options += "\n\n%s\n\n" % elopts.get("help", "")
    options += "=back\n\n"
    out += options
    for title, text in [("CONFIGURATION FILE", CONFIGURATION_FILE),
                        ("CALLBACK", CALLBACK_GUIDE),
                        ("EXAMPLES", EXAMPLES),
                        ("AUTHOR", "%s - %s" % (AUTHOR, COPYRIGHT))]:
        if title == "EXAMPLES":
            out += "=head1 %s\n\n" % (title, )
            for stitle, example in text.items():
                out += "=head2 %s\n\n%s\n\n" % (stitle, example)
        else:
            out += "=head1 %s\n\n%s\n\n" % (title, text)
    out = out.replace("::", ":")\
             .replace("**%s**" % PROG, "B<%s>" % PROG)\
             .replace("<{LIST_BEGIN}>", "\n=over\n")\
             .replace("<{LIST_END}>", "\n=back\n")\
             .replace("\n- ", "\n=item *\n\n")
    print(out)

### Configuration helpers
def _normalize_bool(tree):
    """ Normalize the dict. """
    for key, value in tree.items():
        if type(value) == dict:
            _normalize_bool(value)
        elif type(value) in [str, unicode]:
            if value.lower() == "true":
                tree[key] = True
            elif value.lower() == "false":
                tree[key] = False

def read_apache_config(path):
    """ Read Apache style config files. """
    cmd = "perl -e 'use Config::General qw(ParseConfig);" + \
            "use JSON qw(to_json);print(to_json({ParseConfig(" + \
            "-ConfigFile => $ARGV[0])}))' %s" % (path, )
    proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
    out, err = proc.communicate()
    if err:
        raise ValueError(str(err).strip())
    data = json.loads(out)
    _normalize_bool(data)
    return data

_EXPLOSION_RE = re.compile("^(\w+)-(.+)$")
def _explode_dict(given):
    """ Explode all the keys having a "-". """
    for key in given.keys():
        match = _EXPLOSION_RE.match(key)
        if match:
            if match.group(1) in given:
                given[match.group(1)].update(
                        {match.group(2) : given.pop(key)})
            else:
                given[match.group(1)] = {
                        match.group(2) : given.pop(key)}
    for item in given.values():
        if type(item) == dict:
            _explode_dict(item)
            
def _treeval(tree, name, default=None):
    """ Return the value of the given option in an exploded dict. """
    if type(tree) == dict and name in tree:
        return tree[name]
    match = _EXPLOSION_RE.match(name)
    if match is None:
        return default
    if match.group(1) not in tree:
        return default
    return _treeval(tree[match.group(1)], match.group(2))

def _treeset(tree, name, value):
    """ Set the value in the tree. """
    match = _EXPLOSION_RE.match(name)
    if match is not None:
        tree.setdefault(match.group(1), dict())
        _treeset(tree[match.group(1)], match.group(2), value)
    elif type(tree) == dict:
        tree[name] = value
    else:
        raise AmqpcltError("cannot set %s in %s" % (name, tree))

def _merge(first, second):
    """ Merge two dictionaries. """
    merge = first.copy()
    merge.update(second)
    return merge

def mutex(tree, *options):
    """ Given options must be mutually exclusive. """
    found = None
    for option in options:
        if _treeval(tree, option) is None:
            continue
        if found is not None:
            raise AmqpcltError("options %s and %s are mutually exclusive" %
                               (found, option))
        else:
            found = option

def reqall(tree, first, *options):
    """ If the first option is present require all the others. """
    if first is not None and _treeval(tree, first) is None:
        return
    for option in options:
        if _treeval(tree, option) is not None:
            continue
        if first is None:
            raise AmqpcltError("options %s is required" % (option,))
        raise AmqpcltError("option %s requires option %s" % (first, option))
    
def reqany(tree, first, *options):
    """ If the first option is set, at least one of the others is required. """
    if first is not None and _treeval(tree, first) is None:
        return
    for option in options:
        if _treeval(tree, option) is not None:
            return
    if first is None:
        raise AmqpcltError("one of this option is required: %s" %
                       (", ".join(options)))
    raise AmqpcltError("option %s requires one of: %s" %
                       (first, ", ".join(options)))

def get_parser():
    """ Create the parser. """
    parser = argparse.ArgumentParser(
                prog=PROG,
                epilog="AUTHOR\n\n%s - %s" % (AUTHOR, COPYRIGHT),
                argument_default=argparse.SUPPRESS,
                formatter_class=lambda prog:
                                    argparse.RawDescriptionHelpFormatter(
                                                prog, max_help_position=33))
    for name, elopts in ARGUMENTS.items():
        if name == "help":
            continue
        t_args = list()
        t_kwargs = elopts.copy()
        for arg in ["short", "long"]:
            if arg in t_kwargs:
                t_args.append(t_kwargs.pop(arg))
        t_kwargs["dest"] = name
        parser.add_argument(*t_args, **t_kwargs)
    return parser
    
def read_args():
    """ Read the arguments. """
    parsed = get_parser().parse_args()
    if getattr(parsed, "pod", False):
        print_pod()
        sys.exit(0)
    if getattr(parsed, "rst", False):
        print_rst()
        sys.exit(0)
    return vars(parsed)
    
######### Parse helpers
_SEP_CHARS = re.compile("[, ]")
_SENDER_DEST_RECURSIVE = re.compile("^(exchange|arguments)$")
_STOMP_DEST = re.compile("(/queue/|/topic/)(.*)")
def _parse_sender_destination(entity):
    """ Parse sender destination. """
    entity = entity.encode()
    destination = dict()
    if entity.startswith("/queue/"):
        destination["routing_key"] = entity[7:]
        destination["exchange"] = {"name" : ""}
        destination["queue"] = {"name" : entity[7:],
                         "durable" : True,
                         "exclusive": False,
                         "auto_delete": False}
    elif entity.startswith("/topic/"):
        destination["routing_key"] = entity[7:]
        destination["exchange"] = {"name" : "amq.topic"}
        destination["queue"] = {"durable" : False,
                                "auto_delete" : True,
                                "exclusive" : True}
    else:
        for element in _SEP_CHARS.split(entity):
            if not element:
                continue
            (key, val) = element.split("=")
            if _SENDER_DEST_RECURSIVE.match(key):
                val = _parse_sender_destination(urllib.unquote(val))
            elif val == "false":
                val = False
            elif val == "true":
                val = True
            destination[key] = val
    return destination

_SUBSCRIBE_DEST_RECURSIVE = re.compile("^(exchange|queue|arguments)$")
def _parse_subscribe_destination(entity):
    """ Parse subscribe destination. """
    entity = entity.encode()
    destination = dict()
    if entity.startswith("/queue/"):
        destination["exchange"] = {"name" : ""}
        destination["queue"] = {"name" : entity[7:],
                         "durable" : True,
                         "exclusive": False,
                         "auto_delete": False}
    elif entity.startswith("/topic/"):
        destination["routing_key"] = entity[7:]
        destination["exchange"] = {"name" : "amq.topic"}
        destination["queue"] = {"durable" : False,
                         "auto_delete" : True,
                         "exclusive" : True}
    else:
        for element in _SEP_CHARS.split(entity):
            if not element:
                continue
            (key, val) = element.split("=")
            if _SUBSCRIBE_DEST_RECURSIVE.match(key):
                val = _parse_sender_destination(urllib.unquote(val))
            elif val == "false":
                val = False
            elif val == "true":
                val = True
            destination[key] = val
    return destination

_AMQP_RE = re.compile(
    r'^amqp://'\
    '((?P<host>[^/:]*)([:](?P<port>[^/]*))?)' \
    '/(?P<virtual_host>.*)/?' \
    )
def _parse_amqp_uri(uri):
    """ Parse an AMQP uri and return a dict with its key value pairs. """
    uri_match = _AMQP_RE.match(uri)
    if uri_match:
        return uri_match.groupdict()
    else:
        raise ValueError("invalid uri, required format is: "
                         "amqp://host:port/virtual_host, given uri: %s" %
                         (uri, ))
##################
def get_uuid():
    """ Return a new uuid. """
    return str(uuid.uuid1())
### amqpclt error
class AmqpcltError(Exception):
    """ AMQP client error. """
### DEBUG stuff
def log_debug(message, severity=1):
    """ Debug helper. """
    if severity > CONFIG["debug"]:
        return
    if TO_SYSLOG:
        syslog.syslog(message)
    else:
        sys.stdout.write("%s\n" % message)
        sys.stdout.flush()
        
def log_error(message):
    """ Log an error. """
    if TO_SYSLOG:
        syslog.syslog(syslog.LOG_ERR, message)
    else:
        sys.stderr.write("%s\n" % message)
        sys.stderr.flush()
        
def log_warning(message):
    """ Log a warning. """
    if TO_SYSLOG:
        syslog.syslog(syslog.LOG_WARNING, message)
    else:
        sys.stdout.write("%s\n" % message)
        sys.stdout.flush()

### Callback helpers
def _callback_module(code):
    """ Build the callback. """
    class Callback(object):
        """ Callback class. """
        def start(self, *args):
            """ Called when callback start. """
        #def check(self, message):
        #    """ Called when one message is received from incoming module. """
        def idle(self):
            """ Called when in idle and nothing to do. """
        def stop(self):
            """ Called when callback is stopped. """
        exec code
        #start = classmethod(start)
        #check = classmethod(check)
        #idle = classmethod(idle)
        #stop = classmethod(stop)
    return Callback
##################
class PikaAdapter(object):
    """ Pika library adapter. """
    direction = "unknown"
        
    def __init__(self):
        """ Initialize Pika adapter """
        if "pika" not in globals():
            global pika
            import pika
            if re.match("^0.9.6.*", pika.__version__):
                def sanitize(self, key):
                    """ sanitize """
                    if hasattr(key, 'method') and hasattr(key.method, 'NAME'):
                        return key.method.NAME
                    if hasattr(key, 'NAME'):
                        return key.NAME
                    if hasattr(key, '__dict__') and 'NAME' in key.__dict__:
                        return key.__dict__['NAME']
                    return str(key)
                pika.callback.CallbackManager.sanitize = sanitize
        self._connection = None
        self._channel = None
        self._exchange = dict()
        self._queue = dict()
        self._bind = dict()
    
    def _maybe_declare_exchange(self, exch_props):
        """ May be declare an exchange. """
        exch_name = exch_props.get("name", "")
        if (exch_name and
            not exch_name.startswith("amq.") and
            exch_name not in self._exchange):
            log_debug("declaring %s exchange: %s" % (self.direction,
                                                     exch_name), 3)
            self._exchange[exch_name] = 1
            self._channel.exchange_declare(
                         exchange=exch_name,
                         durable=exch_props.get("durable", False),
                         type=exch_props.get("type", "direct"),
                         auto_delete=exch_props.get("auto_delete", False),
                         arguments=exch_props.get("arguments", dict())
            )
        return exch_name
    
    def _maybe_declare_queue(self, queue_props):
        """ May be declare a queue. """
        queue_name = queue_props.get("name", "")
        if queue_name and queue_name in self._queue:
            return queue_name
        result = self._channel.queue_declare(
                        queue=queue_name,
                        durable=queue_props.get("durable", False),
                        exclusive=queue_props.get("exclusive", False),
                        auto_delete=queue_props.get("auto_delete", False),
                        arguments=queue_props.get("arguments", dict()))
        if not queue_name:
            # amq generated queue
            queue_name = result.method.queue
        log_debug("incoming queue declared: %s" % queue_name, 3)
        self._queue[queue_name] = 1
        return queue_name
    
    def _maybe_bind(self, queue_name, exchange, routing_key=""):
        """ May be bind a queue to an exchange. """
        bind_id = md5_hash(queue_name + exchange + routing_key).hexdigest()
        if bind_id in self._bind:
            return
        log_debug(("binding incoming queue: queue=%s, exchange=%s," + \
                   " routing_key=%s") %
                   (queue_name, exchange, routing_key), 3)
        self._channel.queue_bind(queue=queue_name,
                                 exchange=exchange,
                                 routing_key=routing_key)
        self._bind[bind_id] = 1
    
    def connect(self):
        """ Create a pika AMQP connection and channel. """
        direction = self.direction
        config = CONFIG[direction]["broker"]
        params = _parse_amqp_uri(config["uri"])
        cred = config.get("auth")
        if cred is None:
            cred = credential.new(scheme="none")
        if cred.scheme == "x509":
            if pika.__version__ < "0.9.6":
                raise AmqpcltError(
                            "x509 authentication not supported in pika %s" %
                            pika.__version__)
            ssl_options = dict()
            for key, keyval in {"cert" : "certfile",
                               "key" : "keyfile", "ca" : "ca_certs"}.items():
                if key in cred:
                    ssl_options[keyval] = cred[key]
            ssl_options["cert_reqs"] = ssl.CERT_REQUIRED
            ssl_options["ssl_version"] = ssl.PROTOCOL_SSLv3
            extra = {"ssl" : True,
                     "ssl_options" : ssl_options,
                     "credentials" : pika.credentials.ExternalCredentials() }
        elif cred.scheme == "plain":
            extra = {
                "credentials" : pika.credentials.PlainCredentials(
                                        cred['name'], cred['pass']), }
        else:
            # none
            extra = dict()
        #if CONFIG.get("heartbeat") is not None:
        #    extra["heartbeat"] = CONFIG["heartbeat"]
        parameters = pika.connection.ConnectionParameters(
            params['host'].encode(),
            int(params['port']),
            params.get('virtual_host', "rabbitmq").encode(), **extra)
        self._connection = connection = pika.BlockingConnection(parameters)
        self._channel = connection.channel()
        log_debug("%s broker %s:%s: %s %s" %
                  (direction, params['host'], params['port'],
                  self.amqp_type(),
                  connection.server_properties.get("version", "UNKNOWN"),), 1)
        if _treeval(CONFIG, "%s-broker-type" % direction) is None:
            _treeset(CONFIG,
                     "%s-broker-type" % direction, self.amqp_type())
        return True
    
    def amqp_type(self):
        """ Return the broker type. """
        if self._connection is None:
            return None
        return self._connection.server_properties.get("product", "UNKNOWN")
    
class KombuAdapter(object):
    """ Kombu library adapter. """
    direction = "unknown"
    
    def __init__(self):
        """ Initialize Kombu adapter """
        if "kombu" not in globals():
            global kombu
            import kombu
        self._connection = None
        self._channel = None
        self._exchange = dict()
        self._queue = dict()
        self._bind = dict()
    
    def _maybe_declare_exchange(self, exch_props):
        """ May be declare an exchange. """
        exch_name = exch_props.get("name", "")
        if (exch_name and
            not exch_name.startswith("amq.") and
            exch_name not in self._exchange):
            log_debug("declaring %s exchange: %s" % (self.direction,
                                                     exch_name), 3)
            self._exchange[exch_name] = 1
            self._channel.exchange_declare(
                         exchange=exch_name,
                         durable=exch_props.get("durable", False),
                         type=exch_props.get("type", "direct"),
                         auto_delete=exch_props.get("auto_delete", False),
                         arguments=exch_props.get("arguments", dict())
            )
        return exch_name
    
    def _maybe_declare_queue(self, queue_props):
        """ May be declare a queue. """
        queue_name = queue_props.get("name", "")
        if queue_name and queue_name in self._queue:
            return queue_name
        if not queue_name:
            queue_name = get_uuid()
        result = self._channel.queue_declare(
                        queue=queue_name,
                        durable=queue_props.get("durable", False),
                        exclusive=queue_props.get("exclusive", False),
                        auto_delete=queue_props.get("auto_delete", False),
                        arguments=queue_props.get("arguments", dict()))
#        if not queue_name:
#            # amq generated queue
#            queue_name = result.method.queue
        log_debug("incoming queue declared: %s" % queue_name, 3)
        self._queue[queue_name] = 1
        return queue_name
    
    def _maybe_bind(self, queue_name, exchange, routing_key=""):
        """ May be bind a queue to an exchange. """
        bind_id = md5_hash(queue_name + exchange + routing_key).hexdigest()
        if bind_id in self._bind:
            return
        log_debug(("binding incoming queue: queue=%s, exchange=%s," + \
                   " routing_key=%s") %
                   (queue_name, exchange, routing_key), 3)
        self._channel.queue_bind(queue=queue_name,
                                 exchange=exchange,
                                 routing_key=routing_key)
        self._bind[bind_id] = 1
    
    def connect(self):
        """ Create a kombu AMQP connection and channel. """
        direction = self.direction
        config = CONFIG[direction]["broker"]
        params = _parse_amqp_uri(config["uri"])
        cred = config.get("auth")
        if cred is None:
            cred = credential.new(scheme="none")
        if cred.scheme == "x509":
            ssl_options = dict()
            cred_dict = cred.dict()
            for key, keyval in {"cert" : "certfile",
                               "key" : "keyfile", "ca" : "ca_certs"}.items():
                if key in cred_dict:
                    ssl_options[keyval] = cred_dict[key]
            ssl_options["cert_reqs"] = ssl.CERT_REQUIRED
            ssl_options["ssl_version"] = ssl.PROTOCOL_SSLv3
            extra = {"ssl" : ssl_options,
                     "transport_options" : {"login_method": "EXTERNAL"}}
        elif cred.scheme == "plain":
            extra = {
                "userid" : cred['name'],
                "password" : cred['pass'], }
        else:
            # none
            extra = dict()
        #if CONFIG.get("heartbeat") is not None:
        #    extra["heartbeat"] = CONFIG["heartbeat"]
        parameters = {
            "hostname" : params["host"],
            "port" : int(params["port"]),
            "transport" : "amqplib",
            "virtual_host" : params.get("virtual_host", "rabbitmq")
        }
        timeout_connect = _treeval(CONFIG, "timeout-connect")
        if timeout_connect is not None:
            parameters["connect_timeout"] = timeout_connect
        parameters.update(extra)
        self._connection = connection = kombu.BrokerConnection(**parameters)
        self._channel = connection.channel()
        log_debug("%s broker %s:%s: %s" %
                  (direction, params['host'], params['port'],
                  self.amqp_type(),), 1)
        if _treeval(CONFIG, "%s-broker-type" % direction) is None:
            _treeset(CONFIG,
                     "%s-broker-type" % direction, self.amqp_type())
        return True
    
    def amqp_type(self):
        """ Return the broker type. """
        if self._connection is None:
            return None
        return "RabbitMQ"
    
class PikaIncomingBroker(PikaAdapter):
    """ Pika incoming broker object. """
    direction = "incoming"
    
    def __init__(self):
        """ Initialize incoming broker module. """
        super(PikaIncomingBroker, self).__init__()
        self._msgbuf = list()
        self._pending = list()
        self._consume = dict()
    
    def _maybe_subscribe(self, subscription):
        """ May be subscribe to queue. """
        if "queue" in subscription:
            queue_name = self._maybe_declare_queue(subscription["queue"])
        else:
            raise AmqpcltError("subscription must contain a queue")
        exchange_name = None
        if "exchange" in subscription:
            exchange_name = self._maybe_declare_exchange(
                                        subscription["exchange"])
        if exchange_name:
            self._maybe_bind(queue_name,
                             exchange_name,
                             subscription.get("routing_key", ""))
        if queue_name not in self._consume:
            log_debug("incoming consume from queue: %s" % queue_name, 3)
            tag = get_uuid()
            params = {"consumer_callback" : self._handle_delivery,
                      "queue" : queue_name,
                      "consumer_tag" : tag}
            if not CONFIG["reliable"]:
                params["no_ack"] = True
            self._channel.basic_consume(**params)
            self._consume[queue_name] = tag
        
    def _handle_delivery(self, channel, method, header, body):
        """ Handle delivery. """
        self._msgbuf.append((method, header, body))
        
    def start(self):
        """ Start the incoming broker module. """
        self.connect()
        if CONFIG.get("prefetch") >= 0:
            self._channel.basic_qos(prefetch_count=int(CONFIG["prefetch"]))
        subscribe = CONFIG.get("subscribe", [])
        for sub in subscribe:
            self._maybe_subscribe(sub)
        
    def get(self):
        """ Get a message. """
        if len(self._msgbuf) == 0:
            self._channel.transport.connection.process_data_events()
        if len(self._msgbuf) == 0:
            return ("no messages received", None)
        (method, header, body) = self._msgbuf.pop(0)
        if header.content_type is not None and \
            (header.content_type.startswith("text/") or \
             "charset=" in header.content_type):
            body = body.decode("utf-8")
        headers = header.headers
        for header_name, header_value in headers.items():
            try:
                headers[header_name] = header_value.encode("utf-8")
            except UnicodeDecodeError:
                headers[header_name] = header_value.decode("utf-8")
        msg = Message(header=header.headers, body=body)
        if CONFIG["reliable"]:
            self._pending.append(method.delivery_tag)
            return (msg, method.delivery_tag)
        else:
            return (msg, None)
        
    def ack(self, delivery_tag):
        """ Ack a message. """
        log_debug("acking incoming message: %d" % delivery_tag, 3)
        self._channel.basic_ack(delivery_tag=delivery_tag)
        self._pending.remove(delivery_tag)
        
    def idle(self):
        """ Idle. """
        self._channel.transport.connection.process_data_events()
        
    def stop(self):
        """ Stop. """
        self._channel.stop_consuming()
        self._connection.close()
        self._connection = None
        
class KombuIncomingBroker(KombuAdapter):
    """ Kombu incoming broker object. """
    direction = "incoming"
    
    def __init__(self):
        """ Initialize kombu incoming broker module. """
        super(KombuIncomingBroker, self).__init__()
        self._msgbuf = list()
        self._pending = list()
        self._consume = dict()
    
    def _maybe_subscribe(self, subscription):
        """ May be subscribe to queue. """
        if "queue" in subscription:
            queue_name = self._maybe_declare_queue(subscription["queue"])
        else:
            raise AmqpcltError("subscription must contain a queue")
        exchange_name = None
        if "exchange" in subscription:
            exchange_name = self._maybe_declare_exchange(
                                        subscription["exchange"])
        if exchange_name:
            self._maybe_bind(queue_name,
                             exchange_name,
                             subscription.get("routing_key", ""))
        if queue_name not in self._consume:
            log_debug("incoming consume from queue: %s" % queue_name, 3)
            tag = get_uuid()
            params = {"callback" : self._handle_message,
                      "no_ack" : False,
                      "queue" : queue_name,
                      "consumer_tag" : tag}
            if not CONFIG["reliable"]:
                params["no_ack"] = True
            self._channel.basic_consume(**params)
            self._consume[queue_name] = tag
            
    def _handle_message(self, msg):
        """ Handle delivery. """
        self._msgbuf.append((msg.delivery_info,
                             msg.properties, msg.body))
        
    def _drain_events(self, timeout=0.1):
        """ Drain events. """
        try:
            self._connection.drain_events(timeout=timeout)
        except socket.timeout:
            pass
        except socket.error:
            pass
        
    def start(self):
        """ Start the incoming broker module. """
        self.connect()
        if CONFIG.get("prefetch") >= 0:
            self._channel.basic_qos(prefetch_count=int(CONFIG["prefetch"]),
                                    prefetch_size=0,
                                    a_global=False)
        subscribe = CONFIG.get("subscribe", [])
        for sub in subscribe:
            self._maybe_subscribe(sub)
        
    def get(self):
        """ Get a message. """
        if len(self._msgbuf) == 0:
            self._drain_events()
        if len(self._msgbuf) == 0:
            return ("no messages received", None)
        (info, header, body) = self._msgbuf.pop(0)
        if header.get("content_type") is not None and \
            (header["content_type"].startswith("text/") or \
             "charset=" in header["content_type"]):
            body = body.decode("utf-8")
        headers = header["application_headers"]
        for header_name, header_value in headers.items():
            try:
                headers[header_name] = header_value.encode("utf-8")
            except UnicodeDecodeError:
                headers[header_name] = header_value.decode("utf-8")
        msg = Message(header=headers, body=body)
        if CONFIG["reliable"]:
            self._pending.append(info.get("delivery_tag"))
            return (msg, info.get("delivery_tag"))
        else:
            return (msg, None)
        
    def ack(self, delivery_tag):
        """ Ack a message. """
        log_debug("acking incoming message: %d" % delivery_tag, 3)
        self._channel.basic_ack(delivery_tag=delivery_tag)
        self._pending.remove(delivery_tag)
        
    def idle(self):
        """ Idle. """
        self._drain_events()
        
    def stop(self):
        """ Stop. """
        self._channel.close()
        self._connection.close()
        self._connection = None
        
class IncomingQueue(object):
    """ Incoming queue object. """
    
    def __init__(self):
        """ Initialize incoming queue module. """
        self._queue = None
        self._path = None
        self._eoq = None
        self._lock_failures = None
        self._purge_time = 0
        
    def _purge(self):
        """ Purge at most every 5 minutes. """
        if self._purge_time is None or time.time() < self._purge_time:
            return
        self._queue.purge()
        log_debug("incoming queue has been purged", 2)
        self._purge_time = time.time() + 300
        
    def start(self):
        """ Start the incoming queue module. """
        if _treeval(CONFIG, "incoming-queue-type") is None:
            _treeset(CONFIG, "incoming-queue-type", "DQS")
        self._queue = queue.new(_treeval(CONFIG, "incoming-queue").copy())
        self._path = getattr(self._queue, "path", "")
        self._eoq = True
        log_debug("incoming queue %s %s" %
                  (_treeval(CONFIG, "incoming-queue-type"), self._path), 1)
        
    def get(self):
        """ Get a message. """
        elt = None
        msg = None
        if self._eoq:
            # (re)start from beginning
            self._purge()
            log_debug("incoming queue has %d messages" %
                      (self._queue.count()), 2)
            self._eoq = False
            self._lock_failures = 0
            elt = self._queue.first()
            log_debug("first %s" % elt)
        else:
            # progress from where we were
            elt = self._queue.next()
            log_debug("next %s" % elt)
        if not elt:
            # reached the end
            if not CONFIG["loop"]:
                return ("", None)
            self._eoq = True
            if self._lock_failures == self._queue.count():
                time.sleep(1)
            return ("end of queue", None)
        if not self._queue.lock(elt):
            # cannot lock this one this time...
            self._lock_failures += 1
            return ("failed to lock", None)
        log_debug("incoming message get %s/%s" % (self._path, elt), 3)
        msg = self._queue.get_message(elt)
        if CONFIG["reliable"]:
            return (msg, elt)
        if CONFIG["remove"]:
            self._queue.remove(elt)
        else:
            self._queue.unlock(elt)
        return (msg, None)
        
        
    def ack(self, msg_id):
        """ Ack a message. """
        if CONFIG["remove"] is not None:
            log_debug("incoming message remove %s/%s" %
                      (self._path, msg_id), 3)
            self._queue.remove(msg_id)
        else:
            log_debug("incoming message unlock %s/%s" %
                      (self._path, msg_id), 3)
            self._queue.unlock(msg_id)
            
        
    def idle(self):
        """ Idle. """
        self._purge()
        
    def stop(self):
        """ Stop. """
        self._queue = None
        
class PikaOutgoingBroker(PikaAdapter):
    """ Pika outgoing broker object. """
    direction = "outgoing"
    
    def __init__(self):
        """ Initialize the outgoing broker module. """
        super(PikaOutgoingBroker, self).__init__()
        
    def start(self):
        """ Start the outgoing broker module. """
        self.connect()
        
    def put(self, msg, msg_id=None):
        """ Put a message. """
        delivery_mode = 1
        if msg.header.get("persistent", "false") == "true":
            delivery_mode = 2
        header = dict()
        for key, val in msg.header.items():
            if type(key) == unicode:
                key = key.encode("utf-8")
            if type(val) == unicode:
                val = val.encode("utf-8")
            header[key] = val
        msg.header = header
        properties = pika.BasicProperties(
                            timestamp=time.time(),
                            headers=msg.header,
                            delivery_mode=delivery_mode)
        if "content-type" in msg.header:
            content_type = msg.header["content-type"]
            if content_type.startswith("text/") or \
                "charset=" in content_type:
                if not msg.is_text():
                    raise AmqpcltError("unexpected text content-type "
                                   "for binary message: %s" % content_type)
            else:
                if msg.is_text():
                    raise AmqpcltError("unexpected binary content-type for "
                                       "text message: %s" % content_type)
            properties.content_type = content_type
        elif msg.is_text():
            properties.content_type = "text/unknown"
        # Send the message
        if "destination" not in msg.header:
            raise AmqpcltError("message doesn't have a destination: %s" % msg)
        destination = _parse_sender_destination(msg.header["destination"])
        if "queue" in destination:
            queue_name = self._maybe_declare_queue(destination["queue"])
        exch_name = self._maybe_declare_exchange(
                                        destination.get("exchange", dict()))
        if exch_name and "queue" in destination:
            self._maybe_bind(queue_name,
                             exchange_name,
                             subscription.get("routing_key", ""))
        if type(msg.body) == unicode:
            body = msg.body.encode("utf-8")
        else:
            body = msg.body
        self._channel.basic_publish(
                            exchange=exch_name,
                            routing_key=destination.get("routing_key", ""),
                            body=body,
                            properties=properties)
        if msg_id is None:
            return list()
        else:
            return [msg_id, ]
        
    def idle(self):
        """ Idle. """
        return list()
        
    def stop(self):
        """ Stop. """
        self._connection.close()
        self._channel = None
        self._connection = None
        
class KombuOutgoingBroker(KombuAdapter):
    """ Kombu outgoing broker object. """
    direction = "outgoing"
    
    def __init__(self):
        """ Initialize the kombu outgoing broker module. """
        super(KombuOutgoingBroker, self).__init__()
        self._producer = None
        
    def start(self):
        """ Start the kombu outgoing broker module. """
        self.connect()
        self._producer = kombu.Producer(self._channel)
        
    def put(self, msg, msg_id=None):
        """ Put a message. """
        delivery_mode = 1
        if msg.header.get("persistent", "false") == "true":
            delivery_mode = 2
        # Send the message
        if "destination" not in msg.header:
            raise AmqpcltError("message doesn't have a destination: %s" % msg)
        destination = _parse_sender_destination(msg.header["destination"])
        if "queue" in destination:
            queue_name = self._maybe_declare_queue(destination["queue"])
        exch_name = self._maybe_declare_exchange(
                                    destination.get("exchange", dict()))
        if exch_name and "queue" in destination:
            self._maybe_bind(queue_name,
                             exchange_name,
                             subscription.get("routing_key", ""))
        extra = dict()
        if "content-type" in msg.header:
            content_type = msg.header["content-type"]
            if content_type.startswith("text/") or \
                "charset=" in content_type:
                if not msg.is_text():
                    raise AmqpcltError("unexpected text content-type "
                                   "for binary message: %s" % content_type)
            else:
                if msg.is_text():
                    raise AmqpcltError("unexpected binary content-type for "
                                       "text message: %s" % content_type)
            extra["content_type"] = content_type
        elif msg.is_text():
            extra["content_type"] = "text/unknown"
        self._producer.publish(
                    exchange=exch_name,
                    routing_key=destination.get("routing_key", ""),
                    delivery_mode=delivery_mode,
                    serializer=None,
                    compression=None,
                    headers=msg.header,
                    body=msg.body, **extra)
        if msg_id is None:
            return list()
        else:
            return [msg_id, ]
        
    def idle(self):
        """ Idle. """
        return list()
        
    def stop(self):
        """ Stop. """
        self._connection.close()
        self._channel = None
        self._connection = None
        
class OutgoingQueue(object):
    """ Outgoing queue object. """
    
    def __init__(self):
        """ Initialize the outgoing queue module. """
        self._queue = None
        self._path = None
        
    def start(self):
        """ Start the outgoing queue module. """
        if _treeval(CONFIG, "outgoing-queue-type") is None:
            _treeset(CONFIG, "outgoing-queue-type", "DQS")
        self._queue = queue.new(_treeval(CONFIG, "outgoing-queue").copy())
        self._path = getattr(self._queue, "path", "")
        log_debug("outgoing queue %s %s" %
                  (_treeval(CONFIG, "outgoing-queue-type"), self._path), 1)
        
    def put(self, msg, element_id=None):
        """ Put a message. """
        elt = self._queue.add_message(msg)
        log_debug("outgoing message added %s/%s" % (self._path, elt), 3)
        if element_id is None:
            return list()
        else:
            return [element_id, ]
        
    def idle(self):
        """ Idle. """
        return list()
        
    def stop(self):
        """ Stop. """
        self._queue = None

_KEYVALUE_RE = re.compile("[, ]")
def _get_kv_from_string(given):
    """ Explode a string in key=value format. """
    result = dict()
    kvs = _KEYVALUE_RE.split(given)
    for kv_item in kvs:
        try:
            key, value = kv_item.split("=")
            result[key] = value
        except ValueError:
            raise ValueError("%s value is invalid: %s" % (given, kv_item))
    return result
        
def _clean_configuration():
    """ Clean configuration. """
    for arg, arg_props in ARGUMENTS.items():
        val = _treeval(CONFIG, arg)
        if val is not None and arg_props.get("type", str) in [int, float]:
            try:
                val = arg_props.get("type", str)(val)
            except ValueError:
                raise AmqpcltError("%s must be of %s type" %
                                   arg_props.get("type", str) )
            _treeset(CONFIG, arg, val)
    
    for arg in ["incoming-queue", "outgoing-queue", "subscribe"]:
        val = _treeval(CONFIG, arg)
        if val is None:
            continue
        if type(val) in [str, unicode]:
            # explode it
            newval = _get_kv_from_string(val)
            _treeset(CONFIG, arg, newval)
        elif type(val) == list:
            newval = list()
            for value in val:
                # explode it
                if type(val) in [str, unicode]:
                    new_value = _get_kv_from_string(value)
                else:
                    new_value = value
                newval.append(new_value)
            _treeset(CONFIG, arg, newval)
    
    for auth in ["incoming-broker-auth", "outgoing-broker-auth"]:
        val = _treeval(CONFIG, auth)
        if val is None:
            continue
        elif type(val) == str:
            cred = credential.parse(val)
        elif type(val) == unicode:
            cred = credential.parse(val.encode())
        else:
            for key, value in val.items():
                val[key] = value.encode()
            cred = credential.new(**val)
        cred.check()
        _treeset(CONFIG, auth, cred)
        
def option_parse():
    """ Parse options. """
    global CONFIG
    args = read_args()
    conf_path = args.get("conf", None)
    merged = dict()
    if conf_path is not None:
        merged = read_apache_config(os.path.abspath(conf_path))
    merged = _merge(merged, args)
    _explode_dict(merged)
    CONFIG = merged
    _clean_configuration()
    
    mutex(CONFIG, "quit", "status")
    reqall(CONFIG, "quit", "pidfile")
    reqall(CONFIG, "status", "pidfile")
    
    if "quit" in CONFIG:
        pid = pid_read(CONFIG["pidfile"])
        timeout_linger = _treeval(CONFIG, "timeout-linger")
        if pid and timeout_linger is not None:
            print("%s (pid %d) is being told to quit..." % (PROG, pid))
            pid_write(CONFIG["pidfile"], pid, "quit")
            while timeout_linger >= 0:
                try:
                    os.kill(pid, 0)
                except OSError:
                    break
                time.sleep(1)
                timeout_linger -= 1
            try:
                os.kill(pid, 0)
            except OSError:
                print("%s (pid %d) does not seem to be running anymore" %
                      (PROG, pid))
                pid_quit(CONFIG["pidfile"])
                sys.exit(0)
        pid_quit(CONFIG["pidfile"], PROG)
        sys.exit(0)
    if "status" in CONFIG:
        (status, message) = pid_status(CONFIG["pidfile"], 60)
        print("%s %s" % (PROG, message))
        sys.exit(status)
    
    reqany(CONFIG, None, "incoming-broker", "incoming-queue")
    reqany(CONFIG, None, "outgoing-broker", "outgoing-queue")
    reqall(CONFIG, "loop", "incoming-queue")
    reqall(CONFIG, "remove", "incoming-queue")
    reqall(CONFIG, "prefetch", "incoming-broker")
    reqall(CONFIG, "subscribe", "incoming-broker")
    reqall(CONFIG, "incoming-broker", "subscribe")
    mutex(CONFIG, "incoming-broker", "incoming-queue")
    mutex(CONFIG, "outgoing-broker", "outgoing-queue")
    mutex(CONFIG, "callback-code", "callback-path")
    mutex(CONFIG, "daemon", "statistics")
    reqany(CONFIG, "callback-data", "callback-code", "callback-path")
    reqany(CONFIG, "heart-beat", "incoming-broker", "outgoing-broker")
    
    for opt, val in DEFAULT_OPTIONS.items():
        CONFIG.setdefault(opt, val)
        
    new_subscribe = list()
    subscriptions = CONFIG.get("subscribe", [])
    if type(subscriptions) != list:
        subscriptions = [subscriptions, ]
    for sub in subscriptions:
        if type(sub) == str:
            sub = dict([tuple(sub.split("=")), ])
        destination = sub.get("destination", None)
        if destination is None:
            raise AmqpcltError("subscribe destination must be provided")
        new_subscribe.append(_parse_subscribe_destination(destination))
    CONFIG["subscribe"] = new_subscribe
        
    if "prefetch" not in CONFIG and CONFIG["reliable"] and CONFIG.get("count"):
        CONFIG["prefetch"] = min(CONFIG.get("count", 100), 100)
    
    if CONFIG["debug"] > 3:
        pp.pprint(CONFIG)
    
def init_debug():
    """ Initialize debug. """
    global TO_SYSLOG
    TO_SYSLOG = CONFIG.get("daemon") == True
    if TO_SYSLOG:
        syslog.openlog(PROG, syslog.LOG_PID, syslog.LOG_USER)

def init_callback():
    """ Initialize callback. """
    if CONFIG.get("callback", None) is None:
        return
    callback_code = _treeval(CONFIG, "callback-code")
    callback_path = _treeval(CONFIG, "callback-path")
    if callback_code is not None:
        log_debug("callback inline", 1)
        if re.search("^\s*def ", callback_code, re.MULTILINE):
            code = callback_code
        else:
            code = "\n    "
            code += "\n    ".join(callback_code.splitlines())
            code = ("""
def check(self, msg):
    hdr = msg.header
""" + code + """
    return msg""")
    elif callback_path is not None:
        log_debug("callback file %s" % (callback_path,))
        try:
            call_file = open(callback_path, "r")
            code = call_file.read()
        except IOError:
            error = sys.exc_info()[1]
            raise AmqpcltError("invalid callback file: %s" % error)
        else:
            call_file.close()
    else:
        raise AmqpcltError("callback parameters not complete")
    global CALLBACK
    try:
        CALLBACK = _callback_module(code)()
    except SyntaxError:
        error = sys.exc_info()[1]
        raise AmqpcltError("syntax error in the callback: %s" % error)
    if not hasattr(CALLBACK, "check"):
        raise AmqpcltError("check(message) missing in callback: %s")
    callback_data = _treeval(CONFIG, "callback-data")
    if callback_data is None:
        _treeset(CONFIG, "callback-data", list())
    else:
        callback_data = callback_data.split(",")
        _treeset(CONFIG, "callback-data", callback_data)
    
def init_daemon():
    """ Initialize daemon. """
    if not CONFIG.get("daemon"):
        return
    daemonize()
    log_debug("daemonized", 1)

def initialize():
    """ Initialize. """
    option_parse()
    init_debug()
    init_callback()
    init_daemon()
    if CONFIG.get("pidfile"):
        pid_write(CONFIG["pidfile"], os.getpid(), excl=True)
    
def handle_sig(signum, _):
    """ Handle signals. """
    global running
    if signum == signal.SIGINT:
        log_debug("caught SIGINT", 1)
        running = False
    elif signum == signal.SIGTERM:
        log_debug("caught SIGTERM", 1)
        running = False
    elif signum == signal.SIGHUP:
        log_debug("caught SIGHUP, ignoring it", 1)

@log_exceptions(re_raise=False)
def work():
    """ Do it! """
    global running
    pending = dict()
    put_list = list()
    timek = dict()
    timek["start"] = time.time()
    log_debug("starting", 1)
    signal.signal(signal.SIGINT, handle_sig)
    signal.signal(signal.SIGTERM, handle_sig)
    signal.signal(signal.SIGHUP, handle_sig)
    if _treeval(CONFIG, "incoming-broker") is not None:
        mtype = _treeval(CONFIG, "incoming-broker-module") or "pika"
        if mtype == "kombu":
            incoming = KombuIncomingBroker()
        elif mtype == "pika":
            incoming = PikaIncomingBroker()
        else:
            raise AmqpcltError("invalid incoming broker module: %s" % (mtype))
    else:
        incoming = IncomingQueue()
    if _treeval(CONFIG, "outgoing-broker") is not None:
        mtype = _treeval(CONFIG, "outgoing-broker-module") or "pika"
        if mtype == "kombu":
            outgoing = KombuOutgoingBroker()
        elif mtype == "pika":
            outgoing = PikaOutgoingBroker()
        else:
            raise AmqpcltError("invalid outgoing broker module: %s" % (mtype))
    else:
        outgoing = OutgoingQueue()
    incoming.start()
    if not CONFIG["lazy"]:
        outgoing.start()
    if CONFIG.get("callback", None) is not None:
        CALLBACK.start(*CONFIG["callback"]["data"])
    log_debug("running", 1)
    count = size = 0
    if CONFIG.get("duration") is not None:
        timek["max"] = time.time() + CONFIG["duration"]
    else:
        timek["max"] = 0
    tina = _treeval(CONFIG, "timeout-inactivity")
    if tina is not None:
        timek["ina"] = time.time() + tina
    else:
        timek["ina"] = 0
    running = True
    while running:
        # are we done
        if CONFIG.get("count") is not None and count >= CONFIG["count"]:
            break
        if timek["max"] and time.time() > timek["max"]:
            break
        # get message
        if CONFIG["reliable"]:
            if CONFIG.get("window") >= 0 and len(pending) > CONFIG("window"):
                incoming.idle()
                (msg, msg_id) = ("too many pending acks", None)
            else:
                (msg, msg_id) = incoming.get()
                if type(msg) != str: 
                    if msg_id in pending:
                        log_debug("duplicate ack id: %s" % msg_id)
                        sys.exit(1)
                    else:
                        pending[msg_id] = True
        else:
            (msg, msg_id) = incoming.get()
        # check inactivity
        if timek.get("ina"):
            if isinstance(msg, Message):
                timek["ina"] = time.time() + tina
            elif time.time() >= timek["ina"]:
                break
        # count and statistics
        if isinstance(msg, Message):
            count += 1
            if CONFIG["statistics"]:
                size += msg.size()
                if count == 1:
                    timek["first"] = time.time()
        # callback
        if CONFIG.get("callback") is not None:
            if type(msg) != str:
                msg = CALLBACK.check(msg)
                if not isinstance(msg, Message):
                    log_debug("message discarded by callback: %s" % msg)
                    # message discarded by callback
                    if CONFIG["reliable"]:
                        if msg_id not in pending:
                            raise AmqpcltError(
                                    "unexpected ack id: %s" % (msg_id, ))
                        del(pending[msg_id])
                        incoming.ack(msg_id)
                    if CONFIG["statistics"]:
                        timek["last"] = time.time()
            else:
                CALLBACK.idle()
        # put | idle
        if isinstance(msg, Message):
            log_debug("loop got new message", 3)
            if CONFIG["lazy"]:
                outgoing.start()
                CONFIG["lazy"] = False
            put_list = outgoing.put(msg, msg_id)
            if CONFIG["statistics"]:
                timek["last"] = time.time()
        else:
            if msg:
                log_debug("loop %s" % (msg,), 3)
            else:
                log_debug("loop end", 1)
                running = False
            if CONFIG["lazy"]:
                put_list = list()
            else:
                put_list = outgoing.idle()
        # ack
        for msg_id in put_list:
            if msg_id not in pending:
                raise AmqpcltError("unexpected ack id: %s" % (msg_id, ))
            del(pending[msg_id])
            incoming.ack(msg_id)
        # check --quit and show that we are alive
        if CONFIG.get("pidfile"):
            action = pid_check(CONFIG["pidfile"])
            if action == "quit":
                log_debug("asked to quit", 1)
                break
            pid_touch(CONFIG["pidfile"])
    # linger
    log_debug("linger", 1)
    timeout_linger = _treeval(CONFIG, "timeout-linger")
    if timeout_linger:
        timek["max"] = time.time() + timeout_linger
    else:
        timek["max"] = 0
    running = True
    while running:
        if not pending:
            break
        if "max" in timek and time.time() >= timek["max"]:
            break
        put_list = outgoing.idle()
        if put_list:
            for msg_id in put_list:
                if msg_id not in pending:
                    raise AmqpcltError("unexpected ack id: %s" % (msg_id, ))
                del(pending[msg_id])
                incoming.ack(msg_id)
        else:
            incoming.idle()
    if pending:
        raise AmqpcltError("%d pending messages" % len(pending))
    # report statistics
    if CONFIG.get("statistics"):
        if count == 0:
            print("no messages processed")
        elif count == 1:
            print("only 1 message processed")
        else:
            timek["elapsed"] = timek["last"] - timek["first"]
            print(("processed %d messages in %.3f seconds"
                   " (%.3f k messages/second)") %
                  (count, timek["elapsed"], count / timek["elapsed"] / 1000))
            print("troughput is around %.3f MB/second" %
                  (size / timek["elapsed"] / 1024 / 1024))
            print("average message size is around %d bytes" %
                  (int(size / count + 0.5)))
    # stop
    log_debug("stopping", 1)
    if CONFIG.get("callback", None) is not None:
        log_debug("stopping callback", 1)
        CALLBACK.stop()
    if not CONFIG.get("lazy"):
        log_debug("stopping outgoing", 1)
        outgoing.stop()
    log_debug("stopping incoming", 1)
    incoming.stop()
    log_debug("incoming stopped", 1)
    timek["stop"] = time.time()
    timek["elapsed"] = timek["stop"] - timek["start"]
    log_debug("work processed %d messages in %.3f seconds" %
              (count, timek["elapsed"]), 1)
            
def clean():
    """ Clean before exiting. """
    if CONFIG.get("pidfile"):
        pid_remove(CONFIG.get("pidfile"))
    if CONFIG.get("daemon"):
        syslog.closelog()

if __name__ == "__main__":
    initialize()
    work()
    clean()
