diff --git a/.gitignore b/.gitignore index 2288186..b862e19 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ pkg # # For vim: #*.swp + diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..3be9c3c --- /dev/null +++ b/Gemfile @@ -0,0 +1,2 @@ +source "https://rubygems.org/" +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..32e1ff6 --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,21 @@ +PATH + remote: . + specs: + statsd-ruby (0.4.0.github) + +GEM + remote: https://rubygems.org/ + specs: + minitest (5.9.0) + rake (11.2.2) + +PLATFORMS + ruby + +DEPENDENCIES + minitest (~> 5.9) + rake (~> 11.2) + statsd-ruby! + +BUNDLED WITH + 1.11.2 diff --git a/README.rdoc b/README.rdoc index 29588e9..4843c55 100644 --- a/README.rdoc +++ b/README.rdoc @@ -5,7 +5,7 @@ A Ruby statsd client (https://github.com/etsy/statsd) = Installing Bundler: - gem "statsd-ruby", :require => "statsd" + gem "statsd-ruby", :require => "github/statsd" = Testing @@ -14,7 +14,7 @@ Run the specs with rake spec Run the specs and include live integration specs with LIVE=true rake spec. Note: This will test over a real UDP socket. == Contributing to statsd - + * Check out the latest master to make sure the feature hasn't been implemented or the bug hasn't been fixed yet * Check out the issue tracker to make sure someone already hasn't requested it and/or contributed it * Fork the project diff --git a/Rakefile b/Rakefile index 342e4b4..eb1f8d0 100644 --- a/Rakefile +++ b/Rakefile @@ -1,23 +1,6 @@ require 'rubygems' require 'rake' -require 'jeweler' -Jeweler::Tasks.new do |gem| - # gem is a Gem::Specification... see http://docs.rubygems.org/read/chapter/20 for more options - gem.name = "statsd-ruby" - gem.homepage = "http://github.com/reinh/statsd" - gem.license = "MIT" - gem.summary = %Q{A Statsd client in Ruby} - gem.description = %Q{A Statsd client in Ruby} - gem.email = "rein@phpfog.com" - gem.authors = ["Rein Henrichs"] - gem.add_development_dependency "minitest", ">= 0" - gem.add_development_dependency "yard", "~> 0.6.0" - gem.add_development_dependency "jeweler", "~> 1.5.2" - gem.add_development_dependency "rcov", ">= 0" -end -Jeweler::RubygemsDotOrgTasks.new - require 'rake/testtask' Rake::TestTask.new(:spec) do |spec| spec.libs << 'lib' << 'spec' @@ -25,15 +8,4 @@ Rake::TestTask.new(:spec) do |spec| spec.verbose = true end -require 'rcov/rcovtask' -Rcov::RcovTask.new do |spec| - spec.libs << 'lib' << 'spec' - spec.pattern = 'spec/**/*_spec.rb' - spec.verbose = true - spec.rcov_opts << "--exclude spec,gems" -end - task :default => :spec - -require 'yard' -YARD::Rake::YardocTask.new diff --git a/lib/github/statsd.rb b/lib/github/statsd.rb new file mode 100644 index 0000000..56730e8 --- /dev/null +++ b/lib/github/statsd.rb @@ -0,0 +1,255 @@ +require 'socket' +require 'zlib' + +module GitHub + # = Statsd: A Statsd client (https://github.com/etsy/statsd) + # + # @example Set up a global Statsd client for a server on localhost:8125 + # $statsd = Statsd.new 'localhost', 8125 + # @example Send some stats + # $statsd.increment 'garets' + # $statsd.timing 'glork', 320 + # @example Use {#time} to time the execution of a block + # $statsd.time('account.activate') { @account.activate! } + # @example Create a namespaced statsd client and increment 'account.activate' + # statsd = Statsd.new('localhost').tap{|sd| sd.namespace = 'account'} + # statsd.increment 'activate' + class Statsd + class UDPClient + attr_reader :sock + + def initialize(address, port = nil) + address, port = address.split(':') if address.include?(':') + addrinfo = Addrinfo.ip(address) + + @sock = UDPSocket.new(addrinfo.pfamily) + @sock.connect(addrinfo.ip_address, port) + end + + def send(msg) + sock.write(msg) + rescue SystemCallError + nil + end + end + + class SecureUDPClient < UDPClient + def initialize(address, port, key) + super(address, port) + @key = key + end + + def send(msg) + super(signed_payload(msg)) + end + + private + # defer loading openssl and securerandom unless needed. this shaves ~10ms off + # of baseline require load time for environments that don't require message signing. + def self.setup_openssl + @sha256 ||= begin + require 'securerandom' + require 'openssl' + OpenSSL::Digest::SHA256.new + end + end + + def signed_payload(message) + sha256 = SecureUDPClient.setup_openssl + payload = timestamp + nonce + message + signature = OpenSSL::HMAC.digest(sha256, @key, payload) + signature + payload + end + + def timestamp + [Time.now.to_i].pack("Q<") + end + + def nonce + SecureRandom.random_bytes(4) + end + end + + # A namespace to prepend to all statsd calls. + attr_reader :namespace + + def namespace=(namespace) + @namespace = namespace + @prefix = namespace ? "#{@namespace}." : "".freeze + end + + # All the endpoints where StatsD will report metrics + attr_reader :shards + + # The client class used to initialize shard instances and send metrics. + attr_reader :client_class + + #characters that will be replaced with _ in stat names + RESERVED_CHARS_REGEX = /[\:\|\@]/ + + COUNTER_TYPE = "c".freeze + TIMING_TYPE = "ms".freeze + GAUGE_TYPE = "g".freeze + HISTOGRAM_TYPE = "h".freeze + + def initialize(client_class = nil) + @shards = [] + @client_class = client_class || UDPClient + self.namespace = nil + end + + def self.simple(addr, port = nil) + self.new.add_shard(addr, port) + end + + def add_shard(*args) + @shards << @client_class.new(*args) + self + end + + def enable_buffering(buffer_size = nil) + return if @buffering + @shards.map! { |client| Buffer.new(client, buffer_size) } + @buffering = true + end + + def disable_buffering + return unless @buffering + flush_all + @shards.map! { |client| client.base_client } + @buffering = false + end + + def flush_all + return unless @buffering + @shards.each { |client| client.flush } + end + + + # Sends an increment (count = 1) for the given stat to the statsd server. + # + # @param stat (see #count) + # @param sample_rate (see #count) + # @see #count + def increment(stat, sample_rate=1); count stat, 1, sample_rate end + + # Sends a decrement (count = -1) for the given stat to the statsd server. + # + # @param stat (see #count) + # @param sample_rate (see #count) + # @see #count + def decrement(stat, sample_rate=1); count stat, -1, sample_rate end + + # Sends an arbitrary count for the given stat to the statsd server. + # + # @param [String] stat stat name + # @param [Integer] count count + # @param [Integer] sample_rate sample rate, 1 for always + def count(stat, count, sample_rate=1); send stat, count, COUNTER_TYPE, sample_rate end + + # Sends an arbitary gauge value for the given stat to the statsd server. + # + # @param [String] stat stat name. + # @param [Numeric] gauge value. + # @example Report the current user count: + # $statsd.gauge('user.count', User.count) + def gauge(stat, value) + send stat, value, GAUGE_TYPE + end + + # Sends a timing (in ms) for the given stat to the statsd server. The + # sample_rate determines what percentage of the time this report is sent. The + # statsd server then uses the sample_rate to correctly track the average + # timing for the stat. + # + # @param stat stat name + # @param [Integer] ms timing in milliseconds + # @param [Integer] sample_rate sample rate, 1 for always + def timing(stat, ms, sample_rate=1); send stat, ms, TIMING_TYPE, sample_rate end + + # Reports execution time of the provided block using {#timing}. + # + # @param stat (see #timing) + # @param sample_rate (see #timing) + # @yield The operation to be timed + # @see #timing + # @example Report the time (in ms) taken to activate an account + # $statsd.time('account.activate') { @account.activate! } + def time(stat, sample_rate=1) + start = Time.now + result = yield + timing(stat, ((Time.now - start) * 1000).round(5), sample_rate) + result + end + + # Sends a histogram measurement for the given stat to the statsd server. The + # sample_rate determines what percentage of the time this report is sent. The + # statsd server then uses the sample_rate to correctly track the average + # for the stat. + def histogram(stat, value, sample_rate=1); send stat, value, HISTOGRAM_TYPE, sample_rate end + + private + def sampled(sample_rate) + yield unless sample_rate < 1 and rand > sample_rate + end + + def send(stat, delta, type, sample_rate=1) + sampled(sample_rate) do + stat = stat.to_s.dup + stat.gsub!(/::/, ".".freeze) + stat.gsub!(RESERVED_CHARS_REGEX, "_".freeze) + + msg = String.new + msg << @prefix + msg << stat + msg << ":".freeze + msg << delta.to_s + msg << "|".freeze + msg << type + if sample_rate < 1 + msg << "|@".freeze + msg << sample_rate.to_s + end + + shard = select_shard(stat) + shard.send(msg) + end + end + + def select_shard(stat) + if @shards.size == 1 + @shards.first + else + @shards[Zlib.crc32(stat) % @shards.size] + end + end + + class Buffer + DEFAULT_BUFFER_CAP = 512 + + attr_reader :base_client + attr_accessor :flush_count + + def initialize(client, buffer_cap = nil) + @base_client = client + @buffer = String.new + @buffer_cap = buffer_cap || DEFAULT_BUFFER_CAP + @flush_count = 0 + end + + def flush + return unless @buffer.bytesize > 0 + @base_client.send(@buffer) + @buffer.clear + @flush_count += 1 + end + + def send(msg) + flush if @buffer.bytesize + msg.bytesize >= @buffer_cap + @buffer << msg + @buffer << "\n".freeze + nil + end + end + end +end diff --git a/lib/statsd.rb b/lib/statsd.rb deleted file mode 100644 index cbd3e539..0000000 --- a/lib/statsd.rb +++ /dev/null @@ -1,161 +0,0 @@ -require 'openssl' -require 'securerandom' -require 'socket' -require 'time' -require 'zlib' - -# = Statsd: A Statsd client (https://github.com/etsy/statsd) -# -# @example Set up a global Statsd client for a server on localhost:8125 -# $statsd = Statsd.new 'localhost', 8125 -# @example Send some stats -# $statsd.increment 'garets' -# $statsd.timing 'glork', 320 -# @example Use {#time} to time the execution of a block -# $statsd.time('account.activate') { @account.activate! } -# @example Create a namespaced statsd client and increment 'account.activate' -# statsd = Statsd.new('localhost').tap{|sd| sd.namespace = 'account'} -# statsd.increment 'activate' -class Statsd - class Host - attr_reader :ip, :port, :key - def initialize(host, port, key = nil) - @ip = Addrinfo.ip(host).ip_address - @port = port - @key = key - end - end - - # A namespace to prepend to all statsd calls. - attr_accessor :namespace - - #characters that will be replaced with _ in stat names - RESERVED_CHARS_REGEX = /[\:\|\@]/ - - # Digest object as a constant - SHA256 = OpenSSL::Digest::SHA256.new - - class << self - # Set to any standard logger instance (including stdlib's Logger) to enable - # stat logging using logger.debug - attr_accessor :logger - end - - # @param [String] host your statsd host - # @param [Integer] port your statsd port - def initialize(host, port=8125, key=nil) - @hosts = [] - add_host(host, port, key) - end - - def add_host(host, port = nil, key = nil) - host, port = host.split(':') if host.include?(':') - @hosts << Host.new(host, port.to_i, key) - end - - # Sends an increment (count = 1) for the given stat to the statsd server. - # - # @param stat (see #count) - # @param sample_rate (see #count) - # @see #count - def increment(stat, sample_rate=1); count stat, 1, sample_rate end - - # Sends a decrement (count = -1) for the given stat to the statsd server. - # - # @param stat (see #count) - # @param sample_rate (see #count) - # @see #count - def decrement(stat, sample_rate=1); count stat, -1, sample_rate end - - # Sends an arbitrary count for the given stat to the statsd server. - # - # @param [String] stat stat name - # @param [Integer] count count - # @param [Integer] sample_rate sample rate, 1 for always - def count(stat, count, sample_rate=1); send stat, count, 'c', sample_rate end - - # Sends an arbitary gauge value for the given stat to the statsd server. - # - # @param [String] stat stat name. - # @param [Numeric] gauge value. - # @example Report the current user count: - # $statsd.gauge('user.count', User.count) - def gauge(stat, value) - send stat, value, 'g' - end - - # Sends a timing (in ms) for the given stat to the statsd server. The - # sample_rate determines what percentage of the time this report is sent. The - # statsd server then uses the sample_rate to correctly track the average - # timing for the stat. - # - # @param stat stat name - # @param [Integer] ms timing in milliseconds - # @param [Integer] sample_rate sample rate, 1 for always - def timing(stat, ms, sample_rate=1); send stat, ms, 'ms', sample_rate end - - # Reports execution time of the provided block using {#timing}. - # - # @param stat (see #timing) - # @param sample_rate (see #timing) - # @yield The operation to be timed - # @see #timing - # @example Report the time (in ms) taken to activate an account - # $statsd.time('account.activate') { @account.activate! } - def time(stat, sample_rate=1) - start = Time.now - result = yield - timing(stat, ((Time.now - start) * 1000).round(5), sample_rate) - result - end - - private - - def sampled(sample_rate) - yield unless sample_rate < 1 and rand > sample_rate - end - - def send(stat, delta, type, sample_rate=1) - sampled(sample_rate) do - prefix = "#{@namespace}." unless @namespace.nil? - stat = stat.to_s.gsub('::', '.').gsub(RESERVED_CHARS_REGEX, '_') - msg = "#{prefix}#{stat}:#{delta}|#{type}#{'|@' << sample_rate.to_s if sample_rate < 1}" - send_to_socket(select_host(stat), msg) - end - end - - def send_to_socket(host, message) - self.class.logger.debug {"Statsd: #{message}"} if self.class.logger - if host.key.nil? - socket.send(message, 0, host.ip, host.port) - else - socket.send(signed_payload(host.key, message), 0, host.ip, host.port) - end - rescue => boom - self.class.logger.error {"Statsd: #{boom.class} #{boom}"} if self.class.logger - end - - def select_host(stat) - if @hosts.size == 1 - @hosts.first - else - @hosts[Zlib.crc32(stat) % @hosts.size] - end - end - - def signed_payload(key, message) - payload = timestamp + nonce + message - signature = OpenSSL::HMAC.digest(SHA256, key, payload) - signature + payload - end - - def timestamp - [Time.now.to_i].pack("Q<") - end - - def nonce - SecureRandom.random_bytes(4) - end - - def socket; @socket ||= UDPSocket.new end -end diff --git a/spec/helper.rb b/spec/helper.rb index caffc48..5e5c524 100644 --- a/spec/helper.rb +++ b/spec/helper.rb @@ -3,7 +3,7 @@ $LOAD_PATH.unshift(File.dirname(__FILE__)) $LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) -require 'statsd' +require 'github/statsd' require 'logger' class FakeUDPSocket diff --git a/spec/statsd_spec.rb b/spec/statsd_spec.rb index ab6c582..14d0b21 100644 --- a/spec/statsd_spec.rb +++ b/spec/statsd_spec.rb @@ -1,39 +1,39 @@ -require 'helper' +require_relative './helper' -describe Statsd do +describe GitHub::Statsd do before do - @statsd = Statsd.new('localhost', 1234) + @statsd = GitHub::Statsd.new(FakeUDPSocket) + @statsd.add_shard class << @statsd public :sampled # we need to test this - attr_reader :host, :port # we also need to test this - def socket; @socket ||= FakeUDPSocket.new end end end - after { @statsd.socket.clear } + after { @statsd.shards.first.clear } describe "#initialize" do - it "should set the host and port" do - @statsd.host.must_equal 'localhost' - @statsd.port.must_equal 1234 + it "should default client class to UDPClient" do + statsd = GitHub::Statsd.new + statsd.client_class.must_equal GitHub::Statsd::UDPClient end - it "should default the port to 8125" do - Statsd.new('localhost').instance_variable_get('@port').must_equal 8125 + it "should allow changing client class" do + statsd = GitHub::Statsd.new(FakeUDPSocket) + statsd.client_class.must_equal FakeUDPSocket end end describe "#increment" do it "should format the message according to the statsd spec" do @statsd.increment('foobar') - @statsd.socket.recv.must_equal ['foobar:1|c'] + @statsd.shards.first.recv.must_equal ['foobar:1|c'] end describe "with a sample rate" do before { class << @statsd; def rand; 0; end; end } # ensure delivery it "should format the message according to the statsd spec" do @statsd.increment('foobar', 0.5) - @statsd.socket.recv.must_equal ['foobar:1|c|@0.5'] + @statsd.shards.first.recv.must_equal ['foobar:1|c|@0.5'] end end end @@ -41,14 +41,14 @@ def socket; @socket ||= FakeUDPSocket.new end describe "#decrement" do it "should format the message according to the statsd spec" do @statsd.decrement('foobar') - @statsd.socket.recv.must_equal ['foobar:-1|c'] + @statsd.shards.first.recv.must_equal ['foobar:-1|c'] end describe "with a sample rate" do before { class << @statsd; def rand; 0; end; end } # ensure delivery it "should format the message according to the statsd spec" do @statsd.decrement('foobar', 0.5) - @statsd.socket.recv.must_equal ['foobar:-1|c|@0.5'] + @statsd.shards.first.recv.must_equal ['foobar:-1|c|@0.5'] end end end @@ -56,14 +56,14 @@ def socket; @socket ||= FakeUDPSocket.new end describe "#timing" do it "should format the message according to the statsd spec" do @statsd.timing('foobar', 500) - @statsd.socket.recv.must_equal ['foobar:500|ms'] + @statsd.shards.first.recv.must_equal ['foobar:500|ms'] end describe "with a sample rate" do before { class << @statsd; def rand; 0; end; end } # ensure delivery it "should format the message according to the statsd spec" do @statsd.timing('foobar', 500, 0.5) - @statsd.socket.recv.must_equal ['foobar:500|ms|@0.5'] + @statsd.shards.first.recv.must_equal ['foobar:500|ms|@0.5'] end end end @@ -71,10 +71,10 @@ def socket; @socket ||= FakeUDPSocket.new end describe "#time" do it "should format the message according to the statsd spec" do @statsd.time('foobar') { sleep(0.001); 'test' } - data = @statsd.socket.recv + data = @statsd.shards.first.recv key, value, unit = data.first.split(/[:|]/) key.must_equal "foobar" - value.must_match /^\d\.\d{3}$/ + value.must_match /^\d\.\d+$/ unit.must_equal "ms" end @@ -88,10 +88,10 @@ def socket; @socket ||= FakeUDPSocket.new end it "should format the message according to the statsd spec" do result = @statsd.time('foobar', 0.5) { sleep(0.001); 'test' } - data = @statsd.socket.recv + data = @statsd.shards.first.recv key, value, unit, frequency = data.first.split(/[:|]/) key.must_equal "foobar" - value.must_match /^\d\.\d{3}$/ + value.must_match /^\d\.\d+$/ unit.must_equal "ms" frequency.must_equal "@0.5" end @@ -132,42 +132,20 @@ def socket; @socket ||= FakeUDPSocket.new end it "should add namespace to increment" do @statsd.increment('foobar') - @statsd.socket.recv.must_equal ['service.foobar:1|c'] + @statsd.shards.first.recv.must_equal ['service.foobar:1|c'] end it "should add namespace to decrement" do @statsd.decrement('foobar') - @statsd.socket.recv.must_equal ['service.foobar:-1|c'] + @statsd.shards.first.recv.must_equal ['service.foobar:-1|c'] end it "should add namespace to timing" do @statsd.timing('foobar', 500) - @statsd.socket.recv.must_equal ['service.foobar:500|ms'] + @statsd.shards.first.recv.must_equal ['service.foobar:500|ms'] end end - describe "with logging" do - require 'stringio' - before { Statsd.logger = Logger.new(@log = StringIO.new)} - - it "should write to the log in debug" do - Statsd.logger.level = Logger::DEBUG - - @statsd.increment('foobar') - - @log.string.must_match "Statsd: foobar:1|c" - end - - it "should not write to the log unless debug" do - Statsd.logger.level = Logger::INFO - - @statsd.increment('foobar') - - @log.string.must_be_empty - end - - end - describe "stat names" do it "should accept anything as stat" do @@ -175,29 +153,29 @@ def socket; @socket ||= FakeUDPSocket.new end end it "should replace ruby constant delimeter with graphite package name" do - class Statsd::SomeClass; end - @statsd.increment(Statsd::SomeClass, 1) + class GitHub::Statsd::SomeClass; end + @statsd.increment(GitHub::Statsd::SomeClass, 1) - @statsd.socket.recv.must_equal ['Statsd.SomeClass:1|c'] + @statsd.shards.first.recv.must_equal ['GitHub.Statsd.SomeClass:1|c'] end it "should replace statsd reserved chars in the stat name" do @statsd.increment('ray@hostname.blah|blah.blah:blah', 1) - @statsd.socket.recv.must_equal ['ray_hostname.blah_blah.blah_blah:1|c'] + @statsd.shards.first.recv.must_equal ['ray_hostname.blah_blah.blah_blah:1|c'] end end end -describe Statsd do +describe GitHub::Statsd do describe "with a real UDP socket" do it "should actually send stuff over the socket" do socket = UDPSocket.new host, port = 'localhost', 12345 socket.bind(host, port) - statsd = Statsd.new(host, port) + statsd = GitHub::Statsd.new(host, port) statsd.increment('foobar') message = socket.recvfrom(16).first message.must_equal 'foobar:1|c' diff --git a/statsd-ruby.gemspec b/statsd-ruby.gemspec index 1b53551..6b9a7de 100644 --- a/statsd-ruby.gemspec +++ b/statsd-ruby.gemspec @@ -5,7 +5,7 @@ Gem::Specification.new do |s| s.name = %q{statsd-ruby} - s.version = "0.3.0.github.5" + s.version = "0.4.0.github" s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version= s.authors = ["Rein Henrichs"] @@ -22,7 +22,7 @@ Gem::Specification.new do |s| "README.rdoc", "Rakefile", "VERSION", - "lib/statsd.rb", + "lib/github/statsd.rb", "spec/helper.rb", "spec/statsd_spec.rb", "statsd-ruby.gemspec" @@ -32,26 +32,6 @@ Gem::Specification.new do |s| s.require_paths = ["lib"] s.rubygems_version = %q{1.3.9.1} s.summary = %q{A Statsd client in Ruby} - - if s.respond_to? :specification_version then - s.specification_version = 3 - - if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then - s.add_development_dependency(%q, [">= 0"]) - s.add_development_dependency(%q, ["~> 0.6.0"]) - s.add_development_dependency(%q, ["~> 1.5.2"]) - s.add_development_dependency(%q, [">= 0"]) - else - s.add_dependency(%q, [">= 0"]) - s.add_dependency(%q, ["~> 0.6.0"]) - s.add_dependency(%q, ["~> 1.5.2"]) - s.add_dependency(%q, [">= 0"]) - end - else - s.add_dependency(%q, [">= 0"]) - s.add_dependency(%q, ["~> 0.6.0"]) - s.add_dependency(%q, ["~> 1.5.2"]) - s.add_dependency(%q, [">= 0"]) - end + s.add_development_dependency "rake", "~> 11.2" + s.add_development_dependency "minitest", "~> 5.9" end -