[![Build Status](https://travis-ci.org/avast/RabbitMQ-Consumer-Batcher.svg?branch=master)](https://travis-ci.org/avast/RabbitMQ-Consumer-Batcher) # NAME RabbitMQ::Consumer::Batcher - batch consumer of RMQ messages # SYNOPSIS use AnyEvent; use AnyEvent::RabbitMQ::PubSub; use AnyEvent::RabbitMQ::PubSub::Consumer; use RabbitMQ::Consumer::Batcher; my ($rmq_connection, $channel) = AnyEvent::RabbitMQ::PubSub::connect( host => 'localhost', port => 5672, user => 'guest', pass => 'guest', vhost => '/', ); my $exchange = { exchange => 'my_test_exchange', type => 'topic', durable => 0, auto_delete => 1, }; my $queue = { queue => 'my_test_queue'; auto_delete => 1, }; my $routing_key = 'my_rk'; my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new( channel => $channel, exchange => $exchange, queue => $queue, routing_key => $routing_key, ); $consumer->init(); #declares channel, queue and binding my $batcher = RabbitMQ::Consumer::Batcher->new( batch_size => $consumer->prefetch_count, on_add => sub { my ($batcher, $msg) = @_; my $decode_payload = decode_payload($msg->{header}, $msg->{body}->payload()); return $decode_payload; }, on_add_catch => sub { my ($batcher, $msg, $exception) = @_; if ($exception->$_isa('failure') && $exception->{payload}{stats_key}) { $stats->increment($exception->{payload}{stats_key}); } if ($exception->$_isa('failure') && $exception->{payload}{reject}) { $batcher->reject($msg); $log->error("consume failed - reject: $exception\n".$msg->{body}->payload()); } else { $batcher->reject_and_republish($msg); $log->error("consume failed - republish: $exception"); } }, on_batch_complete => sub { my ($batcher, $batch) = @_; path(...)->spew(join "\t", map { $_->value() } @$batch); }, on_batch_complete_catch => sub { my ($batcher, $batch, $exception) = @_; $log->error("save messages to file failed: $exception"); } ); my $cv = AnyEvent->condvar(); $consumer->consume($cv, $batcher->consume_code())->then(sub { say 'Consumer was started...'; }); # DESCRIPTION If you need batch of messages from RabbitMQ - this module is for you. This module work well with [AnyEvent::RabbitMQ::PubSub::Consumer](https://metacpan.org/pod/AnyEvent::RabbitMQ::PubSub::Consumer) Idea of this module is - in _on\_add_ phase is message validate and if is corrupted, can be reject. In _on\_batch\_complete_ phase we manipulated with message which we don't miss. If is some problem in this phase, messages are republished.. # METHODS ## new(%attributes) ### attributes #### batch\_size Max batch size (trigger for `on_batch_complete`) `batch_size` must be `prefetch_count` or bigger! this is required attribute #### on\_add this callback are called after consume one single message. Is usefully for decoding for example. return value of callback are used as value in batch item ([RabbitMQ::Consumer::Batcher::Item](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item)) default behaviour is payload of message is used as item in batch return sub { my($batcher, $msg) = @_; return $msg->{body}->payload() } parameters which are give to callback: - `$batcher` self instance of [RabbitMQ::Consumer::Batcher](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher) - `$msg` consumed message ["on\_consume" in AnyEvent::RabbitMQ::Channel](https://metacpan.org/pod/AnyEvent::RabbitMQ::Channel#on_consume) #### on\_add\_catch this callback are called if `on_add` callback throws default behaviour do reject message return sub { my ($batcher, $msg, $exception) = @_; $batcher->reject($msg); } parameters which are give to callback: - `$batcher` self instance of [RabbitMQ::Consumer::Batcher](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher) - `$msg` consumed message ["on\_consume" in AnyEvent::RabbitMQ::Channel](https://metacpan.org/pod/AnyEvent::RabbitMQ::Channel#on_consume) - `$exception` exception string #### on\_batch\_complete this callback is triggered if batch is complete (count of items is `batch_size`) this is required attribute parameters which are give to callback: - `$batcher` self instance of [RabbitMQ::Consumer::Batcher](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher) - `$batch` batch is _ArrayRef_ of [RabbitMQ::Consumer::Batcher::Item](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item) example `on_batch_complete` _CodeRef_ (item _value_ are _string_s) return sub { my($batcher, $batch) = @_; print join "\n", map { $_->value() } @$batch; $batcher->ack($batch); } #### on\_batch\_complete\_catch this callback are called if `on_batch_complete` callback throws after this callback is batch _reject\_and\_republish_ If you need change _reject\_and\_republish_ of batch to (for example) _reject_, you can do: return sub { my ($batcher, $batch, $exception) = @_; $batcher->reject($batch); #batch_clean must be called, #because reject_and_republish after this exception handler will be called to... $batcher->batch_clean(); } parameters which are give to callback: - `$batcher` self instance of [RabbitMQ::Consumer::Batcher](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher) - `$batch` _ArrayRef_ of [RabbitMQ::Consumer::Batcher::Item](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item)s - `$exception` exception string ## consume\_code() return `sub{}` for handling messages in `consume` method of [AnyEvent::RabbitMQ::PubSub::Consumer](https://metacpan.org/pod/AnyEvent::RabbitMQ::PubSub::Consumer) $consumer->consume($cv, $batcher->consume_code()); ## ack(@items) ack all `@items` (instances of [RabbitMQ::Consumer::Batcher::Item](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item) or [RabbitMQ::Consumer::Batcher::Message](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Message)) ## reject(@items) reject all `@items` (instances of [RabbitMQ::Consumer::Batcher::Item](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item) or [RabbitMQ::Consumer::Batcher::Message](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Message)) ## reject\_and\_republish(@items) reject and republish all `@items` (instances of [RabbitMQ::Consumer::Batcher::Item](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item) or [RabbitMQ::Consumer::Batcher::Message](https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Message)) # contributing for dependency use [cpanfile](https://metacpan.org/pod/cpanfile)... for resolve dependency use [Carton](https://metacpan.org/pod/Carton) (or [Carmel](https://metacpan.org/pod/Carmel) - is more experimental) carton install for run test use `minil test` carton exec minil test if you don't have perl environment, is best way use docker docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton install docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton exec minil test ## warning docker run default as root, all files which will be make in docker will be have root rights one solution is change rights in docker docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended bash -c "carton install; chmod -R 0777 ." or after docker command (but you must have root rights) # LICENSE Copyright (C) Avast Software. This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. # AUTHOR Jan Seidl