#! /usr/bin/perl 

# Copyright 1999-2011 University of Chicago
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
# http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require Pod::Usage;

=head1 NAME

globus-gram-audit - Upload audit records to a database.

=head1 SYNOPSIS

B<globus-gram-audit> [--conf I<CONFIG-FILE>] [--create | --update=OLD-VERSION] [--check] [--delete]
                     [-audit-directory I<DIR>]

=head1 DESCRIPTION

Upload audit records to a database. Reads
F<$GLOBUS_LOCATION/etc/globus-job-manager.conf> by default
to determine the audit directory and then uploads all files in that directory
that contain valid audit records to the database configured by the 
globus_gram_job_manager_auditing_setup_scripts package. If the upload completes
successfully, the audit files will be removed.

=head1 OPTIONS

=over

=item B<--conf> I<CONFIG-FILE>

Use CONFIG-FILE instead of the default for audit database information

=item B<--create> | B<--update=OLD-VERSION>

Create or update audit database tables.

=item B<--check>

Check whether the insertion of a record was successful. This is used in tests.

=item B<--delete>

Delete the audit record right after inserting it. This is used in tests.

=item B<--audit-directory> I<DIR>

Look for audit records in DIR, instead of looking in the directory specified in
the job manager configuration. This is used in tests.

=item B<--query> I<SQL>

Perform the give SQL query on the audit database.

=back

=cut

use Getopt::Long;
use strict;
use DBI qw(:sql_types);
use POSIX;
use Time::Local;
use Globus::Core::Paths;

my $rc = 0;
my $check = 0;
my $delete = 0;
my $audit_directory = "";
my $help = 0;
my $conf_path = '';
my %conf = ();
my $query = undef;
my $create = undef;
my $update = undef;
my $dbh;
my $sth;
my $check_sth;
my $delete_sth;
my $quiet = 0;

GetOptions(
  'help' => \$help,
  'conf=s' => \$conf_path,
  'check' => \$check,
  'delete' => \$delete,
  'query=s' => \$query,
  'create' => \$create,
  'update=s' => \$update,
  'audit-directory=s' => \$audit_directory,
  'quiet' => \$quiet)
|| Pod::Usage::pod2usage(1);

Pod::Usage::pod2usage(-verbose => 1, -exitval => 0) if $help;

# If the audit record directory wasn't passed as command-line argument,
# read it from job manager configuration file
if ($audit_directory eq '') {
    $audit_directory = get_audit_directory();
    if ($audit_directory eq '' && $query eq '' && (!$create) && (!$update)) {
        print STDERR "No -audit-directory specified in job-manager.conf. " .
            "Nothing to do\n" unless ($quiet);
        exit(1);
    }
}

# Set default audit configuration path if not specified on command-line
if ($conf_path eq '') {
    $conf_path = "$Globus::Core::Paths::sysconfdir/globus/gram-audit.conf";
}

# Parse configuration
(%conf = parse_conf($conf_path)) || exit(1);

# Deal with DB driver
$dbh = dbconnect(\%conf);
if (!defined($dbh))
{
    exit(1);
}

