From f1aec4566559498fffd8d518ed1ce1061cf4333b Mon Sep 17 00:00:00 2001 From: Robin Hahling Date: Wed, 14 Mar 2018 22:14:55 +0100 Subject: [PATCH] add support for the LPUSH command as a list operation The LPUSH command inserts at the head of the list, as opposed to RPUSH which inserts at the head of the list. The default list operation remains RPUSH. However, LPUSH may be set via the list_operation configuration option. While here, fix documentation about the minimum version for RPUSH/LPUSH support, which is 1.0.0 according to Redis documentation. --- docs/index.asciidoc | 7 ++++--- lib/logstash/outputs/redis.rb | 35 ++++++++++++++++++++++++----------- logstash-output-redis.gemspec | 2 +- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 9406057..b61475c 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -20,9 +20,10 @@ include::{include_path}/plugin_header.asciidoc[] ==== Description -This output will send events to a Redis queue using RPUSH. -The RPUSH command is supported in Redis v0.0.7+. Using -PUBLISH to a channel requires at least v1.3.8+. +This output will send events to a Redis queue using RPUSH +(or, optionally, LPUSH). +The RPUSH and LPUSH commands are supported in Redis v1.0.0+. +Using PUBLISH to a channel requires at least v1.3.8+. While you may be able to make these Redis versions work, the best performance and stability will be found in more recent stable versions. Versions 2.6.0+ are recommended. diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index 81e3c9b..f8e66c5 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -3,9 +3,10 @@ require "logstash/namespace" require "stud/buffer" -# This output will send events to a Redis queue using RPUSH. -# The RPUSH command is supported in Redis v0.0.7+. Using -# PUBLISH to a channel requires at least v1.3.8+. +# This output will send events to a Redis queue using RPUSH +# or LPUSH. +# The RPUSH and LPUSH commands are supported in Redis v1.0.0+. +# Using PUBLISH to a channel requires at least v1.3.8+. # While you may be able to make these Redis versions work, # the best performance and stability will be found in more # recent stable versions. Versions 2.6.0+ are recommended. @@ -56,15 +57,19 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # valid here, for example `logstash-%{type}`. config :key, :validate => :string, :required => true - # Either list or channel. If `redis_type` is list, then we will set - # RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`. + # Either list or channel. If `redis_type` is list, then we will set RPUSH (or + # LPUSH) to key. If `redis_type` is channel, then we will PUBLISH to `key`. config :data_type, :validate => [ "list", "channel" ], :required => true - # Set to true if you want Redis to batch up values and send 1 RPUSH command - # instead of one command per value to push on the list. Note that this only - # works with `data_type="list"` mode right now. + # Either RPUSH or LPUSH. Only relevant if `data_type` is list. Defines if + # elements shall be inserted at the head of the list (LPUSH) or the tail (RPUSH). + config :list_operation, :validate => ["RPUSH", "LPUSH"], :default => "RPUSH" + + # Set to true if you want Redis to batch up values and send 1 RPUSH (or + # LPUSH) command instead of one command per value to push on the list. + # Note that this only works with `data_type="list"` mode right now. # - # If true, we send an RPUSH every "batch_events" events or + # If true, we send an RPUSH (or LPUSH) every "batch_events" events or # "batch_timeout" seconds (whichever comes first). # Only supported for `data_type` is "list". config :batch, :validate => :boolean, :default => false @@ -154,7 +159,11 @@ def flush(events, key, close=false) # we should not block due to congestion on close # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value. congestion_check(key) unless close - @redis.rpush(key, events) + if @list_operation == 'LPUSH' + @redis.lpush(key, events) + else + @redis.rpush(key, events) + end end # called from Stud::Buffer#buffer_flush when an error occurs def on_flush_error(e) @@ -219,7 +228,11 @@ def send_to_redis(event, payload) @redis ||= connect if @data_type == 'list' congestion_check(key) - @redis.rpush(key, payload) + if @list_operation == 'LPUSH' + @redis.lpush(key, payload) + else + @redis.rpush(key, payload) + end else @redis.publish(key, payload) end diff --git a/logstash-output-redis.gemspec b/logstash-output-redis.gemspec index 32e9cb1..3216100 100644 --- a/logstash-output-redis.gemspec +++ b/logstash-output-redis.gemspec @@ -3,7 +3,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-redis' s.version = '4.0.3' s.licenses = ['Apache License (2.0)'] - s.summary = "Sends events to a Redis queue using the `RPUSH` command" + s.summary = "Sends events to a Redis queue using the `RPUSH` or `LPUSH` command" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" s.authors = ["Elastic"] s.email = 'info@elastic.co'