#!/usr/bin/perl
#+##############################################################################
#                                                                              #
# File: stompclt                                                               #
#                                                                              #
# Description: versatile STOMP client                                          #
#                                                                              #
#-##############################################################################

#
# modules
#

use strict;
use warnings qw(FATAL all);
use sigtrap qw(die normal-signals);
use Authen::Credential qw();
use Config::General qw(ParseConfig);
use Config::Validator qw(*);
use Data::Dumper qw(Dumper);
use Getopt::Long qw(GetOptions);
use IO::Socket::INET qw();
use List::Util qw(min);
use Messaging::Message qw();
use Messaging::Message::Queue qw();
use Net::STOMP::Client 2.0 qw();
use No::Worries qw($ProgramName);
use No::Worries::Die qw(handler dief);
use No::Worries::File qw(file_read);
use No::Worries::Log qw(log_wants_debug log_debug log_filter);
use No::Worries::PidFile qw(*);
use No::Worries::Proc qw(proc_detach);
use No::Worries::Syslog qw(syslog_open syslog_close);
use No::Worries::Warn qw(handler warnf);
use Pod::Usage qw(pod2usage);
use Socket qw(SO_SNDBUF);
use Time::HiRes qw();

#
# constants
#

use constant VERSION           => "1.6";
use constant PURGE_INTERVAL    => 300;
use constant PREFETCH_MAX      => 100;
use constant PIDFILE_FRESHNESS =>  60;
use constant UDP_SEND_BUFFER   => 256 * 1024 * 1024;

#
# global variables
#

our(%Config, %NeedsCleanup, %Schema);

#
# prototypes
#

sub incoming::start();
sub incoming::get();
sub incoming::ack($);
sub incoming::idle();
sub incoming::stop();

sub outgoing::start();
sub outgoing::put($;$);
sub outgoing::idle();
sub outgoing::stop();

sub callback::start($);
sub callback::check($);
sub callback::idle();
sub callback::stop();

#+++############################################################################
#                                                                              #
# configuration schema                                                         #
#                                                                              #
#---############################################################################

$Schema{_} = {
    type   => "struct",
    fields => {
        callback    => { type => "valid(callback)",     optional => "true" },
        count       => { type => "integer",             optional => "true" },
        daemon      => { type => "boolean",             optional => "true" },
        debug       => { type => "integer",             optional => "true" },
        duration    => { type => "number",              optional => "true" },
        heart       => { type => "valid(heart)",        optional => "true" },
        incoming    => { type => "valid(anying)",       optional => "true" },
        lazy        => { type => "boolean",             optional => "true" },
        loop        => { type => "boolean",             optional => "true" },
        outgoing    => { type => "valid(anying)",       optional => "true" },
        pidfile     => { type => "string",              optional => "true" },
        prefetch    => { type => "integer",             optional => "true" },
        reliable    => { type => "boolean",             optional => "true" },
        remove      => { type => "boolean",             optional => "true" },
        statistics  => { type => "boolean",             optional => "true" },
        subscribe   => { type => "valid(subscribe)",    optional => "true" },
        timeout     => { type => "valid(timeout)",      optional => "true" },
        unsubscribe => { type => "valid(unsubscribe)",  optional => "true" },
        window      => { type => "integer",             optional => "true" },
    },
};

$Schema{anying} = {
    type   => "struct",
    fields => {
        broker      => { type => "valid(broker)",       optional => "true" },
        queue       => { type => "valid(hash)",         optional => "true" },
    },
};

$Schema{broker} = {
    type   => "struct",
    fields => {
        auth        => { type => "valid(auth)",         optional => "true" },
        connect     => { type => "valid(hash)",         optional => "true" },
        sockopts    => { type => "valid(hash)",         optional => "true" },
        stomp       => { type => "valid(broker_stomp)", optional => "true" },
        type        => { type => "valid(broker_type)",  optional => "true" },
        uri         => { type => "string" },
    },
};

$Schema{broker_stomp} = {
    type   => "struct",
    fields => {
        debug       => { type => "string",              optional => "true" },
    },
};

$Schema{broker_type} = {
    type   => "string",
    match  => qr/^(ActiveMQ|Apollo|RabbitMQ|UNKNOWN)$/,
};

$Schema{callback} = {
    type   => "struct",
    fields => {
        code        => { type => "string",              optional => "true" },
        path        => { type => "string",              optional => "true" },
        data        => { type => "valid(hash)",         optional => "true" },
    },
};

$Schema{heart} = {
    type   => "struct",
    fields => {
        beat        => { type => "boolean" },
    },
};

$Schema{timeout} = {
    type   => "struct",
    fields => {
        broker      => { type => "number",              optional => "true" },
        client      => { type => "number",              optional => "true" },
        connect     => { type => "number",              optional => "true" },
        disconnect  => { type => "number",              optional => "true" },
        flush       => { type => "number",              optional => "true" },
        inactivity  => { type => "number",              optional => "true" },
        linger      => { type => "number",              optional => "true" },
        server      => { type => "number",              optional => "true" },
        status      => { type => "number",              optional => "true" },
    },
};

$Schema{auth}        = { type => "list?(isa(Authen::Credential))" };
$Schema{hash}        = { type => "table(string)" };
$Schema{subscribe}   = { type => "list?(valid(hash))" };
$Schema{unsubscribe} = { type => "list?(valid(hash))" };

#+++############################################################################
#                                                                              #
# debug helpers                                                                #
#                                                                              #
#---############################################################################

sub log_debug0 (@) { log_debug(@_, { dl => 0 }) }
sub log_debug1 (@) { log_debug(@_, { dl => 1 }) }
sub log_debug2 (@) { log_debug(@_, { dl => 2 }) }
sub log_debug3 (@) { log_debug(@_, { dl => 3 }) }

#+++############################################################################
#                                                                              #
# STOMP helpers                                                                #
#                                                                              #
#---############################################################################

#
# return a connected UDP socket to a STOMP broker
#

sub stomp::udp ($$$) {
    my($direction, $host, $port) = @_;
    my($config, $socket);

    # setup
    $config = $Config{$direction}{broker};
    foreach my $option (qw(auth connect sockopts)) {
        dief("unsupported option for STOMP over UDP: %s-broker-%s",
             $direction, $option)
            if $config->{$option};
    }
    dief("unsupported option for STOMP over UDP: heart-beat")
        if $Config{heart}{beat};
    dief("unsupported option for STOMP over UDP: reliable")
        if $Config{reliable};
    # create the socket
    $socket = IO::Socket::INET->new(
        Proto    => "udp",
        PeerAddr => $host,
        PeerPort => $port,
    );
    dief("cannot connect to %s:%d: %s", $host, $port, $!) unless $socket;
    $socket->sockopt(SO_SNDBUF, UDP_SEND_BUFFER);
    # log information
    log_debug1("%s broker %s (%s) port %d using STOMP over UDP",
               $direction, $host, $socket->peerhost(), $socket->peerport());
    # so far so good
    return($socket);
}

