diff --git a/.travis.yml b/.travis.yml index 350c4eb..a50fc73 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,2 @@ -sudo: false -language: ruby -cache: bundler -rvm: - - jruby-1.7.23 -script: - - bundle exec rspec spec +import: +- logstash-plugins/.ci:travis/travis.yml@1.x \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 20254c5..41311cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,29 @@ -# 3.0.2 - - Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash -# 3.0.1 - - New dependency requirements for logstash-core for the 5.0 release +## 4.0.5 + - Fix socket leaks + - Add configuration for connection retrying with backoff. + - Add support for plugin shutdown. + +## 4.0.4 + - Docs: Set the default_codec doc attribute. + +## 4.0.3 + - Update gemspec summary + +## 4.0.2 + - Fix some documentation issues + +## 4.0.0 + - internal: Upgrade event API dependency to support Logstash 2.4 & 5.x. + - internal: Bump dependency to ftw library to fix compatibility problem. + +## 3.0.2 + - Depend on logstash-core-plugin-api instead of logstash-core, + removing the need to mass update plugins on major releases of + logstash + +## 3.0.1 + - New dependency requirements for logstash-core for the 5.0 release + ## 3.0.0 - The "mode" option's "server" value has been removed since server mode isn't supported (and never has been). This could potentially affect @@ -10,7 +32,9 @@ - The "url" option is now mandatory. ## 2.0.0 - - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, - instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895 + - Plugins were updated to follow the new shutdown semantic, this mainly + allows Logstash to instruct input plugins to terminate gracefully, + instead of using Thread.raise on the plugins' threads. Ref: + https://github.com/elastic/logstash/pull/3895 - Dependency on logstash-core update to 2.0 diff --git a/Gemfile b/Gemfile index d926697..32cc6fb 100644 --- a/Gemfile +++ b/Gemfile @@ -1,2 +1,11 @@ source 'https://rubygems.org/' -gemspec \ No newline at end of file + +gemspec + +logstash_path = ENV["LOGSTASH_PATH"] || "../../logstash" +use_logstash_source = ENV["LOGSTASH_SOURCE"] && ENV["LOGSTASH_SOURCE"].to_s == "1" + +if Dir.exist?(logstash_path) && use_logstash_source + gem 'logstash-core', :path => "#{logstash_path}/logstash-core" + gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api" +end diff --git a/LICENSE b/LICENSE index 43976b7..a80a3fd 100644 --- a/LICENSE +++ b/LICENSE @@ -1,13 +1,202 @@ -Copyright (c) 2012–2016 Elasticsearch -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ - http://www.apache.org/licenses/LICENSE-2.0 + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2020 Elastic and contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index 27720bd..91e3cb2 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Logstash Plugin -[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-input-websocket.svg)](https://travis-ci.org/logstash-plugins/logstash-input-websocket) +[![Travis Build Status](https://travis-ci.com/logstash-plugins/logstash-input-websocket.svg)](https://travis-ci.com/logstash-plugins/logstash-input-websocket) This is a plugin for [Logstash](https://github.com/elastic/logstash). diff --git a/docs/index.asciidoc b/docs/index.asciidoc new file mode 100644 index 0000000..f251493 --- /dev/null +++ b/docs/index.asciidoc @@ -0,0 +1,106 @@ +:plugin: websocket +:type: input +:default_codec: json + +/////////////////////////////////////////// +START - GENERATED VARIABLES, DO NOT EDIT! +/////////////////////////////////////////// +:version: %VERSION% +:release_date: %RELEASE_DATE% +:changelog_url: %CHANGELOG_URL% +:include_path: ../../../../logstash/docs/include +/////////////////////////////////////////// +END - GENERATED VARIABLES, DO NOT EDIT! +/////////////////////////////////////////// + +[id="plugins-{type}s-{plugin}"] + +=== Websocket input plugin + +include::{include_path}/plugin_header.asciidoc[] + +==== Description + +Read events over the websocket protocol. + +[id="plugins-{type}s-{plugin}-options"] +==== Websocket Input Configuration Options + +This plugin supports the following configuration options plus the <> described later. + +[cols="<,<,<",options="header",] +|======================================================================= +|Setting |Input type|Required +| <> |<>|No +| <> |<>, one of `["client"]`|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|Yes +|======================================================================= + +Also see <> for a list of options supported by all +input plugins. + +  + +[id="plugins-{type}s-{plugin}-debug_status"] +===== `debug_status` + + * Value type is <> + * Default value is `[]` + +Logs responses with the given HTTP status codes as debug instead +of warning. + +[id="plugins-{type}s-{plugin}-mode"] +===== `mode` + + * Value can be any of: `client` + * Default value is `"client"` + +Select the plugin's mode of operation. Right now only client mode +is supported, i.e. this plugin connects to a websocket server and +receives events from the server as websocket messages. + +[id="plugins-{type}s-{plugin}-retry_initial"] +===== `retry_initial` + + * Value type is <> + * Default value is `1` + +The retry interval in seconds connections start with. + +[id="plugins-{type}s-{plugin}-retry_max"] +===== `retry_max` + + * Value type is <> + * Default value is `60` + +The maximum retry interval in seconds backoff will increase to. + +[id="plugins-{type}s-{plugin}-retry_reset"] +===== `retry_reset` + + * Value type is <> + * Default value is `1` + +The number of log entries that have to be processed before +the retry interval is reset to retry_initial. This allows +connections to quickly recover from a small interruption +after we have successfully connected and processed some +entries. + +[id="plugins-{type}s-{plugin}-url"] +===== `url` + + * This is a required setting. + * Value type is <> + * There is no default value for this setting. + +The URL to connect to. + +[id="plugins-{type}s-{plugin}-common-options"] +include::{include_path}/{type}.asciidoc[] + +:default_codec!: diff --git a/lib/logstash/inputs/websocket.rb b/lib/logstash/inputs/websocket.rb index 6236cb6..6f961c8 100644 --- a/lib/logstash/inputs/websocket.rb +++ b/lib/logstash/inputs/websocket.rb @@ -1,8 +1,7 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" -require "socket" - +require "stud/interval" # Read events over the websocket protocol. class LogStash::Inputs::Websocket < LogStash::Inputs::Base @@ -13,6 +12,23 @@ class LogStash::Inputs::Websocket < LogStash::Inputs::Base # The URL to connect to. config :url, :validate => :string, :required => true + # The retry interval in seconds connections start with. + config :retry_initial, :validate => :number, :default => 1 + + # The maximum retry interval in seconds backoff will increase too. + config :retry_max, :validate => :number, :default => 60 + + # The number of log entries that have to be processed before + # the retry interval is reset to retry_initial. This allows + # connections to quickly recover from a small interruption + # after we have successfully connected and processed some + # entries. + config :retry_reset, :validate => :number, :default => 1 + + # Logs responses with the given HTTP status codes as debug instead + # of warning. + config :debug_status, :validate => :array, :default => [] + # Select the plugin's mode of operation. Right now only client mode # is supported, i.e. this plugin connects to a websocket server and # receives events from the server as websocket messages. @@ -20,25 +36,86 @@ class LogStash::Inputs::Websocket < LogStash::Inputs::Base def register require "ftw" + require "uri" + + p = URI.parse(@url) + p.userinfo = "***:***" unless p.userinfo.nil? + + @url_safe = p.to_s + @interval = @retry_initial + @agent = FTW::Agent.new + @processed = 0 end # def register public def run(output_queue) - agent = FTW::Agent.new - begin - websocket = agent.websocket!(@url) - websocket.each do |payload| + @logger.info("Starting", :url => @url_safe) + while !stop? + run_single(output_queue) + Stud.stoppable_sleep(@interval) { stop? } + backoff() + end # loop + end # def run + + def stop + # Force close all connections to escape any blocking reads. + cleanup() + end # def stop + + private + def cleanup() + @agent.shutdown() rescue nil + end # def cleanup + + def run_single(output_queue) + @processed = 0 + r = @agent.websocket!(@url) + if r.instance_of?(FTW::WebSocket) + r.each do |payload| @codec.decode(payload) do |event| decorate(event) output_queue << event + @processed += 1 end end - rescue => e - @logger.warn("websocket input client threw exception, restarting", - :exception => e) - sleep(1) - retry - end # begin - end # def run + elsif r.instance_of?(FTW::Response) + if @debug_status.include?(r.status) + @logger.debug("Request failed", + :status => r.status_line, + :url => @url_safe, + :retry => @interval) unless stop? + else + @logger.warn("Request failed", + :status => r.status_line, + :url => @url_safe, + :retry => @interval) unless stop? + end + else + @logger.warn("Request unexpected type", + :type => r.class.name, + :url => @url_safe, + :retry => @interval) unless stop? + end + rescue EOFError => e + @logger.debug("Run error ", + :exception => e, + :url => @url_safe, + :retry => @interval) unless stop? + rescue => e + @logger.warn("Run error ", + :exception => e, + :url => @url_safe, + :retry => @interval) unless stop? + ensure + cleanup() + end # def run_single + + def backoff() + if @processed > @retry_reset + @interval = @retry_initial + else + @interval = [@interval * 2, @retry_max].min + end + end # def backoff end # class LogStash::Inputs::Websocket diff --git a/logstash-input-websocket.gemspec b/logstash-input-websocket.gemspec index 86c31a4..1c746c9 100644 --- a/logstash-input-websocket.gemspec +++ b/logstash-input-websocket.gemspec @@ -1,9 +1,9 @@ Gem::Specification.new do |s| s.name = 'logstash-input-websocket' - s.version = '3.0.2' + s.version = '4.0.5' s.licenses = ['Apache License (2.0)'] - s.summary = "Read events over the websocket protocol." + s.summary = "Reads events from a websocket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" s.authors = ["Elastic"] s.email = 'info@elastic.co' @@ -11,7 +11,7 @@ Gem::Specification.new do |s| s.require_paths = ["lib"] # Files - s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT'] + s.files = Dir["lib/**/*","spec/**/*","*.gemspec","*.md","CONTRIBUTORS","Gemfile","LICENSE","NOTICE.TXT", "vendor/jar-dependencies/**/*.jar", "vendor/jar-dependencies/**/*.rb", "VERSION", "docs/**/*"] # Tests s.test_files = s.files.grep(%r{^(test|spec|features)/}) @@ -20,10 +20,10 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" } # Gem dependencies - s.add_runtime_dependency "logstash-core-plugin-api", "~> 1.0" + s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-json' - s.add_runtime_dependency 'ftw', ['~> 0.0.40'] + s.add_runtime_dependency 'ftw', ['~> 0.0.46'] s.add_development_dependency 'logstash-devutils' end