# Implement query or else insert records from the audit directory
if (defined($query))
{
    &execute_query($dbh, $query);
}
elsif ($create)
{
    # Create DB table in the audit database. For SQLite, additionally create the
    # directory for the table file if not present.
    my $auditschemadir = "$Globus::Core::Paths::datadir/globus/gram-audit";
    my $path;
    my $fh;
    my $table_sql;

    if ($conf{DRIVER} eq 'SQLite')
    {
        if ($conf{DATABASE} =~ m/dbname=([^;]*)/) {
            my $dbdir = $1;
            my $dbfile = $1;
            $dbdir =~ s|/[^/]*$||;
            mkdir $dbdir, 0700 if (! -d $dbdir);
        }
    }

    $path = "$auditschemadir/audit-$conf{DRIVER}-$conf{AUDITVERSION}.sql";

    open($fh, "<$path")
        || die "Unable to locate schema for driver $conf{DRIVER} and version $conf{AUDITVERSION} ($path)\n";

    $table_sql = join('', <$fh>);

    foreach (split(/;/, $table_sql)) {
        chomp;
        if ($_ ne '') {
            &execute_query($dbh, $_) || die "Error creating table\n";
        }
    }
    exit(1);
}
elsif ($update)
{
    # Update DB table in the audit database from $update version to currently
    # configured version. For SQLite, additionally create the
    # directory for the table file if not present.
    my $auditschemadir = "$Globus::Core::Paths::datadir/globus/gram-audit";
    my $path;
    my $fh;
    my $table_sql;

    if ($conf{DRIVER} eq 'SQLite')
    {
        if ($conf{DATABASE} =~ m/dbname=([^;]*)/) {
            my $dbdir = $1;
            $dbdir =~ s|/[^/]*$||;
            mkdir $dbdir, 0700 if (! -d $dbdir);
        }
    }

    $path = "$auditschemadir/audit-$conf{DRIVER}-$update-$conf{AUDITVERSION}.sql";

    open($fh, "<$path")
        || die "Unable to locate upgrade schema for driver $conf{DRIVER} from version $update to $conf{AUDITVERSION} ($path)\n";

    $table_sql = join('', <$fh>);

    foreach (split(/;/, $table_sql)) {
        chomp;
        if ($_ ne '') {
            &execute_query($dbh, $_) || die "Error updating tables\n";
        }
    }
    exit(1);
}
else
{
    $sth = insertstatement($dbh, \%conf) || exit(1);
    if ($delete) {
        $delete_sth = $dbh->prepare(
            "DELETE FROM gram_audit_table WHERE job_grid_id = ?");
    } else {
        $delete_sth = undef;
    }

    if ($check) {
        $check_sth = checkstatement($dbh, \%conf);
    } else {
        $check_sth = undef;
    }

    foreach my $f (glob("$audit_directory/*.gramaudit")) {
        my $r = upload_record($f, $dbh, $sth, $check_sth, $delete_sth); 
        $rc += $r;

        if ($r == 0) {
            remove($f);
        }
    }
}
exit($rc);