#
# return a new connection to a STOMP broker
#

sub stomp::connection ($) {
    my($direction) = @_;
    my($config, %newopt, %conopt, $broker, $peer);

    # setup
    $config = $Config{$direction}{broker};
    %newopt = (debug => $config->{stomp}{debug}) if $config->{stomp}{debug};
    %conopt = %{ $config->{connect} } if $config->{connect};
    # broker connection
    $newopt{auth} = $config->{auth} if $config->{auth};
    $newopt{uri} = $config->{uri};
    $newopt{sockopts} = $config->{sockopts} if $config->{sockopts};
    $newopt{timeout} = $Config{timeout}{connect} if $Config{timeout}{connect};
    # heart-beating
    if ($Config{heart}{beat}) {
        $newopt{client_heart_beat} = $Config{timeout}{client};
        $newopt{server_heart_beat} = $Config{timeout}{server};
    }
    # the STOMP version must currently be given in new()... weird
    $newopt{version} = delete($conopt{version}) if $conopt{version};
    $broker = Net::STOMP::Client->new(%newopt);
    # STOMP connection
    $broker->connect(%conopt);
    # log information
    $peer = $broker->peer();
    log_debug1("%s broker %s (%s) port %d using STOMP %s", $direction,
               $peer->host(), $peer->addr(), $peer->port(), $broker->version());
    if ($Config{heart}{beat}) {
        log_debug1("heart-beating: client=%.3f server=%.3f",
                  $broker->client_heart_beat(), $broker->server_heart_beat());
    }
    # so far so good
    return($broker);
}

#
# carefully disconnect from broker
#

sub stomp::disconnect ($) {
    my($broker) = @_;
    my(%option);

    $option{timeout} = $Config{timeout}{disconnect}
        if $Config{timeout}{disconnect};
    if ($Config{reliable} and $broker->version() ge "1.1") {
        # use a receipt header when this makes sense
        $option{receipt} = $broker->uuid();
    } else {
        # otherwise check the next pending frame as it could be an ERROR frame
        $broker->wait_for_frames(timeout => 0.1);
    }
    $broker->disconnect(%option);
}

#
# wait for all receipts to come back
#

sub stomp::wait4receipts ($) {
    my($broker) = @_;
    my(%option);

    $option{timeout} = $Config{timeout}{broker} if $Config{timeout}{broker};
    $broker->wait_for_receipts(%option);
    dief("missing receipts: %s", join(", ", $broker->receipts()))
        if $broker->receipts();
}

#
# return the broker type, guessing it from the server and session headers
#

sub stomp::type ($) {
    my($broker) = @_;
    my($server, $session, $type);

    $server = $broker->server() || "";
    $session = $broker->session() || "";
    if ($server =~ /^ActiveMQ-Artemis\//) {
        $type = "Artemis";
    } elsif ($server =~ /^ActiveMQ\//) {
        $type = "ActiveMQ";
    } elsif ($server =~ /^apache-apollo\// or $session =~ /^apollo-/) {
        $type = "Apollo";
    } elsif ($server =~ /^RabbitMQ\// or $session =~ /^session-[\w\-]{22}$/) {
        $type = "RabbitMQ";
    } else {
        $type = "UNKNOWN";
    }
    log_debug1("server=%s session=%s => type=%s", $server, $session, $type);
    return($type);
}

#
# return the subscription options relevant to prefetch
#

sub stomp::prefetch ($) {
    my($type) = @_;

    return() unless $Config{prefetch};
    return("activemq.prefetchSize" => $Config{prefetch}) if $type eq "ActiveMQ";
    return("credit" => $Config{prefetch}) if $type eq "Apollo";
    return("prefetch-count" => $Config{prefetch}) if $type eq "RabbitMQ";
    dief("unsupported broker type for prefetch: %s", $type);
}

#
# do whatever is needed wrt heart-beating (STOMP >=1.1 only)
#

sub stomp::beat ($$) {
    my($direction, $broker) = @_;
    my($now, $delta, $latest);

    $now = Time::HiRes::time();
    # client -> server
    $delta = $broker->client_heart_beat();
    if ($delta) {
        $latest = $broker->last_sent();
        if ($now > $latest + $delta/4) {
            log_debug2("%s heart-beating", $direction);
            $broker->noop();
        }
    }
    # server -> client
    $delta = $broker->server_heart_beat();
    if ($delta) {
        $latest = $broker->last_received();
        if ($now > $latest + $delta*4) {
            # maybe we have pending reads?
            $broker->wait_for_frames(timeout => 0);
            $latest = $broker->last_received();
            dief("%s broker silent: heart-beat=%.3fs last-received=%.3fs ago",
                 $direction, $delta, $now-$latest) if $now > $latest + $delta*2;
        }
    }
}

#+++############################################################################
#                                                                              #
# option helpers                                                               #
#                                                                              #
#---############################################################################

#
# list the available command line options
#

sub option::list (@) {
    my(@options) = @_;

    print("Here are the supported command line options:\n");
    foreach my $option (sort(@options)) {
        # hide the special CONFIG* options
        $option =~ s/^CONFIG\w+\|//;
        printf("  --%s\n", $option);
    }
    exit(0);
}

#
# clean an option so that it complies to the schema
#

sub option::clean ($$$$@) {
    my($valid, $schema, $type, $data, @path) = @_;
    my($auth);

    if ($type eq "boolean") {
        return(0) if not defined($data) or is_true($data) or is_false($data);
        if ($data eq "0") {
            $_[3] = "false";
        } elsif ($data eq "1") {
            $_[3] = "true";
        }
        return(0);
    }
    if ($type eq "isa(Authen::Credential)") {
        if (ref($data) eq "HASH") {
            $auth = Authen::Credential->new($data);
        } elsif (ref($data) eq "") {
            $auth = Authen::Credential->parse($data);
        } else {
            dief("unexpected Authen::Credential: %s", $data);
        }
        $auth->check();
        $_[3] = $auth;
        return(0);
    }
    if ($type eq "valid(hash)") {
        if (ref($data) eq "HASH") {
            # as hash... nothing to do!
        } elsif (ref($data) eq "") {
            $_[3] = string2hash($data);
        } else {
            dief("unexpected hash: %s", $data);
        }
        return(0);
    }
    return(1)
}

#
# handle the configuration option (--config)
#

