#!/usr/bin/env perl

# This program is copyright 2009 Percona Inc.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA  02111-1307  USA.

use strict;
use warnings FATAL => 'all';

our $VERSION = '0.9.3';
our $DISTRIB = '5014';
our $SVN_REV = sprintf("%d", (q$Revision: 4971 $ =~ m/(\d+)/g, 0));

# ###########################################################################
# DSNParser package 4823
# ###########################################################################
use strict;
use warnings FATAL => 'all';

package DSNParser;

use DBI;
use Data::Dumper;
$Data::Dumper::Indent    = 0;
$Data::Dumper::Quotekeys = 0;
use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, @opts ) = @_;
   my $self = {
      opts => {
         A => {
            desc => 'Default character set',
            dsn  => 'charset',
            copy => 1,
         },
         D => {
            desc => 'Database to use',
            dsn  => 'database',
            copy => 1,
         },
         F => {
            desc => 'Only read default options from the given file',
            dsn  => 'mysql_read_default_file',
            copy => 1,
         },
         h => {
            desc => 'Connect to host',
            dsn  => 'host',
            copy => 1,
         },
         p => {
            desc => 'Password to use when connecting',
            dsn  => 'password',
            copy => 1,
         },
         P => {
            desc => 'Port number to use for connection',
            dsn  => 'port',
            copy => 1,
         },
         S => {
            desc => 'Socket file to use for connection',
            dsn  => 'mysql_socket',
            copy => 1,
         },
         u => {
            desc => 'User for login if not current user',
            dsn  => 'user',
            copy => 1,
         },
      },
   };
   foreach my $opt ( @opts ) {
      MKDEBUG && _d('Adding extra property', $opt->{key});
      $self->{opts}->{$opt->{key}} = { desc => $opt->{desc}, copy => $opt->{copy} };
   }
   return bless $self, $class;
}

sub prop {
   my ( $self, $prop, $value ) = @_;
   if ( @_ > 2 ) {
      MKDEBUG && _d('Setting', $prop, 'property');
      $self->{$prop} = $value;
   }
   return $self->{$prop};
}

sub parse {
   my ( $self, $dsn, $prev, $defaults ) = @_;
   if ( !$dsn ) {
      MKDEBUG && _d('No DSN to parse');
      return;
   }
   MKDEBUG && _d('Parsing', $dsn);
   $prev     ||= {};
   $defaults ||= {};
   my %given_props;
   my %final_props;
   my %opts = %{$self->{opts}};

   foreach my $dsn_part ( split(/,/, $dsn) ) {
      if ( my ($prop_key, $prop_val) = $dsn_part =~  m/^(.)=(.*)$/ ) {
         $given_props{$prop_key} = $prop_val;
      }
      else {
         MKDEBUG && _d('Interpreting', $dsn_part, 'as h=', $dsn_part);
         $given_props{h} = $dsn_part;
      }
   }

   foreach my $key ( keys %opts ) {
      MKDEBUG && _d('Finding value for', $key);
      $final_props{$key} = $given_props{$key};
      if (   !defined $final_props{$key}
           && defined $prev->{$key} && $opts{$key}->{copy} )
      {
         $final_props{$key} = $prev->{$key};
         MKDEBUG && _d('Copying value for', $key, 'from previous DSN');
      }
      if ( !defined $final_props{$key} ) {
         $final_props{$key} = $defaults->{$key};
         MKDEBUG && _d('Copying value for', $key, 'from defaults');
      }
   }

   foreach my $key ( keys %given_props ) {
      die "Unrecognized DSN part '$key' in '$dsn'\n"
         unless exists $opts{$key};
   }
   if ( (my $required = $self->prop('required')) ) {
      foreach my $key ( keys %$required ) {
         die "Missing DSN part '$key' in '$dsn'\n" unless $final_props{$key};
      }
   }

   return \%final_props;
}

sub parse_options {
   my ( $self, $o ) = @_;
   die 'I need an OptionParser object' unless ref $o eq 'OptionParser';
   my $dsn_string
      = join(',',
          map  { "$_=".$o->get($_); }
          grep { $o->has($_) && $o->get($_) }
          keys %{$self->{opts}}
        );
   MKDEBUG && _d('DSN string made from options:', $dsn_string);
   return $self->parse($dsn_string);
}

sub as_string {
   my ( $self, $dsn ) = @_;
   return $dsn unless ref $dsn;
   return join(',',
      map  { "$_=" . ($_ eq 'p' ? '...' : $dsn->{$_}) }
      grep { defined $dsn->{$_} && $self->{opts}->{$_} }
      sort keys %$dsn );
}

sub usage {
   my ( $self ) = @_;
   my $usage
      = "DSN syntax is key=value[,key=value...]  Allowable DSN keys:\n\n"
      . "  KEY  COPY  MEANING\n"
      . "  ===  ====  =============================================\n";
   my %opts = %{$self->{opts}};
   foreach my $key ( sort keys %opts ) {
      $usage .= "  $key    "
             .  ($opts{$key}->{copy} ? 'yes   ' : 'no    ')
             .  ($opts{$key}->{desc} || '[No description]')
             . "\n";
   }
   $usage .= "\n  If the DSN is a bareword, the word is treated as the 'h' key.\n";
   return $usage;
}

sub get_cxn_params {
   my ( $self, $info ) = @_;
   my $dsn;
   my %opts = %{$self->{opts}};
   my $driver = $self->prop('dbidriver') || '';
   if ( $driver eq 'Pg' ) {
      $dsn = 'DBI:Pg:dbname=' . ( $info->{D} || '' ) . ';'
         . join(';', map  { "$opts{$_}->{dsn}=$info->{$_}" }
                     grep { defined $info->{$_} }
                     qw(h P));
   }
   else {
      $dsn = 'DBI:mysql:' . ( $info->{D} || '' ) . ';'
         . join(';', map  { "$opts{$_}->{dsn}=$info->{$_}" }
                     grep { defined $info->{$_} }
                     qw(F h P S A))
         . ';mysql_read_default_group=client';
   }
   MKDEBUG && _d($dsn);
   return ($dsn, $info->{u}, $info->{p});
}

sub fill_in_dsn {
   my ( $self, $dbh, $dsn ) = @_;
   my $vars = $dbh->selectall_hashref('SHOW VARIABLES', 'Variable_name');
   my ($user, $db) = $dbh->selectrow_array('SELECT USER(), DATABASE()');
   $user =~ s/@.*//;
   $dsn->{h} ||= $vars->{hostname}->{Value};
   $dsn->{S} ||= $vars->{'socket'}->{Value};
   $dsn->{P} ||= $vars->{port}->{Value};
   $dsn->{u} ||= $user;
   $dsn->{D} ||= $db;
}

sub get_dbh {
   my ( $self, $cxn_string, $user, $pass, $opts ) = @_;
   $opts ||= {};
   my $defaults = {
      AutoCommit         => 0,
      RaiseError         => 1,
      PrintError         => 0,
      ShowErrorStatement => 1,
      mysql_enable_utf8 => ($cxn_string =~ m/charset=utf8/ ? 1 : 0),
   };
   @{$defaults}{ keys %$opts } = values %$opts;

   my $dbh;
   my $tries = 2;
   while ( !$dbh && $tries-- ) {
      MKDEBUG && _d($cxn_string, ' ', $user, ' ', $pass, ' {',
         join(', ', map { "$_=>$defaults->{$_}" } keys %$defaults ), '}');

      eval {
         $dbh = DBI->connect($cxn_string, $user, $pass, $defaults);

         if ( $cxn_string =~ m/mysql/i ) {
            my $sql;

            $sql = q{SET @@SQL_QUOTE_SHOW_CREATE = 1}
                 . q{/*!40101, @@SQL_MODE='NO_AUTO_VALUE_ON_ZERO'*/};
            MKDEBUG && _d($dbh, ':', $sql);
            $dbh->do($sql);

            if ( my ($charset) = $cxn_string =~ m/charset=(\w+)/ ) {
               $sql = "/*!40101 SET NAMES $charset*/";
               MKDEBUG && _d($dbh, ':', $sql);
               $dbh->do($sql);
               MKDEBUG && _d('Enabling charset for STDOUT');
               if ( $charset eq 'utf8' ) {
                  binmode(STDOUT, ':utf8')
                     or die "Can't binmode(STDOUT, ':utf8'): $OS_ERROR";
               }
               else {
                  binmode(STDOUT) or die "Can't binmode(STDOUT): $OS_ERROR";
               }
            }

            if ( $self->prop('set-vars') ) {
               $sql = "SET " . $self->prop('set-vars');
               MKDEBUG && _d($dbh, ':', $sql);
               $dbh->do($sql);
            }
         }
      };
      if ( !$dbh && $EVAL_ERROR ) {
         MKDEBUG && _d($EVAL_ERROR);
         if ( $EVAL_ERROR =~ m/not a compiled character set|character set utf8/ ) {
            MKDEBUG && _d('Going to try again without utf8 support');
            delete $defaults->{mysql_enable_utf8};
         }
         if ( !$tries ) {
            die $EVAL_ERROR;
         }
      }
   }

   MKDEBUG && _d('DBH info: ',
      $dbh,
      Dumper($dbh->selectrow_hashref(
         'SELECT DATABASE(), CONNECTION_ID(), VERSION()/*!50038 , @@hostname*/')),
      'Connection info:',      $dbh->{mysql_hostinfo},
      'Character set info:',   Dumper($dbh->selectall_arrayref(
                     'SHOW VARIABLES LIKE "character_set%"', { Slice => {}})),
      '$DBD::mysql::VERSION:', $DBD::mysql::VERSION,
      '$DBI::VERSION:',        $DBI::VERSION,
   );

   return $dbh;
}

sub get_hostname {
   my ( $self, $dbh ) = @_;
   if ( my ($host) = ($dbh->{mysql_hostinfo} || '') =~ m/^(\w+) via/ ) {
      return $host;
   }
   my ( $hostname, $one ) = $dbh->selectrow_array(
      'SELECT /*!50038 @@hostname, */ 1');
   return $hostname;
}

sub disconnect {
   my ( $self, $dbh ) = @_;
   MKDEBUG && $self->print_active_handles($dbh);
   $dbh->disconnect;
}

sub print_active_handles {
   my ( $self, $thing, $level ) = @_;
   $level ||= 0;
   printf("# Active %sh: %s %s %s\n", ($thing->{Type} || 'undef'), "\t" x $level,
      $thing, (($thing->{Type} || '') eq 'st' ? $thing->{Statement} || '' : ''))
      or die "Cannot print: $OS_ERROR";
   foreach my $handle ( grep {defined} @{ $thing->{ChildHandles} } ) {
      $self->print_active_handles( $handle, $level + 1 );
   }
}

sub copy {
   my ( $self, $dsn_1, $dsn_2, %args ) = @_;
   die 'I need a dsn_1 argument' unless $dsn_1;
   die 'I need a dsn_2 argument' unless $dsn_2;
   my %new_dsn = map {
      my $key = $_;
      my $val;
      if ( $args{overwrite} ) {
         $val = defined $dsn_1->{$key} ? $dsn_1->{$key} : $dsn_2->{$key};
      }
      else {
         $val = defined $dsn_2->{$key} ? $dsn_2->{$key} : $dsn_1->{$key};
      }
      $key => $val;
   } keys %{$self->{opts}};
   return \%new_dsn;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End DSNParser package
# ###########################################################################

# ###########################################################################
# MySQLDump package 4160
# ###########################################################################
package MySQLDump;

use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

( our $before = <<'EOF') =~ s/^   //gm;
   /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
   /*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
   /*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
   /*!40101 SET NAMES utf8 */;
   /*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
   /*!40103 SET TIME_ZONE='+00:00' */;
   /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
   /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
   /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
   /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
EOF

( our $after = <<'EOF') =~ s/^   //gm;
   /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
   /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
   /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
   /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
   /*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
   /*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
   /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
   /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
EOF

sub new {
   my ( $class, %args ) = @_;
   $args{cache} = 1 unless defined $args{cache};
   my $self = bless \%args, $class;
   return $self;
}

sub dump {
   my ( $self, $dbh, $quoter, $db, $tbl, $what ) = @_;

   if ( $what eq 'table' ) {
      my $ddl = $self->get_create_table($dbh, $quoter, $db, $tbl);
      return unless $ddl;
      if ( $ddl->[0] eq 'table' ) {
         return $before
            . 'DROP TABLE IF EXISTS ' . $quoter->quote($tbl) . ";\n"
            . $ddl->[1] . ";\n";
      }
      else {
         return 'DROP TABLE IF EXISTS ' . $quoter->quote($tbl) . ";\n"
            . '/*!50001 DROP VIEW IF EXISTS '
            . $quoter->quote($tbl) . "*/;\n/*!50001 "
            . $self->get_tmp_table($dbh, $quoter, $db, $tbl) . "*/;\n";
      }
   }
   elsif ( $what eq 'triggers' ) {
      my $trgs = $self->get_triggers($dbh, $quoter, $db, $tbl);
      if ( $trgs && @$trgs ) {
         my $result = $before . "\nDELIMITER ;;\n";
         foreach my $trg ( @$trgs ) {
            if ( $trg->{sql_mode} ) {
               $result .= qq{/*!50003 SET SESSION SQL_MODE='$trg->{sql_mode}' */;;\n};
            }
            $result .= "/*!50003 CREATE */ ";
            if ( $trg->{definer} ) {
               my ( $user, $host )
                  = map { s/'/''/g; "'$_'"; }
                    split('@', $trg->{definer}, 2);
               $result .= "/*!50017 DEFINER=$user\@$host */ ";
            }
            $result .= sprintf("/*!50003 TRIGGER %s %s %s ON %s\nFOR EACH ROW %s */;;\n\n",
               $quoter->quote($trg->{trigger}),
               @{$trg}{qw(timing event)},
               $quoter->quote($trg->{table}),
               $trg->{statement});
         }
         $result .= "DELIMITER ;\n\n/*!50003 SET SESSION SQL_MODE=\@OLD_SQL_MODE */;\n\n";
         return $result;
      }
      else {
         return undef;
      }
   }
   elsif ( $what eq 'view' ) {
      my $ddl = $self->get_create_table($dbh, $quoter, $db, $tbl);
      return '/*!50001 DROP TABLE IF EXISTS ' . $quoter->quote($tbl) . "*/;\n"
         . '/*!50001 DROP VIEW IF EXISTS ' . $quoter->quote($tbl) . "*/;\n"
         . '/*!50001 ' . $ddl->[1] . "*/;\n";
   }
   else {
      die "You didn't say what to dump.";
   }
}

sub _use_db {
   my ( $self, $dbh, $quoter, $new ) = @_;
   if ( !$new ) {
      MKDEBUG && _d('No new DB to use');
      return;
   }
   my $sql = 'SELECT DATABASE()';
   MKDEBUG && _d($sql);
   my $curr = $dbh->selectrow_array($sql);
   if ( $curr && $new && $curr eq $new ) {
      MKDEBUG && _d('Current and new DB are the same');
      return $curr;
   }
   $sql = 'USE ' . $quoter->quote($new);
   MKDEBUG && _d($sql);
   $dbh->do($sql);
   return $curr;
}

sub get_create_table {
   my ( $self, $dbh, $quoter, $db, $tbl ) = @_;
   if ( !$self->{cache} || !$self->{tables}->{$db}->{$tbl} ) {
      my $sql = '/*!40101 SET @OLD_SQL_MODE := @@SQL_MODE, '
         . q{@@SQL_MODE := REPLACE(REPLACE(@@SQL_MODE, 'ANSI_QUOTES', ''), ',,', ','), }
         . '@OLD_QUOTE := @@SQL_QUOTE_SHOW_CREATE, '
         . '@@SQL_QUOTE_SHOW_CREATE := 1 */';
      MKDEBUG && _d($sql);
      eval { $dbh->do($sql); };
      MKDEBUG && $EVAL_ERROR && _d($EVAL_ERROR);
      my $curr_db = $self->_use_db($dbh, $quoter, $db);
      $sql = "SHOW CREATE TABLE " . $quoter->quote($db, $tbl);
      MKDEBUG && _d($sql);
      my $href;
      eval { $href = $dbh->selectrow_hashref($sql); };
      if ( $EVAL_ERROR ) {
         warn "Failed to $sql.  The table may be damaged.\nError: $EVAL_ERROR";
         return;
      }
      $self->_use_db($dbh, $quoter, $curr_db);
      $sql = '/*!40101 SET @@SQL_MODE := @OLD_SQL_MODE, '
         . '@@SQL_QUOTE_SHOW_CREATE := @OLD_QUOTE */';
      MKDEBUG && _d($sql);
      $dbh->do($sql);
      my ($key) = grep { m/create table/i } keys %$href;
      if ( $key ) {
         MKDEBUG && _d('This table is a base table');
         $self->{tables}->{$db}->{$tbl} = [ 'table', $href->{$key} ];
      }
      else {
         MKDEBUG && _d('This table is a view');
         ($key) = grep { m/create view/i } keys %$href;
         $self->{tables}->{$db}->{$tbl} = [ 'view', $href->{$key} ];
      }
   }
   return $self->{tables}->{$db}->{$tbl};
}

sub get_columns {
   my ( $self, $dbh, $quoter, $db, $tbl ) = @_;
   MKDEBUG && _d('Get columns for', $db, $tbl);
   if ( !$self->{cache} || !$self->{columns}->{$db}->{$tbl} ) {
      my $curr_db = $self->_use_db($dbh, $quoter, $db);
      my $sql = "SHOW COLUMNS FROM " . $quoter->quote($db, $tbl);
      MKDEBUG && _d($sql);
      my $cols = $dbh->selectall_arrayref($sql, { Slice => {} });
      $self->_use_db($dbh, $quoter, $curr_db);
      $self->{columns}->{$db}->{$tbl} = [
         map {
            my %row;
            @row{ map { lc $_ } keys %$_ } = values %$_;
            \%row;
         } @$cols
      ];
   }
   return $self->{columns}->{$db}->{$tbl};
}

sub get_tmp_table {
   my ( $self, $dbh, $quoter, $db, $tbl ) = @_;
   my $result = 'CREATE TABLE ' . $quoter->quote($tbl) . " (\n";
   $result .= join(",\n",
      map { '  ' . $quoter->quote($_->{field}) . ' ' . $_->{type} }
      @{$self->get_columns($dbh, $quoter, $db, $tbl)});
   $result .= "\n)";
   MKDEBUG && _d($result);
   return $result;
}

sub get_triggers {
   my ( $self, $dbh, $quoter, $db, $tbl ) = @_;
   if ( !$self->{cache} || !$self->{triggers}->{$db} ) {
      $self->{triggers}->{$db} = {};
      my $sql = '/*!40101 SET @OLD_SQL_MODE := @@SQL_MODE, '
         . q{@@SQL_MODE := REPLACE(REPLACE(@@SQL_MODE, 'ANSI_QUOTES', ''), ',,', ','), }
         . '@OLD_QUOTE := @@SQL_QUOTE_SHOW_CREATE, '
         . '@@SQL_QUOTE_SHOW_CREATE := 1 */';
      MKDEBUG && _d($sql);
      eval { $dbh->do($sql); };
      MKDEBUG && $EVAL_ERROR && _d($EVAL_ERROR);
      $sql = "SHOW TRIGGERS FROM " . $quoter->quote($db);
      MKDEBUG && _d($sql);
      my $sth = $dbh->prepare($sql);
      $sth->execute();
      if ( $sth->rows ) {
         my $trgs = $sth->fetchall_arrayref({});
         foreach my $trg (@$trgs) {
            my %trg;
            @trg{ map { lc $_ } keys %$trg } = values %$trg;
            push @{ $self->{triggers}->{$db}->{ $trg{table} } }, \%trg;
         }
      }
      $sql = '/*!40101 SET @@SQL_MODE := @OLD_SQL_MODE, '
         . '@@SQL_QUOTE_SHOW_CREATE := @OLD_QUOTE */';
      MKDEBUG && _d($sql);
      $dbh->do($sql);
   }
   if ( $tbl ) {
      return $self->{triggers}->{$db}->{$tbl};
   }
   return values %{$self->{triggers}->{$db}};
}

sub get_databases {
   my ( $self, $dbh, $quoter, $like ) = @_;
   if ( !$self->{cache} || !$self->{databases} || $like ) {
      my $sql = 'SHOW DATABASES';
      my @params;
      if ( $like ) {
         $sql .= ' LIKE ?';
         push @params, $like;
      }
      my $sth = $dbh->prepare($sql);
      MKDEBUG && _d($sql, @params);
      $sth->execute( @params );
      my @dbs = map { $_->[0] } @{$sth->fetchall_arrayref()};
      $self->{databases} = \@dbs unless $like;
      return @dbs;
   }
   return @{$self->{databases}};
}

sub get_table_status {
   my ( $self, $dbh, $quoter, $db, $like ) = @_;
   if ( !$self->{cache} || !$self->{table_status}->{$db} || $like ) {
      my $sql = "SHOW TABLE STATUS FROM " . $quoter->quote($db);
      my @params;
      if ( $like ) {
         $sql .= ' LIKE ?';
         push @params, $like;
      }
      MKDEBUG && _d($sql, @params);
      my $sth = $dbh->prepare($sql);
      $sth->execute(@params);
      my @tables = @{$sth->fetchall_arrayref({})};
      @tables = map {
         my %tbl; # Make a copy with lowercased keys
         @tbl{ map { lc $_ } keys %$_ } = values %$_;
         $tbl{engine} ||= $tbl{type} || $tbl{comment};
         delete $tbl{type};
         \%tbl;
      } @tables;
      $self->{table_status}->{$db} = \@tables unless $like;
      return @tables;
   }
   return @{$self->{table_status}->{$db}};
}

sub get_table_list {
   my ( $self, $dbh, $quoter, $db, $like ) = @_;
   if ( !$self->{cache} || !$self->{table_list}->{$db} || $like ) {
      my $sql = "SHOW /*!50002 FULL*/ TABLES FROM " . $quoter->quote($db);
      my @params;
      if ( $like ) {
         $sql .= ' LIKE ?';
         push @params, $like;
      }
      MKDEBUG && _d($sql, @params);
      my $sth = $dbh->prepare($sql);
      $sth->execute(@params);
      my @tables = @{$sth->fetchall_arrayref()};
      @tables = map {
         my %tbl = (
            name   => $_->[0],
            engine => ($_->[1] || '') eq 'VIEW' ? 'VIEW' : '',
         );
         \%tbl;
      } @tables;
      $self->{table_list}->{$db} = \@tables unless $like;
      return @tables;
   }
   return @{$self->{table_list}->{$db}};
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End MySQLDump package
# ###########################################################################

# ###########################################################################
# TableParser package 4397
# ###########################################################################
package TableParser;

use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class ) = @_;
   return bless {}, $class;
}


sub parse {
   my ( $self, $ddl, $opts ) = @_;
   return unless $ddl;
   if ( ref $ddl eq 'ARRAY' ) {
      if ( lc $ddl->[0] eq 'table' ) {
         $ddl = $ddl->[1];
      }
      else {
         return {
            engine => 'VIEW',
         };
      }
   }

   if ( $ddl !~ m/CREATE (?:TEMPORARY )?TABLE `/ ) {
      die "Cannot parse table definition; is ANSI quoting "
         . "enabled or SQL_QUOTE_SHOW_CREATE disabled?";
   }

   $ddl =~ s/(`[^`]+`)/\L$1/g;

   my $engine = $self->get_engine($ddl);

   my @defs   = $ddl =~ m/^(\s+`.*?),?$/gm;
   my @cols   = map { $_ =~ m/`([^`]+)`/ } @defs;
   MKDEBUG && _d('Columns:', join(', ', @cols));

   my %def_for;
   @def_for{@cols} = @defs;

   my (@nums, @null);
   my (%type_for, %is_nullable, %is_numeric, %is_autoinc);
   foreach my $col ( @cols ) {
      my $def = $def_for{$col};
      my ( $type ) = $def =~ m/`[^`]+`\s([a-z]+)/;
      die "Can't determine column type for $def" unless $type;
      $type_for{$col} = $type;
      if ( $type =~ m/(?:(?:tiny|big|medium|small)?int|float|double|decimal|year)/ ) {
         push @nums, $col;
         $is_numeric{$col} = 1;
      }
      if ( $def !~ m/NOT NULL/ ) {
         push @null, $col;
         $is_nullable{$col} = 1;
      }
      $is_autoinc{$col} = $def =~ m/AUTO_INCREMENT/i ? 1 : 0;
   }

   my ($keys, $clustered_key) = $self->get_keys($ddl, $opts, \%is_nullable);

   return {
      cols           => \@cols,
      col_posn       => { map { $cols[$_] => $_ } 0..$#cols },
      is_col         => { map { $_ => 1 } @cols },
      null_cols      => \@null,
      is_nullable    => \%is_nullable,
      is_autoinc     => \%is_autoinc,
      clustered_key  => $clustered_key,
      keys           => $keys,
      defs           => \%def_for,
      numeric_cols   => \@nums,
      is_numeric     => \%is_numeric,
      engine         => $engine,
      type_for       => \%type_for,
   };
}

sub sort_indexes {
   my ( $self, $tbl ) = @_;

   my @indexes
      = sort {
         (($a ne 'PRIMARY') <=> ($b ne 'PRIMARY'))
         || ( !$tbl->{keys}->{$a}->{is_unique} <=> !$tbl->{keys}->{$b}->{is_unique} )
         || ( $tbl->{keys}->{$a}->{is_nullable} <=> $tbl->{keys}->{$b}->{is_nullable} )
         || ( scalar(@{$tbl->{keys}->{$a}->{cols}}) <=> scalar(@{$tbl->{keys}->{$b}->{cols}}) )
      }
      grep {
         $tbl->{keys}->{$_}->{type} eq 'BTREE'
      }
      sort keys %{$tbl->{keys}};

   MKDEBUG && _d('Indexes sorted best-first:', join(', ', @indexes));
   return @indexes;
}

sub find_best_index {
   my ( $self, $tbl, $index ) = @_;
   my $best;
   if ( $index ) {
      ($best) = grep { uc $_ eq uc $index } keys %{$tbl->{keys}};
   }
   if ( !$best ) {
      if ( $index ) {
         die "Index '$index' does not exist in table";
      }
      else {
         ($best) = $self->sort_indexes($tbl);
      }
   }
   MKDEBUG && _d('Best index found is', $best);
   return $best;
}

sub find_possible_keys {
   my ( $self, $dbh, $database, $table, $quoter, $where ) = @_;
   return () unless $where;
   my $sql = 'EXPLAIN SELECT * FROM ' . $quoter->quote($database, $table)
      . ' WHERE ' . $where;
   MKDEBUG && _d($sql);
   my $expl = $dbh->selectrow_hashref($sql);
   $expl = { map { lc($_) => $expl->{$_} } keys %$expl };
   if ( $expl->{possible_keys} ) {
      MKDEBUG && _d('possible_keys =', $expl->{possible_keys});
      my @candidates = split(',', $expl->{possible_keys});
      my %possible   = map { $_ => 1 } @candidates;
      if ( $expl->{key} ) {
         MKDEBUG && _d('MySQL chose', $expl->{key});
         unshift @candidates, grep { $possible{$_} } split(',', $expl->{key});
         MKDEBUG && _d('Before deduping:', join(', ', @candidates));
         my %seen;
         @candidates = grep { !$seen{$_}++ } @candidates;
      }
      MKDEBUG && _d('Final list:', join(', ', @candidates));
      return @candidates;
   }
   else {
      MKDEBUG && _d('No keys in possible_keys');
      return ();
   }
}

sub table_exists {
   my ( $self, $dbh, $db, $tbl, $q, $can_insert ) = @_;
   my $result = 0;
   my $db_tbl = $q->quote($db, $tbl);
   my $sql    = "SHOW FULL COLUMNS FROM $db_tbl";
   MKDEBUG && _d($sql);
   eval {
      my $sth = $dbh->prepare($sql);
      $sth->execute();
      my @columns = @{$sth->fetchall_arrayref({})};
      if ( $can_insert ) {
         $result = grep { ($_->{Privileges} || '') =~ m/insert/ } @columns;
      }
      else {
         $result = 1;
      }
   };
   if ( MKDEBUG && $EVAL_ERROR ) {
      _d($EVAL_ERROR);
   }
   return $result;
}

sub get_engine {
   my ( $self, $ddl, $opts ) = @_;
   my ( $engine ) = $ddl =~ m/\).*?(?:ENGINE|TYPE)=(\w+)/;
   MKDEBUG && _d('Storage engine:', $engine);
   return $engine || undef;
}

sub get_keys {
   my ( $self, $ddl, $opts, $is_nullable ) = @_;
   my $engine        = $self->get_engine($ddl);
   my $keys          = {};
   my $clustered_key = undef;

   KEY:
   foreach my $key ( $ddl =~ m/^  ((?:[A-Z]+ )?KEY .*)$/gm ) {

      next KEY if $key =~ m/FOREIGN/;

      MKDEBUG && _d('Parsed key:', $key);

      if ( $engine !~ m/MEMORY|HEAP/ ) {
         $key =~ s/USING HASH/USING BTREE/;
      }

      my ( $type, $cols ) = $key =~ m/(?:USING (\w+))? \((.+)\)/;
      my ( $special ) = $key =~ m/(FULLTEXT|SPATIAL)/;
      $type = $type || $special || 'BTREE';
      if ( $opts->{mysql_version} && $opts->{mysql_version} lt '004001000'
         && $engine =~ m/HEAP|MEMORY/i )
      {
         $type = 'HASH'; # MySQL pre-4.1 supports only HASH indexes on HEAP
      }

      my ($name) = $key =~ m/(PRIMARY|`[^`]*`)/;
      my $unique = $key =~ m/PRIMARY|UNIQUE/ ? 1 : 0;
      my @cols;
      my @col_prefixes;
      foreach my $col_def ( split(',', $cols) ) {
         my ($name, $prefix) = $col_def =~ m/`([^`]+)`(?:\((\d+)\))?/;
         push @cols, $name;
         push @col_prefixes, $prefix;
      }
      $name =~ s/`//g;

      MKDEBUG && _d('Key', $name, 'cols:', join(', ', @cols));

      $keys->{$name} = {
         name         => $name,
         type         => $type,
         colnames     => $cols,
         cols         => \@cols,
         col_prefixes => \@col_prefixes,
         is_unique    => $unique,
         is_nullable  => scalar(grep { $is_nullable->{$_} } @cols),
         is_col       => { map { $_ => 1 } @cols },
      };

      if ( $engine =~ m/InnoDB/i && !$clustered_key ) {
         my $this_key = $keys->{$name};
         if ( $this_key->{name} eq 'PRIMARY' ) {
            $clustered_key = 'PRIMARY';
         }
         elsif ( $this_key->{is_unique} && !$this_key->{is_nullable} ) {
            $clustered_key = $this_key->{name};
         }
         MKDEBUG && $clustered_key && _d('This key is the clustered key');
      }
   }

   return $keys, $clustered_key;
}

sub get_fks {
   my ( $self, $ddl, $opts ) = @_;
   my $fks = {};

   foreach my $fk (
      $ddl =~ m/CONSTRAINT .* FOREIGN KEY .* REFERENCES [^\)]*\)/mg )
   {
      my ( $name ) = $fk =~ m/CONSTRAINT `(.*?)`/;
      my ( $cols ) = $fk =~ m/FOREIGN KEY \(([^\)]+)\)/;
      my ( $parent, $parent_cols ) = $fk =~ m/REFERENCES (\S+) \(([^\)]+)\)/;

      if ( $parent !~ m/\./ && $opts->{database} ) {
         $parent = "`$opts->{database}`.$parent";
      }

      $fks->{$name} = {
         name           => $name,
         colnames       => $cols,
         cols           => [ map { s/[ `]+//g; $_; } split(',', $cols) ],
         parent_tbl     => $parent,
         parent_colnames=> $parent_cols,
         parent_cols    => [ map { s/[ `]+//g; $_; } split(',', $parent_cols) ],
      };
   }

   return $fks;
}

sub remove_auto_increment {
   my ( $self, $ddl ) = @_;
   $ddl =~ s/(^\).*?) AUTO_INCREMENT=\d+\b/$1/m;
   return $ddl;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End TableParser package
# ###########################################################################

# ###########################################################################
# Quoter package 3186
# ###########################################################################
use strict;
use warnings FATAL => 'all';

package Quoter;

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class ) = @_;
   bless {}, $class;
}