# Upload an audit record into the configured database. If $check_sth is
# defined, this function also retrieves the record and compares the input
# from the gramaudit file with the result from querying the database. If 
# $delete_sth is defined, this function will remove the record after inserting
# it. This is used in the test scripts, and is not expected to be generally
# used.
#
# \param $record_file
#     Path to the gramaudit file to parse and upload
# \param $dbh
#     Database handle
# \param $sth
#     Prepared INSERT statement handle
# \param $check_sth
#     Prepared SELECT statement handle to retrieve the row from the
#     database (for debugging).
# \param $delete_sth
#     Prepared DELETE statement handle to remove the row from the
#     database after inserting it (for debugging).
#
# \return
#     Returns true if the record was inserted (and optionally checked and/or
#     deleted) successfully, false otherwise.
sub upload_record {
    my $record_file = shift;
    my $dbh = shift;
    my $sth = shift;
    my $check_sth = shift;
    my $delete_sth = shift;
    my $owner;
    my $record;
    my @record_entries;
    my $record_fd;
    my $rv;
    local(*FH);

    # Use POSIX::open instead of perl open so that we can fstat the file
    # to safely get the ownership
    $record_fd = POSIX::open($record_file);
    if (!defined($record_fd))
    {
        print STDERR "Error opening $record_file\n";
        return 1;
    }

    $owner = (POSIX::fstat($record_fd))[4];

    # convert to a perl file handle for easier I/O with the <> operator
    open(FH, "<&=$record_fd");

    # read the record file and confirm that the owner of the file
    # is the local user id in the audit record 
    chomp($record = <FH>);
    $record =~ s/^"//;
    $record =~ s/"$//;
    @record_entries = split(/"[^"]"/, $record);
    close(FH);

    if ($conf{AUDITVERSION} eq '1') {
        if (scalar(@record_entries) == 16) {
            # Job manager is aware of TG audit extension, but the database is
            # not configured to use it, so we'll drop the last field
            # (gateway_user)
            pop(@record_entries);
        } elsif (scalar(@record_entries) != 15) {
            # Invalid audit record
            print STDERR "Skipping invalid audit record $record_file\n";
            return 1;
        }
        verify_audit_fields(\@record_entries, \%conf) || return 1;
    } elsif ($conf{AUDITVERSION} eq '1TG') {
        if (scalar(@record_entries) == 15) {
            push(@record_entries, 'NULL');
        } elsif (scalar(@record_entries) != 16) {
            print STDERR "Skipping invalid audit record $record_file\n";
            return 1;
        }
        verify_audit_fields(\@record_entries, \%conf) || return 1;
    }

    # don't process entries where the owner != the job user
    if ($record_entries[3] ne (getpwuid($owner))[0]) {
        print STDERR "Skipping record-file $record_file: "
                   . "record owner is different from local user id "
                   . "in audit record\n";
        return 1;
    }

    $rv = $sth->execute(map {
            s/&quot;/"/g;
            if ($_ eq 'NULL') {
                $_ = undef;
            }
            $_ } @record_entries); # "
    $sth->finish;
    if (!$rv)
    {
        print STDERR "Insert of $record_file failed: " . $dbh->errstr . "\n";
        return 1;
    }

    if (defined($check_sth))
    {
        my $rv = $check_sth->execute($record_entries[0]);
        my @row;

        if ($conf{DRIVER} ne 'SQLite') {
            $check_sth->bind_col(5, undef, {TYPE => SQL_DATETIME});
            $check_sth->bind_col(6, undef, {TYPE => SQL_DATETIME});
        }

        @row = $check_sth->fetchrow_array();

        if (scalar(@row) != scalar(@record_entries)) {
            print STDERR "Check failed: row contained " . scalar(@row) 
                       . " fields instead of expected "
                       . scalar(@record_entries) . "\n";
            return 1;
        }

        for (my $i = 0; $i < scalar(@record_entries); $i++) {
            if ((!defined($row[$i]) &&  defined($record_entries[$i])) ||
                ( defined($row[$i]) && !defined($record_entries[$i])) ||
                ( defined($row[$i]) && $row[$i] ne $record_entries[$i])) {
                print STDERR "Check failed: "
                           . $row[$i] . " != " . $record_entries[$i] . "\n";
                return 1;
            }
        }
    }

    if (defined($delete_sth))
    {
        $delete_sth->execute($record_entries[0]) || return 1;
    }

    close(FH);
    return 0;
}

# get_audit_directory
#
# Parse the default job manager configuration file. Return the argument
# to -audit-directory if present in the file, or an empty string.
#
# \return
#     Path of the audit directory, or an empty string if not present in the
#     configuration
sub get_audit_directory {
    my $jmconf = "$Globus::Core::Paths::sysconfdir/globus/globus-gram-job-manager.conf";
    my $audit_directory = '';
    local(*F);
    open(F, "<$jmconf");
    while(<F>) {
        if (m/-audit-directory\s+(\S+)\s+/) {
            $audit_directory = Globus::Core::Paths::eval_path($1);
            last;
        }
    }
    close(F);
    return $audit_directory;
}

# parse_conf
#
# Parse configuration file named by $path. The configuration file
# is expected to contain key-value pairs separated by :
#
# \arg $path
#     Path to GRAM audit configuration file
# \return
#     Hash containing ($key, $value) pairs from $path
sub parse_conf
{
    my $path = shift;
    my %result = ();

    local(*CONF);

    if (!open(CONF, "<$path"))
    {
        print STDERR "Error opening configuration file \"$path\"\n"
            unless ($quiet);
        return ();
    }

    while (<CONF>)
    {
        my ($key, $value);
        chomp;
        ($key, $value) = split(/:/, $_, 2);

        $result{$key} = Globus::Core::Paths::eval_path($value);
    }
    return %result;
}