sub option::config (@) {
    my(@options) = @_;
    my(@list, %tmp, %opt);

    # first pass
    @list = @ARGV;
    GetOptions(\%tmp, @options) or pod2usage(2);
    pod2usage(1) if $tmp{help};
    pod2usage(exitstatus => 0, verbose => 2) if $tmp{manual};
    pod2usage(2) if @ARGV;
    option::list(@options) if $tmp{list};
    if ($tmp{version}) {
        printf("%s %s\n", $ProgramName, VERSION);
        exit(0);
    }
    if (defined($tmp{CONFIGPATH})) {
        if ($tmp{CONFIGOPTS}) {
            %opt = string2hash($tmp{CONFIGOPTS});
        }
        $opt{-CComments} = 0;
        $opt{-ConfigFile} = $tmp{CONFIGPATH};
        $opt{-IncludeAgain} = 1;
        $opt{-IncludeRelative} = 1;
        %Config = ParseConfig(%opt);
        %Config = %{ $Config{stompclt} }
            if $Config{stompclt} and ref($Config{stompclt}) eq "HASH";
        treeify(\%Config);
        delete(@Config{qw(quit status)});
        # second pass
        @ARGV = @list;
        GetOptions(\%Config, @options) or pod2usage(2);
        delete($Config{CONFIGPATH});
        delete($Config{CONFIGOPTS});
    } else {
        # first pass is enough
        %Config = %tmp;
    }
}

#
# handle the pid related options (--quit and --status) which are special
#

sub option::pid () {
    my($timeout, %option, $status, $message, $code);

     mutex(\%Config, "quit", "status");
    reqall(\%Config, "quit", "pidfile");
    reqall(\%Config, "status", "pidfile");
    if ($Config{quit}) {
        $timeout = treeval(\%Config, "timeout-linger") || 1;
        $timeout += $Config{timeout}{flush} + 10;
        %option = (
            linger   => $timeout,
            callback => sub { printf("%s %s\n", $ProgramName, $_[0]) },
        );
        pf_quit($Config{pidfile}, %option);
        exit(0);
    }
    if ($Config{status}) {
        $timeout = treeval(\%Config, "timeout-status");
        %option = (freshness => PIDFILE_FRESHNESS);
        $option{timeout} = $timeout if $timeout;
        ($status, $message, $code) = pf_status($Config{pidfile}, %option);
        printf("%s %s\n", $ProgramName, $message);
        exit($code);
    }
}

#
# flatten the given configuration sub-tree
#

sub option::flatten ($);
sub option::flatten ($) {
    my($hash) = @_;
    my(%flat);

    foreach my $key (keys(%{ $hash })) {
        next unless ref($hash->{$key}) eq "HASH";
        option::flatten($hash->{$key});
        foreach my $subkey (keys(%{ $hash->{$key} })) {
            $flat{"$key-$subkey"} = $hash->{$key}{$subkey};
        }
        delete($hash->{$key});
    }
    foreach my $key (keys(%flat)) {
        $hash->{$key} = $flat{$key};
    }
}

#
# parse the command line options
#

sub option::parse () {
    my($validator, @options, $value);

    # setup
    Getopt::Long::Configure(qw(posix_default no_ignore_case));
    $validator = Config::Validator->new(%Schema);
    @options = (
        $validator->options("_"),
        "CONFIGPATH|config|conf|cfg=s",
        "CONFIGOPTS|config-general=s",
        "help|h|?",
        "list|l",
        "manual|m",
        "quit",
        "status",
        "version",
    );
    foreach my $option (@options) {
        # --debug can be repeated
        $option =~ s/^(debug)(.+)$/$1|d+/;
        # --count and --statistics can be abbreviated
        $option =~ s/^(count)(.+)$/$1|c$2/;
        $option =~ s/^(statistics)(.+)$/$1|s$2/;
    }
    # handle --config and treeify (carefully)
    option::config(@options);
    treeify(\%Config);
    foreach my $name (qw(subscribe unsubscribe)) {
        option::flatten($Config{$name}) if $Config{$name};
    }
    # default debug and timeouts
    $Config{debug} = 0
        unless defined($Config{debug});
    $Config{timeout}{client} = 40
        unless defined($Config{timeout}{client});
    $Config{timeout}{disconnect} = 60
        unless defined($Config{timeout}{disconnect});
    $Config{timeout}{flush} = 60
        unless defined($Config{timeout}{flush});
    $Config{timeout}{server} = 10
        unless defined($Config{timeout}{server});
    # handle --quit and --status which are special
    option::pid();
    # cleanup and validation
    $validator->traverse(\&option::clean, \%Config, "_");
    printf(STDERR "# %s configuration: %s", $ProgramName, Dumper(\%Config))
        if $Config{debug} > 3;
    $validator->validate(\%Config, "_");
    # make sure boolean options can be tested directly
    foreach my $option (qw(daemon lazy loop reliable remove statistics)) {
        delete($Config{$option}) unless is_true($Config{$option});
    }
    delete($Config{heart}) unless is_true(treeval(\%Config, "heart-beat"));
    # yet further validation
    reqany(\%Config, undef, "incoming-broker", "incoming-queue");
    reqany(\%Config, undef, "outgoing-broker", "outgoing-queue");
    reqall(\%Config, "loop", "incoming-queue");
    reqall(\%Config, "remove", "incoming-queue");
    reqall(\%Config, "prefetch", "incoming-broker");
    reqall(\%Config, "subscribe", "incoming-broker");
    reqall(\%Config, "unsubscribe", "incoming-broker");
    reqall(\%Config, "incoming-broker", "subscribe");
     mutex(\%Config, "incoming-broker", "incoming-queue");
     mutex(\%Config, "outgoing-broker", "outgoing-queue");
     mutex(\%Config, "callback-code", "callback-path");
     mutex(\%Config, "daemon", "statistics");
    reqany(\%Config, "callback-data", "callback-code", "callback-path");
    reqany(\%Config, "heart-beat", "incoming-broker", "outgoing-broker");
    foreach my $option (qw(count debug duration prefetch window),
                        map("timeout-$_", keys(%{$Schema{timeout}{fields}}))) {
        $value = treeval(\%Config, $option);
        next if not defined($value) or $value >= 0;
        dief("option %s is not positive: %s", $option, $value);
    }
    # intelligent? tuning
    $Config{prefetch} = min($Config{count}, PREFETCH_MAX)
        if $Config{count} and $Config{reliable}
           and not defined($Config{prefetch});
}

#+++############################################################################
#                                                                              #
# initialization helpers                                                       #
#                                                                              #
#---############################################################################

#
# initialize debugging
#

sub init::debug () {
    my(@filter);

    push(@filter, "debug and dl<=$Config{debug}")
        if $Config{debug};
    push(@filter, "debug and caller=~^Net::STOMP::Client")
        if treeval(\%Config, "incoming-broker-stomp-debug")
        or treeval(\%Config, "outgoing-broker-stomp-debug");
    log_filter(join(" or ", @filter)) if @filter;
}

#
# initialize callback
#