sub quote {
   my ( $self, @vals ) = @_;
   foreach my $val ( @vals ) {
      $val =~ s/`/``/g;
   }
   return join('.', map { '`' . $_ . '`' } @vals);
}

sub quote_val {
   my ( $self, @vals ) = @_;
   return join(', ',
      map {
         if ( defined $_ ) {
            $_ =~ s/(['\\])/\\$1/g;
            $_ eq '' || $_ =~ m/^0|\D/ ? "'$_'" : $_;
         }
         else {
            'NULL';
         }
      } @vals
   );
}

sub split_unquote {
   my ( $self, $db_tbl, $default_db ) = @_;
   $db_tbl =~ s/`//g;
   my ( $db, $tbl ) = split(/[.]/, $db_tbl);
   if ( !$tbl ) {
      $tbl = $db;
      $db  = $default_db;
   }
   return ($db, $tbl);
}

1;

# ###########################################################################
# End Quoter package
# ###########################################################################

# ###########################################################################
# OptionParser package 4805
# ###########################################################################
package OptionParser;

use strict;
use warnings FATAL => 'all';

use Getopt::Long;
use List::Util qw(max);
use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

my $POD_link_re = '[LC]<"?([^">]+)"?>';

my %attributes = (
   'type'       => 1,
   'short form' => 1,
   'group'      => 1,
   'default'    => 1,
   'cumulative' => 1,
   'negatable'  => 1,
);

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(description) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my ($program_name) = $PROGRAM_NAME =~ m/([.A-Za-z-]+)$/;
   $program_name ||= $PROGRAM_NAME;
   my $home = $ENV{HOME} || $ENV{HOMEPATH} || $ENV{USERPROFILE} || '.';

   my $self = {
      description    => $args{description},
      prompt         => $args{prompt} || '<options>',
      strict         => (exists $args{strict} ? $args{strict} : 1),
      dp             => $args{dp}     || undef,
      program_name   => $program_name,
      opts           => {},
      got_opts       => 0,
      short_opts     => {},
      defaults       => {},
      groups         => {},
      allowed_groups => {},
      errors         => [],
      rules          => [],  # desc of rules for --help
      mutex          => [],  # rule: opts are mutually exclusive
      atleast1       => [],  # rule: at least one opt is required
      disables       => {},  # rule: opt disables other opts 
      defaults_to    => {},  # rule: opt defaults to value of other opt
      default_files  => [
         "/etc/maatkit/maatkit.conf",
         "/etc/maatkit/$program_name.conf",
         "$home/.maatkit.conf",
         "$home/.$program_name.conf",
      ],
   };
   return bless $self, $class;
}

sub get_specs {
   my ( $self, $file ) = @_;
   my @specs = $self->_pod_to_specs($file);
   $self->_parse_specs(@specs);
   return;
}

sub get_defaults_files {
   my ( $self ) = @_;
   return @{$self->{default_files}};
}

sub _pod_to_specs {
   my ( $self, $file ) = @_;
   $file ||= __FILE__;
   open my $fh, '<', $file or die "Cannot open $file: $OS_ERROR";

   my %types = (
      string => 's', # standard Getopt type
      'int'  => 'i', # standard Getopt type
      float  => 'f', # standard Getopt type
      Hash   => 'H', # hash, formed from a comma-separated list
      hash   => 'h', # hash as above, but only if a value is given
      Array  => 'A', # array, similar to Hash
      array  => 'a', # array, similar to hash
      DSN    => 'd', # DSN, as provided by a DSNParser which is in $self->{dp}
      size   => 'z', # size with kMG suffix (powers of 2^10)
      'time' => 'm', # time, with an optional suffix of s/h/m/d
   );
   my @specs = ();
   my @rules = ();
   my $para;

   local $INPUT_RECORD_SEPARATOR = '';
   while ( $para = <$fh> ) {
      next unless $para =~ m/^=head1 OPTIONS/;
      last;
   }

   while ( $para = <$fh> ) {
      last if $para =~ m/^=over/;
      chomp $para;
      $para =~ s/\s+/ /g;
      $para =~ s/$POD_link_re/$1/go;
      MKDEBUG && _d('Option rule:', $para);
      push @rules, $para;
   }

   die 'POD has no OPTIONS section' unless $para;

   do {
      if ( my ($option) = $para =~ m/^=item --(.*)/ ) {
         chomp $para;
         MKDEBUG && _d($para);
         my %attribs;

         $para = <$fh>; # read next paragraph, possibly attributes

         if ( $para =~ m/: / ) { # attributes
            $para =~ s/\s+\Z//g;
            %attribs = map {
                  my ( $attrib, $val) = split(/: /, $_);
                  die "Unrecognized attribute for --$option: $attrib"
                     unless $attributes{$attrib};
                  ($attrib, $val);
               } split(/; /, $para);
            if ( $attribs{'short form'} ) {
               $attribs{'short form'} =~ s/-//;
            }
            $para = <$fh>; # read next paragraph, probably short help desc
         }
         else {
            MKDEBUG && _d('Option has no attributes');
         }

         $para =~ s/\s+\Z//g;
         $para =~ s/\s+/ /g;
         $para =~ s/$POD_link_re/$1/go;

         $para =~ s/\.(?:\n.*| [A-Z].*|\Z)//s;
         MKDEBUG && _d('Short help:', $para);

         die "No description after option spec $option" if $para =~ m/^=item/;

         if ( my ($base_option) =  $option =~ m/^\[no\](.*)/ ) {
            $option = $base_option;
            $attribs{'negatable'} = 1;
         }

         push @specs, {
            spec  => $option
               . ($attribs{'short form'} ? '|' . $attribs{'short form'} : '' )
               . ($attribs{'negatable'}  ? '!'                          : '' )
               . ($attribs{'cumulative'} ? '+'                          : '' )
               . ($attribs{'type'}       ? '=' . $types{$attribs{type}} : '' ),
            desc  => $para
               . ($attribs{default} ? " (default $attribs{default})" : ''),
            group => ($attribs{'group'} ? $attribs{'group'} : 'default'),
         };
      }
      while ( $para = <$fh> ) {
         last unless $para;


         if ( $para =~ m/^=head1/ ) {
            $para = undef; # Can't 'last' out of a do {} block.
            last;
         }
         last if $para =~ m/^=item --/;
      }
   } while ( $para );

   die 'No valid specs in POD OPTIONS' unless @specs;

   close $fh;
   return @specs, @rules;
}

sub _parse_specs {
   my ( $self, @specs ) = @_;
   my %disables; # special rule that requires deferred checking

   foreach my $opt ( @specs ) {
      if ( ref $opt ) { # It's an option spec, not a rule.
         MKDEBUG && _d('Parsing opt spec:',
            map { ($_, '=>', $opt->{$_}) } keys %$opt);

         my ( $long, $short ) = $opt->{spec} =~ m/^([\w-]+)(?:\|([^!+=]*))?/;
         if ( !$long ) {
            die "Cannot parse long option from spec $opt->{spec}";
         }
         $opt->{long} = $long;

         die "Duplicate long option --$long" if exists $self->{opts}->{$long};
         $self->{opts}->{$long} = $opt;

         if ( length $long == 1 ) {
            MKDEBUG && _d('Long opt', $long, 'looks like short opt');
            $self->{short_opts}->{$long} = $long;
         }

         if ( $short ) {
            die "Duplicate short option -$short"
               if exists $self->{short_opts}->{$short};
            $self->{short_opts}->{$short} = $long;
            $opt->{short} = $short;
         }
         else {
            $opt->{short} = undef;
         }

         $opt->{is_negatable}  = $opt->{spec} =~ m/!/        ? 1 : 0;
         $opt->{is_cumulative} = $opt->{spec} =~ m/\+/       ? 1 : 0;
         $opt->{is_required}   = $opt->{desc} =~ m/required/ ? 1 : 0;

         $opt->{group} ||= 'default';
         $self->{groups}->{ $opt->{group} }->{$long} = 1;

         $opt->{value} = undef;
         $opt->{got}   = 0;

         my ( $type ) = $opt->{spec} =~ m/=(.)/;
         $opt->{type} = $type;
         MKDEBUG && _d($long, 'type:', $type);

         if ( $type && $type eq 'd' && !$self->{dp} ) {
            die "$opt->{long} is type DSN (d) but no dp argument "
               . "was given when this OptionParser object was created";
         }

         $opt->{spec} =~ s/=./=s/ if ( $type && $type =~ m/[HhAadzm]/ );

         if ( (my ($def) = $opt->{desc} =~ m/default\b(?: ([^)]+))?/) ) {
            $self->{defaults}->{$long} = defined $def ? $def : 1;
            MKDEBUG && _d($long, 'default:', $def);
         }

         if ( $long eq 'config' ) {
            $self->{defaults}->{$long} = join(',', $self->get_defaults_files());
         }

         if ( (my ($dis) = $opt->{desc} =~ m/(disables .*)/) ) {
            $disables{$long} = $dis;
            MKDEBUG && _d('Deferring check of disables rule for', $opt, $dis);
         }

         $self->{opts}->{$long} = $opt;
      }
      else { # It's an option rule, not a spec.
         MKDEBUG && _d('Parsing rule:', $opt); 
         push @{$self->{rules}}, $opt;
         my @participants = $self->_get_participants($opt);
         my $rule_ok = 0;

         if ( $opt =~ m/mutually exclusive|one and only one/ ) {
            $rule_ok = 1;
            push @{$self->{mutex}}, \@participants;
            MKDEBUG && _d(@participants, 'are mutually exclusive');
         }
         if ( $opt =~ m/at least one|one and only one/ ) {
            $rule_ok = 1;
            push @{$self->{atleast1}}, \@participants;
            MKDEBUG && _d(@participants, 'require at least one');
         }
         if ( $opt =~ m/default to/ ) {
            $rule_ok = 1;
            $self->{defaults_to}->{$participants[0]} = $participants[1];
            MKDEBUG && _d($participants[0], 'defaults to', $participants[1]);
         }
         if ( $opt =~ m/restricted to option groups/ ) {
            $rule_ok = 1;
            my ($groups) = $opt =~ m/groups ([\w\s\,]+)/;
            my @groups = split(',', $groups);
            %{$self->{allowed_groups}->{$participants[0]}} = map {
               s/\s+//;
               $_ => 1;
            } @groups;
         }

         die "Unrecognized option rule: $opt" unless $rule_ok;
      }
   }

   foreach my $long ( keys %disables ) {
      my @participants = $self->_get_participants($disables{$long});
      $self->{disables}->{$long} = \@participants;
      MKDEBUG && _d('Option', $long, 'disables', @participants);
   }

   return; 
}

sub _get_participants {
   my ( $self, $str ) = @_;
   my @participants;
   foreach my $long ( $str =~ m/--(?:\[no\])?([\w-]+)/g ) {
      die "Option --$long does not exist while processing rule $str"
         unless exists $self->{opts}->{$long};
      push @participants, $long;
   }
   MKDEBUG && _d('Participants for', $str, ':', @participants);
   return @participants;
}

sub opts {
   my ( $self ) = @_;
   my %opts = %{$self->{opts}};
   return %opts;
}

sub short_opts {
   my ( $self ) = @_;
   my %short_opts = %{$self->{short_opts}};
   return %short_opts;
}

sub set_defaults {
   my ( $self, %defaults ) = @_;
   $self->{defaults} = {};
   foreach my $long ( keys %defaults ) {
      die "Cannot set default for nonexistent option $long"
         unless exists $self->{opts}->{$long};
      $self->{defaults}->{$long} = $defaults{$long};
      MKDEBUG && _d('Default val for', $long, ':', $defaults{$long});
   }
   return;
}

sub get_defaults {
   my ( $self ) = @_;
   return $self->{defaults};
}

sub get_groups {
   my ( $self ) = @_;
   return $self->{groups};
}

sub _set_option {
   my ( $self, $opt, $val ) = @_;
   my $long = exists $self->{opts}->{$opt}       ? $opt
            : exists $self->{short_opts}->{$opt} ? $self->{short_opts}->{$opt}
            : die "Getopt::Long gave a nonexistent option: $opt";

   $opt = $self->{opts}->{$long};
   if ( $opt->{is_cumulative} ) {
      $opt->{value}++;
   }
   else {
      $opt->{value} = $val;
   }
   $opt->{got} = 1;
   MKDEBUG && _d('Got option', $long, '=', $val);
}

sub get_opts {
   my ( $self ) = @_; 

   foreach my $long ( keys %{$self->{opts}} ) {
      $self->{opts}->{$long}->{got} = 0;
      $self->{opts}->{$long}->{value}
         = exists $self->{defaults}->{$long}       ? $self->{defaults}->{$long}
         : $self->{opts}->{$long}->{is_cumulative} ? 0
         : undef;
   }
   $self->{got_opts} = 0;

   $self->{errors} = [];

   if ( @ARGV && $ARGV[0] eq "--config" ) {
      shift @ARGV;
      $self->_set_option('config', shift @ARGV);
   }
   if ( $self->has('config') ) {
      my @extra_args;
      foreach my $filename ( split(',', $self->get('config')) ) {
         eval {
            push @extra_args, $self->_read_config_file($filename);
         };
         if ( $EVAL_ERROR ) {
            if ( $self->got('config') ) {
               die $EVAL_ERROR;
            }
            elsif ( MKDEBUG ) {
               _d($EVAL_ERROR);
            }
         }
      }
      unshift @ARGV, @extra_args;
   }

   Getopt::Long::Configure('no_ignore_case', 'bundling');
   GetOptions(
      map    { $_->{spec} => sub { $self->_set_option(@_); } }
      grep   { $_->{long} ne 'config' } # --config is handled specially above.
      values %{$self->{opts}}
   ) or $self->save_error('Error parsing options');

   if ( exists $self->{opts}->{version} && $self->{opts}->{version}->{got} ) {
      printf("%s  Ver %s Distrib %s Changeset %s\n",
         $self->{program_name}, $main::VERSION, $main::DISTRIB, $main::SVN_REV)
            or die "Cannot print: $OS_ERROR";
      exit 0;
   }

   if ( @ARGV && $self->{strict} ) {
      $self->save_error("Unrecognized command-line options @ARGV");
   }

   foreach my $mutex ( @{$self->{mutex}} ) {
      my @set = grep { $self->{opts}->{$_}->{got} } @$mutex;
      if ( @set > 1 ) {
         my $err = join(', ', map { "--$self->{opts}->{$_}->{long}" }
                      @{$mutex}[ 0 .. scalar(@$mutex) - 2] )
                 . ' and --'.$self->{opts}->{$mutex->[-1]}->{long}
                 . ' are mutually exclusive.';
         $self->save_error($err);
      }
   }

   foreach my $required ( @{$self->{atleast1}} ) {
      my @set = grep { $self->{opts}->{$_}->{got} } @$required;
      if ( @set == 0 ) {
         my $err = join(', ', map { "--$self->{opts}->{$_}->{long}" }
                      @{$required}[ 0 .. scalar(@$required) - 2] )
                 .' or --'.$self->{opts}->{$required->[-1]}->{long};
         $self->save_error("Specify at least one of $err");
      }
   }

   foreach my $long ( keys %{$self->{opts}} ) {
      my $opt = $self->{opts}->{$long};
      if ( $opt->{got} ) {
         if ( exists $self->{disables}->{$long} ) {
            my @disable_opts = @{$self->{disables}->{$long}};
            map { $self->{opts}->{$_}->{value} = undef; } @disable_opts;
            MKDEBUG && _d('Unset options', @disable_opts,
               'because', $long,'disables them');
         }

         if ( exists $self->{allowed_groups}->{$long} ) {

            my @restricted_groups = grep {
               !exists $self->{allowed_groups}->{$long}->{$_}
            } keys %{$self->{groups}};

            my @restricted_opts;
            foreach my $restricted_group ( @restricted_groups ) {
               RESTRICTED_OPT:
               foreach my $restricted_opt (
                  keys %{$self->{groups}->{$restricted_group}} )
               {
                  next RESTRICTED_OPT if $restricted_opt eq $long;
                  push @restricted_opts, $restricted_opt
                     if $self->{opts}->{$restricted_opt}->{got};
               }
            }

            if ( @restricted_opts ) {
               my $err;
               if ( @restricted_opts == 1 ) {
                  $err = "--$restricted_opts[0]";
               }
               else {
                  $err = join(', ',
                            map { "--$self->{opts}->{$_}->{long}" }
                            grep { $_ } 
                            @restricted_opts[0..scalar(@restricted_opts) - 2]
                         )
                       . ' or --'.$self->{opts}->{$restricted_opts[-1]}->{long};
               }
               $self->save_error("--$long is not allowed with $err");
            }
         }

      }
      elsif ( $opt->{is_required} ) { 
         $self->save_error("Required option --$long must be specified");
      }

      $self->_validate_type($opt);
   }

   $self->{got_opts} = 1;
   return;
}

sub _validate_type {
   my ( $self, $opt ) = @_;
   return unless $opt && $opt->{type};
   my $val = $opt->{value};

   if ( $val && $opt->{type} eq 'm' ) {  # type time
      MKDEBUG && _d('Parsing option', $opt->{long}, 'as a time value');
      my ( $prefix, $num, $suffix ) = $val =~ m/([+-]?)(\d+)([a-z])?$/;
      if ( !$suffix ) {
         my ( $s ) = $opt->{desc} =~ m/\(suffix (.)\)/;
         $suffix = $s || 's';
         MKDEBUG && _d('No suffix given; using', $suffix, 'for',
            $opt->{long}, '(value:', $val, ')');
      }
      if ( $suffix =~ m/[smhd]/ ) {
         $val = $suffix eq 's' ? $num            # Seconds
              : $suffix eq 'm' ? $num * 60       # Minutes
              : $suffix eq 'h' ? $num * 3600     # Hours
              :                  $num * 86400;   # Days
         $opt->{value} = ($prefix || '') . $val;
         MKDEBUG && _d('Setting option', $opt->{long}, 'to', $val);
      }
      else {
         $self->save_error("Invalid time suffix for --$opt->{long}");
      }
   }
   elsif ( $val && $opt->{type} eq 'd' ) {  # type DSN
      MKDEBUG && _d('Parsing option', $opt->{long}, 'as a DSN');
      my $prev = {};
      my $from_key = $self->{defaults_to}->{ $opt->{long} };
      if ( $from_key ) {
         MKDEBUG && _d($opt->{long}, 'DSN copies from', $from_key, 'DSN');
         $prev = $self->{opts}->{$from_key}->{value};
      }
      my $defaults = $self->{dp}->parse_options($self);
      $opt->{value} = $self->{dp}->parse($val, $prev, $defaults);
   }
   elsif ( $val && $opt->{type} eq 'z' ) {  # type size
      MKDEBUG && _d('Parsing option', $opt->{long}, 'as a size value');
      my %factor_for = (k => 1_024, M => 1_048_576, G => 1_073_741_824);
      my ($pre, $num, $factor) = $val =~ m/^([+-])?(\d+)([kMG])?$/;
      if ( defined $num ) {
         if ( $factor ) {
            $num *= $factor_for{$factor};
            MKDEBUG && _d('Setting option', $opt->{y},
               'to num', $num, '* factor', $factor);
         }
         $opt->{value} = ($pre || '') . $num;
      }
      else {
         $self->save_error("Invalid size for --$opt->{long}");
      }
   }
   elsif ( $opt->{type} eq 'H' || (defined $val && $opt->{type} eq 'h') ) {
      $opt->{value} = { map { $_ => 1 } split(',', ($val || '')) };
   }
   elsif ( $opt->{type} eq 'A' || (defined $val && $opt->{type} eq 'a') ) {
      $opt->{value} = [ split(/(?<!\\),/, ($val || '')) ];
   }
   else {
      MKDEBUG && _d('Nothing to validate for option',
         $opt->{long}, 'type', $opt->{type}, 'value', $val);
   }

   return;
}

sub get {
   my ( $self, $opt ) = @_;
   my $long = (length $opt == 1 ? $self->{short_opts}->{$opt} : $opt);
   die "Option $opt does not exist"
      unless $long && exists $self->{opts}->{$long};
   return $self->{opts}->{$long}->{value};
}

sub got {
   my ( $self, $opt ) = @_;
   my $long = (length $opt == 1 ? $self->{short_opts}->{$opt} : $opt);
   die "Option $opt does not exist"
      unless $long && exists $self->{opts}->{$long};
   return $self->{opts}->{$long}->{got};
}

sub has {
   my ( $self, $opt ) = @_;
   my $long = (length $opt == 1 ? $self->{short_opts}->{$opt} : $opt);
   return defined $long ? exists $self->{opts}->{$long} : 0;
}

sub set {
   my ( $self, $opt, $val ) = @_;
   my $long = (length $opt == 1 ? $self->{short_opts}->{$opt} : $opt);
   die "Option $opt does not exist"
      unless $long && exists $self->{opts}->{$long};
   $self->{opts}->{$long}->{value} = $val;
   return;
}

sub save_error {
   my ( $self, $error ) = @_;
   push @{$self->{errors}}, $error;
}

sub errors {
   my ( $self ) = @_;
   return $self->{errors};
}

sub prompt {
   my ( $self ) = @_;
   return "Usage: $PROGRAM_NAME $self->{prompt}\n";
}

sub descr {
   my ( $self ) = @_;
   my $descr  = $self->{program_name} . ' ' . ($self->{description} || '')
              . "  For more details, please use the --help option, "
              . "or try 'perldoc $PROGRAM_NAME' "
              . "for complete documentation.";
   $descr = join("\n", $descr =~ m/(.{0,80})(?:\s+|$)/g);
   $descr =~ s/ +$//mg;
   return $descr;
}

sub usage_or_errors {
   my ( $self ) = @_;
   if ( $self->{opts}->{help}->{got} ) {
      print $self->print_usage() or die "Cannot print usage: $OS_ERROR";
      exit 0;
   }
   elsif ( scalar @{$self->{errors}} ) {
      print $self->print_errors() or die "Cannot print errors: $OS_ERROR";
      exit 0;
   }
   return;
}

sub print_errors {
   my ( $self ) = @_;
   my $usage = $self->prompt() . "\n";
   if ( (my @errors = @{$self->{errors}}) ) {
      $usage .= join("\n  * ", 'Errors in command-line arguments:', @errors)
              . "\n";
   }
   return $usage . "\n" . $self->descr();
}

sub print_usage {
   my ( $self ) = @_;
   die "Run get_opts() before print_usage()" unless $self->{got_opts};
   my @opts = values %{$self->{opts}};

   my $maxl = max(
      map { length($_->{long}) + ($_->{is_negatable} ? 4 : 0) }
      @opts);

   my $maxs = max(0,
      map { length($_) + ($self->{opts}->{$_}->{is_negatable} ? 4 : 0) }
      values %{$self->{short_opts}});

   my $lcol = max($maxl, ($maxs + 3));
   my $rcol = 80 - $lcol - 6;
   my $rpad = ' ' x ( 80 - $rcol );

   $maxs = max($lcol - 3, $maxs);

   my $usage = $self->descr() . "\n" . $self->prompt();

   my @groups = reverse sort grep { $_ ne 'default'; } keys %{$self->{groups}};
   push @groups, 'default';

   foreach my $group ( reverse @groups ) {
      $usage .= "\n".($group eq 'default' ? 'Options' : $group).":\n\n";
      foreach my $opt (
         sort { $a->{long} cmp $b->{long} }
         grep { $_->{group} eq $group }
         @opts )
      {
         my $long  = $opt->{is_negatable} ? "[no]$opt->{long}" : $opt->{long};
         my $short = $opt->{short};
         my $desc  = $opt->{desc};
         if ( $opt->{type} && $opt->{type} eq 'm' ) {
            my ($s) = $desc =~ m/\(suffix (.)\)/;
            $s    ||= 's';
            $desc =~ s/\s+\(suffix .\)//;
            $desc .= ".  Optional suffix s=seconds, m=minutes, h=hours, "
                   . "d=days; if no suffix, $s is used.";
         }
         $desc = join("\n$rpad", grep { $_ } $desc =~ m/(.{0,$rcol})(?:\s+|$)/g);
         $desc =~ s/ +$//mg;
         if ( $short ) {
            $usage .= sprintf("  --%-${maxs}s -%s  %s\n", $long, $short, $desc);
         }
         else {
            $usage .= sprintf("  --%-${lcol}s  %s\n", $long, $desc);
         }
      }
   }

   if ( (my @rules = @{$self->{rules}}) ) {
      $usage .= "\nRules:\n\n";
      $usage .= join("\n", map { "  $_" } @rules) . "\n";
   }
   if ( $self->{dp} ) {
      $usage .= "\n" . $self->{dp}->usage();
   }
   $usage .= "\nOptions and values after processing arguments:\n\n";
   foreach my $opt ( sort { $a->{long} cmp $b->{long} } @opts ) {
      my $val   = $opt->{value};
      my $type  = $opt->{type} || '';
      my $bool  = $opt->{spec} =~ m/^[\w-]+(?:\|[\w-])?!?$/;
      $val      = $bool                     ? ( $val ? 'TRUE' : 'FALSE' )
                : !defined $val             ? '(No value)'
                : $type eq 'd'              ? $self->{dp}->as_string($val)
                : $type =~ m/H|h/           ? join(',', sort keys %$val)
                : $type =~ m/A|a/           ? join(',', @$val)
                :                             $val;
      $usage .= sprintf("  --%-${lcol}s  %s\n", $opt->{long}, $val);
   }
   return $usage;
}

sub prompt_noecho {
   shift @_ if ref $_[0] eq __PACKAGE__;
   my ( $prompt ) = @_;
   local $OUTPUT_AUTOFLUSH = 1;
   print $prompt
      or die "Cannot print: $OS_ERROR";
   my $response;
   eval {
      require Term::ReadKey;
      Term::ReadKey::ReadMode('noecho');
      chomp($response = <STDIN>);
      Term::ReadKey::ReadMode('normal');
      print "\n"
         or die "Cannot print: $OS_ERROR";
   };
   if ( $EVAL_ERROR ) {
      die "Cannot read response; is Term::ReadKey installed? $EVAL_ERROR";
   }
   return $response;
}

