diff --git a/README.md b/README.md index 94dc1c4..ec375c9 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,25 @@ +Modification +============ +Modified to an object-oriented socket that one can create multiple socket object for different server connection +Modified by whuben(https://github.com/whuben) + +Motivation of Modification +========================== +The old version `resty.socket.lua`(written by Jiale Zhi (calio), CloudFlare Inc) is not support for multiple syslog server, if you want to send different log message to different remote server, you should make copies of the socket with different name and requrie them in the code, one copy just supports for one remote server. +So, just make it objective-oriented for supporting multiple syslog server + +Usage for creating multipul socket objects +========================================== +``` +local socket = require "lib.resty.socket" +--create two socket objects for remote log server A and B +local socket_obj_a = socket:init(syslog_config_a) --syslog_config_a is the config info of remote log server A +local socket_obj_b = socket:init(syslog_config_b) --syslog_config_b is the config info of remote log server B +--send msg to remote log server +socket_obj_a:log(msg_a.."\n") --send message msg_a to remote log server A +socket_obj_b:log(msg_b.."\n") --send message msg_b to remote log server B +``` + Name ==== diff --git a/lib/resty/logger/socket.lua b/lib/resty/logger/socket.lua index 0ef8bef..1a863b0 100644 --- a/lib/resty/logger/socket.lua +++ b/lib/resty/logger/socket.lua @@ -1,6 +1,7 @@ --- Copyright (C) 2013-2014 Jiale Zhi (calio), CloudFlare Inc. +--An Object-oriented socket that one can create multiple socket object for different server connection +--Modified the original template: resty.socket.lua(written by Jiale Zhi (calio), CloudFlare Inc) for object-oriented +--Modified by whuben(https://github.com/whuben) --require "luacov" - local concat = table.concat local tcp = ngx.socket.tcp local udp = ngx.socket.udp @@ -17,7 +18,6 @@ local CRIT = ngx.CRIT local MAX_PORT = 65535 - -- table.new(narr, nrec) local succ, new_tab = pcall(require, "table.new") if not succ then @@ -41,58 +41,25 @@ else is_exiting = ngx.worker.exiting end +local _M = { + _VERSION = "0.04" +} +local mt = {__index=_M} -_M._VERSION = '0.03' - --- user config -local flush_limit = 4096 -- 4KB -local drop_limit = 1048576 -- 1MB -local timeout = 1000 -- 1 sec -local host -local port -local ssl = false -local ssl_verify = true -local sni_host -local path -local max_buffer_reuse = 10000 -- reuse buffer for at most 10000 - -- times -local periodic_flush = nil -local need_periodic_flush = nil -local sock_type = 'tcp' - --- internal variables -local buffer_size = 0 --- 2nd level buffer, it stores logs ready to be sent out -local send_buffer = "" --- 1st level buffer, it stores incoming logs -local log_buffer_data = new_tab(20000, 0) --- number of log lines in current 1st level buffer, starts from 0 -local log_buffer_index = 0 local last_error -local connecting -local connected -local exiting -local retry_connect = 0 -local retry_send = 0 -local max_retry_times = 3 -local retry_interval = 100 -- 0.1s -local pool_size = 10 -local flushing -local logger_initted -local counter = 0 local ssl_session local function _write_error(msg) last_error = msg end -local function _do_connect() +local function _do_connect(socket_obj) local ok, err, sock - if not connected then - if (sock_type == 'udp') then + if not socket_obj.connected then + if (socket_obj.sock_type == 'udp') then sock, err = udp() else sock, err = tcp() @@ -103,18 +70,18 @@ local function _do_connect() return nil, err end - sock:settimeout(timeout) + sock:settimeout(socket_obj.timeout) end -- "host"/"port" and "path" have already been checked in init() - if host and port then - if (sock_type == 'udp') then - ok, err = sock:setpeername(host, port) + if socket_obj.host and socket_obj.port then + if (socket_obj.sock_type == 'udp') then + ok, err = sock:setpeername(socket_obj.host, socket_obj.port) else - ok, err = sock:connect(host, port) + ok, err = sock:connect(socket_obj.host, socket_obj.port) end - elseif path then - ok, err = sock:connect("unix:" .. path) + elseif socket_obj.path then + ok, err = sock:connect("unix:" .. socket_obj.path) end if not ok then @@ -124,13 +91,13 @@ local function _do_connect() return sock end -local function _do_handshake(sock) +local function _do_handshake(sock,socket_obj) if not ssl then return sock end - local session, err = sock:sslhandshake(ssl_session, sni_host or host, - ssl_verify) + local session, err = sock:sslhandshake(ssl_session, socket_obj.sni_host or socket_obj.host, + socket_obj.ssl_verify) if not session then return nil, err end @@ -139,28 +106,28 @@ local function _do_handshake(sock) return sock end -local function _connect() +local function _connect(socket_obj) local err, sock - if connecting then + if socket_obj.connecting then if debug then ngx_log(DEBUG, "previous connection not finished") end return nil, "previous connection not finished" end - connected = false - connecting = true + socket_obj.connected = false + socket_obj.connecting = true - retry_connect = 0 + socket_obj.retry_connect = 0 - while retry_connect <= max_retry_times do - sock, err = _do_connect() + while socket_obj.retry_connect <= socket_obj.max_retry_times do + sock, err = _do_connect(socket_obj) if sock then - sock, err = _do_handshake(sock) + sock, err = _do_handshake(sock,socket_obj) if sock then - connected = true + socket_obj.connected = true break end end @@ -170,43 +137,43 @@ local function _connect() end -- ngx.sleep time is in seconds - if not exiting then - ngx_sleep(retry_interval / 1000) + if not socket_obj.exiting then + ngx_sleep(socket_obj.retry_interval / 1000) end - retry_connect = retry_connect + 1 + socket_obj.retry_connect = socket_obj.retry_connect + 1 end - connecting = false - if not connected then + socket_obj.connecting = false + if not socket_obj.connected then return nil, "try to connect to the log server failed after " - .. max_retry_times .. " retries: " .. err + .. socket_obj.max_retry_times .. " retries: " .. err end return sock end -local function _prepare_stream_buffer() - local packet = concat(log_buffer_data, "", 1, log_buffer_index) - send_buffer = send_buffer .. packet +local function _prepare_stream_buffer(socket_obj) + local packet = concat(socket_obj.log_buffer_data, "", 1, socket_obj.log_buffer_index) + socket_obj.send_buffer = socket_obj.send_buffer .. packet - log_buffer_index = 0 - counter = counter + 1 - if counter > max_buffer_reuse then - log_buffer_data = new_tab(20000, 0) - counter = 0 + socket_obj.log_buffer_index = 0 + socket_obj.counter = socket_obj.counter + 1 + if socket_obj.counter > socket_obj.max_buffer_reuse then + socket_obj.log_buffer_data = new_tab(20000, 0) + socket_obj.counter = 0 if debug then - ngx_log(DEBUG, "log buffer reuse limit (" .. max_buffer_reuse + ngx_log(DEBUG, "log buffer reuse limit (" .. socket_obj.max_buffer_reuse .. ") reached, create a new \"log_buffer_data\"") end end end -local function _do_flush() +local function _do_flush(socket_obj) local ok, err, sock, bytes - local packet = send_buffer + local packet = socket_obj.send_buffer - sock, err = _connect() + sock, err = _connect(socket_obj) if not sock then return nil, err end @@ -222,8 +189,8 @@ local function _do_flush() ngx_log(DEBUG, ngx.now(), ":log flush:" .. bytes .. ":" .. packet) end - if (sock_type ~= 'udp') then - ok, err = sock:setkeepalive(0, pool_size) + if (socket_obj.sock_type ~= 'udp') then + ok, err = sock:setkeepalive(0, socket_obj.pool_size) if not ok then return nil, err end @@ -232,37 +199,36 @@ local function _do_flush() return bytes end -local function _need_flush() - if buffer_size > 0 then +local function _need_flush(socket_obj) + if socket_obj.buffer_size > 0 then return true end return false end -local function _flush_lock() - if not flushing then +local function _flush_lock(socket_obj) + if not socket_obj.flushing then if debug then ngx_log(DEBUG, "flush lock acquired") end - flushing = true + socket_obj.flushing = true return true end return false end -local function _flush_unlock() +local function _flush_unlock(socket_obj) if debug then ngx_log(DEBUG, "flush lock released") end - flushing = false + socket_obj.flushing = false end -local function _flush() +local function _flush(premature,socket_obj) local err - - -- pre check - if not _flush_lock() then + --pre check + if not _flush_lock(socket_obj) then if debug then ngx_log(DEBUG, "previous flush not finished") end @@ -270,27 +236,27 @@ local function _flush() return true end - if not _need_flush() then + if not _need_flush(socket_obj) then if debug then - ngx_log(DEBUG, "no need to flush:", log_buffer_index) + ngx_log(DEBUG, "no need to flush:", socket_obj.log_buffer_index) end - _flush_unlock() + _flush_unlock(socket_obj) return true end -- start flushing - retry_send = 0 + socket_obj.retry_send = 0 if debug then ngx_log(DEBUG, "start flushing") end local bytes - while retry_send <= max_retry_times do - if log_buffer_index > 0 then - _prepare_stream_buffer() + while socket_obj.retry_send <= socket_obj.max_retry_times do + if socket_obj.log_buffer_index > 0 then + _prepare_stream_buffer(socket_obj) end - bytes, err = _do_flush() + bytes, err = _do_flush(socket_obj) if bytes then break @@ -301,18 +267,18 @@ local function _flush() end -- ngx.sleep time is in seconds - if not exiting then - ngx_sleep(retry_interval / 1000) + if not socket_obj.exiting then + ngx_sleep(socket_obj.retry_interval / 1000) end - retry_send = retry_send + 1 + socket_obj.retry_send = socket_obj.retry_send + 1 end - _flush_unlock() + _flush_unlock(socket_obj) if not bytes then local err_msg = "try to send log messages to the log server " - .. "failed after " .. max_retry_times .. " retries: " + .. "failed after " .. socket_obj.max_retry_times .. " retries: " .. err _write_error(err_msg) return nil, err_msg @@ -322,38 +288,38 @@ local function _flush() end end - buffer_size = buffer_size - #send_buffer - send_buffer = "" + socket_obj.buffer_size = socket_obj.buffer_size - #socket_obj.send_buffer + socket_obj.send_buffer = "" return bytes end -local function _periodic_flush(premature) +local function _periodic_flush(premature,socket_obj) if premature then - exiting = true + socket_obj.exiting = true end - if need_periodic_flush or exiting then + if socket_obj.need_periodic_flush or socket_obj.exiting then -- no regular flush happened after periodic flush timer had been set if debug then ngx_log(DEBUG, "performing periodic flush") end - _flush() + _flush(nil,socket_obj) else if debug then ngx_log(DEBUG, "no need to perform periodic flush: regular flush " .. "happened before") end - need_periodic_flush = true + socket_obj.need_periodic_flush = true end - timer_at(periodic_flush, _periodic_flush) + timer_at(socket_obj.periodic_flush, _periodic_flush,socket_obj) end -local function _flush_buffer() - local ok, err = timer_at(0, _flush) +function _M:_flush_buffer() + local ok, err = timer_at(0, _flush,self) - need_periodic_flush = false + self.need_periodic_flush = false if not ok then _write_error(err) @@ -361,27 +327,38 @@ local function _flush_buffer() end end -local function _write_buffer(msg, len) - log_buffer_index = log_buffer_index + 1 - log_buffer_data[log_buffer_index] = msg +function _M:_write_buffer(msg, len) + self.log_buffer_index = self.log_buffer_index + 1 + self.log_buffer_data[self.log_buffer_index] = msg - buffer_size = buffer_size + len + self.buffer_size = self.buffer_size + len - return buffer_size + return self.buffer_size end -function _M.init(user_config) +function _M:init(user_config) if (type(user_config) ~= "table") then return nil, "user_config must be a table" end - + local socket_instance = {} + socket_instance.timeout = 1000 + socket_instance.drop_limit = 1048576 + socket_instance.flush_limit = 4096 + socket_instance.max_buffer_reuse = 10000 + socket_instance.max_retry_times = 3 + socket_instance.pool_size = 10 + socket_instance.retry_interval = 100 + socket_instance.sock_type = "tcp" + socket_instance.ssl = false + socket_instance.ssl_verify = true + socket_instance.counter = 0 for k, v in pairs(user_config) do if k == "host" then if type(v) ~= "string" then return nil, '"host" must be a string' end - host = v + socket_instance.host = v elseif k == "port" then if type(v) ~= "number" then return nil, '"port" must be a number' @@ -389,12 +366,12 @@ function _M.init(user_config) if v < 0 or v > MAX_PORT then return nil, ('"port" out of range 0~%s'):format(MAX_PORT) end - port = v + socket_instance.port = v elseif k == "path" then if type(v) ~= "string" then return nil, '"path" must be a string' end - path = v + socket_instance.path = v elseif k == "sock_type" then if type(v) ~= "string" then return nil, '"sock_type" must be a string' @@ -402,135 +379,127 @@ function _M.init(user_config) if v ~= "tcp" and v ~= "udp" then return nil, '"sock_type" must be "tcp" or "udp"' end - sock_type = v + socket_instance.sock_type = v elseif k == "flush_limit" then if type(v) ~= "number" or v < 0 then return nil, 'invalid "flush_limit"' end - flush_limit = v + socket_instance.flush_limit = v elseif k == "drop_limit" then if type(v) ~= "number" or v < 0 then return nil, 'invalid "drop_limit"' end - drop_limit = v + socket_instance.drop_limit = v elseif k == "timeout" then if type(v) ~= "number" or v < 0 then return nil, 'invalid "timeout"' end - timeout = v + socket_instance.timeout = v elseif k == "max_retry_times" then if type(v) ~= "number" or v < 0 then return nil, 'invalid "max_retry_times"' end - max_retry_times = v + socket_instance.max_retry_times = v elseif k == "retry_interval" then if type(v) ~= "number" or v < 0 then return nil, 'invalid "retry_interval"' end -- ngx.sleep time is in seconds - retry_interval = v + socket_instance.retry_interval = v elseif k == "pool_size" then if type(v) ~= "number" or v < 0 then return nil, 'invalid "pool_size"' end - pool_size = v + socket_instance.pool_size = v elseif k == "max_buffer_reuse" then if type(v) ~= "number" or v < 0 then return nil, 'invalid "max_buffer_reuse"' end - max_buffer_reuse = v + socket_instance.max_buffer_reuse = v elseif k == "periodic_flush" then if type(v) ~= "number" or v < 0 then return nil, 'invalid "periodic_flush"' end - periodic_flush = v + socket_instance.periodic_flush = v elseif k == "ssl" then if type(v) ~= "boolean" then return nil, '"ssl" must be a boolean value' end - ssl = v + socket_instance.ssl = v elseif k == "ssl_verify" then if type(v) ~= "boolean" then return nil, '"ssl_verify" must be a boolean value' end - ssl_verify = v + socket_instance.ssl_verify = v elseif k == "sni_host" then if type(v) ~= "string" then return nil, '"sni_host" must be a string' end - sni_host = v + socket_instance.sni_host = v end end - if not (host and port) and not path then + if not (socket_instance.host and socket_instance.port) and not socket_instance.path then return nil, "no logging server configured. \"host\"/\"port\" or " .. "\"path\" is required." end - - - if (flush_limit >= drop_limit) then + if (socket_instance.flush_limit >= socket_instance.drop_limit) then return nil, "\"flush_limit\" should be < \"drop_limit\"" end - - flushing = false - exiting = false - connecting = false - - connected = false - retry_connect = 0 - retry_send = 0 - - logger_initted = true - - if periodic_flush then + socket_instance.flushing = false + socket_instance.exiting = false + socket_instance.connecting = false + socket_instance.connected = false + socket_instance.retry_connect = 0 + socket_instance.retry_send = 0 + socket_instance.logger_initted = true + socket_instance.send_buffer = "" + socket_instance.log_buffer_index = 0 + socket_instance.log_buffer_data = new_tab(20000, 0) + socket_instance.buffer_size = 0 + if socket_instance.periodic_flush then if debug then ngx_log(DEBUG, "periodic flush enabled for every " - .. periodic_flush .. " seconds") + .. socket_instance.periodic_flush .. " seconds") end - need_periodic_flush = true - timer_at(periodic_flush, _periodic_flush) + socket_instance.need_periodic_flush = true + timer_at(socket_instance.periodic_flush,_periodic_flush,setmetatable(socket_instance,mt)) end - - return logger_initted + return setmetatable(socket_instance,mt) end -function _M.log(msg) - if not logger_initted then +function _M:log(msg) + if not self.logger_initted then return nil, "not initialized" end - local bytes - if type(msg) ~= "string" then msg = tostring(msg) end - local msg_len = #msg - if (debug) then ngx.update_time() ngx_log(DEBUG, ngx.now(), ":log message length: " .. msg_len) end - -- response of "_flush_buffer" is not checked, because it writes -- error buffer if (is_exiting()) then exiting = true - _write_buffer(msg, msg_len) - _flush_buffer() + self:_write_buffer(msg, msg_len) + self:_flush_buffer() if (debug) then ngx_log(DEBUG, "Nginx worker is exiting") end bytes = 0 - elseif (msg_len + buffer_size < flush_limit) then - _write_buffer(msg, msg_len) + elseif (msg_len + self.buffer_size < self.flush_limit) then + self:_write_buffer(msg, msg_len) bytes = msg_len - elseif (msg_len + buffer_size <= drop_limit) then - _write_buffer(msg, msg_len) - _flush_buffer() + elseif (msg_len + self.buffer_size <= self.drop_limit) then + self:_write_buffer(msg, msg_len) + self:_flush_buffer() bytes = msg_len else - _flush_buffer() + self:_flush_buffer() if (debug) then ngx_log(DEBUG, "logger buffer is full, this log message will be " .. "dropped") @@ -548,11 +517,15 @@ function _M.log(msg) return bytes end -function _M.initted() - return logger_initted +function _M:initted() + return self.logger_initted end -_M.flush = _flush +function _M:close() + if self.logger_initted == true then + _flush(nil,self) + self.logger_initted = false + end +end return _M -