sub init::callback () {
    my($code);

    return() unless $Config{callback};
    $code = "package callback; use strict; use warnings qw(FATAL all);\n";
    if ($Config{callback}{code}) {
        log_debug1("callback inline");
        if ($Config{callback}{code} =~ /^\s*sub\s/m) {
            $code .= $Config{callback}{code};
        } else {
            $Config{callback}{code} =~ s/^\s+//;
            $Config{callback}{code} =~ s/\s+$//;
            $Config{callback}{code} .= ";"
                unless substr($Config{callback}{code}, -1) eq ";";
            $Config{callback}{code} =~ s/^/  /mg;
            $code .= "our(\%hdr, \$bdy);\n" .
                "sub check (\$) {\n" .
                "  my \$msg = \$_[0];\n" .
                "  local *hdr = \$msg->header();\n" .
                "  local *bdy = \$msg->body_ref();\n" .
                   $Config{callback}{code} . "\n" .
                "  return(\$msg);\n" .
                "}\n";
        }
    } elsif ($Config{callback}{path}) {
        dief("invalid callback file: %s", $Config{callback}{path})
            unless -f $Config{callback}{path} and -s _;
        log_debug1("callback file %s", $Config{callback}{path});
        $code .= file_read($Config{callback}{path});
    } else {
        die;
    }
    eval($code);  ## no critic 'ProhibitStringyEval'
    if ($@) {
        $code =~ s/^/  /mg;
        $code =~ s/\s+$//;
        print(STDERR "callback code:\n$code\n");
        dief("error in callback: %s", $@);
    }
    dief("missing function in callback: check(\$)")
        unless defined(&callback::check);
    $Config{callback}{data} ||= {};
}

#
# initialize daemon
#

sub init::daemon () {
    return unless $Config{daemon};
    $No::Worries::Log::Handler = \&No::Worries::Syslog::log2syslog;
    $No::Worries::Die::Syslog = 1;
    $No::Worries::Warn::Syslog = 1;
    syslog_open(ident => $ProgramName, facility => "user");
    $NeedsCleanup{syslog}++;
    proc_detach(callback => sub {
        printf("%s (pid %d) started\n", $ProgramName, $_[0])
    });
    log_debug1("daemonized");
}

#+++############################################################################
#                                                                              #
# main code                                                                    #
#                                                                              #
#---############################################################################

#
# initialization
#

sub init () {
    $| = 1;
    $Data::Dumper::Indent = 1;
    $Data::Dumper::Sortkeys = 1;
    option::parse();
    init::debug();
    init::callback();
    init::daemon();
    if ($Config{pidfile}) {
        pf_set($Config{pidfile});
        $NeedsCleanup{pidfile}++;
    }
}

#
# main loop
#

sub main () { ## no critic 'ProhibitExcessComplexity'
    my($running, $count, $size, %pending, $msg, $id, @list, %time);

    #
    # start
    #
    $time{start} = Time::HiRes::time();
    log_debug1("main starting");
    $SIG{HUP}  = sub { log_debug1("caught SIGHUP (ignored)") };
    $SIG{INT}  = sub { log_debug1("caught SIGINT");  $running = 0 };
    $SIG{QUIT} = sub { log_debug1("caught SIGQUIT"); $running = 0 };
    $SIG{TERM} = sub { log_debug1("caught SIGTERM"); $running = 0 };
    incoming::start();
    outgoing::start() unless $Config{lazy};
    callback::start($Config{callback}{data})
        if $Config{callback} and defined(&callback::start);
    #
    # run
    #
    log_debug1("main running");
    $count = $size = 0;
    $time{max} = defined($Config{duration}) ?
        Time::HiRes::time() + $Config{duration} : 0;
    $time{ina} = defined($Config{timeout}{inactivity}) ?
        Time::HiRes::time() + $Config{timeout}{inactivity} : 0;
    $running = 1;
    while ($running) {
        # done?
        last if defined($Config{count}) and $Config{count} <= $count;
        last if $time{max} and $time{max} <= Time::HiRes::time();
        # get
        if ($Config{reliable}) {
            if ($Config{window} and keys(%pending) > $Config{window}) {
                incoming::idle();
                ($msg, $id) = ("too many pending acks", undef);
            } else {
                ($msg, $id) = incoming::get();
                $pending{$id}++ if ref($msg);
            }
        } else {
            $msg = incoming::get();
        }
        # check inactivity
        if ($time{ina}) {
            if (ref($msg)) {
                $time{ina} = Time::HiRes::time() + $Config{timeout}{inactivity};
            } else {
                last if $time{ina} <= Time::HiRes::time();
            }
        }
        # update count and statistics (_before_ the callback, this is a feature)
        if (ref($msg)) {
            $count++;
            if ($Config{statistics}) {
                $size += $msg->size();
                $time{first} = Time::HiRes::time() if $count == 1;
            }
        }
        # callback
        if ($Config{callback}) {
            if (ref($msg)) {
                $msg = callback::check($msg);
                unless (ref($msg)) {
                    # ooops... message discarded by callback
                    if ($Config{reliable}) {
                        dief("unexpected ack id: %s", $id)
                            unless delete($pending{$id});
                        incoming::ack($id);
                    }
                    $time{last} = Time::HiRes::time() if $Config{statistics};
                }
            } else {
                callback::idle() if defined(&callback::idle);
            }
        }
        # put|idle
        if (ref($msg)) {
            log_debug3("loop got new message");
            if ($Config{lazy}) {
                outgoing::start();
                delete($Config{lazy});
            }
            @list = outgoing::put($msg, $id);
            $time{last} = Time::HiRes::time() if $Config{statistics};
        } else {
            if ($msg) {
                log_debug3("loop %s", $msg);
            } else {
                log_debug1("loop end");
                $running = 0;
            }
            if ($Config{lazy}) {
                @list = ();
            } else {
                @list = outgoing::idle();
            }
        }
        # ack
        foreach my $ackid (@list) {
            dief("unexpected ack id: %s", $ackid)
                unless delete($pending{$ackid});
            incoming::ack($ackid);
        }
        # check --quit and show that we are alive
        if ($Config{pidfile}) {
            if (pf_check($Config{pidfile}) eq "quit") {
                log_debug1("told to quit");
                last;
            }
            pf_touch($Config{pidfile});
        }
    }
    #
    # linger
    #
    log_debug1("main lingering");
    if (defined($Config{timeout}{linger})) {
        $time{max} = Time::HiRes::time() + $Config{timeout}{linger};
        $time{sleep} = min(0.1, $Config{timeout}{linger} / 10);
    } else {
        $time{max} = 0;
        $time{sleep} = 0.1;
    }
    $running = 1;
    while ($running) {
        # done?
        last unless keys(%pending);
        last if $time{max} and $time{max} <= Time::HiRes::time();
        @list = outgoing::idle();
        if (@list) {
            foreach my $ackid (@list) {
                dief("unexpected ack id: %s", $ackid)
                    unless delete($pending{$ackid});
                incoming::ack($ackid);
            }
        } else {
            incoming::idle();
            Time::HiRes::sleep($time{sleep}) if $time{sleep} >= 0.001;
        }
    }
    @list = keys(%pending);
    dief("%d pending messages", scalar(@list)) if @list;
    #
    # report statistics
    #
    if ($Config{statistics}) {
        if ($count == 0) {
            print("processed no messages\n");
        } elsif ($count == 1) {
            print("processed only 1 message\n");
        } else {
            $time{elapsed} = $time{last} - $time{first};
            printf("processed %d messages in %.3f seconds (%.3f k msg/sec)\n",
                   $count, $time{elapsed}, $count / $time{elapsed} / 1000);
            printf("throughput is around %.3f MB/second\n",
                   $size / $time{elapsed} / 1024 / 1024);
            printf("average message size is around %d bytes\n",
                   int($size / $count + 0.5));
        }
    }
    #
    # stop
    #
    log_debug1("main stopping");
    callback::stop() if $Config{callback} and defined(&callback::stop);
    outgoing::stop() unless $Config{lazy};
    incoming::stop();
    $time{stop} = Time::HiRes::time();
    $time{elapsed} = $time{stop} - $time{start};
    log_debug1("main processed %d messages in %.3f seconds",
              $count, $time{elapsed});
}