if ( MKDEBUG ) {
   print '# ', $^X, ' ', $], "\n";
   my $uname = `uname -a`;
   if ( $uname ) {
      $uname =~ s/\s+/ /g;
      print "# $uname\n";
   }
   printf("# %s  Ver %s Distrib %s Changeset %s line %d\n",
      $PROGRAM_NAME, ($main::VERSION || ''), ($main::DISTRIB || ''),
      ($main::SVN_REV || ''), __LINE__);
   print('# Arguments: ',
      join(' ', map { my $a = "_[$_]_"; $a =~ s/\n/\n# /g; $a; } @ARGV), "\n");
}

sub _read_config_file {
   my ( $self, $filename ) = @_;
   open my $fh, "<", $filename or die "Cannot open $filename: $OS_ERROR\n";
   my @args;
   my $prefix = '--';
   my $parse  = 1;

   LINE:
   while ( my $line = <$fh> ) {
      chomp $line;
      next LINE if $line =~ m/^\s*(?:\#|\;|$)/;
      $line =~ s/\s+#.*$//g;
      $line =~ s/^\s+|\s+$//g;
      if ( $line eq '--' ) {
         $prefix = '';
         $parse  = 0;
         next LINE;
      }
      if ( $parse
         && (my($opt, $arg) = $line =~ m/^\s*([^=\s]+?)(?:\s*=\s*(.*?)\s*)?$/)
      ) {
         push @args, grep { defined $_ } ("$prefix$opt", $arg);
      }
      elsif ( $line =~ m/./ ) {
         push @args, $line;
      }
      else {
         die "Syntax error in file $filename at line $INPUT_LINE_NUMBER";
      }
   }
   close $fh;
   return @args;
}

sub read_para_after {
   my ( $self, $file, $regex ) = @_;
   open my $fh, "<", $file or die "Can't open $file: $OS_ERROR";
   local $INPUT_RECORD_SEPARATOR = '';
   my $para;
   while ( $para = <$fh> ) {
      next unless $para =~ m/^=pod$/m;
      last;
   }
   while ( $para = <$fh> ) {
      next unless $para =~ m/$regex/;
      last;
   }
   $para = <$fh>;
   chomp($para);
   close $fh or die "Can't close $file: $OS_ERROR";
   return $para;
}

sub clone {
   my ( $self ) = @_;

   my %clone = map {
      my $hashref  = $self->{$_};
      my $val_copy = {};
      foreach my $key ( keys %$hashref ) {
         my $ref = ref $hashref->{$key};
         $val_copy->{$key} = !$ref           ? $hashref->{$key}
                           : $ref eq 'HASH'  ? { %{$hashref->{$key}} }
                           : $ref eq 'ARRAY' ? [ @{$hashref->{$key}} ]
                           : $hashref->{$key};
      }
      $_ => $val_copy;
   } qw(opts short_opts defaults);

   foreach my $scalar ( qw(got_opts) ) {
      $clone{$scalar} = $self->{$scalar};
   }

   return bless \%clone;     
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End OptionParser package
# ###########################################################################

# ###########################################################################
# Transformers package 4299
# ###########################################################################

package Transformers;

use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Time::Local qw(timelocal);
use Digest::MD5 qw(md5_hex);

use constant MKDEBUG => $ENV{MKDEBUG};

require Exporter;
our @ISA         = qw(Exporter);
our %EXPORT_TAGS = ();
our @EXPORT      = ();
our @EXPORT_OK   = qw(
   micro_t
   percentage_of
   secs_to_time
   shorten
   ts
   parse_timestamp
   unix_timestamp
   any_unix_timestamp
   make_checksum
);

our $mysql_ts  = qr/(\d\d)(\d\d)(\d\d) +(\d+):(\d+):(\d+)(\.\d+)?/;
our $proper_ts = qr/(\d\d\d\d)-(\d\d)-(\d\d)[T ](\d\d):(\d\d):(\d\d)(?:\.\d+)?/;
our $n_ts      = qr/(\d{1,5})([shmd]?)/; # Limit \d{1,5} because \d{6} looks

sub micro_t {
   my ( $t, %args ) = @_;
   my $p_ms = defined $args{p_ms} ? $args{p_ms} : 0;  # precision for ms vals
   my $p_s  = defined $args{p_s}  ? $args{p_s}  : 0;  # precision for s vals
   my $f;

   $t = 0 if $t < 0;

   $t = sprintf('%.17f', $t) if $t =~ /e/;

   $t =~ s/\.(\d{1,6})\d*/\.$1/;

   if ($t > 0 && $t <= 0.000999) {
      $f = ($t * 1000000) . 'us';
   }
   elsif ($t >= 0.001000 && $t <= 0.999999) {
      $f = sprintf("%.${p_ms}f", $t * 1000);
      $f = ($f * 1) . 'ms'; # * 1 to remove insignificant zeros
   }
   elsif ($t >= 1) {
      $f = sprintf("%.${p_s}f", $t);
      $f = ($f * 1) . 's'; # * 1 to remove insignificant zeros
   }
   else {
      $f = 0;  # $t should = 0 at this point
   }

   return $f;
}

sub percentage_of {
   my ( $is, $of, %args ) = @_;
   my $p   = $args{p} || 0; # float precision
   my $fmt = $p ? "%.${p}f" : "%d";
   return sprintf $fmt, ($is * 100) / ($of ||= 1);
}

sub secs_to_time {
   my ( $secs, $fmt ) = @_;
   $secs ||= 0;
   return '00:00' unless $secs;

   $fmt ||= $secs >= 86_400 ? 'd'
          : $secs >= 3_600  ? 'h'
          :                   'm';

   return
      $fmt eq 'd' ? sprintf(
         "%d+%02d:%02d:%02d",
         int($secs / 86_400),
         int(($secs % 86_400) / 3_600),
         int(($secs % 3_600) / 60),
         $secs % 60)
      : $fmt eq 'h' ? sprintf(
         "%02d:%02d:%02d",
         int(($secs % 86_400) / 3_600),
         int(($secs % 3_600) / 60),
         $secs % 60)
      : sprintf(
         "%02d:%02d",
         int(($secs % 3_600) / 60),
         $secs % 60);
}

sub shorten {
   my ( $num, %args ) = @_;
   my $p = defined $args{p} ? $args{p} : 2;     # float precision
   my $d = defined $args{d} ? $args{d} : 1_024; # divisor
   my $n = 0;
   my @units = ('', qw(k M G T P E Z Y));
   while ( $num >= $d && $n < @units - 1 ) {
      $num /= $d;
      ++$n;
   }
   return sprintf(
      $num =~ m/\./ || $n
         ? "%.${p}f%s"
         : '%d',
      $num, $units[$n]);
}

sub ts {
   my ( $time ) = @_;
   my ( $sec, $min, $hour, $mday, $mon, $year )
      = localtime($time);
   $mon  += 1;
   $year += 1900;
   return sprintf("%d-%02d-%02dT%02d:%02d:%02d",
      $year, $mon, $mday, $hour, $min, $sec);
}

sub parse_timestamp {
   my ( $val ) = @_;
   if ( my($y, $m, $d, $h, $i, $s, $f)
         = $val =~ m/^$mysql_ts$/ )
   {
      return sprintf "%d-%02d-%02d %02d:%02d:"
                     . (defined $f ? '%02.6f' : '%02d'),
                     $y + 2000, $m, $d, $h, $i, (defined $f ? $s + $f : $s);
   }
   return $val;
}

sub unix_timestamp {
   my ( $val ) = @_;
   if ( my($y, $m, $d, $h, $i, $s)
     = $val =~ m/^$proper_ts$/ )
   {
      return timelocal($s, $i, $h, $d, $m - 1, $y);
   }
   return $val;
}

sub any_unix_timestamp {
   my ( $val, $callback ) = @_;

   if ( my ($n, $suffix) = $val =~ m/^$n_ts$/ ) {
      $n = $suffix eq 's' ? $n            # Seconds
         : $suffix eq 'm' ? $n * 60       # Minutes
         : $suffix eq 'h' ? $n * 3600     # Hours
         : $suffix eq 'd' ? $n * 86400    # Days
         :                  $n;           # default: Seconds
      MKDEBUG && _d('ts is now - N[shmd]:', $n);
      return time - $n;
   }
   elsif ( my ($ymd, $hms) = $val =~ m/^(\d{6})(?:\s+(\d+:\d+:\d+))?/ ) {
      MKDEBUG && _d('ts is MySQL slow log timestamp');
      $val .= ' 00:00:00' unless $hms;
      return unix_timestamp(parse_timestamp($val));
   }
   elsif ( ($ymd, $hms) = $val =~ m/^(\d{4}-\d\d-\d\d)(?:[T ](\d+:\d+:\d+))?/) {
      MKDEBUG && _d('ts is properly formatted timestamp');
      $val .= ' 00:00:00' unless $hms;
      return unix_timestamp($val);
   }
   else {
      MKDEBUG && _d('ts is MySQL expression');
      return $callback->($val) if $callback && ref $callback eq 'CODE';
   }

   MKDEBUG && _d('Unknown ts type:', $val);
   return;
}

sub make_checksum {
   my ( $val ) = @_;
   my $checksum = uc substr(md5_hex($val), -16);
   MKDEBUG && _d($checksum, 'checksum for', $val);
   return $checksum;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End Transformers package
# ###########################################################################

# ###########################################################################
# QueryRewriter package 4569
# ###########################################################################
use strict;
use warnings FATAL => 'all';

package QueryRewriter;

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

our $verbs   = qr{^SHOW|^FLUSH|^COMMIT|^ROLLBACK|^BEGIN|SELECT|INSERT
                  |UPDATE|DELETE|REPLACE|^SET|UNION|^START|^LOCK}xi;
my $quote_re = qr/"(?:(?!(?<!\\)").)*"|'(?:(?!(?<!\\)').)*'/; # Costly!
my $bal;
$bal         = qr/
                  \(
                  (?:
                     (?> [^()]+ )    # Non-parens without backtracking
                     |
                     (??{ $bal })    # Group with matching parens
                  )*
                  \)
                 /x;

my $olc_re = qr/(?:--|#)[^'"\r\n]*(?=[\r\n]|\Z)/;  # One-line comments
my $mlc_re = qr#/\*[^!].*?\*/#sm;                  # But not /*!version */

sub new {
   my ( $class, %args ) = @_;
   my $self = { %args };
   return bless $self, $class;
}

sub strip_comments {
   my ( $self, $query ) = @_;
   $query =~ s/$olc_re//go;
   $query =~ s/$mlc_re//go;
   return $query;
}

sub shorten {
   my ( $self, $query, $length ) = @_;
   $query =~ s{
      \A(
         (?:INSERT|REPLACE)
         (?:\s+LOW_PRIORITY|DELAYED|HIGH_PRIORITY|IGNORE)?
         (?:\s\w+)*\s+\S+\s+VALUES\s*\(.*?\)
      )
      \s*,\s*\(.*?(ON\s+DUPLICATE|\Z)}
      {$1 /*... omitted ...*/$2}xsi;

   return $query unless $query =~ m/IN\s*\(\s*(?!select)/i;

   if ( $length && length($query) > $length ) {
      my ($left, $mid, $right) = $query =~ m{
         (\A.*?\bIN\s*\()     # Everything up to the opening of IN list
         ([^\)]+)             # Contents of the list
         (\).*\Z)             # The rest of the query
      }xsi;
      if ( $left ) {
         my $targ = $length - length($left) - length($right);
         my @vals = split(/,/, $mid);
         my @left = shift @vals;
         my @right;
         my $len  = length($left[0]);
         while ( @vals && $len < $targ / 2 ) {
            $len += length($vals[0]) + 1;
            push @left, shift @vals;
         }
         while ( @vals && $len < $targ ) {
            $len += length($vals[-1]) + 1;
            unshift @right, pop @vals;
         }
         $query = $left . join(',', @left)
                . (@right ? ',' : '')
                . " /*... omitted " . scalar(@vals) . " items ...*/ "
                . join(',', @right) . $right;
      }
   }

   return $query;
}

sub fingerprint {
   my ( $self, $query ) = @_;

   $query =~ m#\ASELECT /\*!40001 SQL_NO_CACHE \*/ \* FROM `# # mysqldump query
      && return 'mysqldump';
   $query =~ m#/\*\w+\.\w+:[0-9]/[0-9]\*/#     # mk-table-checksum, etc query
      && return 'maatkit';
   $query =~ m/\A# administrator command: /
      && return $query;
   $query =~ m/\A\s*(call\s+\S+)\(/i
      && return lc($1); # Warning! $1 used, be careful.
   if ( my ($beginning) = $query =~ m/\A((?:INSERT|REPLACE)(?: IGNORE)? INTO .+? VALUES \(.*?\)),\(/i ) {
      $query = $beginning; # Shorten multi-value INSERT statements ASAP
   }

   $query =~ s/$olc_re//go;
   $query =~ s/$mlc_re//go;
   $query =~ s/\Ause \S+\Z/use ?/i       # Abstract the DB in USE
      && return $query;

   $query =~ s/\\["']//g;                # quoted strings
   $query =~ s/".*?"/?/sg;               # quoted strings
   $query =~ s/'.*?'/?/sg;               # quoted strings
   $query =~ s/[0-9+-][0-9a-f.xb+-]*/?/g;# Anything vaguely resembling numbers
   $query =~ s/[xb.+-]\?/?/g;            # Clean up leftovers
   $query =~ s/\A\s+//;                  # Chop off leading whitespace
   chomp $query;                         # Kill trailing whitespace
   $query =~ tr[ \n\t\r\f][ ]s;          # Collapse whitespace
   $query = lc $query;
   $query =~ s/\bnull\b/?/g;             # Get rid of NULLs
   $query =~ s{                          # Collapse IN and VALUES lists
               \b(in|values?)(?:[\s,]*\([\s?,]*\))+
              }
              {$1(?+)}gx;
   $query =~ s{                          # Collapse UNION
               \b(select\s.*?)(?:(\sunion(?:\sall)?)\s\1)+
              }
              {$1 /*repeat$2*/}xg;
   $query =~ s/\blimit \?(?:, ?\?| offset \?)?/limit ?/; # LIMIT
   return $query;
}

sub distill {
   my ( $self, $query, %args ) = @_;
   my $qp = $args{qp} || $self->{QueryParser};
   die "I need a qp argument" unless $qp;

   $query =~ m/\A\s*call\s+(\S+)\(/i
      && return "CALL $1"; # Warning! $1 used, be careful.
   $query =~ m/\A# administrator/
      && return "ADMIN";
   $query =~ m/\A\s*use\s+/
      && return "USE";
   $query =~ m/\A\s*UNLOCK TABLES/i
      && return "UNLOCK";

   eval $QueryParser::data_def_stmts;
   eval $QueryParser::tbl_ident;
   my ( $dds ) = $query =~ /^\s*($QueryParser::data_def_stmts)\b/i;
   if ( $dds ) {
      my ( $obj ) = $query =~ m/$dds.+(DATABASE|TABLE)\b/i;
      $obj = uc $obj if $obj;
      MKDEBUG && _d('Data def statment:', $dds, $obj);
      my ($db_or_tbl)
         = $query =~ m/(?:TABLE|DATABASE)\s+($QueryParser::tbl_ident)(\s+.*)?/i;
      MKDEBUG && _d('Matches db or table:', $db_or_tbl);
      $obj .= ($db_or_tbl ? " $db_or_tbl" : '');
      return uc($dds) . ($obj ? " $obj" : '');
   }

   my @verbs = $query =~ m/\b($verbs)\b/gio;
   @verbs    = do {
      my $last = '';
      grep { my $pass = $_ ne $last; $last = $_; $pass } map { uc } @verbs;
   };
   my $verbs = join(q{ }, @verbs);
   $verbs =~ s/( UNION SELECT)+/ UNION/g;

   my @tables = map {
      $_ =~ s/`//g;
      $_ =~ s/(_?)[0-9]+/$1?/g;
      $_;
   } $qp->get_tables($query);

   @tables = do {
      my $last = '';
      grep { my $pass = $_ ne $last; $last = $_; $pass } @tables;
   };

   $query = join(q{ }, $verbs, @tables);
   return $query;
}

sub convert_to_select {
   my ( $self, $query ) = @_;
   return unless $query;
   $query =~ s{
                 \A.*?
                 update\s+(.*?)
                 \s+set\b(.*?)
                 (?:\s*where\b(.*?))?
                 (limit\s*[0-9]+(?:\s*,\s*[0-9]+)?)?
                 \Z
              }
              {__update_to_select($1, $2, $3, $4)}exsi
      || $query =~ s{
                    \A.*?
                    (?:insert|replace)\s+
                    .*?\binto\b(.*?)\(([^\)]+)\)\s*
                    values?\s*(\(.*?\))\s*
                    (?:\blimit\b|on\s*duplicate\s*key.*)?\s*
                    \Z
                 }
                 {__insert_to_select($1, $2, $3)}exsi
      || $query =~ s{
                    \A.*?
                    delete\s+(.*?)
                    \bfrom\b(.*)
                    \Z
                 }
                 {__delete_to_select($1, $2)}exsi;
   $query =~ s/\s*on\s+duplicate\s+key\s+update.*\Z//si;
   $query =~ s/\A.*?(?=\bSELECT\s*\b)//ism;
   return $query;
}

sub convert_select_list {
   my ( $self, $query ) = @_;
   $query =~ s{
               \A\s*select(.*?)\bfrom\b
              }
              {$1 =~ m/\*/ ? "select 1 from" : "select isnull(coalesce($1)) from"}exi;
   return $query;
}

sub __delete_to_select {
   my ( $delete, $join ) = @_;
   if ( $join =~ m/\bjoin\b/ ) {
      return "select 1 from $join";
   }
   return "select * from $join";
}

sub __insert_to_select {
   my ( $tbl, $cols, $vals ) = @_;
   MKDEBUG && _d('Args:', @_);
   my @cols = split(/,/, $cols);
   MKDEBUG && _d('Cols:', @cols);
   $vals =~ s/^\(|\)$//g; # Strip leading/trailing parens
   my @vals = $vals =~ m/($quote_re|[^,]*${bal}[^,]*|[^,]+)/g;
   MKDEBUG && _d('Vals:', @vals);
   if ( @cols == @vals ) {
      return "select * from $tbl where "
         . join(' and ', map { "$cols[$_]=$vals[$_]" } (0..$#cols));
   }
   else {
      return "select * from $tbl limit 1";
   }
}

sub __update_to_select {
   my ( $from, $set, $where, $limit ) = @_;
   return "select $set from $from "
      . ( $where ? "where $where" : '' )
      . ( $limit ? " $limit "      : '' );
}

sub wrap_in_derived {
   my ( $self, $query ) = @_;
   return unless $query;
   return $query =~ m/\A\s*select/i
      ? "select 1 from ($query) as x limit 1"
      : $query;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End QueryRewriter package
# ###########################################################################

# ###########################################################################
# SlowLogParser package 4462
# ###########################################################################
package SlowLogParser;

use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Data::Dumper;

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class ) = @_;
   bless {}, $class;
}

my $slow_log_ts_line = qr/^# Time: ([0-9: ]{15})/;
my $slow_log_uh_line = qr/# User\@Host: ([^\[]+|\[[^[]+\]).*?@ (\S*) \[(.*)\]/;
my $slow_log_hd_line = qr{
      ^(?:
      T[cC][pP]\s[pP]ort:\s+\d+ # case differs on windows/unix
      |
      [/A-Z].*mysqld,\sVersion.*(?:started\swith:|embedded\slibrary)
      |
      Time\s+Id\s+Command
      ).*\n
   }xm;

sub parse_event {
   my ( $self, $fh, $misc, @callbacks ) = @_;
   my $oktorun_here = 1;
   my $oktorun      = $misc->{oktorun} ? $misc->{oktorun} : \$oktorun_here;
   my $num_events   = 0;

   my @pending;
   local $INPUT_RECORD_SEPARATOR = ";\n#";
   my $trimlen    = length($INPUT_RECORD_SEPARATOR);
   my $pos_in_log = tell($fh);
   my $stmt;

   EVENT:
   while ( $$oktorun
           && (defined($stmt = shift @pending) or defined($stmt = <$fh>)) ) {
      my @properties = ('cmd', 'Query', 'pos_in_log', $pos_in_log);
      $pos_in_log = tell($fh);

      if ( $stmt =~ s/$slow_log_hd_line//go ){ # Throw away header lines in log
         my @chunks = split(/$INPUT_RECORD_SEPARATOR/o, $stmt);
         if ( @chunks > 1 ) {
            MKDEBUG && _d("Found multiple chunks");
            $stmt = shift @chunks;
            unshift @pending, @chunks;
         }
      }

      $stmt = '#' . $stmt unless $stmt =~ m/\A#/;
      $stmt =~ s/;\n#?\Z//;


      my ($got_ts, $got_uh, $got_ac, $got_db, $got_set, $got_embed);
      my $pos = 0;
      my $len = length($stmt);
      my $found_arg = 0;
      LINE:
      while ( $stmt =~ m/^(.*)$/mg ) { # /g is important, requires scalar match.
         $pos     = pos($stmt);  # Be careful not to mess this up!
         my $line = $1;          # Necessary for /g and pos() to work.
         MKDEBUG && _d($line);

         if ($line =~ m/^(?:#|use |SET (?:last_insert_id|insert_id|timestamp))/o) {

            if ( !$got_ts && (my ( $time ) = $line =~ m/$slow_log_ts_line/o)) {
               MKDEBUG && _d("Got ts", $time);
               push @properties, 'ts', $time;
               ++$got_ts;
               if ( !$got_uh
                  && ( my ( $user, $host, $ip ) = $line =~ m/$slow_log_uh_line/o )
               ) {
                  MKDEBUG && _d("Got user, host, ip", $user, $host, $ip);
                  push @properties, 'user', $user, 'host', $host, 'ip', $ip;
                  ++$got_uh;
               }
            }

            elsif ( !$got_uh
                  && ( my ( $user, $host, $ip ) = $line =~ m/$slow_log_uh_line/o )
            ) {
               MKDEBUG && _d("Got user, host, ip", $user, $host, $ip);
               push @properties, 'user', $user, 'host', $host, 'ip', $ip;
               ++$got_uh;
            }

            elsif (!$got_ac && $line =~ m/^# (?:administrator command:.*)$/) {
               MKDEBUG && _d("Got admin command");
               push @properties, 'cmd', 'Admin', 'arg', $line;
               push @properties, 'bytes', length($properties[-1]);
               ++$found_arg;
               ++$got_ac;
            }

            elsif ( $line =~ m/^# +[A-Z][A-Za-z_]+: \S+/ ) { # Make the test cheap!
               MKDEBUG && _d("Got some line with properties");
               my @temp = $line =~ m/(\w+):\s+(\S+|\Z)/g;
               push @properties, @temp;
            }

            elsif ( !$got_db && (my ( $db ) = $line =~ m/^use ([^;]+)/ ) ) {
               MKDEBUG && _d("Got a default database:", $db);
               push @properties, 'db', $db;
               ++$got_db;
            }

            elsif (!$got_set && (my ($setting) = $line =~ m/^SET\s+([^;]*)/)) {
               MKDEBUG && _d("Got some setting:", $setting);
               push @properties, split(/,|\s*=\s*/, $setting);
               ++$got_set;
            }

            if ( !$found_arg && $pos == $len ) {
               MKDEBUG && _d("Did not find arg, looking for special cases");
               local $INPUT_RECORD_SEPARATOR = ";\n";
               if ( defined(my $l = <$fh>) ) {
                  chomp $l;
                  MKDEBUG && _d("Found admin statement", $l);
                  push @properties, 'cmd', 'Admin', 'arg', '#' . $l;
                  push @properties, 'bytes', length($properties[-1]);
                  $found_arg++;
               }
               else {
                  MKDEBUG && _d("I can't figure out what to do with this line");
                  next EVENT;
               }
            }
         }
         else {
            MKDEBUG && _d("Got the query/arg line");
            my $arg = substr($stmt, $pos - length($line));
            push @properties, 'arg', $arg, 'bytes', length($arg);
            if ( $misc && $misc->{embed}
               && ( my ($e) = $arg =~ m/($misc->{embed})/)
            ) {
               push @properties, $e =~ m/$misc->{capture}/g;
            }
            last LINE;
         }
      }

      MKDEBUG && _d('Properties of event:', Dumper(\@properties));
      my $event = { @properties };
      foreach my $callback ( @callbacks ) {
         last unless $event = $callback->($event);
      }
      ++$num_events;
      last EVENT unless @pending;
   }
   return $num_events;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End SlowLogParser package
# ###########################################################################

# ###########################################################################
# EventAggregator package 4462
# ###########################################################################
package EventAggregator;

use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);

use constant MKDEBUG      => $ENV{MKDEBUG};
use constant BUCK_SIZE    => 1.05;
use constant BASE_LOG     => log(BUCK_SIZE);
use constant BASE_OFFSET  => abs(1 - log(0.000001) / BASE_LOG); # 284.1617969
use constant NUM_BUCK     => 1000;
use constant MIN_BUCK     => .000001;

our @buckets  = map { 0 } (0..NUM_BUCK-1);

my @buck_vals = map { bucket_value($_); } (0..NUM_BUCK-1);

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(groupby worst) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $attributes = $args{attributes} || {};
   my $self = {
      groupby        => $args{groupby},
      detect_attribs => scalar keys %$attributes == 0 ? 1 : 0,
      all_attribs    => [ keys %$attributes ],
      ignore_attribs => {
         map  { $_ => $args{attributes}->{$_} }
         grep { $_ ne $args{groupby} }
         @{$args{ignore_attributes}}
      },
      attributes     => {
         map  { $_ => $args{attributes}->{$_} }
         grep { $_ ne $args{groupby} }
         keys %$attributes
      },
      alt_attribs    => {
         map  { $_ => make_alt_attrib(@{$args{attributes}->{$_}}) }
         grep { $_ ne $args{groupby} }
         keys %$attributes
      },
      worst        => $args{worst},
      unroll_limit => $args{unroll_limit} || 1000,
      attrib_limit => $args{attrib_limit},
      result_classes => {},
      result_globals => {},
      result_samples => {},
      n_events       => 0,
      unrolled_loops => undef,
      type_for       => { %{$args{type_for} || { Query_time => 'num' }} },
   };
   return bless $self, $class;
}

sub reset_aggregated_data {
   my ( $self ) = @_;
   foreach my $class ( values %{$self->{result_classes}} ) {
      foreach my $attrib ( values %$class ) {
         delete @{$attrib}{keys %$attrib};
      }
   }
   foreach my $class ( values %{$self->{result_globals}} ) {
      delete @{$class}{keys %$class};
   }
   delete @{$self->{result_samples}}{keys %{$self->{result_samples}}};
   $self->{n_events} = 0;
}

sub aggregate {
   my ( $self, $event ) = @_;

   my $group_by = $event->{$self->{groupby}};
   return unless defined $group_by;

   $self->{n_events}++;
   MKDEBUG && _d('event', $self->{n_events});

   return $self->{unrolled_loops}->($self, $event, $group_by)
      if $self->{unrolled_loops};

   if ( $self->{n_events} <= $self->{unroll_limit} ) {

      $self->add_new_attributes($event) if $self->{detect_attribs};

      ATTRIB:
      foreach my $attrib ( keys %{$self->{attributes}} ) {

         if ( !exists $event->{$attrib} ) {
            MKDEBUG && _d("attrib doesn't exist in event:", $attrib);
            my $alt_attrib = $self->{alt_attribs}->{$attrib}->($event);
            MKDEBUG && _d('alt attrib:', $alt_attrib);
            next ATTRIB unless $alt_attrib;
         }

         GROUPBY:
         foreach my $val ( ref $group_by ? @$group_by : ($group_by) ) {
            my $class_attrib  = $self->{result_classes}->{$val}->{$attrib} ||= {};
            my $global_attrib = $self->{result_globals}->{$attrib} ||= {};
            my $samples       = $self->{result_samples};
            my $handler = $self->{handlers}->{ $attrib };
            if ( !$handler ) {
               $handler = $self->make_handler(
                  $attrib,
                  $event,
                  wor => $self->{worst} eq $attrib,
                  alt => $self->{attributes}->{$attrib},
               );
               $self->{handlers}->{$attrib} = $handler;
            }
            next GROUPBY unless $handler;
            $samples->{$val} ||= $event; # Initialize to the first event.
            $handler->($event, $class_attrib, $global_attrib, $samples, $group_by);
         }
      }
   }
   else {
      $self->_make_unrolled_loops($event);
      $self->{unrolled_loops}->($self, $event, $group_by);
   }

   return;
}

sub _make_unrolled_loops {
   my ( $self, $event ) = @_;

   my $group_by = $event->{$self->{groupby}};

   my @attrs   = grep { $self->{handlers}->{$_} } keys %{$self->{attributes}};
   my $globs   = $self->{result_globals}; # Global stats for each
   my $samples = $self->{result_samples};

   my @lines = (
      'my ( $self, $event, $group_by ) = @_;',
      'my ($val, $class, $global, $idx);',
      (ref $group_by ? ('foreach my $group_by ( @$group_by ) {') : ()),
      'my $temp = $self->{result_classes}->{ $group_by }
         ||= { map { $_ => { } } @attrs };',
      '$samples->{$group_by} ||= $event;', # Always start with the first.
   );
   foreach my $i ( 0 .. $#attrs ) {
      push @lines, (
         '$class  = $temp->{"'  . $attrs[$i] . '"};',
         '$global = $globs->{"' . $attrs[$i] . '"};',
         $self->{unrolled_for}->{$attrs[$i]},
      );
   }
   if ( ref $group_by ) {
      push @lines, '}'; # Close the loop opened above
   }
   @lines = map { s/^/   /gm; $_ } @lines; # Indent for debugging
   unshift @lines, 'sub {';
   push @lines, '}';

   my $code = join("\n", @lines);
   MKDEBUG && _d('Unrolled subroutine:', @lines);
   my $sub = eval $code;
   die $EVAL_ERROR if $EVAL_ERROR;
   $self->{unrolled_loops} = $sub;

   return;
}

sub results {
   my ( $self ) = @_;
   return {
      classes => $self->{result_classes},
      globals => $self->{result_globals},
      samples => $self->{result_samples},
   };
}

sub attributes {
   my ( $self ) = @_;
   return $self->{type_for};
}

sub type_for {
   my ( $self, $attrib ) = @_;
   return $self->{type_for}->{$attrib};
}

sub make_handler {
   my ( $self, $attrib, $event, %args ) = @_;
   die "I need an attrib" unless defined $attrib;
   my ($val) = grep { defined $_ } map { $event->{$_} } @{ $args{alt} };
   my $is_array = 0;
   if (ref $val eq 'ARRAY') {
      $is_array = 1;
      $val      = $val->[0];
   }
   return unless defined $val; # Can't decide type if it's undef.

   my $float_re = qr{[+-]?(?:(?=\d|[.])\d+(?:[.])\d{0,})(?:E[+-]?\d+)?}i;
   my $type = $self->type_for($attrib)         ? $self->type_for($attrib)
            : $val  =~ m/^(?:\d+|$float_re)$/o ? 'num'
            : $val  =~ m/^(?:Yes|No)$/         ? 'bool'
            :                                    'string';
   MKDEBUG && _d('Type for', $attrib, 'is', $type,
      '(sample:', $val, '), is array:', $is_array);
   $self->{type_for}->{$attrib} = $type;

   %args = ( # Set up defaults
      min => 1,
      max => 1,
      sum => $type =~ m/num|bool/    ? 1 : 0,
      cnt => 1,
      unq => $type =~ m/bool|string/ ? 1 : 0,
      all => $type eq 'num'          ? 1 : 0,
      glo => 1,
      trf => ($type eq 'bool') ? q{(($val || '') eq 'Yes') ? 1 : 0} : undef,
      wor => 0,
      alt => [],
      %args,
   );

   my @lines = ("# type: $type"); # Lines of code for the subroutine
   if ( $args{trf} ) {
      push @lines, q{$val = } . $args{trf} . ';';
   }

   foreach my $place ( qw($class $global) ) {
      my @tmp;
      if ( $args{min} ) {
         my $op   = $type eq 'num' ? '<' : 'lt';
         push @tmp, (
            'PLACE->{min} = $val if !defined PLACE->{min} || $val '
               . $op . ' PLACE->{min};',
         );
      }
      if ( $args{max} ) {
         my $op = ($type eq 'num') ? '>' : 'gt';
         push @tmp, (
            'PLACE->{max} = $val if !defined PLACE->{max} || $val '
               . $op . ' PLACE->{max};',
         );
      }
      if ( $args{sum} ) {
         push @tmp, 'PLACE->{sum} += $val;';
      }
      if ( $args{cnt} ) {
         push @tmp, '++PLACE->{cnt};';
      }
      if ( $args{all} ) {
         push @tmp, (
            'exists PLACE->{all} or PLACE->{all} = [ @buckets ];',
            '++PLACE->{all}->[ EventAggregator::bucket_idx($val) ];',
         );
      }
      push @lines, map { s/PLACE/$place/g; $_ } @tmp;
   }

   if ( $args{unq} ) {
      push @lines, '++$class->{unq}->{$val};';
   }
   if ( $args{wor} ) {
      my $op = $type eq 'num' ? '>=' : 'ge';
      push @lines, (
         'if ( $val ' . $op . ' ($class->{max} || 0) ) {',
         '   $samples->{$group_by} = $event;',
         '}',
      );
   }

   my @broken_query_time;
   if ( $attrib eq 'Query_time' ) {
      push @broken_query_time, (
         '$val =~ s/^(\d+(?:\.\d+)?).*/$1/;',
         '$event->{\''.$attrib.'\'} = $val;',
      );
   }

   my @limit;
   if ( $args{all} && $type eq 'num' && $self->{attrib_limit} ) {
      push @limit, (
         "if ( \$val > $self->{attrib_limit} ) {",
         '   $val = $class->{last} ||= 0;',
         '}',
         '$class->{last} = $val;',
      );
   }

   my @unrolled = (
      "\$val = \$event->{'$attrib'};",
      ($is_array ? ('foreach my $val ( @$val ) {') : ()),
      (map { "\$val = \$event->{'$_'} unless defined \$val;" }
         grep { $_ ne $attrib } @{$args{alt}}),
      'defined $val && do {',
      ( map { s/^/   /gm; $_ } (@broken_query_time, @limit, @lines) ), # Indent for debugging
      '};',
      ($is_array ? ('}') : ()),
   );
   $self->{unrolled_for}->{$attrib} = join("\n", @unrolled);

   unshift @lines, (
      'sub {',
      'my ( $event, $class, $global, $samples, $group_by ) = @_;',
      'my ($val, $idx);', # NOTE: define all variables here
      "\$val = \$event->{'$attrib'};",
      (map { "\$val = \$event->{'$_'} unless defined \$val;" }
         grep { $_ ne $attrib } @{$args{alt}}),
      'return unless defined $val;',
      ($is_array ? ('foreach my $val ( @$val ) {') : ()),
      @broken_query_time,
      @limit,
      ($is_array ? ('}') : ()),
   );
   push @lines, '}';
   my $code = join("\n", @lines);
   $self->{code_for}->{$attrib} = $code;

   MKDEBUG && _d('Metric handler for', $attrib, ':', @lines);
   my $sub = eval join("\n", @lines);
   die if $EVAL_ERROR;
   return $sub;
}

sub bucket_idx {
   my ( $val ) = @_;
   return 0 if $val < MIN_BUCK;
   my $idx = int(BASE_OFFSET + log($val)/BASE_LOG);
   return $idx > (NUM_BUCK-1) ? (NUM_BUCK-1) : $idx;
}

sub bucket_value {
   my ( $bucket ) = @_;
   return 0 if $bucket == 0;
   die "Invalid bucket: $bucket" if $bucket < 0 || $bucket > (NUM_BUCK-1);
   return (BUCK_SIZE**($bucket-1)) * MIN_BUCK;
}

{
   my @buck_tens;
   sub buckets_of {
      return @buck_tens if @buck_tens;

      my $start_bucket  = 0;
      my @base10_starts = (0);
      map { push @base10_starts, (10**$_)*MIN_BUCK } (1..7);

      for my $base10_bucket ( 0..($#base10_starts-1) ) {
         my $next_bucket = bucket_idx( $base10_starts[$base10_bucket+1] );
         MKDEBUG && _d('Base 10 bucket', $base10_bucket, 'maps to',
            'base 1.05 buckets', $start_bucket, '..', $next_bucket-1);
         for my $base1_05_bucket ($start_bucket..($next_bucket-1)) {
            $buck_tens[$base1_05_bucket] = $base10_bucket;
         }
         $start_bucket = $next_bucket;
      }

      map { $buck_tens[$_] = 7 } ($start_bucket..(NUM_BUCK-1));

      return @buck_tens;
   }
}

sub calculate_statistical_metrics {
   my ( $self, $vals, $args ) = @_;
   my $statistical_metrics = {
      pct_95    => 0,
      stddev    => 0,
      median    => 0,
      cutoff    => undef,
   };

   return $statistical_metrics
      unless defined $vals && @$vals && $args->{cnt};

   my $n_vals = $args->{cnt};
   if ( $n_vals == 1 || $args->{max} == $args->{min} ) {
      my $v      = $args->{max} || 0;
      my $bucket = int(6 + ( log($v > 0 ? $v : MIN_BUCK) / log(10)));
      $bucket    = $bucket > 7 ? 7 : $bucket < 0 ? 0 : $bucket;
      return {
         pct_95 => $v,
         stddev => 0,
         median => $v,
         cutoff => $n_vals,
      };
   }
   elsif ( $n_vals == 2 ) {
      foreach my $v ( $args->{min}, $args->{max} ) {
         my $bucket = int(6 + ( log($v && $v > 0 ? $v : MIN_BUCK) / log(10)));
         $bucket = $bucket > 7 ? 7 : $bucket < 0 ? 0 : $bucket;
      }
      my $v      = $args->{max} || 0;
      my $mean = (($args->{min} || 0) + $v) / 2;
      return {
         pct_95 => $v,
         stddev => sqrt((($v - $mean) ** 2) *2),
         median => $mean,
         cutoff => $n_vals,
      };
   }

   my $cutoff = $n_vals >= 10 ? int ( $n_vals * 0.95 ) : $n_vals;
   $statistical_metrics->{cutoff} = $cutoff;

   my $total_left = $n_vals;
   my $top_vals   = $n_vals - $cutoff; # vals > 95th
   my $sum_excl   = 0;
   my $sum        = 0;
   my $sumsq      = 0;
   my $mid        = int($n_vals / 2);
   my $median     = 0;
   my $prev       = NUM_BUCK-1; # Used for getting median when $cutoff is odd
   my $bucket_95  = 0; # top bucket in 95th

   MKDEBUG && _d('total vals:', $total_left, 'top vals:', $top_vals, 'mid:', $mid);

   BUCKET:
   for my $bucket ( reverse 0..(NUM_BUCK-1) ) {
      my $val = $vals->[$bucket];
      next BUCKET unless $val; 

      $total_left -= $val;
      $sum_excl   += $val;
      $bucket_95   = $bucket if !$bucket_95 && $sum_excl > $top_vals;

      if ( !$median && $total_left <= $mid ) {
         $median = (($cutoff % 2) || ($val > 1)) ? $buck_vals[$bucket]
                 : ($buck_vals[$bucket] + $buck_vals[$prev]) / 2;
      }

      $sum    += $val * $buck_vals[$bucket];
      $sumsq  += $val * ($buck_vals[$bucket]**2);
      $prev   =  $bucket;
   }

   my $var      = $sumsq/$n_vals - ( ($sum/$n_vals) ** 2 );
   my $stddev   = $var > 0 ? sqrt($var) : 0;
   my $maxstdev = (($args->{max} || 0) - ($args->{min} || 0)) / 2;
   $stddev      = $stddev > $maxstdev ? $maxstdev : $stddev;

   MKDEBUG && _d('sum:', $sum, 'sumsq:', $sumsq, 'stddev:', $stddev,
      'median:', $median, 'prev bucket:', $prev,
      'total left:', $total_left, 'sum excl', $sum_excl,
      'bucket 95:', $bucket_95, $buck_vals[$bucket_95]);

   $statistical_metrics->{stddev} = $stddev;
   $statistical_metrics->{pct_95} = $buck_vals[$bucket_95];
   $statistical_metrics->{median} = $median;

   return $statistical_metrics;
}

sub metrics {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(attrib where) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $stats = $self->results;
   my $store = $stats->{classes}->{$args{where}}->{$args{attrib}};

   my $global_cnt = $stats->{globals}->{$args{attrib}}->{cnt};
   my $metrics    = $self->calculate_statistical_metrics($store->{all}, $store);

   return {
      cnt    => $store->{cnt},
      pct    => $global_cnt && $store->{cnt} ? $store->{cnt} / $global_cnt : 0,
      sum    => $store->{sum},
      min    => $store->{min},
      max    => $store->{max},
      avg    => $store->{sum} && $store->{cnt} ? $store->{sum} / $store->{cnt} : 0,
      median => $metrics->{median},
      pct_95 => $metrics->{pct_95},
      stddev => $metrics->{stddev},
   };
}

sub top_events {
   my ( $self, %args ) = @_;
   my $classes = $self->{result_classes};
   my @sorted = reverse sort { # Sorted list of $groupby values
      $classes->{$a}->{$args{attrib}}->{$args{orderby}}
         <=> $classes->{$b}->{$args{attrib}}->{$args{orderby}}
      } grep {
         defined $classes->{$_}->{$args{attrib}}->{$args{orderby}}
      } keys %$classes;
   my @chosen;
   my ($total, $count) = (0, 0);
   foreach my $groupby ( @sorted ) {
      if ( 
         (!$args{total} || $total < $args{total} )
         && ( !$args{count} || $count < $args{count} )
      ) {
         push @chosen, [$groupby, 'top'];
      }

      elsif ( $args{ol_attrib} && (!$args{ol_freq}
         || $classes->{$groupby}->{$args{ol_attrib}}->{cnt} >= $args{ol_freq})
      ) {
         MKDEBUG && _d('Calculating statistical_metrics');
         my $stats = $self->calculate_statistical_metrics(
            $classes->{$groupby}->{$args{ol_attrib}}->{all},
            $classes->{$groupby}->{$args{ol_attrib}}
         );
         if ( $stats->{pct_95} >= $args{ol_limit} ) {
            push @chosen, [$groupby, 'outlier'];
         }
      }

      $total += $classes->{$groupby}->{$args{attrib}}->{$args{orderby}};
      $count++;
   }
   return @chosen;
}

sub add_new_attributes {
   my ( $self, $event ) = @_;
   return unless $event;

   map {
      my $attrib = $_;
      $self->{attributes}->{$attrib}  = [$attrib];
      $self->{alt_attribs}->{$attrib} = make_alt_attrib($attrib);
      push @{$self->{all_attribs}}, $attrib;
      MKDEBUG && _d('Added new attribute:', $attrib);
   }
   grep {
      $_ ne $self->{groupby}
      && !exists $self->{attributes}->{$_}
      && !exists $self->{ignore_attribs}->{$_}
   }
   keys %$event;

   return;
}

sub get_attributes {
   my ( $self ) = @_;
   return @{$self->{all_attribs}};
}

sub events_processed {
   my ( $self ) = @_;
   return $self->{n_events};
}

sub make_alt_attrib {
   my ( @attribs ) = @_;

   my $attrib = shift @attribs;  # Primary attribute.
   return sub {} unless @attribs;  # No alternates.

   my @lines;
   push @lines, 'sub { my ( $event ) = @_; my $alt_attrib;';
   push @lines, map  {
         "\$alt_attrib = '$_' if !defined \$alt_attrib "
         . "&& exists \$event->{'$_'};"
      } @attribs;
   push @lines, 'return $alt_attrib; }';
   MKDEBUG && _d('alt attrib sub for', $attrib, ':', @lines);
   my $sub = eval join("\n", @lines);
   die if $EVAL_ERROR;
   return $sub;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End EventAggregator package
# ###########################################################################

# ###########################################################################
# QueryParser package 4704
# ###########################################################################
package QueryParser;

use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};
our $tbl_ident = qr/(?:`[^`]+`|\w+)(?:\.(?:`[^`]+`|\w+))?/;
our $tbl_regex = qr{
         \b(?:FROM|JOIN|(?<!KEY\s)UPDATE|INTO) # Words that precede table names
         \b\s*
         ($tbl_ident
            (?: (?:\s+ (?:AS\s+)? \w+)?, \s*$tbl_ident )*
         )
      }xio;
our $has_derived = qr{
      \b(?:FROM|JOIN|,)
      \s*\(\s*SELECT
   }xi;

our $data_def_stmts = qr/(?:CREATE|ALTER|TRUNCATE|DROP|RENAME)/i;

sub new {
   my ( $class ) = @_;
   bless {}, $class;
}

sub get_tables {
   my ( $self, $query ) = @_;
   return unless $query;
   MKDEBUG && _d('Getting tables for', $query);

   my ( $ddl_stmt ) = $query =~ /^\s*($data_def_stmts)\b/i;
   if ( $ddl_stmt ) {
      MKDEBUG && _d('Special table type:', $ddl_stmt);
      $query =~ s/IF NOT EXISTS//i;
      if ( $query =~ m/$ddl_stmt DATABASE\b/i ) {
         MKDEBUG && _d('Query alters a database, not a table');
         return ();
      }
      if ( $ddl_stmt =~ m/CREATE/i && $query =~ m/$ddl_stmt\b.+?\bSELECT\b/i ) {
         my ($select) = $query =~ m/\b(SELECT\b.+)/is;
         MKDEBUG && _d('CREATE TABLE ... SELECT:', $select);
         return $self->get_tables($select);
      }
      my ($tbl) = $query =~ m/TABLE\s+($tbl_ident)(\s+.*)?/i;
      MKDEBUG && _d('Matches table:', $tbl);
      return ($tbl);
   }

   $query =~ s/ (?:LOW_PRIORITY|IGNORE|STRAIGHT_JOIN)//ig;

   if ( $query =~ /^\s*LOCK TABLES/i ) {
      MKDEBUG && _d('Special table type: LOCK TABLES');
      $query =~ s/^(\s*LOCK TABLES\s+)//;
      $query =~ s/\s+(?:READ|WRITE|LOCAL)+\s*//g;
      MKDEBUG && _d('Locked tables:', $query);
      $query = "FROM $query";
   }

   $query =~ s/\\["']//g;                # quoted strings
   $query =~ s/".*?"/?/sg;               # quoted strings
   $query =~ s/'.*?'/?/sg;               # quoted strings

   my @tables;
   foreach my $tbls ( $query =~ m/$tbl_regex/gio ) {
      MKDEBUG && _d('Match tables:', $tbls);
      foreach my $tbl ( split(',', $tbls) ) {
         $tbl =~ s/\s*($tbl_ident)(\s+.*)?/$1/gio;

         if ( $tbl !~ m/[a-zA-Z]/ ) {
            MKDEBUG && _d('Skipping suspicious table name:', $tbl);
            next;
         }

         push @tables, $tbl;
      }
   }
   return @tables;
}

sub has_derived_table {
   my ( $self, $query ) = @_;
   my $match = $query =~ m/$has_derived/;
   MKDEBUG && _d($query, 'has ' . ($match ? 'a' : 'no') . ' derived table');
   return $match;
}

sub get_aliases {
   my ( $self, $query ) = @_;
   return unless $query;
   my $aliases;

   $query =~ s/ (?:LOW_PRIORITY|IGNORE|STRAIGHT_JOIN)//ig;

   $query =~ s/ (?:INNER|OUTER|CROSS|LEFT|RIGHT|NATURAL)//ig;

   my ($tbl_refs, $from) = $query =~ m{
      (
         (FROM|INTO|UPDATE)\b\s*   # Keyword before table refs
         .+?                       # Table refs
      )
      (?:\s+|\z)                   # If the query does not end with the table
      (?:WHERE|ORDER|LIMIT|HAVING|SET|VALUES|\z) # Keyword after table refs
   }ix;

   die "Failed to parse table references from $query"
      unless $tbl_refs && $from;

   MKDEBUG && _d('tbl refs:', $tbl_refs);

   my $before_tbl = qr/(?:,|JOIN|\s|$from)+/i;

   my $after_tbl  = qr/(?:,|JOIN|ON|USING|\z)/i;

   $tbl_refs =~ s/ = /=/g;

   while (
      $tbl_refs =~ m{
         $before_tbl\b\s*
            ( ($tbl_ident) (?:\s+ (?:AS\s+)? (\w+))? )
         \s*$after_tbl
      }xgio )
   {
      my ( $tbl_ref, $db_tbl, $alias ) = ($1, $2, $3);
      MKDEBUG && _d('Match table:', $tbl_ref);

      if ( $tbl_ref =~ m/^AS\s+\w+/i ) {
         MKDEBUG && _d('Subquery', $tbl_ref);
         $aliases->{$alias} = undef;
         next;
      }

      my ( $db, $tbl ) = $db_tbl =~ m/^(?:(.*?)\.)?(.*)/;
      $aliases->{$alias || $tbl} = $tbl;
      $aliases->{DATABASE}->{$tbl} = $db if $db;
   }
   return $aliases;
}

sub split {
   my ( $self, $query ) = @_;
   return unless $query;
   $query = clean_query($query);
   MKDEBUG && _d('Splitting', $query);

   my $verbs = qr{SELECT|INSERT|UPDATE|DELETE|REPLACE|UNION|CREATE}i;

   my @split_statements = grep { $_ } split(m/\b($verbs\b(?!(?:\s*\()))/io, $query);

   my @statements;
   if ( @split_statements == 1 ) {
      push @statements, $query;
   }
   else {
      for ( my $i = 0; $i <= $#split_statements; $i += 2 ) {
         push @statements, $split_statements[$i].$split_statements[$i+1];
      }
   }

   MKDEBUG && _d('statements:', map { $_ ? "<$_>" : 'none' } @statements);
   return @statements;
}

sub clean_query {
   my ( $query ) = @_;
   return unless $query;
   $query =~ s!/\*.*?\*/! !g;  # Remove /* comment blocks */
   $query =~ s/^\s+//;         # Remove leading spaces
   $query =~ s/\s+$//;         # Remove trailing spaces
   $query =~ s/\s{2,}/ /g;     # Remove extra spaces
   return $query;
}

sub split_subquery {
   my ( $self, $query ) = @_;
   return unless $query;
   $query = clean_query($query);
   $query =~ s/;$//;

   my @subqueries;
   my $sqno = 0;  # subquery number
   my $pos  = 0;
   while ( $query =~ m/(\S+)(?:\s+|\Z)/g ) {
      $pos = pos($query);
      my $word = $1;
      MKDEBUG && _d($word, $sqno);
      if ( $word =~ m/^\(?SELECT\b/i ) {
         my $start_pos = $pos - length($word) - 1;
         if ( $start_pos ) {
            $sqno++;
            MKDEBUG && _d('Subquery', $sqno, 'starts at', $start_pos);
            $subqueries[$sqno] = {
               start_pos => $start_pos,
               end_pos   => 0,
               len       => 0,
               words     => [$word],
               lp        => 1, # left parentheses
               rp        => 0, # right parentheses
               done      => 0,
            };
         }
         else {
            MKDEBUG && _d('Main SELECT at pos 0');
         }
      }
      else {
         next unless $sqno;  # next unless we're in a subquery
         MKDEBUG && _d('In subquery', $sqno);
         my $sq = $subqueries[$sqno];
         if ( $sq->{done} ) {
            MKDEBUG && _d('This subquery is done; SQL is for',
               ($sqno - 1 ? "subquery $sqno" : "the main SELECT"));
            next;
         }
         push @{$sq->{words}}, $word;
         my $lp = ($word =~ tr/\(//) || 0;
         my $rp = ($word =~ tr/\)//) || 0;
         MKDEBUG && _d('parentheses left', $lp, 'right', $rp);
         if ( ($sq->{lp} + $lp) - ($sq->{rp} + $rp) == 0 ) {
            my $end_pos = $pos - 1;
            MKDEBUG && _d('Subquery', $sqno, 'ends at', $end_pos);
            $sq->{end_pos} = $end_pos;
            $sq->{len}     = $end_pos - $sq->{start_pos};
         }
      }
   }

   for my $i ( 1..$#subqueries ) {
      my $sq = $subqueries[$i];
      next unless $sq;
      $sq->{sql} = join(' ', @{$sq->{words}});
      substr $query,
         $sq->{start_pos} + 1,  # +1 for (
         $sq->{len} - 1,        # -1 for )
         "__subquery_$i";
   }

   return $query, map { $_->{sql} } grep { defined $_ } @subqueries;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End QueryParser package
# ###########################################################################

# ###########################################################################
# Daemon package 4565
# ###########################################################################

package Daemon;

use strict;
use warnings FATAL => 'all';

use POSIX qw(setsid);
use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(o) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $o = $args{o};
   my $self = {
      o        => $o,
      log_file => $o->has('log') ? $o->get('log') : undef,
      PID_file => $o->has('pid') ? $o->get('pid') : undef,
   };

   check_PID_file(undef, $self->{PID_file});

   MKDEBUG && _d('Daemonized child will log to', $self->{log_file});
   return bless $self, $class;
}

sub daemonize {
   my ( $self ) = @_;

   MKDEBUG && _d('About to fork and daemonize');
   defined (my $pid = fork()) or die "Cannot fork: $OS_ERROR";
   if ( $pid ) {
      MKDEBUG && _d('I am the parent and now I die');
      exit;
   }

   $self->{child} = 1;

   POSIX::setsid() or die "Cannot start a new session: $OS_ERROR";
   chdir '/'       or die "Cannot chdir to /: $OS_ERROR";

   $self->_make_PID_file();

   $OUTPUT_AUTOFLUSH = 1;

   if ( -t STDIN ) {
      close STDIN;
      open  STDIN, '/dev/null'
         or die "Cannot reopen STDIN to /dev/null: $OS_ERROR";
   }

   if ( $self->{log_file} ) {
      close STDOUT;
      open  STDOUT, '>>', $self->{log_file}
         or die "Cannot open log file $self->{log_file}: $OS_ERROR";

      close STDERR;
      open  STDERR, ">&STDOUT"
         or die "Cannot dupe STDERR to STDOUT: $OS_ERROR"; 
   }
   else {
      if ( -t STDOUT ) {
         close STDOUT;
         open  STDOUT, '>', '/dev/null'
            or die "Cannot reopen STDOUT to /dev/null: $OS_ERROR";
      }
      if ( -t STDERR ) {
         close STDERR;
         open  STDERR, '>', '/dev/null'
            or die "Cannot reopen STDERR to /dev/null: $OS_ERROR";
      }
   }

   MKDEBUG && _d('I am the child and now I live daemonized');
   return;
}

sub check_PID_file {
   my ( $self, $file ) = @_;
   my $PID_file = $self ? $self->{PID_file} : $file;
   MKDEBUG && _d('Checking PID file', $PID_file);
   if ( $PID_file && -f $PID_file ) {
      my $pid;
      eval { chomp($pid = `cat $PID_file`); };
      die "Cannot cat $PID_file: $OS_ERROR" if $EVAL_ERROR;
      MKDEBUG && _d('PID file exists; it contains PID', $pid);
      if ( $pid ) {
         my $pid_is_alive = kill 0, $pid;
         if ( $pid_is_alive ) {
            die "The PID file $PID_file already exists "
               . " and the PID that it contains, $pid, is running";
         }
         else {
            warn "Overwriting PID file $PID_file because the PID that it "
               . "contains, $pid, is not running";
         }
      }
      else {
         die "The PID file $PID_file already exists but it does not "
            . "contain a PID";
      }
   }
   else {
      MKDEBUG && _d('No PID file');
   }
   return;
}

sub make_PID_file {
   my ( $self ) = @_;
   if ( exists $self->{child} ) {
      die "Do not call Daemon::make_PID_file() for daemonized scripts";
   }
   $self->_make_PID_file();
   $self->{rm_PID_file} = 1;
   return;
}

sub _make_PID_file {
   my ( $self ) = @_;

   my $PID_file = $self->{PID_file};
   if ( !$PID_file ) {
      MKDEBUG && _d('No PID file to create');
      return;
   }

   $self->check_PID_file();

   open my $PID_FH, '>', $PID_file
      or die "Cannot open PID file $PID_file: $OS_ERROR";
   print $PID_FH $PID
      or die "Cannot print to PID file $PID_file: $OS_ERROR";
   close $PID_FH
      or die "Cannot close PID file $PID_file: $OS_ERROR";

   MKDEBUG && _d('Created PID file:', $self->{PID_file});
   return;
}

sub _remove_PID_file {
   my ( $self ) = @_;
   if ( $self->{PID_file} && -f $self->{PID_file} ) {
      unlink $self->{PID_file}
         or warn "Cannot remove PID file $self->{PID_file}: $OS_ERROR";
      MKDEBUG && _d('Removed PID file');
   }
   else {
      MKDEBUG && _d('No PID to remove');
   }
   return;
}

sub DESTROY {
   my ( $self ) = @_;
   $self->_remove_PID_file() if $self->{child} || $self->{rm_PID_file};
   return;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End Daemon package
# ###########################################################################

# ###########################################################################
# QueryExecutor package 4719
# ###########################################################################
package QueryExecutor;

use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);
use Time::HiRes qw(time);
use Data::Dumper;
$Data::Dumper::Indent    = 1;
$Data::Dumper::Sortkeys  = 1;
$Data::Dumper::Quotekeys = 0;

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw() ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $self = {};
   return bless $self, $class;
}

sub exec {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(query hosts callbacks) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $query      = $args{query};
   my $callbacks  = $args{callbacks};
   my $hosts      = $args{hosts};
   my $dp         = $args{DSNParser};

   MKDEBUG && _d('Executing query:', $query);

   my @results;
   my $hostno = -1;
   HOST:
   foreach my $host ( @$hosts ) {
      $hostno++;  # Increment this now because we might not reach loop's end.
      $results[$hostno] = {};
      my $results       = $results[$hostno];
      my $dbh           = $host->{dbh};
      my $dsn           = $host->{dsn};
      my $host_name     = $dp && $dsn ? $dp->as_string($dsn) : $hostno + 1;
      my %callback_args = (
         query     => $query,
         dbh       => $dbh,
         dsn       => $dsn,
         host_name => $host_name,
         results   => $results,
      );

      MKDEBUG && _d('Starting execution on host', $host_name);
      foreach my $callback ( @$callbacks ) {
         my ($name, $res);
         eval {
            ($name, $res) = $callback->(%callback_args);
         };
         if ( $EVAL_ERROR ) {
            __die(
               "A callback sub had an unhandled error: $EVAL_ERROR",
               $name,
               $res,
               $host_name,
               \@results
            );
         };
         _check_results($name, $res, $host_name, \@results);
         $results->{$name} = $res;
      }
      MKDEBUG && _d('Results for host', $host_name, ':', Dumper($results));
   } # HOST

   return @results;
}

sub Query_time {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(query dbh) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $query = $args{query};
   my $dbh   = $args{dbh};
   my $error = undef;
   my $name  = 'Query_time';
   my $res   = { error => undef, Query_time => -1, };
   MKDEBUG && _d($name);

   my ( $start, $end, $query_time );
   eval {
      $start = time();
      $dbh->do($query);
      $end   = time();
      $query_time = sprintf '%.6f', $end - $start;
   };
   if ( $EVAL_ERROR ) {
      MKDEBUG && _d('Error executing query on host', $args{host_name}, ':',
         $EVAL_ERROR);
      $res->{error} = $EVAL_ERROR;
   }
   else {
      $res->{Query_time} = $query_time;
   }

   return $name, $res;
}

sub get_warnings {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(dbh) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $dbh   = $args{dbh};
   my $error = undef;
   my $name  = 'warnings';
   MKDEBUG && _d($name);

   my $warnings;
   my $warning_count;
   eval {
      $warnings      = $dbh->selectall_hashref('SHOW WARNINGS', 'Code');
      $warning_count = $dbh->selectall_arrayref('SELECT @@warning_count',
         { Slice => {} });
   };
   if ( $EVAL_ERROR ) {
      MKDEBUG && _d('Error getting warnings:', $EVAL_ERROR);
      $error = $EVAL_ERROR;
   }

   my $results = {
      error => $error,
      codes => $warnings,
      count => $warning_count->[0]->{'@@warning_count'} || 0,
   };
   return $name, $results;
}

sub clear_warnings {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(dbh query QueryParser) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $dbh     = $args{dbh};
   my $query   = $args{query};
   my $qparser = $args{QueryParser};
   my $error   = undef;
   my $name    = 'clear_warnings';
   MKDEBUG && _d($name);

   my @tables = $qparser->get_tables($query);
   if ( @tables ) {
      MKDEBUG && _d('tables:', @tables);
      my $sql = "SELECT * FROM $tables[0] LIMIT 0";
      MKDEBUG && _d($sql);
      eval {
         $dbh->do($sql);
      };
      if ( $EVAL_ERROR ) {
         MKDEBUG && _d('Error clearning warnings:', $EVAL_ERROR);
         $error = $EVAL_ERROR;
      }
   }
   else {
      $error = "Cannot clear warnings because the tables for this query cannot "
         . "be parsed.";
   }

   return $name, { error=>$error };
}

sub pre_checksum_results {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(dbh database tmp_table Quoter) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $dbh     = $args{dbh};
   my $db      = $args{database};
   my $tmp_tbl = $args{tmp_table};
   my $q       = $args{Quoter};
   my $error   = undef;
   my $name    = 'pre_checksum_results';
   MKDEBUG && _d($name);

   my $tmp_db_tbl = $q->quote($db, $tmp_tbl);
   eval {
      $dbh->do("DROP TABLE IF EXISTS $tmp_db_tbl");
      $dbh->do("SET storage_engine=MyISAM");
   };
   if ( $EVAL_ERROR ) {
      MKDEBUG && _d('Error dropping table', $tmp_db_tbl, ':', $EVAL_ERROR);
      $error = $EVAL_ERROR;
   }
   return $name, { error=>$error };
}

sub checksum_results {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(dbh database tmp_table MySQLDump TableParser Quoter) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $dbh     = $args{dbh};
   my $db      = $args{database};
   my $tmp_tbl = $args{tmp_table};
   my $du      = $args{MySQLDump};
   my $tp      = $args{TableParser};
   my $q       = $args{Quoter};
   my $error   = undef;
   my @errors  = ();
   my $name    = 'checksum_results';
   MKDEBUG && _d($name);

   my $tmp_db_tbl = $q->quote($db, $tmp_tbl);
   my $tbl_checksum;
   my $n_rows;
   my $tbl_struct;
   eval {
      $n_rows = $dbh->selectall_arrayref("SELECT COUNT(*) FROM $tmp_db_tbl")->[0]->[0];
      $tbl_checksum = $dbh->selectall_arrayref("CHECKSUM TABLE $tmp_db_tbl")->[0]->[1];
   };
   if ( $EVAL_ERROR ) {
      MKDEBUG && _d('Error counting rows or checksumming', $tmp_db_tbl, ':',
         $EVAL_ERROR);
      $error = $EVAL_ERROR;
      push @errors, $error;
   }
   else {
      MKDEBUG && _d('n rows:', $n_rows, 'tbl checksum:', $tbl_checksum);

      eval {
         my $ddl = $du->get_create_table($dbh, $q, $db, $tmp_tbl);
         MKDEBUG && _d('tmp table ddl:', Dumper($ddl));
         if ( $ddl->[0] eq 'table' ) {
            $tbl_struct = $tp->parse($ddl)
         }
      };
      if ( $EVAL_ERROR ) {
         MKDEBUG && _d('Failed to parse', $tmp_db_tbl, ':', $EVAL_ERROR); 
         $error = $EVAL_ERROR;
         push @errors, $error;
      }
   }

   my $sql = "DROP TABLE IF EXISTS $tmp_db_tbl";
   MKDEBUG && _d($sql);
   eval { $dbh->do($sql); };
   if ( $EVAL_ERROR ) {
      MKDEBUG && _d('Error dropping tmp table:', $EVAL_ERROR);
      $error = $EVAL_ERROR;
      push @errors, $error;
   }

   if ( !defined $n_rows ) { # 0 rows returned is ok.
      $error = "SELECT COUNT(*) for getting the number of rows didn't return a value";
      push @errors, $error;
      MKDEBUG && _d($error);
   }
   if ( $n_rows && !$tbl_checksum ) {
      $error = "CHECKSUM TABLE didn't return a value";
      push @errors, $error;
      MKDEBUG && _d($error);
   }

   @errors = () if @errors == 1;

   my $results = {
      error        => $error,
      errors       => \@errors,
      checksum     => $tbl_checksum || 0,
      n_rows       => $n_rows || 0,
      table_struct => $tbl_struct,
   };
   return $name, $results;
}



sub get_row_sths {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(query dbh) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $query      = $args{query};
   my $dbh        = $args{dbh};
   my $error      = undef;
   my $name       = 'get_row_sths';
   my $Query_time = { error => undef, Query_time => -1, };
   my ( $start, $end, $query_time );
   MKDEBUG && _d($name);

   my $sth;
   eval {
      $sth = $dbh->prepare($query);
   };
   if ( $EVAL_ERROR ) {
      MKDEBUG && _d('Error on prepare:', $EVAL_ERROR);
      $error = $EVAL_ERROR;
   }
   else {
      eval {
         $start = time();
         $sth->execute();
         $end   = time();
         $query_time = sprintf '%.6f', $end - $start;
      };
      if ( $EVAL_ERROR ) {
         MKDEBUG && _d('Error on execute:', $EVAL_ERROR);
         $error = $EVAL_ERROR;
         $Query_time->{error} = $error;
      }
      else {
         $Query_time->{Query_time} = $query_time;
      }
   }

   my $results = {
      error      => $error,
      sth        => $error ? undef : $sth,  # Only pass sth if no errors.
      Query_time => $Query_time,
   };
   return $name, $results;
}

sub _check_results {
   my ( $name, $res, $host_name, $all_res ) = @_;
   __die('Operation did not return a name!', @_)
      unless $name;
   __die('Operation did not return any results!', @_)
      unless $res || (scalar keys %$res);
   __die("Operation results do no have an 'error' key")
      unless exists $res->{error};
   __die("Operation error is blank string!")
      if defined $res->{error} && !$res->{error};
   __die("Operation errors is not an arrayref!")
      if $res->{errors} && ref $res->{errors} ne 'ARRAY';
   return;
}

sub __die {
   my ( $msg, $name, $res, $host_name, $all_res ) = @_;
   die "$msg\n"
      . "Host name: " . ($host_name ? $host_name : 'UNKNOWN') . "\n"
      . "Current results: " . Dumper($res)
      . "Prior results: "   . Dumper($all_res)
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End QueryExecutor package
# ###########################################################################

# ###########################################################################
# QueryRanker package 4970
# ###########################################################################
package QueryRanker;


use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);
use POSIX qw(floor);

use constant MKDEBUG => $ENV{MKDEBUG};

my @bucket_threshold = qw(500 100  100   500 50   50    20 1   );
my @bucket_labels    = qw(1us 10us 100us 1ms 10ms 100ms 1s 10s+);

my %ranker_for = (
   Query_time       => \&rank_query_times,
   warnings         => \&rank_warnings,
   checksum_results => \&rank_result_sets,
);

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw() ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $self = {
      %args,
   };
   return bless $self, $class;
}

sub set_ranker {
   my ( $self, $res, $code ) = @_;
   $self->{ranker_for}->{$res} = $code;
   return;
}

sub rank_results {
   my ( $self, $results, %args ) = @_;
   return unless @$results > 1;

   my $rank      = 0;
   my @reasons   = ();
   my $host1     = $results->[0];

   RESULTS:
   foreach my $op ( keys %$host1 ) {  # Each key the name of some operation
      my $compare = $self->{ranker_for}->{$op} || $ranker_for{$op};
      if ( !$compare ) {
         MKDEBUG && _d('No ranker for', $op);
         next RESULTS;
      }
      MKDEBUG && _d('Ranking', $op, 'results');

      my $host1_results = $host1->{$op};

      HOST:
      for my $i ( 1..(@$results-1) ) {
         my $hostN = $results->[$i];
         if ( !exists $hostN->{$op} ) {
            warn "Host", $i+1, " doesn't have $op results";
            next HOST;
         }

         my @res = $compare->($host1_results, $hostN->{$op}, %args);
         $rank += shift @res;
         push @reasons, @res;
      } 
   }

   return $rank, @reasons;
}

sub rank_query_times {
   my ( $host1, $host2 ) = @_;
   my $rank    = 0;   # total rank
   my @reasons = ();  # all reasons
   my @res     = ();  # ($rank, @reasons) for each comparison

   if ( $host1->{Query_time} == -1 ) {
      $rank += 100;
      push @reasons, 'Query failed to execute on host1: '
            . ($host1->{error} || 'unknown error')
            . " (rank+100)";
   }
   if ( $host2->{Query_time} == -1 ) {
      $rank += 100;
      push @reasons, 'Query failed to execute on host2: '
            . ($host2->{error} || 'unknown error')
            . " (rank+100)";
   }

   if ( $host1->{Query_time} >= 0 && $host2->{Query_time} >= 0 ) {
      @res = compare_query_times(
         $host1->{Query_time}, $host2->{Query_time});
      $rank += shift @res;
      push @reasons, @res;
   }

   return $rank, @reasons;
}

sub rank_warnings {
   my ( $host1, $host2 ) = @_;
   my $rank    = 0;   # total rank
   my @reasons = ();  # all reasons
   my @res     = ();  # ($rank, @reasons) for each comparison

   if ( $host1->{count} > 0 || $host2->{count} > 0 ) {
      $rank += 1;
      push @reasons, "Query has warnings (rank+1)";
   }

   if ( my $diff = abs($host1->{count} - $host2->{count}) ) {
      $rank += $diff;
      push @reasons, "Warning counts differ by $diff (rank+$diff)";
   }

   @res = compare_warnings($host1->{codes}, $host2->{codes});
   $rank += shift @res;
   push @reasons, @res;

   return $rank, @reasons;
}

sub compare_query_times {
   my ( $t1, $t2 ) = @_;
   die "I need a t1 argument" unless defined $t1;
   die "I need a t2 argument" unless defined $t2;

   MKDEBUG && _d('host1 query time:', $t1, 'host2 query time:', $t2);

   my $t1_bucket = bucket_for($t1);
   my $t2_bucket = bucket_for($t2);

   if ( $t1_bucket != $t2_bucket ) {
      my $rank_inc = 2 * abs($t1_bucket - $t2_bucket);
      return $rank_inc, "Query times differ significantly: "
         . "host1 in ".$bucket_labels[$t1_bucket]." range, "
         . "host2 in ".$bucket_labels[$t2_bucket]." range (rank+2)";
   }

   my $inc = percentage_increase($t1, $t2);
   if ( $inc >= $bucket_threshold[$t1_bucket] ) {
      return 1, "Query time increase $inc\% exceeds "
         . $bucket_threshold[$t1_bucket] . "\% increase threshold for "
         . $bucket_labels[$t1_bucket] . " range (rank+1)";
   }

   return (0);  # No significant difference.
}

sub compare_warnings {
   my ( $warnings1, $warnings2 ) = @_;
   die "I need a warnings1 argument" unless defined $warnings1;
   die "I need a warnings2 argument" unless defined $warnings2;

   my %new_warnings;
   my $rank_inc = 0;
   my @reasons;

   foreach my $code ( keys %$warnings1 ) {
      if ( exists $warnings2->{$code} ) {
         if ( $warnings2->{$code}->{Level} ne $warnings1->{$code}->{Level} ) {
            $rank_inc += 2;
            push @reasons, "Error $code changes level: "
               . $warnings1->{$code}->{Level} . " on host1, "
               . $warnings2->{$code}->{Level} . " on host2 (rank+2)";
         }
      }
      else {
         MKDEBUG && _d('New warning on host1:', $code);
         push @reasons, "Error $code on host1 is new (rank+3)";
         %{ $new_warnings{$code} } = %{ $warnings1->{$code} };
      }
   }

   foreach my $code ( keys %$warnings2 ) {
      if ( !exists $warnings1->{$code} && !exists $new_warnings{$code} ) {
         MKDEBUG && _d('New warning on host2:', $code);
         push @reasons, "Error $code on host2 is new (rank+3)";
         %{ $new_warnings{$code} } = %{ $warnings2->{$code} };
      }
   }

   $rank_inc += 3 * scalar keys %new_warnings;


   return $rank_inc, @reasons;
}

sub rank_result_sets {
   my ( $host1, $host2 ) = @_;
   my $rank    = 0;   # total rank
   my @reasons = ();  # all reasons
   my @res     = ();  # ($rank, @reasons) for each comparison

   if ( $host1->{checksum} ne $host2->{checksum} ) {
      $rank += 50;
      push @reasons, "Table checksums do not match (rank+50)";
   }

   if ( $host1->{n_rows} != $host2->{n_rows} ) {
      $rank += 50;
      push @reasons, "Number of rows do not match (rank+50)";
   }

   if ( $host1->{table_struct} && $host2->{table_struct} ) {
      @res = compare_table_structs(
         $host1->{table_struct},
         $host2->{table_struct}
      );
      $rank += shift @res;
      push @reasons, @res;
   }
   else {
      $rank += 10;
      push @reasons, 'The temporary tables could not be parsed (rank+10)';
   }

   return $rank, @reasons;
}

sub compare_table_structs {
   my ( $s1, $s2 ) = @_;
   die "I need a s1 argument" unless defined $s1;
   die "I need a s2 argument" unless defined $s2;

   my $rank_inc = 0;
   my @reasons  = ();

   if ( scalar @{$s1->{cols}} != scalar @{$s2->{cols}} ) {
      my $inc = 2 * abs( scalar @{$s1->{cols}} - scalar @{$s2->{cols}} );
      $rank_inc += $inc;
      push @reasons, 'Tables have different columns counts: '
         . scalar @{$s1->{cols}} . ' columns on host1, '
         . scalar @{$s2->{cols}} . " columns on host2 (rank+$inc)";
   }

   my %host1_missing_cols = %{$s2->{type_for}};  # Make a copy to modify.
   my @host2_missing_cols;
   foreach my $col ( keys %{$s1->{type_for}} ) {
      if ( exists $s2->{type_for}->{$col} ) {
         if ( $s1->{type_for}->{$col} ne $s2->{type_for}->{$col} ) {
            $rank_inc += 3;
            push @reasons, "Types for $col column differ: "
               . "'$s1->{type_for}->{$col}' on host1, "
               . "'$s2->{type_for}->{$col}' on host2 (rank+3)";
         }
         delete $host1_missing_cols{$col};
      }
      else {
         push @host2_missing_cols, $col;
      }
   }

   foreach my $col ( @host2_missing_cols ) {
      $rank_inc += 5;
      push @reasons, "Column $col exists on host1 but not on host2 (rank+5)";
   }
   foreach my $col ( keys %host1_missing_cols ) {
      $rank_inc += 5;
      push @reasons, "Column $col exists on host2 but not on host1 (rank+5)";
   }

   return $rank_inc, @reasons;
}

sub bucket_for {
   my ( $val ) = @_;
   die "I need a val" unless defined $val;
   return 0 if $val == 0;
   my $bucket = floor(log($val) / log(10)) + 6;
   $bucket = $bucket > 7 ? 7 : $bucket < 0 ? 0 : $bucket;
   return $bucket;
}

sub percentage_increase {
   my ( $x, $y ) = @_;
   return 0 if $x == $y;

   if ( $x > $y ) {
      my $z = $y;
         $y = $x;
         $x = $z;
   }

   if ( $x == 0 ) {
      return 1000;  # This should trigger all buckets' thresholds.
   }

   return sprintf '%.2f', (($y - $x) / $x) * 100;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End QueryRanker package
# ###########################################################################

# ###########################################################################
# ChangeHandler package 3186
# ###########################################################################
use strict;
use warnings FATAL => 'all';

package ChangeHandler;

use English qw(-no_match_vars);

my $DUPE_KEY  = qr/Duplicate entry/;
our @ACTIONS  = qw(DELETE REPLACE INSERT UPDATE);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(quoter database table sdatabase stable replace queue)
   ) {
      die "I need a $arg argument" unless defined $args{$arg};
   }
   my $self = { %args, map { $_ => [] } @ACTIONS };
   $self->{db_tbl}  = $self->{quoter}->quote(@args{qw(database table)});
   $self->{sdb_tbl} = $self->{quoter}->quote(@args{qw(sdatabase stable)});
   $self->{changes} = { map { $_ => 0 } @ACTIONS };
   return bless $self, $class;
}

sub fetch_back {
   my ( $self, $dbh ) = @_;
   $self->{fetch_back} = $dbh;
   MKDEBUG && _d('Will fetch rows from source when updating destination');
}

sub take_action {
   my ( $self, @sql ) = @_;
   MKDEBUG && _d('Calling subroutines on', @sql);
   foreach my $action ( @{$self->{actions}} ) {
      $action->(@sql);
   }
}

sub change {
   my ( $self, $action, $row, $cols ) = @_;
   MKDEBUG && _d($action, 'where', $self->make_where_clause($row, $cols));
   $self->{changes}->{
      $self->{replace} && $action ne 'DELETE' ? 'REPLACE' : $action
   }++;
   if ( $self->{queue} ) {
      $self->__queue($action, $row, $cols);
   }
   else {
      eval {
         my $func = "make_$action";
         $self->take_action($self->$func($row, $cols));
      };
      if ( $EVAL_ERROR =~ m/$DUPE_KEY/ ) {
         MKDEBUG && _d('Duplicate key violation; will queue and rewrite');
         $self->{queue}++;
         $self->{replace} = 1;
         $self->__queue($action, $row, $cols);
      }
      elsif ( $EVAL_ERROR ) {
         die $EVAL_ERROR;
      }
   }
}

sub __queue {
   my ( $self, $action, $row, $cols ) = @_;
   MKDEBUG && _d('Queueing change for later');
   if ( $self->{replace} ) {
      $action = $action eq 'DELETE' ? $action : 'REPLACE';
   }
   push @{$self->{$action}}, [ $row, $cols ];
}

sub process_rows {
   my ( $self, $queue_level ) = @_;
   my $error_count = 0;
   TRY: {
      if ( $queue_level && $queue_level < $self->{queue} ) { # see redo below!
         MKDEBUG && _d('Not processing now', $queue_level, '<', $self->{queue});
         return;
      }

      my ($row, $cur_act);
      eval {
         foreach my $action ( @ACTIONS ) {
            my $func = "make_$action";
            my $rows = $self->{$action};
            MKDEBUG && _d(scalar(@$rows), 'to', $action);
            $cur_act = $action;
            while ( @$rows ) {
               $row = shift @$rows;
               $self->take_action($self->$func(@$row));
            }
         }
         $error_count = 0;
      };
      if ( !$error_count++ && $EVAL_ERROR =~ m/$DUPE_KEY/ ) {
         MKDEBUG
            && _d('Duplicate key violation; re-queueing and rewriting');
         $self->{queue}++; # Defer rows to the very end
         $self->{replace} = 1;
         $self->__queue($cur_act, @$row);
         redo TRY;
      }
      elsif ( $EVAL_ERROR ) {
         die $EVAL_ERROR;
      }
   }
}

sub make_DELETE {
   my ( $self, $row, $cols ) = @_;
   return "DELETE FROM $self->{db_tbl} WHERE "
      . $self->make_where_clause($row, $cols)
      . ' LIMIT 1';
}

sub make_UPDATE {
   my ( $self, $row, $cols ) = @_;
   if ( $self->{replace} ) {
      return $self->make_row('REPLACE', $row, $cols);
   }
   my %in_where = map { $_ => 1 } @$cols;
   my $where = $self->make_where_clause($row, $cols);
   if ( my $dbh = $self->{fetch_back} ) {
      my $sql = "SELECT * FROM $self->{sdb_tbl} WHERE $where LIMIT 1";
      MKDEBUG && _d('Fetching data for UPDATE:', $sql);
      my $res = $dbh->selectrow_hashref($sql);
      @{$row}{keys %$res} = values %$res;
      $cols = [sort keys %$res];
   }
   else {
      $cols = [ sort keys %$row ];
   }
   return "UPDATE $self->{db_tbl} SET "
      . join(', ', map {
            $self->{quoter}->quote($_)
            . '=' .  $self->{quoter}->quote_val($row->{$_})
         } grep { !$in_where{$_} } @$cols)
      . " WHERE $where LIMIT 1";
}

sub make_INSERT {
   my ( $self, $row, $cols ) = @_;
   if ( $self->{replace} ) {
      return $self->make_row('REPLACE', $row, $cols);
   }
   return $self->make_row('INSERT', $row, $cols);
}

sub make_REPLACE {
   my ( $self, $row, $cols ) = @_;
   return $self->make_row('REPLACE', $row, $cols);
}

sub make_row {
   my ( $self, $verb, $row, $cols ) = @_;
   my @cols = sort keys %$row;
   if ( my $dbh = $self->{fetch_back} ) {
      my $where = $self->make_where_clause($row, $cols);
      my $sql = "SELECT * FROM $self->{sdb_tbl} WHERE $where LIMIT 1";
      MKDEBUG && _d('Fetching data for UPDATE:', $sql);
      my $res = $dbh->selectrow_hashref($sql);
      @{$row}{keys %$res} = values %$res;
      @cols = sort keys %$res;
   }
   return "$verb INTO $self->{db_tbl}("
      . join(', ', map { $self->{quoter}->quote($_) } @cols)
      . ') VALUES ('
      . $self->{quoter}->quote_val( @{$row}{@cols} )
      . ')';
}

sub make_where_clause {
   my ( $self, $row, $cols ) = @_;
   my @clauses = map {
      my $val = $row->{$_};
      my $sep = defined $val ? '=' : ' IS ';
      $self->{quoter}->quote($_) . $sep . $self->{quoter}->quote_val($val);
   } @$cols;
   return join(' AND ', @clauses);
}

sub get_changes {
   my ( $self ) = @_;
   return %{$self->{changes}};
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End ChangeHandler package
# ###########################################################################

# ###########################################################################
# RowDiff package 4561
# ###########################################################################
use strict;
use warnings FATAL => 'all';

package RowDiff;

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   die "I need a dbh" unless $args{dbh};
   my $self = \%args;
   return bless $self, $class;
}

sub compare_sets {
   my ( $self, %args ) = @_;
   my ( $left, $right, $syncer, $tbl )
      = @args{qw(left right syncer tbl)};

   my ($lr, $rr);  # Current row from the left/right sources.
   my $done = $self->{done};

   my ($left_done, $right_done) = (0, 0);

   do {
      if ( !$lr && !$left_done ) {
         MKDEBUG && _d('Fetching row from left');
         eval { $lr = $left->fetchrow_hashref(); };
         MKDEBUG && $EVAL_ERROR && _d($EVAL_ERROR);
         $left_done = !$lr || $EVAL_ERROR ? 1 : 0;
      }
      elsif ( MKDEBUG ) {
         _d('Left still has rows');
      }

      if ( !$rr && !$right_done ) {
         MKDEBUG && _d('Fetching row from right');
         eval { $rr = $right->fetchrow_hashref(); };
         MKDEBUG && $EVAL_ERROR && _d($EVAL_ERROR);
         $right_done = !$rr || $EVAL_ERROR ? 1 : 0;
      }
      elsif ( MKDEBUG ) {
         _d('Right still has rows');
      }

      my $cmp;
      if ( $lr && $rr ) {
         $cmp = $self->key_cmp($lr, $rr, $syncer->key_cols(), $tbl);
         MKDEBUG && _d('Key comparison on left and right:', $cmp);
      }
      if ( $lr || $rr ) {
         if ( $lr && $rr && defined $cmp && $cmp == 0 ) {
            MKDEBUG && _d('Left and right have the same key');
            $syncer->same_row($lr, $rr);
            $self->{same_row}->($lr, $rr) if $self->{same_row};
            $lr = $rr = undef; # Fetch another row from each side.
         }
         elsif ( !$rr || ( defined $cmp && $cmp < 0 ) ) {
            MKDEBUG && _d('Left is not in right');
            $syncer->not_in_right($lr);
            $self->{not_in_right}->($lr) if $self->{not_in_right};
            $lr = undef;
         }
         else {
            MKDEBUG && _d('Right is not in left');
            $syncer->not_in_left($rr);
            $self->{not_in_left}->($rr) if $self->{not_in_left};
            $rr = undef;
         }
      }
      $left_done = $right_done = 1 if $done && $done->($left, $right);
   } while ( !($left_done && $right_done) );
   MKDEBUG && _d('No more rows');
   $syncer->done_with_rows();
}

sub key_cmp {
   my ( $self, $lr, $rr, $key_cols, $tbl ) = @_;
   MKDEBUG && _d('Comparing keys using columns:', join(',', @$key_cols));
   my $callback = $self->{key_cmp};
   my $trf      = $self->{trf};
   foreach my $col ( @$key_cols ) {
      my $l = $lr->{$col};
      my $r = $rr->{$col};
      if ( !defined $l || !defined $r ) {
         MKDEBUG && _d($col, 'is not defined in both rows');
         return defined $l ? 1 : defined $r ? -1 : 0;
      }
      else {
         if ($tbl->{is_numeric}->{$col} ) {   # Numeric column
            MKDEBUG && _d($col, 'is numeric');
            ($l, $r) = $trf->($l, $r, $tbl, $col) if $trf;
            my $cmp = $l <=> $r;
            if ( $cmp ) {
               MKDEBUG && _d('Column', $col, 'differs:', $l, '!=', $r);
               $callback->($col, $l, $r) if $callback;
               return $cmp;
            }
         }
         elsif ( $l ne $r ) {
            my $cmp;
            my $coll = $tbl->{collation_for}->{$col};
            if ( $coll && ( $coll ne 'latin1_swedish_ci'
                           || $l =~ m/[^\040-\177]/ || $r =~ m/[^\040-\177]/) ) {
               MKDEBUG && _d('Comparing', $col, 'via MySQL');
               $cmp = $self->db_cmp($coll, $l, $r);
            }
            else {
               MKDEBUG && _d('Comparing', $col, 'in lowercase');
               $cmp = lc $l cmp lc $r;
            }
            if ( $cmp ) {
               MKDEBUG && _d('Column', $col, 'differs:', $l, 'ne', $r);
               $callback->($col, $l, $r) if $callback;
               return $cmp;
            }
         }
      }
   }
   return 0;
}

sub db_cmp {
   my ( $self, $collation, $l, $r ) = @_;
   if ( !$self->{sth}->{$collation} ) {
      if ( !$self->{charset_for} ) {
         MKDEBUG && _d('Fetching collations from MySQL');
         my @collations = @{$self->{dbh}->selectall_arrayref(
            'SHOW COLLATION', {Slice => { collation => 1, charset => 1 }})};
         foreach my $collation ( @collations ) {
            $self->{charset_for}->{$collation->{collation}}
               = $collation->{charset};
         }
      }
      my $sql = "SELECT STRCMP(_$self->{charset_for}->{$collation}? COLLATE $collation, "
         . "_$self->{charset_for}->{$collation}? COLLATE $collation) AS res";
      MKDEBUG && _d($sql);
      $self->{sth}->{$collation} = $self->{dbh}->prepare($sql);
   }
   my $sth = $self->{sth}->{$collation};
   $sth->execute($l, $r);
   return $sth->fetchall_arrayref()->[0]->[0];
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End RowDiff package
# ###########################################################################

# ###########################################################################
# TableChunker package 3186
# ###########################################################################
use strict;
use warnings FATAL => 'all';

package TableChunker;

use English qw(-no_match_vars);
use POSIX qw(ceil);
use List::Util qw(min max);
use Data::Dumper;
$Data::Dumper::Quotekeys = 0;
$Data::Dumper::Indent    = 0;

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   die "I need a quoter" unless $args{quoter};
   bless { %args }, $class;
}

my $EPOCH      = '1970-01-01';
my %int_types  = map { $_ => 1 }
   qw( bigint date datetime int mediumint smallint time timestamp tinyint year );
my %real_types = map { $_ => 1 }
   qw( decimal double float );

sub find_chunk_columns {
   my ( $self, $table, $opts ) = @_;
   $opts ||= {};

   my %prefer;
   if ( $opts->{possible_keys} && @{$opts->{possible_keys}} ) {
      my $i = 1;
      %prefer = map { $_ => $i++ } @{$opts->{possible_keys}};
      MKDEBUG && _d('Preferred indexes for chunking:',
         join(', ', @{$opts->{possible_keys}}));
   }

   my @possible_keys;
   KEY:
   foreach my $key ( values %{ $table->{keys} } ) {

      next unless $key->{type} eq 'BTREE';

      defined $_ && next KEY for @{ $key->{col_prefixes} };

      if ( $opts->{exact} ) {
         next unless $key->{is_unique} && @{$key->{cols}} == 1;
      }

      push @possible_keys, $key;
   }

   @possible_keys = sort {
      ($prefer{$a->{name}} || 9999) <=> ($prefer{$b->{name}} || 9999)
   } @possible_keys;

   MKDEBUG && _d('Possible keys in order:',
      join(', ', map { $_->{name} } @possible_keys));

   my $can_chunk_exact = 0;
   my @candidate_cols;
   foreach my $key ( @possible_keys ) { 
      my $col = $key->{cols}->[0];

      next unless ( $int_types{$table->{type_for}->{$col}}
                    || $real_types{$table->{type_for}->{$col}} );

      push @candidate_cols, { column => $col, index => $key->{name} };
   }

   $can_chunk_exact = 1 if ( $opts->{exact} && scalar @candidate_cols );

   if ( MKDEBUG ) {
      my $chunk_type = $opts->{exact} ? 'Exact' : 'Inexact';
      _d($chunk_type, 'chunkable:',
         join(', ', map { "$_->{column} on $_->{index}" } @candidate_cols));
   }

   my @result;
   if ( !%prefer ) {
      MKDEBUG && _d('Ordering columns by order in tbl, PK first');
      if ( $table->{keys}->{PRIMARY} ) {
         my $pk_first_col = $table->{keys}->{PRIMARY}->{cols}->[0];
         @result = grep { $_->{column} eq $pk_first_col } @candidate_cols;
         @candidate_cols = grep { $_->{column} ne $pk_first_col } @candidate_cols;
      }
      my $i = 0;
      my %col_pos = map { $_ => $i++ } @{$table->{cols}};
      push @result, sort { $col_pos{$a->{column}} <=> $col_pos{$b->{column}} }
                       @candidate_cols;
   }
   else {
      @result = @candidate_cols;
   }

   if ( MKDEBUG ) {
      _d('Chunkable columns:',
         join(', ', map { "$_->{column} on $_->{index}" } @result));
      _d('Can chunk exactly:', $can_chunk_exact);
   }

   return ($can_chunk_exact, \@result);
}

sub calculate_chunks {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(table col min max rows_in_range size dbh) ) {
      die "Required argument $arg not given or undefined"
         unless defined $args{$arg};
   }
   MKDEBUG && _d('Arguments:',
      join(', ',
         map { "$_=" . (defined $args{$_} ? $args{$_} : 'undef') } keys %args));

   my @chunks;
   my ($range_func, $start_point, $end_point);
   my $col_type = $args{table}->{type_for}->{$args{col}};
   MKDEBUG && _d('Chunking on', $args{col}, '(',$col_type,')');


   if ( $col_type =~ m/(?:int|year|float|double|decimal)$/ ) {
      $start_point = $args{min};
      $end_point   = $args{max};
      $range_func  = 'range_num';
   }
   elsif ( $col_type eq 'timestamp' ) {
      my $sql = "SELECT UNIX_TIMESTAMP('$args{min}'), UNIX_TIMESTAMP('$args{max}')";
      MKDEBUG && _d($sql);
      ($start_point, $end_point) = $args{dbh}->selectrow_array($sql);
      $range_func  = 'range_timestamp';
   }
   elsif ( $col_type eq 'date' ) {
      my $sql = "SELECT TO_DAYS('$args{min}'), TO_DAYS('$args{max}')";
      MKDEBUG && _d($sql);
      ($start_point, $end_point) = $args{dbh}->selectrow_array($sql);
      $range_func  = 'range_date';
   }
   elsif ( $col_type eq 'time' ) {
      my $sql = "SELECT TIME_TO_SEC('$args{min}'), TIME_TO_SEC('$args{max}')";
      MKDEBUG && _d($sql);
      ($start_point, $end_point) = $args{dbh}->selectrow_array($sql);
      $range_func  = 'range_time';
   }
   elsif ( $col_type eq 'datetime' ) {
      $start_point = $self->timestampdiff($args{dbh}, $args{min});
      $end_point   = $self->timestampdiff($args{dbh}, $args{max});
      $range_func  = 'range_datetime';
   }
   else {
      die "I don't know how to chunk $col_type\n";
   }

   if ( !defined $start_point ) {
      MKDEBUG && _d('Start point is undefined');
      $start_point = 0;
   }
   if ( !defined $end_point || $end_point < $start_point ) {
      MKDEBUG && _d('End point is undefined or before start point');
      $end_point = 0;
   }
   MKDEBUG && _d('Start and end of chunk range:',$start_point,',', $end_point);

   my $interval = $args{size} * ($end_point - $start_point) / $args{rows_in_range};
   if ( $int_types{$col_type} ) {
      $interval = ceil($interval);
   }
   $interval ||= $args{size};
   if ( $args{exact} ) {
      $interval = $args{size};
   }
   MKDEBUG && _d('Chunk interval:', $interval, 'units');

   my $col = "`$args{col}`";
   if ( $start_point < $end_point ) {
      my ( $beg, $end );
      my $iter = 0;
      for ( my $i = $start_point; $i < $end_point; $i += $interval ) {
         ( $beg, $end ) = $self->$range_func($args{dbh}, $i, $interval, $end_point);

         if ( $iter++ == 0 ) {
            push @chunks, "$col < " . $self->quote($end);
         }
         else {
            push @chunks, "$col >= " . $self->quote($beg) . " AND $col < " . $self->quote($end);
         }
      }

      my $nullable = $args{table}->{is_nullable}->{$args{col}};
      pop @chunks;
      if ( @chunks ) {
         push @chunks, "$col >= " . $self->quote($beg);
      }
      else {
         push @chunks, $nullable ? "$col IS NOT NULL" : '1=1';
      }
      if ( $nullable ) {
         push @chunks, "$col IS NULL";
      }

   }
   else {
      push @chunks, '1=1';
   }

   return @chunks;
}

sub get_first_chunkable_column {
   my ( $self, $table, $opts ) = @_;
   my ($exact, $cols) = $self->find_chunk_columns($table, $opts);
   return ( $cols->[0]->{column}, $cols->[0]->{index} );
}

sub size_to_rows {
   my ( $self, $dbh, $db, $tbl, $size, $dumper ) = @_;
  
   my ( $num, $suffix ) = $size =~ m/^(\d+)([MGk])?$/;
   if ( $suffix ) { # Convert to bytes.
      $size = $suffix eq 'k' ? $num * 1_024
            : $suffix eq 'M' ? $num * 1_024 * 1_024
            :                  $num * 1_024 * 1_024 * 1_024;
   }
   elsif ( $num ) {
      return $num;
   }
   else {
      die "Invalid size spec $size; must be an integer with optional suffix kMG";
   }

   my @status = $dumper->get_table_status($dbh, $self->{quoter}, $db);
   my ($status) = grep { $_->{name} eq $tbl } @status;
   my $avg_row_length = $status->{avg_row_length};
   return $avg_row_length ? ceil($size / $avg_row_length) : undef;
}

sub get_range_statistics {
   my ( $self, $dbh, $db, $tbl, $col, $where ) = @_;
   my $q = $self->{quoter};
   my $sql = "SELECT MIN(" . $q->quote($col) . "), MAX(" . $q->quote($col)
      . ") FROM " . $q->quote($db, $tbl)
      . ($where ? " WHERE $where" : '');
   MKDEBUG && _d($sql);
   my ( $min, $max );
   eval {
      ( $min, $max ) = $dbh->selectrow_array($sql);
   };
   if ( $EVAL_ERROR ) {
      chomp $EVAL_ERROR;
      if ( $EVAL_ERROR =~ m/in your SQL syntax/ ) {
         die "$EVAL_ERROR (WHERE clause: $where)";
      }
      else {
         die $EVAL_ERROR;
      }
   }
   $sql = "EXPLAIN SELECT * FROM " . $q->quote($db, $tbl)
      . ($where ? " WHERE $where" : '');
   MKDEBUG && _d($sql);
   my $expl = $dbh->selectrow_hashref($sql);
   return (
      min           => $min,
      max           => $max,
      rows_in_range => $expl->{rows},
   );
}

sub quote {
   my ( $self, $val ) = @_;
   return $val =~ m/\d[:-]/ ? qq{"$val"} : $val;
}

sub inject_chunks {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(database table chunks chunk_num query) ) {
      die "$arg is required" unless defined $args{$arg};
   }
   MKDEBUG && _d('Injecting chunk', $args{chunk_num});
   my $comment = sprintf("/*%s.%s:%d/%d*/",
      $args{database}, $args{table},
      $args{chunk_num} + 1, scalar @{$args{chunks}});
   $args{query} =~ s!/\*PROGRESS_COMMENT\*/!$comment!;
   my $where = "WHERE (" . $args{chunks}->[$args{chunk_num}] . ')';
   if ( $args{where} && grep { $_ } @{$args{where}} ) {
      $where .= " AND ("
         . join(" AND ", map { "($_)" } grep { $_ } @{$args{where}} )
         . ")";
   }
   my $db_tbl     = $self->{quoter}->quote(@args{qw(database table)});
   my $index_hint = defined $args{index_hint}
                    ? "USE INDEX (`$args{index_hint}`)"
                    : '';
   MKDEBUG && _d('Parameters:',
      Dumper({WHERE => $where, DB_TBL => $db_tbl, INDEX_HINT => $index_hint}));
   $args{query} =~ s!/\*WHERE\*/! $where!;
   $args{query} =~ s!/\*DB_TBL\*/!$db_tbl!;
   $args{query} =~ s!/\*INDEX_HINT\*/! $index_hint!;
   $args{query} =~ s!/\*CHUNK_NUM\*/! $args{chunk_num} AS chunk_num,!;
   return $args{query};
}

sub range_num {
   my ( $self, $dbh, $start, $interval, $max ) = @_;
   my $end = min($max, $start + $interval);


   $start = sprintf('%.17f', $start) if $start =~ /e/;
   $end   = sprintf('%.17f', $end)   if $end   =~ /e/;

   $start =~ s/\.(\d{5}).*$/.$1/;
   $end   =~ s/\.(\d{5}).*$/.$1/;

   if ( $end > $start ) {
      return ( $start, $end );
   }
   else {
      die "Chunk size is too small: $end !> $start\n";
   }
}

sub range_time {
   my ( $self, $dbh, $start, $interval, $max ) = @_;
   my $sql = "SELECT SEC_TO_TIME($start), SEC_TO_TIME(LEAST($max, $start + $interval))";
   MKDEBUG && _d($sql);
   return $dbh->selectrow_array($sql);
}

sub range_date {
   my ( $self, $dbh, $start, $interval, $max ) = @_;
   my $sql = "SELECT FROM_DAYS($start), FROM_DAYS(LEAST($max, $start + $interval))";
   MKDEBUG && _d($sql);
   return $dbh->selectrow_array($sql);
}

sub range_datetime {
   my ( $self, $dbh, $start, $interval, $max ) = @_;
   my $sql = "SELECT DATE_ADD('$EPOCH', INTERVAL $start SECOND), "
       . "DATE_ADD('$EPOCH', INTERVAL LEAST($max, $start + $interval) SECOND)";
   MKDEBUG && _d($sql);
   return $dbh->selectrow_array($sql);
}

sub range_timestamp {
   my ( $self, $dbh, $start, $interval, $max ) = @_;
   my $sql = "SELECT FROM_UNIXTIME($start), FROM_UNIXTIME(LEAST($max, $start + $interval))";
   MKDEBUG && _d($sql);
   return $dbh->selectrow_array($sql);
}

sub timestampdiff {
   my ( $self, $dbh, $time ) = @_;
   my $sql = "SELECT (COALESCE(TO_DAYS('$time'), 0) * 86400 + TIME_TO_SEC('$time')) "
      . "- TO_DAYS('$EPOCH 00:00:00') * 86400";
   MKDEBUG && _d($sql);
   my ( $diff ) = $dbh->selectrow_array($sql);
   $sql = "SELECT DATE_ADD('$EPOCH', INTERVAL $diff SECOND)";
   MKDEBUG && _d($sql);
   my ( $check ) = $dbh->selectrow_array($sql);
   die <<"   EOF"
   Incorrect datetime math: given $time, calculated $diff but checked to $check.
   This is probably because you are using a version of MySQL that overflows on
   large interval values to DATE_ADD().  If not, please report this as a bug.
   EOF
      unless $check eq $time;
   return $diff;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End TableChunker package
# ###########################################################################

# ###########################################################################
# TableNibbler package 4610
# ###########################################################################
package TableNibbler;

use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   return bless {}, shift;
}

sub generate_asc_stmt {
   my ( $self, %args ) = @_;

   my $tbl  = $args{tbl};
   my @cols = $args{cols} ? @{$args{cols}} : @{$tbl->{cols}};
   my $q    = $args{quoter};

   my @asc_cols;
   my @asc_slice;

   my $index = $args{parser}->find_best_index($tbl, $args{index});
   die "Cannot find an ascendable index in table" unless $index;

   @asc_cols = @{$tbl->{keys}->{$index}->{cols}};
   MKDEBUG && _d('Will ascend index', $index);
   MKDEBUG && _d('Will ascend columns', join(', ', @asc_cols));
   if ( $args{ascfirst} ) {
      @asc_cols = $asc_cols[0];
      MKDEBUG && _d('Ascending only first column');
   }

   my %col_posn = do { my $i = 0; map { $_ => $i++ } @cols };
   foreach my $col ( @asc_cols ) {
      if ( !exists $col_posn{$col} ) {
         push @cols, $col;
         $col_posn{$col} = $#cols;
      }
      push @asc_slice, $col_posn{$col};
   }
   MKDEBUG && _d('Will ascend, in ordinal position:', join(', ', @asc_slice));

   my $asc_stmt = {
      cols  => \@cols,
      index => $index,
      where => '',
      slice => [],
      scols => [],
   };

   if ( @asc_slice ) {
      my $cmp_where;
      foreach my $cmp ( qw(< <= >= >) ) {
         $cmp_where = $self->generate_cmp_where(
            type        => $cmp,
            slice       => \@asc_slice,
            cols        => \@cols,
            quoter      => $q,
            is_nullable => $tbl->{is_nullable},
         );
         $asc_stmt->{boundaries}->{$cmp} = $cmp_where->{where};
      }
      my $cmp = $args{asconly} ? '>' : '>=';
      $asc_stmt->{where} = $asc_stmt->{boundaries}->{$cmp};
      $asc_stmt->{slice} = $cmp_where->{slice};
      $asc_stmt->{scols} = $cmp_where->{scols};
   }

   return $asc_stmt;
}

sub generate_cmp_where {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(type slice cols quoter is_nullable) ) {
      die "I need a $arg arg" unless defined $args{$arg};
   }

   my @slice       = @{$args{slice}};
   my @cols        = @{$args{cols}};
   my $q           = $args{quoter};
   my $is_nullable = $args{is_nullable};
   my $type        = $args{type};

   (my $cmp = $type) =~ s/=//;

   my @r_slice;    # Resulting slice columns, by ordinal
   my @r_scols;    # Ditto, by name

   my @clauses;
   foreach my $i ( 0 .. $#slice ) {
      my @clause;

      foreach my $j ( 0 .. $i - 1 ) {
         my $ord = $slice[$j];
         my $col = $cols[$ord];
         my $quo = $q->quote($col);
         if ( $is_nullable->{$col} ) {
            push @clause, "((? IS NULL AND $quo IS NULL) OR ($quo = ?))";
            push @r_slice, $ord, $ord;
            push @r_scols, $col, $col;
         }
         else {
            push @clause, "$quo = ?";
            push @r_slice, $ord;
            push @r_scols, $col;
         }
      }

      my $ord = $slice[$i];
      my $col = $cols[$ord];
      my $quo = $q->quote($col);
      my $end = $i == $#slice; # Last clause of the whole group.
      if ( $is_nullable->{$col} ) {
         if ( $type =~ m/=/ && $end ) {
            push @clause, "(? IS NULL OR $quo $type ?)";
         }
         elsif ( $type =~ m/>/ ) {
            push @clause, "((? IS NULL AND $quo IS NOT NULL) OR ($quo $cmp ?))";
         }
         else { # If $type =~ m/</ ) {
            push @clause, "((? IS NOT NULL AND $quo IS NULL) OR ($quo $cmp ?))";
         }
         push @r_slice, $ord, $ord;
         push @r_scols, $col, $col;
      }
      else {
         push @r_slice, $ord;
         push @r_scols, $col;
         push @clause, ($type =~ m/=/ && $end ? "$quo $type ?" : "$quo $cmp ?");
      }

      push @clauses, '(' . join(' AND ', @clause) . ')';
   }
   my $result = '(' . join(' OR ', @clauses) . ')';
   my $where = {
      slice => \@r_slice,
      scols => \@r_scols,
      where => $result,
   };
   return $where;
}

sub generate_del_stmt {
   my ( $self, %args ) = @_;

   my $tbl  = $args{tbl};
   my @cols = $args{cols} ? @{$args{cols}} : ();
   my $q    = $args{quoter};

   my @del_cols;
   my @del_slice;

   my $index = $args{parser}->find_best_index($tbl, $args{index});
   die "Cannot find an ascendable index in table" unless $index;

   if ( $index ) {
      @del_cols = @{$tbl->{keys}->{$index}->{cols}};
   }
   else {
      @del_cols = @{$tbl->{cols}};
   }
   MKDEBUG && _d('Columns needed for DELETE:', join(', ', @del_cols));

   my %col_posn = do { my $i = 0; map { $_ => $i++ } @cols };
   foreach my $col ( @del_cols ) {
      if ( !exists $col_posn{$col} ) {
         push @cols, $col;
         $col_posn{$col} = $#cols;
      }
      push @del_slice, $col_posn{$col};
   }
   MKDEBUG && _d('Ordinals needed for DELETE:', join(', ', @del_slice));

   my $del_stmt = {
      cols  => \@cols,
      index => $index,
      where => '',
      slice => [],
      scols => [],
   };

   my @clauses;
   foreach my $i ( 0 .. $#del_slice ) {
      my $ord = $del_slice[$i];
      my $col = $cols[$ord];
      my $quo = $q->quote($col);
      if ( $tbl->{is_nullable}->{$col} ) {
         push @clauses, "((? IS NULL AND $quo IS NULL) OR ($quo = ?))";
         push @{$del_stmt->{slice}}, $ord, $ord;
         push @{$del_stmt->{scols}}, $col, $col;
      }
      else {
         push @clauses, "$quo = ?";
         push @{$del_stmt->{slice}}, $ord;
         push @{$del_stmt->{scols}}, $col;
      }
   }

   $del_stmt->{where} = '(' . join(' AND ', @clauses) . ')';

   return $del_stmt;
}

sub generate_ins_stmt {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(ins_tbl sel_cols) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $ins_tbl  = $args{ins_tbl};
   my @sel_cols = @{$args{sel_cols}};

   die "You didn't specify any SELECT columns" unless @sel_cols;

   my @ins_cols;
   my @ins_slice;
   for my $i ( 0..$#sel_cols ) {
      next unless $ins_tbl->{is_col}->{$sel_cols[$i]};
      push @ins_cols, $sel_cols[$i];
      push @ins_slice, $i;
   }

   return {
      cols  => \@ins_cols,
      slice => \@ins_slice,
   };
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End TableNibbler package
# ###########################################################################

# ###########################################################################
# TableChecksum package 4508
# ###########################################################################
use strict;
use warnings FATAL => 'all';

package TableChecksum;

use English qw(-no_match_vars);
use List::Util qw(max);

use constant MKDEBUG => $ENV{MKDEBUG};

our %ALGOS = (
   CHECKSUM => { pref => 0, hash => 0 },
   BIT_XOR  => { pref => 2, hash => 1 },
   ACCUM    => { pref => 3, hash => 1 },
);

sub new {
   bless {}, shift;
}

sub crc32 {
   my ( $self, $string ) = @_;
   my $poly = 0xEDB88320;
   my $crc  = 0xFFFFFFFF;
   foreach my $char ( split(//, $string) ) {
      my $comp = ($crc ^ ord($char)) & 0xFF;
      for ( 1 .. 8 ) {
         $comp = $comp & 1 ? $poly ^ ($comp >> 1) : $comp >> 1;
      }
      $crc = (($crc >> 8) & 0x00FFFFFF) ^ $comp;
   }
   return $crc ^ 0xFFFFFFFF;
}

sub get_crc_wid {
   my ( $self, $dbh, $func ) = @_;
   my $crc_wid = 16;
   if ( uc $func ne 'FNV_64' && uc $func ne 'FNV1A_64' ) {
      eval {
         my ($val) = $dbh->selectrow_array("SELECT $func('a')");
         $crc_wid = max(16, length($val));
      };
   }
   return $crc_wid;
}

sub get_crc_type {
   my ( $self, $dbh, $func ) = @_;
   my $type   = '';
   my $length = 0;
   my $sql    = "SELECT $func('a')";
   my $sth    = $dbh->prepare($sql);
   eval {
      $sth->execute();
      $type   = $sth->{mysql_type_name}->[0];
      $length = $sth->{mysql_length}->[0];
      MKDEBUG && _d($sql, $type, $length);
      if ( $type eq 'bigint' && $length < 20 ) {
         $type = 'int';
      }
   };
   $sth->finish;
   MKDEBUG && _d('crc_type:', $type, 'length:', $length);
   return ($type, $length);
}

sub best_algorithm {
   my ( $self, %args ) = @_;
   my ($alg, $vp, $dbh) = @args{ qw(algorithm vp dbh) };
   my @choices = sort { $ALGOS{$a}->{pref} <=> $ALGOS{$b}->{pref} } keys %ALGOS;
   die "Invalid checksum algorithm $alg"
      if $alg && !$ALGOS{$alg};

   if (
      $args{where} || $args{chunk}        # CHECKSUM does whole table
      || $args{replicate}                 # CHECKSUM can't do INSERT.. SELECT
      || !$vp->version_ge($dbh, '4.1.1')) # CHECKSUM doesn't exist
   {
      MKDEBUG && _d('Cannot use CHECKSUM algorithm');
      @choices = grep { $_ ne 'CHECKSUM' } @choices;
   }

   if ( !$vp->version_ge($dbh, '4.1.1') ) {
      MKDEBUG && _d('Cannot use BIT_XOR algorithm because MySQL < 4.1.1');
      @choices = grep { $_ ne 'BIT_XOR' } @choices;
   }

   if ( $alg && grep { $_ eq $alg } @choices ) {
      MKDEBUG && _d('User requested', $alg, 'algorithm');
      return $alg;
   }

   if ( $args{count} && grep { $_ ne 'CHECKSUM' } @choices ) {
      MKDEBUG && _d('Not using CHECKSUM algorithm because COUNT desired');
      @choices = grep { $_ ne 'CHECKSUM' } @choices;
   }

   MKDEBUG && _d('Algorithms, in order:', @choices);
   return $choices[0];
}

sub is_hash_algorithm {
   my ( $self, $algorithm ) = @_;
   return $ALGOS{$algorithm} && $ALGOS{$algorithm}->{hash};
}

sub choose_hash_func {
   my ( $self, %args ) = @_;
   my @funcs = qw(CRC32 FNV1A_64 FNV_64 MD5 SHA1);
   if ( $args{func} ) {
      unshift @funcs, $args{func};
   }
   my ($result, $error);
   do {
      my $func;
      eval {
         $func = shift(@funcs);
         my $sql = "SELECT $func('test-string')";
         MKDEBUG && _d($sql);
         $args{dbh}->do($sql);
         $result = $func;
      };
      if ( $EVAL_ERROR && $EVAL_ERROR =~ m/failed: (.*?) at \S+ line/ ) {
         $error .= qq{$func cannot be used because "$1"\n};
         MKDEBUG && _d($func, 'cannot be used because', $1);
      }
   } while ( @funcs && !$result );

   die $error unless $result;
   MKDEBUG && _d('Chosen hash func:', $result);
   return $result;
}

sub optimize_xor {
   my ( $self, %args ) = @_;
   my ( $dbh, $func ) = @args{qw(dbh func)};

   die "$func never needs the BIT_XOR optimization"
      if $func =~ m/^(?:FNV1A_64|FNV_64|CRC32)$/i;

   my $opt_slice = 0;
   my $unsliced  = uc $dbh->selectall_arrayref("SELECT $func('a')")->[0]->[0];
   my $sliced    = '';
   my $start     = 1;
   my $crc_wid   = length($unsliced) < 16 ? 16 : length($unsliced);

   do { # Try different positions till sliced result equals non-sliced.
      MKDEBUG && _d('Trying slice', $opt_slice);
      $dbh->do('SET @crc := "", @cnt := 0');
      my $slices = $self->make_xor_slices(
         query     => "\@crc := $func('a')",
         crc_wid   => $crc_wid,
         opt_slice => $opt_slice,
      );

      my $sql = "SELECT CONCAT($slices) AS TEST FROM (SELECT NULL) AS x";
      $sliced = ($dbh->selectrow_array($sql))[0];
      if ( $sliced ne $unsliced ) {
         MKDEBUG && _d('Slice', $opt_slice, 'does not work');
         $start += 16;
         ++$opt_slice;
      }
   } while ( $start < $crc_wid && $sliced ne $unsliced );

   if ( $sliced eq $unsliced ) {
      MKDEBUG && _d('Slice', $opt_slice, 'works');
      return $opt_slice;
   }
   else {
      MKDEBUG && _d('No slice works');
      return undef;
   }
}

sub make_xor_slices {
   my ( $self, %args ) = @_;
   my ( $query, $crc_wid, $opt_slice )
      = @args{qw(query crc_wid opt_slice)};

   my @slices;
   for ( my $start = 1; $start <= $crc_wid; $start += 16 ) {
      my $len = $crc_wid - $start + 1;
      if ( $len > 16 ) {
         $len = 16;
      }
      push @slices,
         "LPAD(CONV(BIT_XOR("
         . "CAST(CONV(SUBSTRING(\@crc, $start, $len), 16, 10) AS UNSIGNED))"
         . ", 10, 16), $len, '0')";
   }

   if ( defined $opt_slice && $opt_slice < @slices ) {
      $slices[$opt_slice] =~ s/\@crc/\@crc := $query/;
   }
   else {
      map { s/\@crc/$query/ } @slices;
   }

   return join(', ', @slices);
}

sub make_row_checksum {
   my ( $self, %args ) = @_;
   my ( $table, $quoter, $func )
      = @args{ qw(table quoter func) };

   my $sep = $args{sep} || '#';
   $sep =~ s/'//g;
   $sep ||= '#';

   my %ignorecols = map { $_ => 1 } @{$args{ignorecols}};

   my %cols = map { lc($_) => 1 }
              grep { !exists $ignorecols{$_} }
              ($args{cols} ? @{$args{cols}} : @{$table->{cols}});
   my @cols =
      map {
         my $type = $table->{type_for}->{$_};
         my $result = $quoter->quote($_);
         if ( $type eq 'timestamp' ) {
            $result .= ' + 0';
         }
         elsif ( $type =~ m/float|double/ && $args{precision} ) {
            $result = "ROUND($result, $args{precision})";
         }
         elsif ( $type =~ m/varchar/ && $args{trim} ) {
            $result = "TRIM($result)";
         }
         $result;
      }
      grep {
         $cols{$_}
      }
      @{$table->{cols}};

   my $query;
   if ( uc $func ne 'FNV_64' && uc $func ne 'FNV1A_64' ) {
      my @nulls = grep { $cols{$_} } @{$table->{null_cols}};
      if ( @nulls ) {
         my $bitmap = "CONCAT("
            . join(', ', map { 'ISNULL(' . $quoter->quote($_) . ')' } @nulls)
            . ")";
         push @cols, $bitmap;
      }

      $query = @cols > 1
             ? "$func(CONCAT_WS('$sep', " . join(', ', @cols) . '))'
             : "$func($cols[0])";
   }
   else {
      my $fnv_func = uc $func;
      $query = "$fnv_func(" . join(', ', @cols) . ')';
   }

   return $query;
}

sub make_checksum_query {
   my ( $self, %args ) = @_;
   my @arg_names = qw(dbname tblname table quoter algorithm
        func crc_wid crc_type opt_slice);
   foreach my $arg( @arg_names ) {
      die "You must specify argument $arg" unless exists $args{$arg};
   }
   my ( $dbname, $tblname, $table, $quoter, $algorithm,
        $func, $crc_wid, $crc_type, $opt_slice ) = @args{ @arg_names };
   die "Invalid or missing checksum algorithm"
      unless $algorithm && $ALGOS{$algorithm};

   my $result;

   if ( $algorithm eq 'CHECKSUM' ) {
      return "CHECKSUM TABLE " . $quoter->quote($dbname, $tblname);
   }

   my $expr = $self->make_row_checksum(%args);

   if ( $algorithm eq 'BIT_XOR' ) {
      if ( $crc_type =~ m/int$/ ) {
         $result = "LOWER(CONV(BIT_XOR(CAST($expr AS UNSIGNED)), 10, 16)) AS crc ";
      }
      else {
         my $slices = $self->make_xor_slices( query => $expr, %args );
         $result = "LOWER(CONCAT($slices)) AS crc ";
      }
   }
   else {
      if ( $crc_type =~ m/int$/ ) {
         $result = "RIGHT(MAX("
            . "\@crc := CONCAT(LPAD(\@cnt := \@cnt + 1, 16, '0'), "
            . "CONV(CAST($func(CONCAT(\@crc, $expr)) AS UNSIGNED), 10, 16))"
            . "), $crc_wid) AS crc ";
      }
      else {
         $result = "RIGHT(MAX("
            . "\@crc := CONCAT(LPAD(\@cnt := \@cnt + 1, 16, '0'), "
            . "$func(CONCAT(\@crc, $expr)))"
            . "), $crc_wid) AS crc ";
      }
   }
   if ( $args{replicate} ) {
      $result = "REPLACE /*PROGRESS_COMMENT*/ INTO $args{replicate} "
         . "(db, tbl, chunk, boundaries, this_cnt, this_crc) "
         . "SELECT ?, ?, /*CHUNK_NUM*/ ?, COUNT(*) AS cnt, $result";
   }
   else {
      $result = "SELECT "
         . ($args{buffer} ? 'SQL_BUFFER_RESULT ' : '')
         . "/*PROGRESS_COMMENT*//*CHUNK_NUM*/ COUNT(*) AS cnt, $result";
   }
   return $result . "FROM /*DB_TBL*//*INDEX_HINT*//*WHERE*/";
}

sub find_replication_differences {
   my ( $self, $dbh, $table ) = @_;

   (my $sql = <<"   EOF") =~ s/\s+/ /gm;
      SELECT db, tbl, chunk, boundaries,
         COALESCE(this_cnt-master_cnt, 0) AS cnt_diff,
         COALESCE(
            this_crc <> master_crc OR ISNULL(master_crc) <> ISNULL(this_crc),
            0
         ) AS crc_diff,
         this_cnt, master_cnt, this_crc, master_crc
      FROM $table
      WHERE master_cnt <> this_cnt OR master_crc <> this_crc
      OR ISNULL(master_crc) <> ISNULL(this_crc)
   EOF

   MKDEBUG && _d($sql);
   my $diffs = $dbh->selectall_arrayref($sql, { Slice => {} });
   return @$diffs;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End TableChecksum package
# ###########################################################################

# ###########################################################################
# TableSyncer package 4590
# ###########################################################################
package TableSyncer;

use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

our %ALGOS = map { lc $_ => $_ } qw(Stream Chunk Nibble GroupBy);

sub new {
   bless {}, shift;
}

sub best_algorithm {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(tbl_struct parser nibbler chunker) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $result;

   my ($exact, $cols) = $args{chunker}
      ->find_chunk_columns($args{tbl_struct}, { exact => 1 });
   if ( $exact ) {
      MKDEBUG && _d('Chunker says', $cols->[0], 'supports chunking exactly');
      $result = 'Chunk';
   }
   else {
      my ($idx) = $args{parser}->find_best_index($args{tbl_struct});
      if ( $idx ) {
         MKDEBUG && _d('Parser found best index', $idx, 'so Nibbler will work');
         $result = 'Nibble';
      }
      else {
         MKDEBUG && _d('No primary or unique non-null key in table');
         $result = 'GroupBy';
      }
   }
   MKDEBUG && _d('Algorithm:', $result);
   return $result;
}

sub sync_table {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(
      buffer checksum chunker chunksize dst_db dst_dbh dst_tbl execute lock
      misc_dbh quoter replace replicate src_db src_dbh src_tbl test tbl_struct
      timeoutok transaction versionparser wait where possible_keys cols
      nibbler parser master_slave func dumper trim skipslavecheck bufferinmysql) )
   {
      die "I need a $arg argument" unless defined $args{$arg};
   }
   MKDEBUG && _d('Syncing table with args',
      join(', ',
         map { "$_=" . (defined $args{$_} ? $args{$_} : 'undef') }
         sort keys %args));

   my $sleep = $args{sleep} || 0;

   my $can_replace
      = grep { $_->{is_unique} } values %{$args{tbl_struct}->{keys}};
   MKDEBUG && _d('This table\'s replace-ability:', $can_replace);
   my $use_replace = $args{replace} || $args{replicate};

   my $update_func;
   my $change_dbh;
   if ( $args{execute} ) {
      if ( $args{replicate} ) {
         $change_dbh = $args{src_dbh};
         $self->check_permissions(@args{qw(src_dbh src_db src_tbl quoter)});
         if ( !$can_replace ) {
            die "Can't make changes on the master: no unique index exists";
         }
      }
      else {
         $change_dbh = $args{dst_dbh};
         $self->check_permissions(@args{qw(dst_dbh dst_db dst_tbl quoter)});
         my $slave_status = $args{master_slave}->get_slave_status($change_dbh);
         my (undef, $log_bin) = $change_dbh->selectrow_array(
            'SHOW VARIABLES LIKE "log_bin"');
         my ($sql_log_bin) = $change_dbh->selectrow_array(
            'SELECT @@SQL_LOG_BIN');
         MKDEBUG && _d('Variables: log_bin=',
            (defined $log_bin ? $log_bin : 'NULL'),
            ' @@SQL_LOG_BIN=',
            (defined $sql_log_bin ? $sql_log_bin : 'NULL'));
         if ( !$args{skipslavecheck} && $slave_status && $sql_log_bin
            && ($log_bin || 'OFF') eq 'ON' )
         {
            die "Can't make changes on $change_dbh because it's a slave: see "
               . "the documentation section 'REPLICATION SAFETY' for solutions "
               . "to this problem.";
         }
      }
      MKDEBUG && _d('Will make changes via', $change_dbh);
      $update_func = sub {
         map {
            MKDEBUG && _d('About to execute:', $_);
            $change_dbh->do($_);
         } @_;
      };

      $args{src_dbh}->do("USE `$args{src_db}`");
      $args{dst_dbh}->do("USE `$args{dst_db}`");
   }

   my $ch = new ChangeHandler(
      queue     => $args{buffer} ? 0 : 1,
      quoter    => $args{quoter},
      database  => $args{dst_db},
      table     => $args{dst_tbl},
      sdatabase => $args{src_db},
      stable    => $args{src_tbl},
      replace   => $use_replace,
      actions   => [
         ( $update_func ? $update_func            : () ),
         ( $args{print}
            ? sub { print(@_, ";\n") or die "Cannot print: $OS_ERROR" }
            : () ),
      ],
   );
   my $rd = $args{RowDiff} || new RowDiff( dbh => $args{misc_dbh} );

   $args{algorithm} ||= $self->best_algorithm(
      map { $_ => $args{$_} } qw(tbl_struct parser nibbler chunker));

   if ( !$ALGOS{ lc $args{algorithm} } ) {
      die "No such algorithm $args{algorithm}; try one of "
         . join(', ', values %ALGOS) . "\n";
   }
   $args{algorithm} = $ALGOS{ lc $args{algorithm} };

   if ( $args{test} ) {
      return ($ch->get_changes(), ALGORITHM => $args{algorithm});
   }

   my $chunksize = $args{chunker}->size_to_rows(
         @args{qw(src_dbh src_db src_tbl chunksize dumper)}),

   my $class  = "TableSync$args{algorithm}";
   my $plugin = $class->new(
      handler   => $ch,
      cols      => $args{cols},
      dbh       => $args{src_dbh},
      database  => $args{src_db},
      dumper    => $args{dumper},
      table     => $args{src_tbl},
      chunker   => $args{chunker},
      nibbler   => $args{nibbler},
      parser    => $args{parser},
      struct    => $args{tbl_struct},
      checksum  => $args{checksum},
      vp        => $args{versionparser},
      quoter    => $args{quoter},
      chunksize => $chunksize,
      where     => $args{where},
      possible_keys => [],
      versionparser => $args{versionparser},
      func          => $args{func},
      trim          => $args{trim},
      bufferinmysql => $args{bufferinmysql},
   );

   $self->lock_and_wait(%args, lock_level => 2);

   my $cycle = 0;
   while ( !$plugin->done ) {

      MKDEBUG && _d('Beginning sync cycle', $cycle);
      my $src_sql = $plugin->get_sql(
         quoter     => $args{quoter},
         database   => $args{src_db},
         table      => $args{src_tbl},
         where      => $args{where},
         index_hint => $args{index_hint} ? $plugin->{index} : undef,
      );
      my $dst_sql = $plugin->get_sql(
         quoter     => $args{quoter},
         database   => $args{dst_db},
         table      => $args{dst_tbl},
         where      => $args{where},
         index_hint => $args{index_hint} ? $plugin->{index} : undef,
      );
      if ( $args{transaction} ) {
         if ( $change_dbh && $change_dbh eq $args{src_dbh} ) {
            $src_sql .= ' FOR UPDATE';
            $dst_sql .= ' LOCK IN SHARE MODE';
         }
         elsif ( $change_dbh ) {
            $src_sql .= ' LOCK IN SHARE MODE';
            $dst_sql .= ' FOR UPDATE';
         }
         else {
            $src_sql .= ' LOCK IN SHARE MODE';
            $dst_sql .= ' LOCK IN SHARE MODE';
         }
      }
      $plugin->prepare($args{src_dbh});
      $plugin->prepare($args{dst_dbh});
      MKDEBUG && _d('src:', $src_sql);
      MKDEBUG && _d('dst:', $dst_sql);
      my $src_sth = $args{src_dbh}
         ->prepare( $src_sql, { mysql_use_result => !$args{buffer} } );
      my $dst_sth = $args{dst_dbh}
         ->prepare( $dst_sql, { mysql_use_result => !$args{buffer} } );

      my $executed_src = 0;
      if ( !$cycle || !$plugin->pending_changes() ) {
         $executed_src
            = $self->lock_and_wait(%args, src_sth => $src_sth, lock_level => 1);
      }

      $src_sth->execute() unless $executed_src;
      $dst_sth->execute();

      $rd->compare_sets(
         left   => $src_sth,
         right  => $dst_sth,
         syncer => $plugin,
         tbl    => $args{tbl_struct},
      );
      MKDEBUG && _d('Finished sync cycle', $cycle);
      $ch->process_rows(1);

      $cycle++;

      sleep $sleep;
   }

   $ch->process_rows();

   $self->unlock(%args, lock_level => 2);

   return ($ch->get_changes(), ALGORITHM => $args{algorithm});
}

sub check_permissions {
   my ( $self, $dbh, $db, $tbl, $quoter ) = @_;
   my $db_tbl = $quoter->quote($db, $tbl);
   my $sql = "SHOW FULL COLUMNS FROM $db_tbl";
   MKDEBUG && _d('Permissions check:', $sql);
   my $cols = $dbh->selectall_arrayref($sql, {Slice => {}});
   my ($hdr_name) = grep { m/privileges/i } keys %{$cols->[0]};
   my $privs = $cols->[0]->{$hdr_name};
   die "$privs does not include all needed privileges for $db_tbl"
      unless $privs =~ m/select/ && $privs =~ m/insert/ && $privs =~ m/update/;
   $sql = "DELETE FROM $db_tbl LIMIT 0"; # FULL COLUMNS doesn't show all privs
   MKDEBUG && _d('Permissions check:', $sql);
   $dbh->do($sql);
}

sub lock_table {
   my ( $self, $dbh, $where, $db_tbl, $mode ) = @_;
   my $query = "LOCK TABLES $db_tbl $mode";
   MKDEBUG && _d($query);
   $dbh->do($query);
   MKDEBUG && _d('Acquired table lock on', $where, 'in', $mode, 'mode');
}

sub unlock {
   my ( $self, %args ) = @_;

   foreach my $arg ( qw(
      dst_db dst_dbh dst_tbl lock quoter replicate src_db src_dbh src_tbl
      timeoutok transaction wait lock_level) )
   {
      die "I need a $arg argument" unless defined $args{$arg};
   }

   return unless $args{lock} && $args{lock} <= $args{lock_level};

   foreach my $dbh( @args{qw(src_dbh dst_dbh)} ) {
      if ( $args{transaction} ) {
         MKDEBUG && _d('Committing', $dbh);
         $dbh->commit;
      }
      else {
         my $sql = 'UNLOCK TABLES';
         MKDEBUG && _d($dbh, $sql);
         $dbh->do($sql);
      }
   }
}

sub lock_and_wait {
   my ( $self, %args ) = @_;
   my $result = 0;

   foreach my $arg ( qw(
      dst_db dst_dbh dst_tbl lock quoter replicate src_db src_dbh src_tbl
      timeoutok transaction wait lock_level misc_dbh master_slave) )
   {
      die "I need a $arg argument" unless defined $args{$arg};
   }

   return unless $args{lock} && $args{lock} == $args{lock_level};

   foreach my $dbh( @args{qw(src_dbh dst_dbh)} ) {
      if ( $args{transaction} ) {
         MKDEBUG && _d('Committing', $dbh);
         $dbh->commit;
      }
      else {
         my $sql = 'UNLOCK TABLES';
         MKDEBUG && _d($dbh, $sql);
         $dbh->do($sql);
      }
   }

   if ( $args{lock} == 3 ) {
      my $sql = 'FLUSH TABLES WITH READ LOCK';
      MKDEBUG && _d($args{src_dbh}, ',', $sql);
      $args{src_dbh}->do($sql);
   }
   else {
      if ( $args{transaction} ) {
         if ( $args{src_sth} ) {
            MKDEBUG && _d('Executing statement on source to lock rows');
            $args{src_sth}->execute();
            $result = 1;
         }
      }
      else {
         $self->lock_table($args{src_dbh}, 'source',
            $args{quoter}->quote($args{src_db}, $args{src_tbl}),
            $args{replicate} ? 'WRITE' : 'READ');
      }
   }

   eval {
      if ( $args{wait} ) {
         $args{master_slave}->wait_for_master(
            $args{misc_dbh}, $args{dst_dbh}, $args{wait}, $args{timeoutok});
      }

      if ( $args{replicate} ) {
         MKDEBUG
            && _d('Not locking destination because syncing via replication');
      }
      else {
         if ( $args{lock} == 3 ) {
            my $sql = 'FLUSH TABLES WITH READ LOCK';
            MKDEBUG && _d($args{dst_dbh}, ',', $sql);
            $args{dst_dbh}->do($sql);
         }
         elsif ( !$args{transaction} ) {
            $self->lock_table($args{dst_dbh}, 'dest',
               $args{quoter}->quote($args{dst_db}, $args{dst_tbl}),
               $args{execute} ? 'WRITE' : 'READ');
         }
      }
   };

   if ( $EVAL_ERROR ) {
      if ( $args{src_sth}->{Active} ) {
         $args{src_sth}->finish();
      }
      foreach my $dbh ( @args{qw(src_dbh dst_dbh misc_dbh)} ) {
         next unless $dbh;
         MKDEBUG && _d('Caught error, unlocking/committing on', $dbh);
         $dbh->do('UNLOCK TABLES');
         $dbh->commit() unless $dbh->{AutoCommit};
      }
      die $EVAL_ERROR;
   }

   return $result;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End TableSyncer package
# ###########################################################################

# ###########################################################################
# TableSyncStream package 3186
# ###########################################################################
package TableSyncStream;

use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(handler cols) ) {
      die "I need a $arg argument" unless defined $args{$arg};
   }
   return bless { %args }, $class;
}

sub get_sql {
   my ( $self, %args ) = @_;
   return "SELECT "
      . ($self->{bufferinmysql} ? 'SQL_BUFFER_RESULT ' : '')
      . join(', ', map { $args{quoter}->quote($_) } @{$self->{cols}})
      . ' FROM ' . $args{quoter}->quote(@args{qw(database table)})
      . ' WHERE ' . ( $args{where} || '1=1' );
}

sub same_row {
   my ( $self, $lr, $rr ) = @_;
}

sub not_in_right {
   my ( $self, $lr ) = @_;
   $self->{handler}->change('INSERT', $lr, $self->key_cols());
}

sub not_in_left {
   my ( $self, $rr ) = @_;
   $self->{handler}->change('DELETE', $rr, $self->key_cols());
}

sub done_with_rows {
   my ( $self ) = @_;
   $self->{done} = 1;
}

sub done {
   my ( $self ) = @_;
   return $self->{done};
}

sub key_cols {
   my ( $self ) = @_;
   return $self->{cols};
}

sub prepare {
   my ( $self, $dbh ) = @_;
}

sub pending_changes {
   my ( $self ) = @_;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End TableSyncStream package
# ###########################################################################

# ###########################################################################
# TableSyncChunk package 4493
# ###########################################################################
package TableSyncChunk;

use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);
use List::Util qw(max);
use Data::Dumper;
$Data::Dumper::Indent    = 0;
$Data::Dumper::Quotekeys = 0;

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(dbh database table handler chunker quoter struct
                        checksum cols vp chunksize where possible_keys
                        dumper trim) ) {
      die "I need a $arg argument" unless defined $args{$arg};
   }

   $args{crc_col} = '__crc';
   while ( $args{struct}->{is_col}->{$args{crc_col}} ) {
      $args{crc_col} = "_$args{crc_col}"; # Prepend more _ until not a column.
   }
   MKDEBUG && _d('CRC column will be named', $args{crc_col});

   my @chunks;
   my ( $col, $idx ) = $args{chunker}->get_first_chunkable_column(
      $args{struct}, { possible_keys => $args{possible_keys} });
   $args{index} = $idx;
   if ( $col ) {
      my %params = $args{chunker}->get_range_statistics(
         $args{dbh}, $args{database}, $args{table}, $col,
         $args{where});
      if ( !grep { !defined $params{$_} }
            qw(min max rows_in_range) )
      {
         @chunks = $args{chunker}->calculate_chunks(
            dbh      => $args{dbh},
            table    => $args{struct},
            col      => $col,
            size     => $args{chunksize},
            %params,
         );
      }
      else {
         @chunks = '1=1';
      }
      $args{chunk_col} = $col;
   }
   die "Cannot chunk $args{database}.$args{table}" unless @chunks;
   $args{chunks}     = \@chunks;
   $args{chunk_num}  = 0;

   $args{algorithm} = $args{checksum}->best_algorithm(
      algorithm   => 'BIT_XOR',
      vp          => $args{vp},
      dbh         => $args{dbh},
      where       => 1,
      chunk       => 1,
      count       => 1,
   );
   $args{func} = $args{checksum}->choose_hash_func(
      func => $args{func},
      dbh  => $args{dbh},
   );
   $args{crc_wid}    = $args{checksum}->get_crc_wid($args{dbh}, $args{func});
   ($args{crc_type}) = $args{checksum}->get_crc_type($args{dbh}, $args{func});
   if ( $args{algorithm} eq 'BIT_XOR' && $args{crc_type} !~ m/int$/ ) {
      $args{opt_slice}
         = $args{checksum}->optimize_xor(dbh => $args{dbh}, func => $args{func});
   }
   $args{chunk_sql} ||= $args{checksum}->make_checksum_query(
      dbname    => $args{database},
      tblname   => $args{table},
      table     => $args{struct},
      quoter    => $args{quoter},
      algorithm => $args{algorithm},
      func      => $args{func},
      crc_wid   => $args{crc_wid},
      crc_type  => $args{crc_type},
      opt_slice => $args{opt_slice},
      cols      => $args{cols},
      trim      => $args{trim},
      buffer    => $args{bufferinmysql},
   );
   $args{row_sql} ||= $args{checksum}->make_row_checksum(
      table     => $args{struct},
      quoter    => $args{quoter},
      func      => $args{func},
      cols      => $args{cols},
      trim      => $args{trim},
   );

   $args{state} = 0;
   $args{handler}->fetch_back($args{dbh});
   return bless { %args }, $class;
}

sub get_sql {
   my ( $self, %args ) = @_;
   if ( $self->{state} ) {
      my $index_hint = defined $args{index_hint}
                       ? " USE INDEX (`$args{index_hint}`) "
                       : '';
      return 'SELECT '
         . ($self->{bufferinmysql} ? 'SQL_BUFFER_RESULT ' : '')
         . join(', ', map { $self->{quoter}->quote($_) } @{$self->key_cols()})
         . ', ' . $self->{row_sql} . " AS $self->{crc_col}"
         . ' FROM ' . $self->{quoter}->quote(@args{qw(database table)})
         . $index_hint 
         . ' WHERE (' . $self->{chunks}->[$self->{chunk_num}] . ')'
         . ($args{where} ? " AND ($args{where})" : '');
   }
   else {
      return $self->{chunker}->inject_chunks(
         database   => $args{database},
         table      => $args{table},
         chunks     => $self->{chunks},
         chunk_num  => $self->{chunk_num},
         query      => $self->{chunk_sql},
         where      => [$args{where}],
         quoter     => $self->{quoter},
         index_hint => $args{index_hint},
      );
   }
}

sub prepare {
   my ( $self, $dbh ) = @_;
   my $sql = 'SET @crc := "", @cnt := 0';
   MKDEBUG && _d($sql);
   $dbh->do($sql);
   return;
}

sub same_row {
   my ( $self, $lr, $rr ) = @_;
   if ( $self->{state} ) {
      if ( $lr->{$self->{crc_col}} ne $rr->{$self->{crc_col}} ) {
         $self->{handler}->change('UPDATE', $lr, $self->key_cols());
      }
   }
   elsif ( $lr->{cnt} != $rr->{cnt} || $lr->{crc} ne $rr->{crc} ) {
      MKDEBUG && _d('Rows:', Dumper($lr, $rr));
      MKDEBUG && _d('Will examine this chunk before moving to next');
      $self->{state} = 1; # Must examine this chunk row-by-row
   }
}

sub not_in_right {
   my ( $self, $lr ) = @_;
   die "Called not_in_right in state 0" unless $self->{state};
   $self->{handler}->change('INSERT', $lr, $self->key_cols());
}

sub not_in_left {
   my ( $self, $rr ) = @_;
   die "Called not_in_left in state 0" unless $self->{state};
   $self->{handler}->change('DELETE', $rr, $self->key_cols());
}

sub done_with_rows {
   my ( $self ) = @_;
   if ( $self->{state} == 1 ) {
      $self->{state} = 2;
      MKDEBUG && _d('Setting state =', $self->{state});
   }
   else {
      $self->{state} = 0;
      $self->{chunk_num}++;
      MKDEBUG && _d('Setting state =', $self->{state},
         'chunk_num =', $self->{chunk_num});
   }
}

sub done {
   my ( $self ) = @_;
   MKDEBUG && _d('Done with', $self->{chunk_num}, 'of',
      scalar(@{$self->{chunks}}), 'chunks');
   MKDEBUG && $self->{state} && _d('Chunk differs; must examine rows');
   return $self->{state} == 0
      && $self->{chunk_num} >= scalar(@{$self->{chunks}})
}

sub pending_changes {
   my ( $self ) = @_;
   if ( $self->{state} ) {
      MKDEBUG && _d('There are pending changes');
      return 1;
   }
   else {
      MKDEBUG && _d('No pending changes');
      return 0;
   }
}

sub key_cols {
   my ( $self ) = @_;
   my @cols;
   if ( $self->{state} == 0 ) {
      @cols = qw(chunk_num);
   }
   else {
      @cols = $self->{chunk_col};
   }
   MKDEBUG && _d('State', $self->{state},',', 'key cols', join(', ', @cols));
   return \@cols;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End TableSyncChunk package
# ###########################################################################

# ###########################################################################
# TableSyncNibble package 4664
# ###########################################################################
package TableSyncNibble;

use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);
use List::Util qw(max);
use Data::Dumper;
$Data::Dumper::Indent    = 0;
$Data::Dumper::Quotekeys = 0;

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(dbh database table handler nibbler quoter struct
                        parser checksum cols vp chunksize where chunker
                        versionparser possible_keys trim) ) {
      die "I need a $arg argument" unless defined $args{$arg};
   }

   $args{crc_col} = '__crc';
   while ( $args{struct}->{is_col}->{$args{crc_col}} ) {
      $args{crc_col} = "_$args{crc_col}"; # Prepend more _ until not a column.
   }
   MKDEBUG && _d('CRC column will be named', $args{crc_col});

   $args{sel_stmt} = $args{nibbler}->generate_asc_stmt(
      parser   => $args{parser},
      tbl      => $args{struct},
      index    => $args{possible_keys}->[0],
      quoter   => $args{quoter},
      asconly  => 1,
   );

   die "No suitable index found"
      unless $args{sel_stmt}->{index}
         && $args{struct}->{keys}->{$args{sel_stmt}->{index}}->{is_unique};
   $args{key_cols} = $args{struct}->{keys}->{$args{sel_stmt}->{index}}->{cols};

   $args{algorithm} = $args{checksum}->best_algorithm(
      algorithm   => 'BIT_XOR',
      vp          => $args{vp},
      dbh         => $args{dbh},
      where       => 1,
      chunk       => 1,
      count       => 1,
   );
   $args{func} = $args{checksum}->choose_hash_func(
      dbh  => $args{dbh},
      func => $args{func},
   );
   $args{crc_wid}    = $args{checksum}->get_crc_wid($args{dbh}, $args{func});
   ($args{crc_type}) = $args{checksum}->get_crc_type($args{dbh}, $args{func});
   if ( $args{algorithm} eq 'BIT_XOR' && $args{crc_type} !~ m/int$/ ) {
      $args{opt_slice}
         = $args{checksum}->optimize_xor(dbh => $args{dbh}, func => $args{func});
   }

   $args{nibble_sql} ||= $args{checksum}->make_checksum_query(
      dbname    => $args{database},
      tblname   => $args{table},
      table     => $args{struct},
      quoter    => $args{quoter},
      algorithm => $args{algorithm},
      func      => $args{func},
      crc_wid   => $args{crc_wid},
      crc_type  => $args{crc_type},
      opt_slice => $args{opt_slice},
      cols      => $args{cols},
      trim      => $args{trim},
      buffer    => $args{bufferinmysql},
   );
   $args{row_sql} ||= $args{checksum}->make_row_checksum(
      table     => $args{struct},
      quoter    => $args{quoter},
      func      => $args{func},
      cols      => $args{cols},
      trim      => $args{trim},
   );

   $args{state}  = 0;
   $args{nibble} = 0;
   $args{handler}->fetch_back($args{dbh});
   return bless { %args }, $class;
}

sub get_sql {
   my ( $self, %args ) = @_;
   if ( $self->{state} ) {
      return 'SELECT /*rows in nibble*/ '
         . ($self->{bufferinmysql} ? 'SQL_BUFFER_RESULT ' : '')
         . join(', ', map { $self->{quoter}->quote($_) } @{$self->key_cols()})
         . ', ' . $self->{row_sql} . " AS $self->{crc_col}"
         . ' FROM ' . $self->{quoter}->quote(@args{qw(database table)})
         . ' WHERE (' . $self->__get_boundaries() . ')'
         . ($args{where} ? " AND ($args{where})" : '');
   }
   else {
      my $where = $self->__get_boundaries();
      return $self->{chunker}->inject_chunks(
         database  => $args{database},
         table     => $args{table},
         chunks    => [$where],
         chunk_num => 0,
         query     => $self->{nibble_sql},
         where     => [$args{where}],
         quoter    => $self->{quoter},
      );
   }
}

sub __get_boundaries {
   my ( $self ) = @_;
   my $q = $self->{quoter};
   my $s = $self->{sel_stmt};
   my $lb;   # Lower boundary part of WHERE
   my $ub;   # Upper boundary part of WHERE
   my $row;  # Next upper boundary row or cached_row

   if ( $self->{cached_boundaries} ) {
      MKDEBUG && _d('Using cached boundaries');
      return $self->{cached_boundaries};
   }

   if ( $self->{cached_row} && $self->{cached_nibble} == $self->{nibble} ) {
      MKDEBUG && _d('Using cached row for boundaries');
      $row = $self->{cached_row};
   }
   else {
      MKDEBUG && _d('Getting next upper boundary row');
      my $sql;
      ($sql, $lb) = $self->__make_boundary_sql();  # $lb from outer scope!

      if ( $self->{nibble} == 0 ) {
         my $explain_index = $self->__get_explain_index($sql);
         if ( ($explain_index || '') ne $s->{index} ) {
         die 'Cannot nibble table '.$q->quote($self->{database},$self->{table})
            . " because MySQL chose "
            . ($explain_index ? "the `$explain_index`" : 'no') . ' index'
            . " instead of the `$s->{index}` index";
         }
      }

      $row = $self->{dbh}->selectrow_hashref($sql);
      MKDEBUG && _d($row ? 'Got a row' : "Didn't get a row");
   }

   if ( $row ) {
      my $i = 0;
      $ub   = $s->{boundaries}->{'<='};
      $ub   =~ s/\?/$q->quote_val($row->{$s->{scols}->[$i++]})/eg;
   }
   else {
      MKDEBUG && _d('No upper boundary');
      $ub = '1=1';
   }

   my $where = $lb ? "($lb AND $ub)" : $ub;

   $self->{cached_row}        = $row;
   $self->{cached_nibble}     = $self->{nibble};
   $self->{cached_boundaries} = $where;

   MKDEBUG && _d('WHERE clause:', $where);
   return $where;
}

sub __make_boundary_sql {
   my ( $self ) = @_;
   my $lb;
   my $q   = $self->{quoter};
   my $s   = $self->{sel_stmt};
   my $sql = "SELECT /*nibble boundary $self->{nibble}*/ "
      . join(',', map { $q->quote($_) } @{$s->{cols}})
      . " FROM " . $q->quote($self->{database}, $self->{table})
      . ($self->{versionparser}->version_ge($self->{dbh}, '4.0.9')
         ? " FORCE" : " USE")
      . " INDEX(" . $q->quote($s->{index}) . ")";
   if ( $self->{nibble} ) {
      my $tmp = $self->{cached_row};
      my $i   = 0;
      $lb     = $s->{boundaries}->{'>'};
      $lb     =~ s/\?/$q->quote_val($tmp->{$s->{scols}->[$i++]})/eg;
      $sql   .= ' WHERE ' . $lb;
   }
   $sql .= " ORDER BY " . join(',', map { $q->quote($_) } @{$self->{key_cols}})
         . ' LIMIT ' . ($self->{chunksize} - 1) . ', 1';
   MKDEBUG && _d('Lower boundary:', $lb);
   MKDEBUG && _d('Next boundary sql:', $sql);
   return $sql, $lb;
}

sub __get_explain_index {
   my ( $self, $sql ) = @_;
   return unless $sql;
   my $explain;
   eval {
      $explain = $self->{dbh}->selectall_arrayref("EXPLAIN $sql",{Slice => {}});
   };
   if ( $EVAL_ERROR ) {
      MKDEBUG && _d($EVAL_ERROR);
      return;
   }
   MKDEBUG && _d('EXPLAIN key:', $explain->[0]->{key}); 
   return $explain->[0]->{key}
}

sub prepare {
   my ( $self, $dbh ) = @_;
   my $sql = 'SET @crc := "", @cnt := 0';
   MKDEBUG && _d($sql);
   $dbh->do($sql);
   return;
}

sub same_row {
   my ( $self, $lr, $rr ) = @_;
   if ( $self->{state} ) {
      if ( $lr->{$self->{crc_col}} ne $rr->{$self->{crc_col}} ) {
         $self->{handler}->change('UPDATE', $lr, $self->key_cols());
      }
   }
   elsif ( $lr->{cnt} != $rr->{cnt} || $lr->{crc} ne $rr->{crc} ) {
      MKDEBUG && _d('Rows:', Dumper($lr, $rr));
      MKDEBUG && _d('Will examine this nibble before moving to next');
      $self->{state} = 1; # Must examine this nibble row-by-row
   }
}

sub not_in_right {
   my ( $self, $lr ) = @_;
   die "Called not_in_right in state 0" unless $self->{state};
   $self->{handler}->change('INSERT', $lr, $self->key_cols());
}

sub not_in_left {
   my ( $self, $rr ) = @_;
   die "Called not_in_left in state 0" unless $self->{state};
   $self->{handler}->change('DELETE', $rr, $self->key_cols());
}

sub done_with_rows {
   my ( $self ) = @_;
   if ( $self->{state} == 1 ) {
      $self->{state} = 2;
      MKDEBUG && _d('Setting state =', $self->{state});
   }
   else {
      $self->{state} = 0;
      $self->{nibble}++;
      delete $self->{cached_boundaries};
      MKDEBUG && _d('Setting state =', $self->{state},
         ', nibble =', $self->{nibble});
   }
}

sub done {
   my ( $self ) = @_;
   MKDEBUG && _d('Done with nibble', $self->{nibble});
   MKDEBUG && $self->{state} && _d('Nibble differs; must examine rows');
   return $self->{state} == 0 && $self->{nibble} && !$self->{cached_row};
}

sub pending_changes {
   my ( $self ) = @_;
   if ( $self->{state} ) {
      MKDEBUG && _d('There are pending changes');
      return 1;
   }
   else {
      MKDEBUG && _d('No pending changes');
      return 0;
   }
}

sub key_cols {
   my ( $self ) = @_;
   my @cols;
   if ( $self->{state} == 0 ) {
      @cols = qw(chunk_num);
   }
   else {
      @cols = @{$self->{key_cols}};
   }
   MKDEBUG && _d('State', $self->{state},',', 'key cols', join(', ', @cols));
   return \@cols;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End TableSyncNibble package
# ###########################################################################

# ###########################################################################
# TableSyncGroupBy package 3186
# ###########################################################################
package TableSyncGroupBy;

use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(handler cols) ) {
      die "I need a $arg argument" unless defined $args{$arg};
   }
   $args{count_col} = '__maatkit_count';
   while ( $args{struct}->{is_col}->{$args{count_col}} ) {
      $args{count_col} = "_$args{count_col}";
   }
   MKDEBUG && _d('COUNT column will be named', $args{count_col});
   return bless { %args }, $class;
}

sub get_sql {
   my ( $self, %args ) = @_;
   my $cols = join(', ', map { $args{quoter}->quote($_) } @{$self->{cols}});
   return "SELECT"
      . ($self->{bufferinmysql} ? ' SQL_BUFFER_RESULT' : '')
      . " $cols, COUNT(*) AS $self->{count_col}"
      . ' FROM ' . $args{quoter}->quote(@args{qw(database table)})
      . ' WHERE ' . ( $args{where} || '1=1' )
      . " GROUP BY $cols ORDER BY $cols";
}

sub same_row {
   my ( $self, $lr, $rr ) = @_;
   my $cc = $self->{count_col};
   my $lc = $lr->{$cc};
   my $rc = $rr->{$cc};
   my $diff = abs($lc - $rc);
   return unless $diff;
   $lr = { %$lr };
   delete $lr->{$cc};
   $rr = { %$rr };
   delete $rr->{$cc};
   foreach my $i ( 1 .. $diff ) {
      if ( $lc > $rc ) {
         $self->{handler}->change('INSERT', $lr, $self->key_cols());
      }
      else {
         $self->{handler}->change('DELETE', $rr, $self->key_cols());
      }
   }
}

sub not_in_right {
   my ( $self, $lr ) = @_;
   $lr = { %$lr };
   my $cnt = delete $lr->{$self->{count_col}};
   foreach my $i ( 1 .. $cnt ) {
      $self->{handler}->change('INSERT', $lr, $self->key_cols());
   }
}

sub not_in_left {
   my ( $self, $rr ) = @_;
   $rr = { %$rr };
   my $cnt = delete $rr->{$self->{count_col}};
   foreach my $i ( 1 .. $cnt ) {
      $self->{handler}->change('DELETE', $rr, $self->key_cols());
   }
}

sub done_with_rows {
   my ( $self ) = @_;
   $self->{done} = 1;
}

sub done {
   my ( $self ) = @_;
   return $self->{done};
}

sub key_cols {
   my ( $self ) = @_;
   return $self->{cols};
}

sub prepare {
   my ( $self, $dbh ) = @_;
}

sub pending_changes {
   my ( $self ) = @_;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End TableSyncGroupBy package
# ###########################################################################

# ###########################################################################
# Outfile package 4510
# ###########################################################################
package Outfile;

use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);

use List::Util qw(min);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   my $self = {};
   return bless $self, $class;
}

sub write {
   my ( $self, $fh, $rows ) = @_;
   foreach my $row ( @$rows ) {
      print $fh escape($row), "\n"
         or die "Cannot write to outfile: $OS_ERROR\n";
   }
   return;
}

sub escape {
   my ( $row ) = @_;
   return join("\t", map {
      s/([\t\n\\])/\\$1/g if defined $_;  # Escape tabs etc
      defined $_ ? $_ : '\N';             # NULL = \N
   } @$row);
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;
# ###########################################################################
# End Outfile package
# ###########################################################################

# ###########################################################################
# MockSyncStream package 4559
# ###########################################################################
package MockSyncStream;


use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(query cols same_row not_in_left not_in_right) ) {
      die "I need a $arg argument" unless defined $args{$arg};
   }
   return bless { %args }, $class;
}

sub get_sql {
   my ( $self ) = @_;
   return $self->{query};
}

sub same_row {
   my ( $self, $lr, $rr ) = @_;
   return $self->{same_row}->($lr, $rr);
}

sub not_in_right {
   my ( $self, $lr ) = @_;
   return $self->{not_in_right}->($lr);
}

sub not_in_left {
   my ( $self, $rr ) = @_;
   return $self->{not_in_left}->($rr);
}

sub done_with_rows {
   my ( $self ) = @_;
   $self->{done} = 1;
}

sub done {
   my ( $self ) = @_;
   return $self->{done};
}

sub key_cols {
   my ( $self ) = @_;
   return $self->{cols};
}

sub prepare {
   my ( $self, $dbh ) = @_;
   return;
}

sub pending_changes {
   my ( $self ) = @_;
   return;
}

sub get_result_set_struct {
   my ( $dbh, $sth ) = @_;
   my @cols     = @{$sth->{NAME}};
   my @types    = map { $dbh->type_info($_)->{TYPE_NAME} } @{$sth->{TYPE}};
   my @nullable = map { $dbh->type_info($_)->{NULLABLE} == 1 ? 1 : 0 } @{$sth->{TYPE}};
   my @p = @{$sth->{PRECISION}};
   my @s = @{$sth->{SCALE}};

   my $struct   = {
      cols => \@cols, 
   };

   for my $i ( 0..$#cols ) {
      my $col  = $cols[$i];
      my $type = $types[$i];
      $struct->{is_col}->{$col}      = 1;
      $struct->{col_posn}->{$col}    = $i;
      $struct->{type_for}->{$col}    = $type;
      $struct->{is_nullable}->{$col} = $nullable[$i];
      $struct->{is_numeric}->{$col} 
         = ($type =~ m/(?:(?:tiny|big|medium|small)?int|float|double|decimal|year)/ ? 1 : 0);
      $struct->{precision}->{$col}
         = ($type =~ m/(?:float|double|decimal)/ ? "($p[$i],$s[$i])" : undef);
   }

   return $struct;
}

sub as_arrayref {
   my ( $sth, $row ) = @_;
   my @cols = @{$sth->{NAME}};
   my @row  = @{$row}{@cols};
   return \@row;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End MockSyncStream package
# ###########################################################################

# ###########################################################################
# VersionParser package 3186
# ###########################################################################
package VersionParser;

use strict;
use warnings FATAL => 'all';

use English qw(-no_match_vars);

use constant MKDEBUG => $ENV{MKDEBUG};

sub new {
   my ( $class ) = @_;
   bless {}, $class;
}

sub parse {
   my ( $self, $str ) = @_;
   my $result = sprintf('%03d%03d%03d', $str =~ m/(\d+)/g);
   MKDEBUG && _d($str, 'parses to', $result);
   return $result;
}

sub version_ge {
   my ( $self, $dbh, $target ) = @_;
   if ( !$self->{$dbh} ) {
      $self->{$dbh} = $self->parse(
         $dbh->selectrow_array('SELECT VERSION()'));
   }
   my $result = $self->{$dbh} ge $self->parse($target) ? 1 : 0;
   MKDEBUG && _d($self->{$dbh}, 'ge', $target, ':', $result);
   return $result;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;

# ###########################################################################
# End VersionParser package
# ###########################################################################

# ###########################################################################
# This is a combination of modules and programs in one -- a runnable module.
# http://www.perl.com/pub/a/2006/07/13/lightning-articles.html?page=last
# Or, look it up in the Camel book on pages 642 and 643 in the 3rd edition.
#
# Check at the end of this package for the call to main() which actually runs
# the program.
# ###########################################################################
package mk_upgrade;

use English qw(-no_match_vars);

use Data::Dumper;
$Data::Dumper::Indent    = 1;
$Data::Dumper::Sortkeys  = 1;
$Data::Dumper::Quotekeys = 0;

Transformers->import(qw());

use constant MKDEBUG => $ENV{MKDEBUG};

use sigtrap 'handler', \&sig_int, 'normal-signals';

# Global variables.  Only really essential variables should be here.
my $oktorun = 1;

sub main {
   @ARGV = @_;  # set global ARGV for this package

   # ##########################################################################
   # Get configuration information.
   # ##########################################################################
   my $dp = new DSNParser();
   my $o  = new OptionParser(
      strict      => 0,
      dp          => $dp,
      prompt      => '[OPTION...] HOST-1-DSN HOST-2-DSN FILE',
      description => q{compares query execution on two hosts by executing }
                   . q{queries in the given file (or STDIN if no file given) }
                   . q{and examining the results, errors, warnings, etc. }
                   . q{produced on each.},
   );
   $o->get_specs();
   $o->get_opts();

   if ( !$o->get('help') ) {
      if ( @ARGV < 2 ) {
         $o->save_error('Specify at least one host DSNs');
      }
      if ( my $compare_method = $o->get('compare-results-method') ) {
         my %valid_method = qw(checksum 1 rows 1);
         $o->save_error("Invalid --compare-results-method: $compare_method")
            unless $valid_method{lc $compare_method};
      }
   }

   $o->set('all-errors', 0) unless $o->get('errors');

   $o->usage_or_errors();

   # ########################################################################
   # Parse the host DSNs and make sure that we can connect to each.
   # ########################################################################
   my $hosts        = [];
   my $dsn_defaults = $dp->parse_options($o);

   my $host1_dsn = $dp->parse(shift @ARGV, $dsn_defaults);
   my $host1_dbh = get_cxn($dp, $o, $host1_dsn);
   push @$hosts, { dbh => $host1_dbh, dsn => $host1_dsn };

   my $host2_dsn;
   my $host2_dbh;
   if ( @ARGV && !$o->get('single-host') ) {
      $host2_dsn = $dp->parse(shift @ARGV, $host1_dsn, $dsn_defaults);
      $host2_dbh = get_cxn($dp, $o, $host2_dsn);
      push @$hosts, { dbh => $host2_dbh, dsn => $host2_dsn };
   }

   # ########################################################################
   # Make some common modules.
   # ########################################################################
   my $parser  = new SlowLogParser();
   my $qparser = new QueryParser();
   my $qexec   = new QueryExecutor();
   my $qranker = new QueryRanker(
      ranker_for => {
         get_row_sths => \&rank_row_sths,
      }
   );
   # Most of these modules are only used in rank_row_sths() if
   # the result sets differ.
   my $du       = new MySQLDump(cache => 0);
   my $tp       = new TableParser();
   my $q        = new Quoter();
   my $vp       = new VersionParser();
   my $chunker  = new TableChunker( quoter => $q );
   my $nibbler  = new TableNibbler();
   my $checksum = new TableChecksum();
   my $syncer   = new TableSyncer();
   my %common_modules = (
      OptionParser  => $o,
      QueryParser   => $qparser,
      MySQLDump     => $du,
      TableParser   => $tp,
      Quoter        => $q,
      VersionParser => $vp,
      TableChunker  => $chunker,
      TableNibbler  => $nibbler,
      TableChecksum => $checksum,
      TableSyncer   => $syncer,
   );

   if ( !$o->get('compare-query-times') ) {
      $qranker->set_ranker('Query_time',
         sub { MKDEBUG && _d('Not comparing query times'); return 0; });
   }

   # ########################################################################
   # Create QueryExecutor::exec() callbacks for each operation.
   # These subs return to QueryExecutor::exec(), not mk-upgrade.
   # exec() returns a list of each host's results to mk-upgrade.
   # ########################################################################
   my $compare_method
      = $o->get('compare-results') ? lc($o->get('compare-results-method') || '')
      : '';
   MKDEBUG && _d('compare method:', $compare_method);
   my @qe_callbacks;
   my $tmp_tbl    = 'mk_upgrade';  
   my $tmp_tbl_db = $o->get('tmp-table-database');
   my $tmp_tbl_wrap;
   my $current_db;

   # Pre-query execution operations.
   if ( $compare_method eq 'checksum' ) {
      # This drops the tmp table and sets the default engine to MyISAM.
      push @qe_callbacks, sub {
         return $qexec->pre_checksum_results(
            @_,
            database  => $tmp_tbl_db || $current_db,
            tmp_table => $tmp_tbl,
            Quoter    => $q,
         );
      };
   }

   # The actual query execution.
   if ( $compare_method eq 'rows' ) {
      push @qe_callbacks, sub { return $qexec->get_row_sths(@_); };
      push @qe_callbacks, sub {
         my ( %args ) = @_;
         MKDEBUG && _d('Take Query_time results out of get_row_sths results');
         # Results for the previous operation, get_row_sths(), will
         # have Query_time results.  Since each callback can only return
         # on result, we take out the Query_time results and return them.
         my $Query_time = $args{results}->{get_row_sths}->{Query_time};
         delete $args{results}->{get_row_sths}->{Query_time};  # Keep clean
         return 'Query_time', $Query_time;
      };
   }
   else {
      push @qe_callbacks, sub { return $qexec->Query_time(@_); };
   }

   # Post-query execution operations.
   if ( $o->get('compare-warnings') ) {
      # Do this first after execution so we don't taint the warnings.
      push @qe_callbacks, sub { return $qexec->get_warnings(@_); };
      push @qe_callbacks, sub { return $qexec->clear_warnings(@_, QueryParser => $qparser); };
   }

   if ( $compare_method eq 'checksum' ) {
      push @qe_callbacks, sub {
         return $qexec->checksum_results(
            @_,
            database    => $tmp_tbl_db || $current_db,
            tmp_table   => $tmp_tbl,
            Quoter      => $q,
            TableParser => $tp,
            MySQLDump   => $du,
         );
      };
   }

   # ########################################################################
   # Set up an array of callbacks to transform, filter and process events.
   # Results are saved in %results and compared and reported after all
   # events have been processed.
   # ########################################################################
   my @callbacks;
   my %results;

   # For the moment we only support deterministic SELECT queries.
   # There's no filter/transformation for non-deterministic functions yet,
   # just a simple filter to remove non-SELECTs.
   push @callbacks, sub {
      my ( $event ) = @_;
      if ( $event->{cmd} ne 'Query' ) {
         MKDEBUG && _d('Skipping non-Query cmd');
         return;
      }
      if ( !$event->{arg} ) {
         MKDEBUG && _d('Skipping empty arg');
         return;
      }
      if ( $event->{arg} !~ m/(?:^SELECT|(?:\*\/\s*SELECT))/i ) {
         MKDEBUG && _d('Skipping non-SELECT query');
         return;
      }
      return $event;
   };

   # User-defined filter.
   if ( $o->get('filter') ) {
      my $filter = $o->get('filter');
      if ( -f $filter && -r $filter ) {
         MKDEBUG && _d('Reading file', $filter, 'for --filter code');
         open my $fh, "<", $filter or die "Cannot open $filter: $OS_ERROR";
         $filter = do { local $/ = undef; <$fh> };
         close $fh;
      }
      else {
         $filter = "( $filter )";  # issue 565
      }
      my $code   = "sub { MKDEBUG && _d('callback: filter');  my(\$event) = shift; $filter && return \$event; };";
      MKDEBUG && _d('--filter code:', $code);
      my $sub = eval $code
         or die "Error compiling --filter code: $code\n$EVAL_ERROR";
      push @callbacks, $sub;
   }

   # Keep the default database update-to-date.  This helps when queries
   # don't use db-qualified tables.
   push @callbacks, sub {
      my ( $event ) = @_;
      my $db = $event->{db} || $event->{Schema};
      if ( $db && (!$current_db || $db ne $current_db) ) {
         eval {
            $host1_dbh->do("USE $db");
            $host2_dbh->do("USE $db") if $host2_dbh;
         };
         if ( $EVAL_ERROR ) {
            warn "Failed to USE $db: $EVAL_ERROR";
         }
         else {
            $current_db  = $db;
            MKDEBUG && _d('USE', $db);
         }
      }
      return $event;
   };

   if ( $compare_method eq 'checksum'  ) {
      # exec() does not wrap the query in "CREATE TEMPORARY TABLE ... AS".
      # We must do this.  This is an mk-upgrade callback sub.  Later, we'll
      # unwrap it in another mk-upgrade callback.
      push @callbacks, sub {
         my ( $event ) = @_;
         MKDEBUG && _d('callback: wrap query in CREATE TEMPORARY TABLE AS');
         my $query = $event->{arg};
         return unless $query;
         my $tmp_db_tbl = $q->quote($tmp_tbl_db || $current_db, $tmp_tbl);
         $tmp_tbl_wrap = "CREATE TEMPORARY TABLE $tmp_db_tbl AS ";
         $event->{arg} = "$tmp_tbl_wrap$query";
         return $event;
      };
   }

   # This is the main mk-upgrade callback which calls QueryExecutor::exec().
   # This callback gets the results and passes them to the next callback
   # which will rank them.
   push @callbacks, sub {
      my ( $event ) = @_;
      my $query = $event->{arg};
      eval {
         my @results = $qexec->exec(
            query      => $query,
            hosts      => $hosts,
            DSNParser  => $dp,
            callbacks  => \@qe_callbacks,
         );
         $event->{results} = \@results;
      };
      if ( $EVAL_ERROR ) {
         # This shouldn't happen because QueryExecutor operations are supposed
         # to eval everything they do and capture all their own errors.
         MKDEBUG && _d($EVAL_ERROR, Dumper($event));
         print "The query executor failed to execute this query:\n"
            . "$query\n"
            . "Error: $EVAL_ERROR";
         return;
      }
      return $event;
   };

   if ( $compare_method eq 'checksum'  ) {
      # Unwrap the query from inside CREATE TEMPORARY TABLE AS.  This is an
      # mk-upgrade callback because it's not something QueryExecutor does.
      push @callbacks, sub {
         my ( $event ) = @_;
         MKDEBUG && _d('callback: unwrap query in CREATE TEMPORARY TABLE AS');
         my $query = $event->{arg};
         return unless $query;
         $query =~ s/^$tmp_tbl_wrap//;
         $event->{arg} = $query;
         return $event;
      };
   }

   # Rank and save the query's results from the previous callback.
   # After all queries have been executed and ranked, they will be
   # sorted and reported by rank.  For now we save the whole event,
   # but later when we do aggregation this will cost too much memory.
   my @events;
   push @callbacks, sub {
      my ( $event ) = @_;
      my $results = $event->{results};
      die "Got an event without results" unless $results;

      # At least 2 hosts are needed compare results.  If we have
      # only 1 host, then we'll just print its results later.
      if ( scalar @$hosts == 1 ) {
         $event->{rank}    = 0;
         $event->{reasons} = ['No rank for single host'];
         push @events, $event;
         return $event;
      }

      eval {
         my ($rank, @reasons) = $qranker->rank_results(
            $results,
            # These args are passed through to the ranker subs.
            hosts => $hosts,
            event => $event,
            %common_modules,
         ); 
         $event->{rank}    = $rank;
         $event->{reasons} = \@reasons;
         push @events, $event;
      };
      if ( $EVAL_ERROR ) {
         # This really should not happen.
         MKDEBUG && _d($EVAL_ERROR, Dumper($event));
         print "An unexpected error occurred while ranking the "
            . "query's results: $EVAL_ERROR";
         return;
      }

      return $event;
   };

   # ########################################################################
   # Daemonize now that everything is setup and ready to work.
   # ########################################################################
   my $daemon;
   if ( $o->get('daemonize') ) {
      $daemon = new Daemon(o=>$o);
      $daemon->daemonize();
      MKDEBUG && _d('I am a daemon now');
   }
   elsif ( $o->get('pid') ) {
      # We're not daemoninzing, it just handles PID stuff.
      $daemon = new Daemon(o=>$o);
      $daemon->make_PID_file();
   }

   # ##########################################################################
   # Parse input and process events.
   # ##########################################################################
   if ( @ARGV == 0 ) {
      MKDEBUG && _d('Reading STDIN');
      push @ARGV, '-'; # Magical STDIN filename.
   }

   my $start = time();
   my $end   = $start + ($o->get('run-time') || 0); # When we should exit
   my $now   = $start;

   FILE:
   while (                                 # Quit if:
      $oktorun                             # instructed to quit
      && ($start == $end || $now < $end)   # or time is exceeded
      && @ARGV )                           # or there's no more files
   {
      my $file = shift @ARGV;
      MKDEBUG && _d('Parsing', $file);
      my $fh;
      if ( $file eq '-' ) {
         $fh = *STDIN;
      }
      else {
         if ( !open $fh, "<", $file ) {
            warn "Cannot open $file: $OS_ERROR";
            next FILE;
         }
      }

      my $events;
      EVENT:
      while ( $oktorun
              && ($start == $end || $now < $end) ) {
         eval {
            $events = $parser->parse_event($fh, undef, @callbacks);
         };
         if ( $EVAL_ERROR ) {
            _d($EVAL_ERROR);
            # Don't ignore failure to open a file, else we'll get
            # "tell() on closed filehandle" errors.
            last EVENT if $EVAL_ERROR =~ m/Cannot open/;
            last EVENT unless $o->get('continue-on-error');
         }
         last EVENT unless $events;
         $now = time();
      }

      close $fh;
      $now = time();
   }

   # ######################################################################
   # Report results.
   # ######################################################################
   QUERY:
   foreach my $event ( sort { $b->{rank} <=> $a->{rank} } @events ) {
      my $query         = $event->{arg};
      my $host1_results = $event->{results}->[0];
      my $host2_results = $event->{results}->[1] || $host1_results;

      if ( $o->get('only-failed-queries') ) {
         my $has_errors = 0;
         HOST:
         foreach my $host_results ( @{$event->{results}} ) {
            foreach my $op ( keys %$host_results ) {
               if ( $host_results->{$op}->{error} ) {
                  $has_errors = 1;
                  last HOST;
               }
            }
         }
         if ( !$has_errors ) {
            MKDEBUG && _d('Query has no errors:', $query);
            next QUERY;
         }
      }

      if ( $o->get('dump-results') ) {
         print Dumper($event->{results});
         next QUERY;
      }

      print "# Rank: $event->{rank}\n";

      my $t1 = $host1_results->{Query_time}->{Query_time};
      my $t2 = $host2_results->{Query_time}->{Query_time};
      printf "# Host1_Query_time: %.6f  Host2_Query_time: %.6f\n",
         ($t1 >= 0 ? $t1 : 0),  # Query_time may be -1 if the query failed to
         ($t2 >= 0 ? $t2 : 0);  # execute.  Make it 0 though to print pretty.

      if ( $o->get('compare-warnings') ) {
         printf "# Host1_Warning_count: %d  Host2_Warning_count: %d\n",
            $host1_results->{warnings}->{count},
            $host2_results->{warnings}->{count};
      }

      if ( $compare_method eq 'checksum' ) {
         printf "# Host1_Resultset_checksum: %d  Host2_Resultset_checksum: %d\n",
            $host1_results->{checksum_results}->{checksum},
            $host2_results->{checksum_results}->{checksum};
         printf "# Host1_Resultset_rows: %d  Host2_Resultset_rows: %d\n",
            $host1_results->{checksum_results}->{n_rows},
            $host2_results->{checksum_results}->{n_rows};
      }
      if ( $compare_method eq 'rows' && !$o->get('single-host') ) {
          printf "# Missing_Rows: %d  Different_rows: %d\n",
            scalar @{$host1_results->{get_row_sths}->{missing_rows}},
            scalar @{$host2_results->{get_row_sths}->{different_rows}};
      }

      # Print errors and reasons in a /* comment block */ before the query.
      if ( $o->get('reasons') || $o->get('errors') ) {
         print "/*\n";
         print_errors($event->{results}, $o->get('all-errors'))
            if $o->get('errors');
         print_reasons($event->{reasons})
            if $o->get('reasons');
         print_row_differences($host1_results)
            if $compare_method eq 'rows';
         print "*/\n";
      }

      print "$query\n";
   }

   $host1_dbh->disconnect();
   $host2_dbh->disconnect() if $host2_dbh;
   return 0;

} # End main().

# ############################################################################
# Subroutines.
# ############################################################################

# rank_row_sths() implements part of an idea discussed by Mark Callaghan,
# Baron Schwartz and Daniel Nichter.  See:
# http://groups.google.com/group/maatkit-discuss/browse_thread/thread/5d0f208f4e76ec0f 
# http://groups.google.com/group/maatkit-discuss/browse_thread/thread/49f4564111c78a2f

# This is a QueryRanker callback for the QueryExecutor::get_row_sths
# operation.  It's not a trivial rank and comparison so we do it here
# instead of QueryRanker.
sub rank_row_sths {
   my ( $host1, $host2, %args ) = @_;
   foreach my $arg ( qw(hosts event OptionParser) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my $dbh   = $args{hosts}->[0]->{dbh};  # Doesn't matter which one.
   my $event = $args{event};
   my $rank  = 0;
   my @reasons;

   $host1->{missing_rows}   = [];
   $host1->{different_rows} = [];

   if ( $host1->{error} ) {
      $rank += 100;
      push @reasons, 'Query failed to execute on host1: '
         . ($host1->{error} || 'unknown error')
         . " (rank+100)";
   }
   if ( $host2->{error} ) {
      $rank += 100;
      push @reasons, 'Query failed to execute on host2: '
         . ($host2->{error} || 'unknown error')
         . " (rank+100)";
   }

   # Return now unless we have two sths.
   my $left  = $host1->{sth};
   my $right = $host2->{sth};
   return $rank, @reasons unless $left && $right;

   # The sths are ok; start the comparison.
   my $struct = MockSyncStream::get_result_set_struct($dbh, $left);
   MKDEBUG && _d('result set struct:', Dumper($struct));

   # Identical rows are ignored.  Once a difference on either side is found,
   # we gobble up the remaining rows in that sth and print them to an outfile.
   # This short circuits RowDiff::compare_sets() which is what we want to do.

   my $no_diff      = 1;  # results are identical; this catches 0 row results
   my $outfile      = new Outfile();
   my ($host1_outfile, $host2_outfile);
   my $same_row     = sub { return; };  # ignore/discard identical rows
   my $not_in_left  = sub {
      my ( $rr ) = @_;
      $no_diff = 0;
      $host2_outfile = write_to_outfile(
         host    => 'host2',
         sth     => $right,
         row     => $rr,
         outfile => $outfile,
         %args,
      );
      return;
   };
   my $not_in_right = sub {
      my ( $lr ) = @_;
      $no_diff = 0;
      $host1_outfile = write_to_outfile(
         host    => 'host1',
         sth     => $left,
         row     => $lr,
         outfile => $outfile,
         %args,
      );
      return;
   };

   my $rd       = new RowDiff(dbh => $dbh);
   my $mocksync = new MockSyncStream(
      query        => $event->{arg},
      cols         => $struct->{cols},
      same_row     => $same_row,
      not_in_left  => $not_in_left,
      not_in_right => $not_in_right,
   );

   MKDEBUG && _d('Comparing result sets with MockSyncStream');
   $rd->compare_sets(
      left   => $left,
      right  => $right,
      syncer => $mocksync,
      tbl    => $struct,
   );

   if ( $no_diff ) {
      MKDEBUG && _d('Result sets are identical');
      return $rank, @reasons;  # rank should be 0 and no reasons
   }

   # The result sets differ, so now we must begin the difficult
   # work: finding and determining the nature of those differences.
   MKDEBUG && _d('Result sets are different');
   my ($missing_rows, $different_rows) = diff_rows(
      outfiles => [$host1_outfile, $host2_outfile],
      struct   => $struct,
      %args,
   );

   my $n_missing_rows = scalar @$missing_rows;
   if ( $n_missing_rows ) {
      my $inc = 10 * $n_missing_rows;
      push @reasons, "$n_missing_rows missing rows (rank+$inc)";
   }

   my $n_different_rows = scalar @$different_rows;
   if ( $n_different_rows ) {
      my $inc = 5 * $n_different_rows;
      push @reasons, "$n_different_rows different rows (rank+$inc)";
   }

   # Doesn't matter in which host's results we save these.  Just remember
   # which one we used when we go to print these results.
   $host1->{missing_rows}   = $missing_rows;
   $host1->{different_rows} = $different_rows;

   return $rank, @reasons;
}

sub diff_rows {
   my ( %args ) = @_;
   my @required_args = qw(hosts event OptionParser outfiles struct TableSyncer);
   foreach my $arg ( @required_args ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my ($hosts, $event, $o, $outfiles, $struct) = @args{@required_args};

   my $dbh  = $args{hosts}->[0]->{dbh};  # Doesn't matter which one.
   my $db   = $o->get('tmp-table-database') || $event->{db} || $event->{Schema};

   my $host1_outfile = $outfiles->[0];
   my $host1_dbh     = $hosts->[0]->{dbh};
   my $host1_tbl     = "`$db`.`mk_upgrade_host1`";

   my $host2_outfile = $outfiles->[1];
   my $host2_dbh     = $hosts->[1]->{dbh};
   my $host2_tbl     = "`$db`.`mk_upgrade_host2`";

   # First thing, make two temps tables into which the outfiles can
   # be loaded.  This requires that we make a CREATE TABLE statement
   # for the result sets' columns.
   my $table_ddl = make_table_ddl($struct);
   $host1_dbh->do("DROP TABLE IF EXISTS $host1_tbl");
   $host1_dbh->do("CREATE TABLE $host1_tbl $table_ddl");
   $host2_dbh->do("DROP TABLE IF EXISTS $host2_tbl");
   $host2_dbh->do("CREATE TABLE $host2_tbl $table_ddl");
   $host1_dbh->do("LOAD DATA LOCAL INFILE '$host1_outfile' "
      . "INTO TABLE $host1_tbl");
   $host2_dbh->do("LOAD DATA LOCAL INFILE '$host2_outfile' "
      . "INTO TABLE $host2_tbl");
   MKDEBUG && _d('Loaded', $host1_outfile, 'into table', $host1_tbl, 'and',
      $host2_outfile, 'into table', $host2_tbl);

   # Now we need to get all indexes from all tables used by the query
   # and add them to the temp tbl.  Some indexes may be invalid, dupes,
   # or generally useless, but we'll let the sync algo decide that later.
   add_source_indexes(
      query  => $event->{arg},
      dest   => [
         { dbh => $host1_dbh, tbl => $host1_tbl, },
         { dbh => $host2_dbh, tbl => $host2_tbl, },
      ],
      %args
   );

   # Create a RowDiff with callbacks that will do what we want when rows and
   # columns differ.  This RowDiff is passed to TableSyncer which calls it.
   # TODO: explain how these callbacks work together.
   my $max_diff = $o->get('max-differences');
   my $n_diff   = 0;
   my @missing_rows;
   my @different_rows;
   use constant LEFT  => 0;
   use constant RIGHT => 1;
   my @l_r = (undef, undef);
   my @last_diff_col;
   my $last_diff = 0;
   my $key_cmp      = sub {
      push @last_diff_col, [@_];
      $last_diff--;
      return;
   };
   my $same_row = sub {
      my ( $lr, $rr ) = @_;
      if ( $l_r[LEFT] && $l_r[RIGHT] ) {
         MKDEBUG && _d('Saving different row');
         push @different_rows, [@l_r, $last_diff_col[$last_diff]];
         $n_diff++;
      }
      elsif ( $l_r[LEFT] ) {
         MKDEBUG && _d('Saving not in right row');
         push @missing_rows, [$l_r[LEFT], undef];
         $n_diff++;
      }
      elsif ( $l_r[RIGHT] ) {
         MKDEBUG && _d('Saving not in left row');
         push @missing_rows, [undef, $l_r[RIGHT]];
         $n_diff++;
      }
      else {
         MKDEBUG && _d('No missing or different rows in queue');
      }
      @l_r           = (undef, undef);
      @last_diff_col = ();
      $last_diff     = 0;
      return;
   };
   my $not_in_left  = sub {
      my ( $rr ) = @_;
      $same_row->() if $l_r[RIGHT];  # last missing row
      $l_r[RIGHT] = $rr;
      $same_row->(@l_r) if $l_r[LEFT] && $l_r[RIGHT];
      return;
   };
   my $not_in_right = sub {
      my ( $lr ) = @_;
      $same_row->() if $l_r[LEFT];  # last missing row
      $l_r[LEFT] = $lr;
      $same_row->(@l_r) if $l_r[LEFT] && $l_r[RIGHT];
      return;
   };
   my $done = sub {
      my ( $left, $right ) = @_;
      MKDEBUG && _d('Found', $n_diff, 'of', $max_diff, 'max differences');
      if ( $n_diff >= $max_diff ) {
         MKDEBUG && _d('Done comparing rows, got --max-differences', $max_diff);
         $left->finish();
         $right->finish();
         return 1;
      }
      return 0;
   };
   my $trf;
   if ( my $n = $o->get('float-precision') ) {
      $trf = sub {
         my ( $l, $r, $tbl, $col ) = @_;
         return $l, $r
            unless $tbl->{type_for}->{$col} =~ m/(?:float|double|decimal)/;
         my $l_rounded = sprintf "%.${n}f", $l;
         my $r_rounded = sprintf "%.${n}f", $r;
         MKDEBUG && _d('Rounded', $l, 'to', $l_rounded,
            'and', $r, 'to', $r_rounded);
         return $l_rounded, $r_rounded;
      };
   };

   my $rd = new RowDiff(
      dbh          => $dbh,
      key_cmp      => $key_cmp,
      same_row     => $same_row,
      not_in_left  => $not_in_left,
      not_in_right => $not_in_right,
      done         => $done,
      trf          => $trf,
   );

   # With whatever index we may have, let TableSyncer choose an
   # algorithm and find were rows differ.  We don't actually sync
   # the tables (execute=>0).  Instead, the callbacks above will
   # save rows in @missing_rows and @different_rows.
   my $syncer = $args{TableSyncer};
   my %status = $syncer->sync_table(
      algorithm     => '',  # determine best algo
      buffer        => 0,
      bufferinmysql => 0,
      checksum      => $args{TableChecksum},
      chunker       => $args{TableChunker},
      chunksize     => 1_000,
      cols          => $struct->{cols},
      dst_db        => $db,
      dst_dbh       => $host2_dbh,
      dst_tbl       => 'mk_upgrade_host2',
      dumper        => $args{MySQLDump},
      execute       => 0,
      lock          => 0,
      misc_dbh      => $host1_dbh,
      nibbler       => $args{TableNibbler},
      possible_keys => [],  # not actually used; bug?
      parser        => $args{TableParser},
      print         => 0,
      quoter        => $args{Quoter},
      replace       => 0,
      replicate     => 0,
      transaction   => 0,
      sleep         => 0,
      RowDiff       => $rd,
      src_db        => $db,
      src_dbh       => $host1_dbh,
      src_tbl       => 'mk_upgrade_host1',
      test          => 0,
      tbl_struct    => $struct,
      timeoutok     => 0,
      trim          => 0,
      versionparser => $args{VersionParser},
      wait          => 0,
      where         => '',
      master_slave  => 1,  # shouldn't be used for what we're doing
      func          => $o->get('function') || '',
      skipslavecheck=> 1,
   );

   if ( $n_diff < $max_diff ) {
      $same_row->() if $l_r[LEFT] || $l_r[RIGHT];  # save remaining rows
   }

   return \@missing_rows, \@different_rows;
}

# Writes the current row and all remaining rows to an outfile.
# Returns the outfile's name.
sub write_to_outfile {
   my ( %args ) = @_;
   foreach my $arg ( qw(host row sth outfile OptionParser) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my ( $host, $sth, $row, $outfile ) = @args{qw(host sth row outfile)};
   my ( $fh, $file ) = open_outfile($host, %args);

   # Write this one row.
   $outfile->write($fh, [ MockSyncStream::as_arrayref($sth, $row) ]);

   # Get and write all remaining rows.
   my $remaing_rows = $sth->fetchall_arrayref();
   $outfile->write($fh, $remaing_rows);

   close $fh or warn "Cannot close $file: $OS_ERROR";
   return $file;
}

sub open_outfile {
   my ( $host, %args ) = @_;
   my $o = $args{OptionParser};
   die "I need an OptionParser argument" unless $o;
   my $outfile = $o->get('base-dir') . "/$host-outfile.txt";
   open my $fh, '>', $outfile or die "Cannot open $outfile: $OS_ERROR";
   MKDEBUG && _d('outfile for', $host, ':', $outfile);
   return $fh, $outfile;
}

# Returns just the column definitions for the given struct.
# Example:
#   (
#     `i` integer,
#     `f` float(10,8)
#   )
sub make_table_ddl {
   my ( $struct ) = @_;
   my $sql = "(\n"
           . (join("\n",
                 map {
                    my $name = $_;
                    my $type = $struct->{type_for}->{$_};
                    my $prec = $struct->{precision}->{$_} || '';
                    "  `$name` $type$prec,";
                 } @{$struct->{cols}}))
           . ')';
   # The last column will be like "`i` integer,)" which is invalid.
   $sql =~ s/,\)$/\n)/;
   MKDEBUG && _d('table ddl:', $sql);
   return $sql;
}

# Adds every index from every table used by the query to all the
# dest tables.  dest is an arrayref of hashes, one for each destination.
# Each hash needs a dbh and tbl key; e.g.:
#   [
#     {
#       dbh => $dbh,
#       tbl => 'db.tbl',
#     },
#   ],
# For the moment, the sub returns nothing.  In the future, it should
# add to $args{struct}->{keys} the keys that it was able to add.
sub add_source_indexes {
   my ( %args ) = @_;
   my @required_args = qw(query dest QueryParser TableParser Quoter MySQLDump struct);
   foreach my $arg ( @required_args ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my ( $query, $dest, $qp, $tp, $q, $du ) = @args{@required_args};
   my $default_db = $args{db};          # optional
   my $dbh        = $dest->[0]->{dbh};  # doesn't matter which one

   my @src_tbls = $qp->get_tables($query);
   my $struct;
   my @keys;
   foreach my $db_tbl ( @src_tbls ) {
      my ($db, $tbl) = $q->split_unquote($db_tbl, $default_db);
      if ( $db ) {
         eval {
            $struct = $tp->parse($du->get_create_table($dbh, $q, $db, $tbl));
         };
         if ( $EVAL_ERROR ) {
            MKDEBUG && _d('Error parsing', $db, '.', $tbl, ':', $EVAL_ERROR);
            next;
         }
         push @keys, map {
            my $def = ($_->{is_unique} ? 'UNIQUE ' : '')."KEY ($_->{colnames})";
            [$def, $_];
         } grep { $_->{type} eq 'BTREE' } values %{$struct->{keys}};
      }
      else {
         MKDEBUG && _d('Cannot get indexes from', $db_tbl, 'because its '
            . 'database is unknown');
      }
   }
   MKDEBUG && _d('Source keys:', Dumper(\@keys));
   return unless @keys;

   for my $i ( 0..(@$dest - 1) ) {
      MKDEBUG && _d("Adding source keys to host".($i+1));
      foreach my $key ( @keys ) {
         my $def = $key->[0];
         my $sql = "ALTER TABLE `$dest->[$i]->{tbl}` ADD $key->[0]";
         MKDEBUG && _d($sql);
         eval {
            $dest->[$i]->{dbh}->do($sql);
         };
         if ( $EVAL_ERROR ) {
            MKDEBUG && _d($EVAL_ERROR);
         }
         else {
            # TODO: $args{struct}->{keys}->{ $key->[1]->{name} } = $key->[1];
         }
      }
   }

   # If the query uses only 1 table then return its struct.
   # TODO: $args{struct} = $struct if @src_tbls == 1;
   return;
}

sub print_errors {
   my ( $results, $all ) = @_;
   my $n_errors = 0;

   print "ERRORS: ";
   for my $i ( 1..(scalar @$results) ) {
      my $hostname = "Host$i";

      # Each element is a hashref of operation results, and each op
      # result should have an error key and maybe an errors arrayref.
      my $ops = $results->[$i - 1]; 

      foreach my $op ( keys %$ops ) {
         my $op_res = $ops->{$op};
         my $last_error = $op_res->{error};
         if ( $last_error ) {
            chomp $last_error;  # Usually an $EVAL_ERROR that has a \n already.
            print "\n\n$hostname $op Last_error: $last_error\n";
            $n_errors++;
         }
         if ( $all && $op_res->{errors} && scalar @{$op_res->{errors}} ) {
            for my $j ( 1..(scalar @{$op_res->{errors}}) ) {
               my $error = $op_res->{errors}->[$j-1];
               chomp $error;
               print "Error_$j: $error\n";
            }
         }
      }
   }
   print '', ($n_errors ? "\n" : "None\n\n");

   return;
}
sub print_reasons {
   my ( $reasons ) = @_;
   print "REASONS: ";
   if ( @$reasons ) {
      print "\n\n";
      for my $i ( 0..(scalar @$reasons - 1) ) {
         print $i+1, ". $reasons->[$i]\n";
      }
      print "\n";
   }
   else {
      print "None\n\n";
   }
   return;
}

sub print_row_differences {
   my ( $host1_results ) = @_;
   my $missing = $host1_results->{get_row_sths}->{missing_rows};
   my $diff    = $host1_results->{get_row_sths}->{different_rows};
   if ( @$missing ) {
      print "MISSING ROWS: ";
      print "\n\n";
      for my $i ( 0..(scalar @$missing - 1) ) {
         print $i+1, '. Missing on host',
            ($missing->[$i]->[0] ? "2: ".Dumper($missing->[$i]->[0])
                                 : "1: ".Dumper($missing->[$i]->[1])
            ),
      }
      print "\n";
   }
   if ( @$diff ) {
      print "DIFFERENT ROWS: ";
      print "\n\n";
      for my $i ( 0..(scalar @$diff - 1) ) {
         print $i+1, '. ', Dumper($diff->[$i]);
      }
      print "\n";
   }
   return;
}

sub get_cxn {
   my ( $dp, $o, $dsn ) = @_;
   if ( $o->get('ask-pass') ) {
      $dsn->{p} = OptionParser::prompt_noecho("Enter password: ");
   }
   my $dbh = $dp->get_dbh($dp->get_cxn_params($dsn), {AutoCommit => 1});
   return $dbh;
}

# Catches signals so we can exit gracefully.
sub sig_int {
   my ( $signal ) = @_;
   if ( $oktorun ) {
      print STDERR "# Caught SIG$signal.\n";
      $oktorun = 0;
   }
   else {
      print STDERR "# Exiting on SIG$signal.\n";
      exit(1);
   }
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

# ############################################################################
# Run the program.
# ############################################################################
if ( !caller ) { exit main(@ARGV); }

1; # Because this is a module as well as a script.

# #############################################################################
# Documentation.
# #############################################################################

=pod

=head1 NAME

mk-upgrade - Execute SQL statements against two MySQL servers and compare the results.

=head1 SYNOPSIS

Compare all queries in slow.log on host1 to host2:

   mk-upgrade h=host1 h=host2 slow.log

Use mk-query-digest to compare only 1 sample of each unique query:

  mk-query-digest --no-report --print --sample 1 slow.log \
    | mk-upgrade h=host1 h=host2

=head1 RISKS

The following section is included to inform users about the potential risks,
whether known or unknown, of using this tool.  The two main categories of risks
are those created by the nature of the tool (e.g. read-only tools vs. read-write
tools) and those created by bugs.

mk-upgrade is in active development.  It should only be used on test servers.

mk-upgrade executes queries on the given servers.  It filters out everything
except SELECT queries for safety.  If there were to be a bug in the filter,
other queries might be executed.  The queries it executes might cause extra load
on the server.

At the time of this release, we know of no bugs that could cause serious harm to
users.

The authoritative source for updated information is always the online issue
tracking system.  Issues that affect this tool will be marked as such.  You can
see a list of such issues at the following URL:
L<http://www.maatkit.org/bugs/mk-upgrade>.

See also L<"BUGS"> for more information on filing bugs and getting help.

=head1 DESCRIPTION

mk-upgrade measures differences in execution plans on two MySQL servers and
ranks the results.  This helps to test for bugs, changes in results, and
performance regressions before an upgrade or configuration change.  The higher a
query ranks, the greater its results differ.

mk-upgrade reads queries from slowlogs.  You should use mk-query-digest to
extract only one unique sample of each query from the original slowlog.  For
example:

  mk-query-digest --no-report --print --sample 1 slow.log \
    | mk-upgrade h=host1 h=host2

You can also use mk-query-digest to transform other input sources into a
slowlog by its C<--print --no-report> options.  For example, to get queries
from a tcpdump dump and then compare them with mk-audit:

  mk-query-digest --no-report --print --sample 1 --type tcpdump dump.txt \
    | mk-upgrade h=host1 h=host2

Queries are not yet fingerprinted or aggregated.  Every query is executed,
compared and ranked individually.

=head1 OUTPUT

Output is like a slow log: header lines that start with C<#> followed by a
query.  Like a slow log, the headers have attribute-value pairs such as
C<attribute: value>, where each pair is the result of some L<"COMPARISONS">
option.  Each attribute name begins with the server to which its value applies,
and each header line corresponds to one part of a comparison.  The first header
line is special: it's the query's difference rank.  The higher its rank, the
greater the difference in execution between the servers.

Here's a simple example:

  # Rank: 5
  # host1_Query_time: 0.123003  host2_Query_time: 0.231002
  # host1_warnings_count: 0  host2_warnings_count: 1
  SELECT id FROM table WHERE id < 10;

=head1 OPTIONS

=over

=item --all-errors

Print all errors for all comparisons.

Normally only the last error for each comparion is printed, but some
comparisons will save multiple errors.

=item --ask-pass

Prompt for a password when connecting to MySQL.

=item --base-dir

type: string; default: /tmp

Save result set outfiles in this directory.

=item --charset

short form: -A; type: string

Default character set.  If the value is utf8, sets Perl's binmode on
STDOUT to utf8, passes the mysql_enable_utf8 option to DBD::mysql, and
runs SET NAMES UTF8 after connecting to MySQL.  Any other value sets
binmode on STDOUT without the utf8 layer, and runs SET NAMES after
connecting to MySQL.

=item --config

type: Array

Read this comma-separated list of config files; if specified, this must be the
first option on the command line.

=item --continue-on-error

Continue working even if there is an error.

=item --daemonize

Fork to the background and detach from the shell.  POSIX
operating systems only.

=item --dump-results

Don't print a human-readable report, just dump the raw results.

=item --[no]errors

default: yes

Print last error for each operation (if any).

The last error for each query and each comparison is printed in a /* comment
block */ before the query.  C<--no-errors> will disable L<"--all-errors">.
See also L<"--all-errors"> and L<"--[no]reasons">.

=item --filter

type: string

Discard events for which this Perl code doesn't return true.

This option is a string of Perl code or a file containing Perl code that gets
compiled into a subroutine with one argument: $event.  This is a hashref.
If the given value is a readable file, then mk-upgrade reads the entire
file and uses its contents as the code.  The file should not contain
a shebang (#!/usr/bin/perl) line.

If the code returns true, the chain of callbacks continues; otherwise it ends.
The code is the last statement in the subroutine other than C<return $event>. 
The subroutine template is:

  sub { $event = shift; filter && return $event; }

Filters given on the command line are wrapped inside parentheses like like
C<( filter )>.  For complex, multi-line filters, you must put the code inside
a file so it will not be wrapped inside parentheses.  Either way, the filter
must produce syntactically valid code given the template.  For example, an
if-else branch given on the command line would not be valid:

  --filter 'if () { } else { }'  # WRONG

Since it's given on the command line, the if-else branch would be wrapped inside
parentheses which is not syntactically valid.  So to accomplish something more
complex like this would require putting the code in a file, for example
filter.txt:

  my $event_ok; if (...) { $event_ok=1; } else { $event_ok=0; } $event_ok

Then specify C<--filter filter.txt> to read the code from filter.txt.

If the filter code won't compile, mk-upgrade will die with an error.
If the filter code does compile, an error may still occur at runtime if the
code tries to do something wrong (like pattern match an undefined value).
mk-upgrade does not provide any safeguards so code carefully!

An example filter that discards everything but SELECT statements:

  --filter '$event->{arg} =~ m/^select/i'

This is compiled into a subroutine like the following:

  sub { $event = shift; ( $event->{arg} =~ m/^select/i ) && return $event; }

It is permissible for the code to have side effects (to alter $event).

You can find an explanation of the structure of $event at
L<http://code.google.com/p/maatkit/wiki/EventAttributes>.

=item --function

type: string

Which hash function you'd like to use for checksums.

The default is C<CRC32>.  Other good choices include C<MD5> and C<SHA1>.  If you
have installed the C<FNV_64> user-defined function, C<mk-table-sync> will detect
it and prefer to use it, because it is much faster than the built-ins.  You can
also use MURMUR_HASH if you've installed that user-defined function.  Both of
these are distributed with Maatkit.  See L<mk-table-checksum> for more
information and benchmarks.

=item --help

Show help and exit.

=item --host

short form: -h; type: string

Connect to host.

=item --log

type: string

Print all output to this file when daemonized.

=item --only-failed-queries

Report only queries that had at least one failure of any kind.

=item --password

short form: -p; type: string

Password to use when connecting.

=item --pid

type: string

Create the given PID file when daemonized.  The file contains the process
ID of the daemonized instance.  The PID file is removed when the
daemonized instance exits.  The program checks for the existence of the
PID file when starting; if it exists and the process with the matching PID
exists, the program exits.

=item --port

short form: -P; type: int

Port number to use for connection.

=item --[no]reasons

default: yes

Print reasons for each query explaining its rank.

The rank reasons are printed in a /* comment block */ before the query.
See also L<"--[no]errors">.

=item --run-time

type: time

How long to run before exiting.  The default is to run forever (you can
interrupt with CTRL-C).

=item --single-host

Run tests on a single host and print results.  The printed report will
still list Host2 values, but these are identical to the Host1 values.

=item --socket

short form: -S; type: string

Socket file to use for connection.

=item --tmp-table-database

type: string

Use this database for creating temporary tables.

If given, this database is used for creating temporary tables for the
results comparison (see L<"--[no]compare-results">).  Otherwise, the current
database (from the last event that specified its database) is used.

=item --user

short form: -u; type: string

User for login if not current user.

=item --version

Show version and exit.

=back

=head2 COMPARISONS

These comparisons determine the differences for various aspects of a query
when executed on the different hosts.  The more comparisons made, the more
difference can be detected.  Results for each comparison and each host are
reported in the query's header (see L<"OUTPUT">).

=over

=item --[no]compare-query-times

default: yes

Compare query execution times.

=item --[no]compare-results

default: yes; group: Comparisons

Compare result sets to find differences in rows, columns, etc.

What differences can be found depends on the L<"--compare-results-method"> used.

=item --compare-results-method

type: string; default: CHECKSUM

Method to use for L<"--[no]compare-results">.  This option has no effect
if C<--no-compare-results> is given.

Available compare methods (case-insensitive):

=over

=item CHECKSUM

Do C<CREATE TEMPORARY TABLE `mk_upgrade` AS query> then
C<CHECKSUM TABLE `mk_upgrade`>.

=item rows

Compare rows to find those which are missing or different.  Does not work
with L<"--single-host">.

=back

=item --[no]compare-warnings

default: yes; group: Comparisons

Compare warnings and errors.

=item --float-precision

type: int

Round float, double and decimal values to this many places.

This option helps eliminate false-positives caused by floating-point
imprecision.

=item --max-differences

type: int; default: 10

Stop comparing rows for C<--compare-results-method rows> after this many differences are found.

This counts for both missing and different rows.  The query's rank will not
be increased for the different rows past this value.  Therefore, this option
may lower the query's real rank because all different rows may not be found.

=back

=head1 DOWNLOADING

You can download Maatkit from Google Code at
L<http://code.google.com/p/maatkit/>, or you can get any of the tools
easily with a command like the following:

   wget http://www.maatkit.org/get/toolname
   or
   wget http://www.maatkit.org/trunk/toolname

Where C<toolname> can be replaced with the name (or fragment of a name) of any
of the Maatkit tools.  Once downloaded, they're ready to run; no installation is
needed.  The first URL gets the latest released version of the tool, and the
second gets the latest trunk code from Subversion.

=head1 ENVIRONMENT

The environment variable C<MKDEBUG> enables verbose debugging output in all of
the Maatkit tools:

   MKDEBUG=1 mk-....

=head1 SYSTEM REQUIREMENTS

You need Perl and some core packages that ought to be installed in any
reasonably new version of Perl.

=head1 BUGS

For list of known bugs see L<http://www.maatkit.org/bugs/mk-upgrade>.

Please use Google Code Issues and Groups to report bugs or request support:
L<http://code.google.com/p/maatkit/>.  You can also join #maatkit on Freenode to
discuss Maatkit.

Please include the complete command-line used to reproduce the problem you are
seeing, the version of all MySQL servers involved, the complete output of the
tool when run with L<"--version">, and if possible, debugging output produced by
running with the C<MKDEBUG=1> environment variable.

=head1 COPYRIGHT, LICENSE AND WARRANTY

This program is copyright 2009 Percona, Inc.
Feedback and improvements are welcome.

THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
systems, you can issue `man perlgpl' or `man perlartistic' to read these
licenses.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 59 Temple
Place, Suite 330, Boston, MA  02111-1307  USA.

=head1 AUTHOR

Baron Schwartz, Daniel Nichter

=head1 VERSION

This manual page documents Ver 0.9.3 Distrib 5014 $Revision: 4971 $.

=cut