# Connect to database described in configuration hash %conf.
#
# \arg %conf
#     Hash containing DRIVER, DATABASE, USERNAME, and PASSWORD keys.
# \return
#     DBI handle connected to the selected database or undef.
sub dbconnect
{
    my $conf = $_[0];
    my $dbh;
    my $sth;
    my @available_drivers = DBI->available_drivers();
    my $driver_ok = 0;

    for (my $i = 0; $i < scalar(@available_drivers); $i++)
    {
        if ($available_drivers[$i] eq $conf->{'DRIVER'})
        {
            $driver_ok = 1;
        }
    }
    if (!$driver_ok)
    {
        print STDERR "Error connecting to database: driver "
                   . "\"$conf->{DRIVER}\" not installed\n" unless ($quiet);
        return undef;
    }

    $dbh = DBI->connect(
            "dbi:$conf->{DRIVER}:$conf->{DATABASE}",
            $conf->{USERNAME},
            $conf->{PASSWORD});

    if (!defined($dbh))
    {
        print STDERR "Error connecting to database\n" unless ($quiet);
        return undef;
    }

    return $dbh;
}

# Generate an audit-version specific SQL statement to insert a row into
# the database, with placeholders in the same order as the gramaudit file.
# 
# \param $dbh
#     Database handle.
# \param $conf
#     Reference to the configuration hash from the globus-gram-auditing.conf
#     file
#
# \return
#     A prepared SQL statement or undef if an error occurs.
sub insertstatement
{
    my $dbh = shift;
    my $conf = shift;
    my $sth;

    if ($conf->{AUDITVERSION} eq '1')
    {
        $sth = $dbh->prepare("INSERT INTO gram_audit_table("
             . "job_grid_id,"
             . "local_job_id,"
             . "subject_name,"
             . "username,"
             . "idempotence_id,"
             . "creation_time,"
             . "queued_time,"
             . "stage_in_grid_id,"
             . "stage_out_grid_id,"
             . "clean_up_grid_id,"
             . "globus_toolkit_version,"
             . "resource_manager_type,"
             . "job_description,"
             . "success_flag,"
             . "finished_flag)"
             . " VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
    }
    elsif ($conf->{AUDITVERSION} eq '1TG')
    {
        $sth = $dbh->prepare("INSERT INTO gram_audit_table("
             . "job_grid_id,"
             . "local_job_id,"
             . "subject_name,"
             . "username,"
             . "idempotence_id,"
             . "creation_time,"
             . "queued_time,"
             . "stage_in_grid_id,"
             . "stage_out_grid_id,"
             . "clean_up_grid_id,"
             . "globus_toolkit_version,"
             . "resource_manager_type,"
             . "job_description,"
             . "success_flag,"
             . "finished_flag,"
             . "gateway_user) "
             . "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
    }
    else
    {
        print STDERR "Unsupported audit version $conf->{AUDITVERSION}\n";
        $sth = undef;
    }

    return $sth;
}

# Generate an audit-version specific SQL statement to retrieve the columns
# from the database in the same order that they are in the original gramaudit
# file.
# 
# \param $dbh
#     Database handle.
# \param $conf
#     Reference to the configuration hash from the globus-gram-auditing.conf
#     file
#
# \return
#     A prepared SQL statement or undef if an error occurs.
sub checkstatement
{
    my $dbh = shift;
    my $conf = shift;
    my $sth;

    if ($conf->{AUDITVERSION} eq '1')
    {
        $sth = $dbh->prepare("SELECT  "
             . "job_grid_id,"
             . "local_job_id,"
             . "subject_name,"
             . "username,"
             . "idempotence_id,"
             . "creation_time,"
             . "queued_time,"
             . "stage_in_grid_id,"
             . "stage_out_grid_id,"
             . "clean_up_grid_id,"
             . "globus_toolkit_version,"
             . "resource_manager_type,"
             . "job_description,"
             . "success_flag,"
             . "finished_flag "
             . "FROM gram_audit_table "
             . "WHERE job_grid_id = ?");
    }
    elsif ($conf->{AUDITVERSION} eq '1TG')
    {
        $sth = $dbh->prepare("SELECT "
             . "job_grid_id,"
             . "local_job_id,"
             . "subject_name,"
             . "username,"
             . "idempotence_id,"
             . "creation_time,"
             . "queued_time,"
             . "stage_in_grid_id,"
             . "stage_out_grid_id,"
             . "clean_up_grid_id,"
             . "globus_toolkit_version,"
             . "resource_manager_type,"
             . "job_description,"
             . "success_flag,"
             . "finished_flag, "
             . "gateway_user "
             . "FROM gram_audit_table "
             . "WHERE job_grid_id = ?");
    }
    else
    {
        print STDERR "Unsupported audit version $conf->{AUDITVERSION}\n";
        $sth = undef;
    }

    return $sth;
}

# Check that the fields of the audit record array are valid for the typed
# columns and normalize those to SQL92 syntax.
# 
# \param $record_entries
#     Reference to the list of audit record fields from the gramaudit file.
#     Modified as data is normalized
# \param %conf
#     Configuration values from the GRAM audit configuration.
#
# \return
#     A true value if the audit records match the expected types, false
#     otherwise.
sub verify_audit_fields
{
    my $record_entries = $_[0];
    my %conf = %{$_[1]};
    my $ts;

    if (! parse_timestamp(\$record_entries->[5]))
    {
        return 0;
    }

    if ((! parse_timestamp(\$record_entries->[6])) &&
        $record_entries->[6] ne 'NULL')
    {
        return 0;
    }

    if (!defined(parse_boolean(\$record_entries->[13])))
    {
        return 0;
    }

    if (!defined(parse_boolean(\$record_entries->[14])))
    {
        return 0;
    }

    return 1;
}

# Translate boolean from text in the auditfile into an integer
#
# \param $boolean_ref
#     Reference to the boolean string to parse. The referred-to string is
#     modified by this function to be formatted as an integer (1 for true,
#     0 for false).
#
# \return
#     Integer representation of the boolean value.
sub parse_boolean
{
    my $boolean_ref = $_[0];
    my %values = ( 'true' => 1, 'false' => 0 );

    if (exists($values{$$boolean_ref}))
    {
        $$boolean_ref = $values{$$boolean_ref};
    }
    else
    {
        return undef;
    }

    return $$boolean_ref;
}

# Translate timestamp from ctime() in the auditfile into an SQL92 TIMESTAMP
#
# \param $timestamp_string_ref
#     Reference to the timestamp string to parse. The referred-to string is
#     modified by this function to be formatted in SQL92 TIMESTAMP format
#
# \return
#     Timestamp string value in SQL92 TIMESTAMP format or undef if
#     the string could not be parsed.
#
sub parse_timestamp
{
    my $timestamp_string_ref = $_[0];
    my %months = ( 'Jan' => 1, 'Feb' => 2, 'Mar' => 3, 'Apr' => 4, 'May' => 5,
                   'Jun' => 6, 'Jul' => 7, 'Aug' => 8, 'Sep' => 9, 'Oct' => 10,
                   'Nov' => 11, 'Dec' => 12 );

    # Convert to SQL standard date-time format 
    if ($$timestamp_string_ref !~
            m/(\S{3})\s+(\S{3})\s+(\d+)\s+(\d+):(\d+):(\d+) UTC (\d+)/)
    {
        return undef;
    }

    $$timestamp_string_ref =
            sprintf("%04d-%02d-%02d %02d:%02d:%02d",
                   $7, $months{$2}, $3, $4, $5, $6);
    return $$timestamp_string_ref;
}

sub execute_query
{
    my ($dbh, $query) = @_;
    my $st;
    my $rc;

    $st = $dbh->prepare($query) || exit(1);
    $rc = $st->execute;

    if (defined($rc) && $rc ne '0E0') {
        while (my @row = $st->fetchrow_array) {
            print DBI::neat_list(\@row, 2560, ",") . "\n";
        }
    }

    $st->finish;
}

END {
    if ($dbh)
    {
        $dbh->disconnect;
    }
}