#
# cleanup
#

END {
    return if $No::Worries::Proc::Transient;
    log_debug1("cleanup");
    pf_unset($Config{pidfile}) if $NeedsCleanup{pidfile};
    syslog_close() if $NeedsCleanup{syslog};
}

#
# just do it ;-)
#

init();
main();

#+++############################################################################
#                                                                              #
# incoming modules                                                             #
#                                                                              #
#---############################################################################

#
# start() -> ()
# get()   -> (msg) | (msg, id) | ("text") | ("")
# ack(id) -> ()
# idle()  -> ()
# stop()  -> ()
#
# notes:
#  - "text" means there is no message this time and here is why
#  - "" means there will be no more messages (e.g. end of message queue)
#  - id is something meaningful for the module to keep track of not yet ack'ed messages
#

package incoming;

sub start () {
    if ($Config{incoming}{broker}) {
        incoming::broker::start();
        *incoming::get  = \&incoming::broker::get;
        *incoming::ack  = \&incoming::broker::ack;
        *incoming::idle = \&incoming::broker::idle;
        *incoming::stop = \&incoming::broker::stop;
    } elsif ($Config{incoming}{queue}) {
        incoming::queue::start();
        *incoming::get  = \&incoming::queue::get;
        *incoming::ack  = \&incoming::queue::ack;
        *incoming::idle = \&incoming::queue::idle;
        *incoming::stop = \&incoming::queue::stop;
    } else {
        die;
    }
}

############################################################################

package incoming::broker;

{

    our($Broker, $MsgBuf, $AckId, %Pending);

    sub start () {
        my(%option);
        $Broker = stomp::connection("incoming");
        $Config{incoming}{broker}{type} ||= stomp::type($Broker);
        $Broker->message_callback(sub {
            my($self, $frame) = @_;
            push(@{ $MsgBuf }, $frame) if $MsgBuf;
            return($frame);
        });
        foreach my $subscription (::listof($Config{subscribe})) {
            %option = (stomp::prefetch($Config{incoming}{broker}{type}),
                       %{ $subscription });
            if ($Config{reliable}) {
                $option{"ack"} = $Broker->version() ge "1.1"
                    ? "client-individual" : "client";
            } else {
                $option{"ack"} = "auto";
            }
            $option{"receipt"} ||= $Broker->uuid();
            $option{"id"} ||= $Broker->uuid();
            $subscription->{id} = $option{"id"};
            $Broker->subscribe(%option);
        }
        $MsgBuf = [];
        stomp::wait4receipts($Broker);
        if (@{ $MsgBuf }) {
            ::log_debug1("received %d messages while waiting for receipts",
                        scalar(@{ $MsgBuf }));
        } else {
            $MsgBuf = undef;
        }
        $AckId = "a";
    }

    sub get () {
        my($frame);
        if ($MsgBuf) {
            $frame = shift(@{ $MsgBuf });
            $MsgBuf = undef unless @{ $MsgBuf };
        } else {
            $frame = $Broker->wait_for_frames(timeout => 1);
        }
        stomp::beat("incoming", $Broker) if $Config{heart}{beat};
        return("no frames received") unless $frame;
        return($frame->messagify()) unless $Config{reliable};
        $Pending{$AckId} = $frame;
        return($frame->messagify(), $AckId++);
    }

    sub ack ($) {
        my($id) = @_;
        my($frame, $value, %option);
        $frame = delete($Pending{$id});
        $value = $frame->header("message-id");
        $option{"message-id"} = $value if defined($value);
        $value = $frame->header("subscription");
        $option{"subscription"} = $value if defined($value);
        $value = $frame->header("ack");
        $option{"id"} = $value if defined($value);
        $frame = Net::STOMP::Client::Frame->new(
            command => "ACK",
            headers => \%option,
        );
        $Broker->queue_frame($frame);
        $Broker->send_data(timeout => 0);
    }

    sub idle () {
        # maybe send pending data...
        $Broker->send_data(timeout => 0);
        stomp::beat("incoming", $Broker) if $Config{heart}{beat};
    }

    sub stop () {
        my(@unsubscribe, %option);
        # first flush pending bytes
        $Broker->send_data(timeout => $Config{timeout}{flush});
        ::dief("failed to flush outgoing buffer (%d bytes)!",
               $Broker->outgoing_buffer_length())
            if $Broker->outgoing_buffer_length();
        # then optionally unsubscribe
        if ($Config{unsubscribe}) {
            @unsubscribe = ::listof($Config{unsubscribe});
            foreach my $subscription (::listof($Config{subscribe})) {
                %option = @unsubscribe ? %{ shift(@unsubscribe) } : ();
                $option{"receipt"} ||= $Broker->uuid();
                $option{"id"} = $subscription->{id};
                $Broker->unsubscribe(%option);
            }
            stomp::wait4receipts($Broker);
        }
        $Broker->message_callback(sub { return(1) });
        # then carefully disconnect from broker
        stomp::disconnect($Broker);
        $Broker = undef;
    }

}

############################################################################

package incoming::queue;

