Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ include::{include_path}/plugin_header.asciidoc[]
Read rows from an sqlite database.

This is most useful in cases where you are logging directly to a table.
Any tables being watched must have an `id` column that is monotonically
Any tables being watched must have a specified column that is monotonically
increasing.

All tables are read by default except:
Expand Down Expand Up @@ -85,6 +85,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-batch>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-exclude_tables>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-id_field>> |<<string,string>>|NO
| <<plugins-{type}s-{plugin}-path>> |<<string,string>>|Yes
|=======================================================================

Expand All @@ -110,6 +111,14 @@ How many rows to fetch at a time from each `SELECT` call.
Any tables to exclude by name.
By default all tables are followed.

[id="plugins-{type}s-{plugin}-id_field"]
===== `path`

* Value type is <<string,string>>
* Default value is `id`.

The name of the field in the database that is monotonically increasing.

[id="plugins-{type}s-{plugin}-path"]
===== `path`

Expand Down
13 changes: 8 additions & 5 deletions lib/logstash/inputs/sqlite.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class LogStash::Inputs::Sqlite < LogStash::Inputs::Base
# How many rows to fetch at a time from each `SELECT` call.
config :batch, :validate => :number, :default => 5

# The name of the monotonically increasing "id" field
config :id_field, :validate => :string, :default => "id"

SINCE_TABLE = :since_table

public
Expand Down Expand Up @@ -121,7 +124,7 @@ def get_all_tables(db)
public
def get_n_rows_from_table(db, table, offset, limit)
dataset = db["SELECT * FROM #{table}"]
return db["SELECT * FROM #{table} WHERE (id > #{offset}) ORDER BY 'id' LIMIT #{limit}"].map { |row| row }
return db["SELECT * FROM #{table} WHERE (#{@id_field} > #{offset}) ORDER BY '#{@id_field}' LIMIT #{limit}"].map { |row| row }
end

public
Expand Down Expand Up @@ -156,16 +159,16 @@ def run(queue)
@logger.debug("offset is #{offset}", :k => k, :table => table_name)
rows = get_n_rows_from_table(@db, table_name, offset, @batch)
count += rows.count
rows.each do |row|
event = LogStash::Event.new("host" => @host, "db" => @db)
rows.each do |row|
event = LogStash::Event.new("host" => @host, "db" => @path)
decorate(event)
# store each column as a field in the event.
row.each do |column, element|
next if column == :id
next if column == @id_field
event.set(column.to_s, element)
end
queue << event
@table_data[k][:place] = row[:id]
@table_data[k][:place] = row[@id_field.to_sym]
end
# Store the last-seen row in the database
update_placeholder(@db, table_name, @table_data[k][:place])
Expand Down