{

    our($Queue, $Path, $EndOfQueue, $LockFailed, $PurgeTime, $AckId, %Pending);

    # purge at most every 5 minutes
    sub _purge () {
        return unless not $PurgeTime or time() >= $PurgeTime;
        $Queue->purge();
        ::log_debug2("incoming queue has been purged");
        $PurgeTime = time() + ::PURGE_INTERVAL;
    }

    sub start () {
        $Config{incoming}{queue}{type} ||= "DQS";
        $Queue = Messaging::Message::Queue->new($Config{incoming}{queue});
        $Path = $Queue->path();
        $EndOfQueue = 1;
        ::log_debug1("incoming queue %s %s",
                    $Config{incoming}{queue}{type}, $Path);
        $AckId = "a";
    }

    sub get () {
        my($elt, $msg);
        if ($EndOfQueue) {
            # (re)start from beginning
            _purge();
            ::log_debug2("incoming queue has %d messages", $Queue->count())
                if ::log_wants_debug();
            $EndOfQueue = 0;
            $LockFailed = 0;
            $elt = $Queue->first();
        } else {
            # progress from where we were
            $elt = $Queue->next();
        }
        unless ($elt) {
            # reached the end
            return("") unless $Config{loop};
            $EndOfQueue = 1;
            sleep(1) if $LockFailed == $Queue->count();
            return("end of queue");
        }
        unless ($Queue->lock($elt)) {
            # cannot lock this one this time...
            $LockFailed++;
            return("failed to lock");
        }
        if ($Config{incoming}{queue}{type} eq "DQS"
            and not -s $Queue->get_path($elt)) {
            ::warnf("removed empty file %s/%s", $Path, $elt);
            $Queue->remove($elt);
            return("removed empty file");
        }
        ::log_debug3("incoming message get %s/%s", $Path, $elt);
        $msg = $Queue->get_message($elt);
        if ($Config{reliable}) {
            $Pending{$AckId} = $elt;
            return($msg, $AckId++);
        } else {
            if ($Config{remove}) {
                $Queue->remove($elt);
            } else {
                $Queue->unlock($elt);
            }
            return($msg);
        }
    }

    sub ack ($) {
        my($id) = @_;
        my($elt, $path);
        $elt = delete($Pending{$id});
        $path = "$Path/$elt";
        if ($Config{incoming}{queue}{type} !~ /^DQ/ or -e $path) {
            if ($Config{remove}) {
                ::log_debug3("incoming message remove %s", $path);
                $Queue->remove($elt);
            } else {
                ::log_debug3("incoming message unlock %s", $path);
                $Queue->unlock($elt);
            }
        } else {
            # this can happen if we did not receive an ack quickly enough,
            # purged the queue removing the stale lock, re-sent the same
            # message and eventually received both acks for the same element
            ::warnf("ignored duplicate ack for %s", $path);
        }
    }

    sub idle () {
        _purge();
    }

    sub stop () {
        foreach my $elt (values(%Pending)) {
            ::log_debug3("incoming message unlock %s/%s", $Path, $elt);
            $Queue->unlock($elt);
        }
        $Queue = undef;
    }

}

#+++############################################################################
#                                                                              #
# outgoing modules                                                             #
#                                                                              #
#---############################################################################

#
# start()      -> ()
# put(msg)     -> ()
# put(msg, id) -> (id...)
# idle()       -> (id...)
# stop()       -> ()
#
# notes:
#  - id is what has been given by the incoming module
#

package outgoing;

sub start () {
    if ($Config{outgoing}{broker}) {
        outgoing::broker::start();
        *outgoing::put  = \&outgoing::broker::put;
        *outgoing::idle = \&outgoing::broker::idle;
        *outgoing::stop = \&outgoing::broker::stop;
    } elsif ($Config{outgoing}{queue}) {
        outgoing::queue::start();
        *outgoing::put  = \&outgoing::queue::put;
        *outgoing::idle = \&outgoing::queue::idle;
        *outgoing::stop = \&outgoing::queue::stop;
    } else {
        die;
    }
}

############################################################################

package outgoing::broker;

{

    our($Broker, %Pending);

    # return the recently received acks
    sub _acks () {
        my(%missing, @done);
        return() unless keys(%Pending);
        1 while $Broker->wait_for_frames(timeout => 0);
        grep($missing{$_}++, $Broker->receipts());
        @done = grep(!$missing{$_}, keys(%Pending));
        return(delete(@Pending{@done}));
    }

    sub start () {
        if ($Config{outgoing}{broker}{uri} =~ /^udp:\/\/([\w\-\.]+):(\d+)$/) {
            $Broker = stomp::udp("outgoing", $1, $2);
        } else {
            $Broker = stomp::connection("outgoing");
            $Config{outgoing}{broker}{type} ||= stomp::type($Broker);
        }
    }

    sub put ($;$) {
        my($msg, $id) = @_;
        my($rid, $frame, $sent);
        if ($id) {
            $rid = $Broker->uuid();
            $Pending{$rid} = $id;
            $msg->header_field("receipt", $rid);
        }
        $frame = Net::STOMP::Client::Frame::demessagify($msg);
        if ($Broker->isa("IO::Socket::INET")) {
            $sent = $Broker->send(${ $frame->encode(version => "1.1") });
            ::dief("failed to send(): %s", $!) unless $sent;
            return();
        }
        $Broker->queue_frame($frame);
        $Broker->send_data(timeout => 0);
        stomp::beat("outgoing", $Broker) if $Config{heart}{beat};
        return(_acks());
    }

    sub idle () {
        # maybe send pending data...
        return() if $Broker->isa("IO::Socket::INET");
        $Broker->send_data(timeout => 0);
        stomp::beat("outgoing", $Broker) if $Config{heart}{beat};
        return(_acks());
    }

    sub stop () {
        unless ($Broker->isa("IO::Socket::INET")) {
            # first flush pending bytes
            $Broker->send_data(timeout => $Config{timeout}{flush});
            ::dief("failed to flush outgoing buffer (%d bytes)!",
                   $Broker->outgoing_buffer_length())
                if $Broker->outgoing_buffer_length();
            # then carefully disconnect from broker
            stomp::disconnect($Broker);
        }
        $Broker = undef;
    }

}

############################################################################

package outgoing::queue;

{

    our($Queue, $Path);

    sub start () {
        $Config{outgoing}{queue}{type} ||= "DQS";
        $Queue = Messaging::Message::Queue->new($Config{outgoing}{queue});
        $Path = $Queue->path();
        ::log_debug1("outgoing queue %s %s",
                    $Config{outgoing}{queue}{type}, $Path);
    }

    sub put ($;$) {
        my($msg, $id) = @_;
        my($elt);
        $elt = $Queue->add_message($msg);
        ::log_debug3("outgoing message added %s/%s", $Path, $elt);
        return() unless $id;
        return($id);
    }

    sub idle () {
        return();
    }

    sub stop () {
        $Queue = undef;
    }

}

############################################################################

__END__

=head1 NAME

stompclt - versatile STOMP client

=head1 SYNOPSIS

B<stompclt> [I<OPTIONS>]

=head1 DESCRIPTION

B<stompclt> is a versatile tool to interact with messaging brokers speaking
STOMP and/or message queues (see L<Messaging::Message::Queue>) on disk.

It receives messages (see L<Messaging::Message>) from an incoming module,
optionally massaging them (i.e. filtering and/or modifying), and sends them to
an outgoing module. Depending on which modules are used, the tool can perform
different operations.

Here are the supported incoming modules:

=over

=item *

broker: connect to a messaging broker using STOMP, subscribe to one or more
destinations and receive the messages sent by the broker

=item *

queue: read messages from a message queue on disk

=back

Here are the supported outgoing modules:

=over

=item *

broker: connect to a messaging broker using STOMP and send the messages

=item *

queue: store the messages in a message queue on disk

=back

Here are some frequently used combinations:

=over

=item *

I<incoming broker + outgoing queue>: drain some destinations, storing the
messages on disk

=item *

I<incoming queue + outgoing broker>: (re-)send messages that have been
previously stored on disk, optionally with modifications (such as altering the
destination)

=item *

I<incoming broker + outgoing broker>: shovel messages from one broker to
another

=back

See the L</"EXAMPLES"> sections for concrete examples.

=head1 OPTIONS

=over

=item B<--callback-code> I<CODE>

execute the given Perl code on each message, see the L</"CALLBACK"> section
below for more information

=item B<--callback-data> I<KEY=VALUE...>

pass this data to the user supplied callback code, see the L</"CALLBACK">
section below for more information

=item B<--callback-path> I<PATH>

execute the Perl code in the given file on each message, see the
L</"CALLBACK"> section below for more information

=item B<--config>, B<--conf>, B<--cfg> I<PATH>

use the given configuration file, see the L</"CONFIGURATION FILE"> section
below for more information

=item B<--config-general> I<KEY=VALUE...>

use the given L<Config::General> options when creating the configuration parser

=item B<--count>, B<-c> I<INTEGER>

process at most the given number of messages; note: when using an incoming
broker, to avoid consuming more messages, it is recommended to enable the
B<--reliable> option

=item B<--daemon> | B<--no-daemon>

detach B<stompclt> so that it becomes a daemon running in the background;
debug, warning and error messages will get sent to syslog; this option can be
negated

=item B<--debug>, B<-d>

show debugging information

=item B<--duration> I<SECONDS>

process messages during at most the given number of seconds and then stop; can
be fractional

=item B<--heart-beat> | B<--no-heart-beat>

enable STOMP 1.1 heart-beats between B<stompclt> and the broker(s); this
option can be negated

=item B<--help>, B<-h>, B<-?>

show some help

=item B<--incoming-broker-auth> I<STRING>

use this authentication string (see L<Authen::Credential>) to authenticate to
the incoming broker; this option can be given multiple times

=item B<--incoming-broker-connect> I<KEY=VALUE...>

use these options in the STOMP CONNECT frame sent to the incoming broker

=item B<--incoming-broker-sockopts> I<KEY=VALUE...>

use these socket options when connecting to the incoming broker

=item B<--incoming-broker-stomp-debug> I<STRING>

set the STOMP debug flags (see L<Net::STOMP::Client>) when interacting with
the incoming broker

=item B<--incoming-broker-type> I<STRING>

set the incoming broker type; this can be useful when using STOMP features
which are broker specific

=item B<--incoming-broker-uri> I<URI>

use this connection URI (see L<Net::STOMP::Client>) to connect to the incoming
broker

=item B<--incoming-queue> I<KEY=VALUE...>

read incoming messages from the given message queue (see
L<Messaging::Message::Queue>)

=item B<--lazy> | B<--no-lazy>

initialize the outgoing module only after having received the first message;
this option can be negated

=item B<--list>, B<-l>

show all supported options

=item B<--loop> | B<--no-loop>

when using an incoming message queue, loop over it; this option can be negated

=item B<--manual>, B<-m>

show this manual

=item B<--outgoing-broker-auth> I<STRING>

use this authentication string (see L<Authen::Credential>) to authenticate to
the outgoing broker; this option can be given multiple times

=item B<--outgoing-broker-connect> I<KEY=VALUE...>

use these options in the STOMP CONNECT frame sent to the outgoing broker

=item B<--outgoing-broker-sockopts> I<KEY=VALUE...>

use these socket options when connecting to the outgoing broker

=item B<--outgoing-broker-stomp-debug> I<STRING>

set the STOMP debug flags (see L<Net::STOMP::Client>) when interacting with
the outgoing broker

=item B<--outgoing-broker-type> I<STRING>

set the outgoing broker type; this can be useful when using STOMP features
which are broker specific

=item B<--outgoing-broker-uri> I<URI>

use this connection URI (see L<Net::STOMP::Client>) to connect to the outgoing
broker

=item B<--outgoing-queue> I<KEY=VALUE...>

store outgoing messages into the given message queue (see
L<Messaging::Message::Queue>)

=item B<--pidfile> I<PATH>

use this pid file

=item B<--prefetch> I<INTEGER>

set the prefetch value (i.e. the maximum number of messages to received
without acknowledging them) on the incoming broker

=item B<--quit>

tell another instance of B<stompclt> (identified by its pid file, as specified
by the B<--pidfile> option) to quit

=item B<--reliable> | B<--no-reliable>

use STOMP features for more reliable messaging (i.e. client side
acknowledgments and receipts) at the cost of less performance; this option can
be negated

=item B<--remove> | B<--no-remove>

when using an incoming message queue, remove the processed messages; this
option can be negated

=item B<--statistics>, B<-s> | B<--no-statistics>

report statistics at the end of the execution; this option can be negated

=item B<--status>

get the status of another instance of B<stompclt> (identified by its pid file,
as specified by the B<--pidfile> option); the exit code will be zero if the
instance is alive and non-zero otherwise

=item B<--subscribe> I<KEY=VALUE...>

use these options in the STOMP SUBSCRIBE frame used with the incoming broker;
this option can be given multiple times

=item B<--timeout-broker> I<SECONDS>

use this timeout when interacting with the broker (e.g. getting receipts
back); can be fractional

=item B<--timeout-client> I<SECONDS>

use this timeout for the client heart-beat; can be fractional (default: 40)

=item B<--timeout-connect> I<SECONDS>

use this timeout when connecting to the broker; can be fractional

=item B<--timeout-disconnect> I<SECONDS>

use this timeout when disconnecting from the broker; can be fractional
(default: 60)

=item B<--timeout-flush> I<SECONDS>

use this timeout when attempting to send the last bytes to the broker just
before disconnecting; can be fractional (default: 60)

=item B<--timeout-inactivity> I<SECONDS>

use this timeout in the incoming module to stop B<stompclt> when no new
messages have been received (aka drain mode); can be fractional

=item B<--timeout-linger> I<SECONDS>

when stopping B<stompclt>, use this timeout to finish interacting with the
broker; can be fractional

=item B<--timeout-server> I<SECONDS>

use this timeout for the server heart-beat; can be fractional (default: 10)

=item B<--timeout-status> I<SECONDS>

use this timeout when checking the status with B<--status>; can be fractional

=item B<--unsubscribe> I<KEY=VALUE...>

use these options in the STOMP UNSUBSCRIBE frame used with the incoming
broker; this option can be given multiple times and should match the
B<--subscribe> options

=item B<--version>

display version information

=item B<--window> I<INTEGER>

keep at most the given number of not-yet-acknowledged messages in memory

=back

To list all the available options in a compact form, type:

  $ stompclt -l

=head1 CONFIGURATION FILE

B<stompclt> can read its options from a configuration file. For this, the
L<Config::General> module is used and the option names are the same as on the
command line. For instance:

  daemon = true
  pidfile = /var/run/stompclt.pid
  incoming-queue = path=/var/spool/stompclt
  outgoing-broker-uri = stomp://broker.acme.com:6163
  outgoing-broker-auth = "plain name=guest pass=guest"

Alternatively, options can be nested:

  <outgoing-broker>
      uri = stomp://broker.acme.com:6163
      auth = "plain name=guest pass=guest"
  </outgoing-broker>

Or even:

  <outgoing>
      <broker>
          uri = stomp://broker.acme.com:6163
          <auth>
              scheme = plain
              name = guest
              pass = guest
          </auth>
      </broker>
  </outgoing>

The options specified on the command line have precedence over the ones found
in the configuration file.

=head1 CALLBACK

B<stompclt> can be given Perl code to execute on all processed messages. This
can be used for different purposes:

=over

=item *

massaging: the code can change any part of the message, including setting or
removing header fields

=item *

filtering: the code can decide if the message must be given to the outgoing
module or not

=item *

displaying: the code can print any part of the message

=item *

copying: the code can store a copy of the message into files or message queues

=back

To use callbacks, the B<--callback-path> or B<--callback-code> option must be
used. The Perl code must provide functions with the following signature:

=over

=item start(DATA)

(optional) this will be called when the program starts, with the supplied data
(see the B<--callback-data> option) as a hash reference

=item check(MESSAGE)

(mandatory) this will be called when the program has one message to process;
it will be given the message (see L<Messaging::Message>) and must return
either a message (it could be the same one or a new one) or a string
describing why the message has been dropped

=item idle()

(optional) this will be called when the program has no message to process

=item stop()

(optional) this will be called when the program stops

=back

The code can be put in a file, on the command line or in the B<stompclt>
configuration file, using the "here document" syntax.

Here is an example (to be put in the B<stompclt> configuration file) that
prints on stdout a JSON array of messages:

  callback-code = <<EOF
      my($count);
      sub start ($) {
          $count = 0;
      }
      sub check ($) {
          my($msg) = @_;
          print($count++ ? "," : "[");
          print($msg->serialize(), "\n");
          return($msg);
      }
      sub stop () {
          print($count ? "]\n" : "[]\n");
      }
  EOF

For simple callback code that only needs the C<check> subroutine, it is enough
to supply the "inside code". If the subroutine definition is missing, the
supplied code will be wrapped with:

  sub check ($) {
      my($msg) = @_;
      local *hdr = $msg->header();
      local *bdy = $msg->body_ref();
      ... your code goes here ...
      return($msg);
  }

This allows for instance to remove the C<message-id> header with something
like:

  $ stompclt ... --callback-code 'delete($hdr{"message-id"})'

Or to filter on message bodies with:

  $ stompclt ... --callback-code 'return("skip") unless $bdy =~ /error/'

=head1 SUBSCRIPTIONS

In the case of an incoming broker, B<stompclt> deals with the subscriptions
defined by the B<--subscribe> option.

Regardless of the B<--reliable> option, subscriptions are always made using
receipts. Also, if missing, an C<id> header is always added.

Here is for instance how to create a named durable topic subscription using
Apollo:

  $ stompclt ... --subscribe 'destination=/topic/foo persistent=true id=mysub'

By default, when it finishes, B<stompclt> does not unsubscribe. It simply
disconnects from the broker and the latter will perform the necessary cleanup
when terminating the STOMP connection.

If the B<--unsubscribe> option is given, even if it is empty, B<stompclt> will
explicitly unsubscribe before disconnecting, also using receipts.

Here is for instance how to destroy, when B<stompclt> ends, the durable topic
subscription created above:

  $ stompclt ... --unsubscribe 'persistent=true'

There is no need to give the subscription C<id> in the B<--unsubscribe> option
because, by default, it comes from the matching B<--subscribe> option.

=head1 UDP SUPPORT

B<stompclt> has experimental UDP support (outgoing only). This has been tested
with Apollo.

To use it, simply specify an outgoing URI that uses UDP such as:

  $ stompclt ... --outgoing-broker-uri udp://broker.acme.com:6163

Features such as authentication, heart beating, reliability and socket options
are not supported over UDP.

=head1 EXAMPLES

=head2 SENDING

Here is an example of a configuration file for a message sender daemon (from
queue to broker), forcing the C<persistent> header to C<true> (something which
is highly recommended for reliable messaging) and setting the destination:

  # define the source message queue
  <incoming-queue>
      path = /var/spool/sender
  </incoming-queue>
  # modify the message header on the fly
  callback-code = <<EOF
      $hdr{destination} = "/queue/app1.data";
      $hdr{persistent} = "true";
  EOF
  # define the destination broker
  <outgoing-broker>
      uri = "stomp://broker.acme.com:6163"
  </outgoing-broker>
  # miscellaneous options
  reliable = true
  pidfile = /var/run/sender.pid
  daemon = true
  loop = true
  remove = true

=head2 RECEIVING

Here is an example of a configuration file for a message receiver daemon (from
broker to queue):

  # define the source broker
  <incoming-broker>
      uri = "stomp://broker.acme.com:6163"
      <auth>
          scheme = plain
          name = receiver
          pass = secret
      </auth>
  </incoming-broker>
  # define the subscriptions
  <subscribe>
      destination = /queue/app1.data
  </subscribe>
  <subscribe>
      destination = /queue/app2.data
  </subscribe>
  # define the destination message queue
  <outgoing-queue>
      path = /var/spool/receiver
  </outgoing-queue>
  # miscellaneous options
  pidfile = /var/run/receiver.pid
  daemon = true

Here is how to use the configuration file above with some options overridden
on the command line to drain the queues in the foreground:

  $ stompclt --config test.conf --no-daemon --timeout-inactivity 10

=head2 SHOVELING

Here is an example of a configuration file for a message shoveler (from broker
to broker), clearing some headers on the fly so that messages can be replayed
safely:

  # define the source broker
  <incoming-broker>
      uri = "stomp://broker.acme.com:6163"
  </incoming-broker>
  # define the subscriptions
  <subscribe>
      destination = /queue/app1.data
  </subscribe>
  <subscribe>
      destination = /queue/app2.data
  </subscribe>
  # define the destination broker
  <outgoing-broker>
      uri = "stomp://dev-broker.acme.com:6163"
  </outgoing-broker>
  # modify the message so that it can be replayed
  callback-code = <<EOF
      foreach my $name (qw(message-id timestamp expires)) {
          delete($hdr{$name});
      }
  EOF

=head2 TAPPING

Callback code can also be used to tap messages, i.e. get a copy of all
messages processed by B<stompclt>. Here is some callback code for this purpose
that could for instance be merged with the shoveling code above. It also shows
how to use the B<--callback-data> option:

  callback-code = <<EOF
      my($queue);
      sub start ($) {
          my($data) = @_;
          $queue = Messaging::Message::Queue->new($data);
      }
      sub check ($) {
          my($msg) = @_;
          $queue->add_message($msg);
          return($msg);
      }
  EOF

Callback data must be given to specify which message queue to use:

  $ stompclt --config tap.conf --callback-data "path=/tmp/tap type=DQS"

=head1 SEE ALSO

L<Authen::Credential>,
L<Config::General>,
L<Messaging::Message>,
L<Messaging::Message::Queue>,
L<Net::STOMP::Client>.

=head1 AUTHOR

Lionel Cons L<http://cern.ch/lionel.cons>

Copyright (C) CERN 2012-2019
