-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathapi.rb
451 lines (390 loc) · 17.5 KB
/
api.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# frozen_string_literal: true
require 'matrix_sdk'
require 'erb'
require 'net/http'
require 'openssl'
require 'uri'
module MatrixSdk
class Api
extend MatrixSdk::Extensions
include MatrixSdk::Logging
USER_AGENT = "Ruby Matrix SDK v#{MatrixSdk::VERSION}"
DEFAULT_HEADERS = {
'accept' => 'application/json',
'user-agent' => USER_AGENT
}.freeze
attr_accessor :access_token, :connection_address, :connection_port, :device_id, :autoretry, :global_headers
attr_reader :homeserver, :validate_certificate, :open_timeout, :read_timeout, :well_known, :proxy_uri, :threadsafe
ignore_inspect :access_token, :logger
# @param homeserver [String,URI] The URL to the Matrix homeserver, without the /_matrix/ part
# @param params [Hash] Additional parameters on creation
# @option params [Symbol[]] :protocols The protocols to include (:AS, :CS, :IS, :SS), defaults to :CS
# @option params [String] :address The connection address to the homeserver, if different to the HS URL
# @option params [Integer] :port The connection port to the homeserver, if different to the HS URL
# @option params [String] :access_token The access token to use for the connection
# @option params [String] :device_id The ID of the logged in decide to use
# @option params [Boolean] :autoretry (true) Should requests automatically be retried in case of rate limits
# @option params [Boolean] :validate_certificate (false) Should the connection require valid SSL certificates
# @option params [Integer] :transaction_id (0) The starting ID for transactions
# @option params [Numeric] :backoff_time (5000) The request backoff time in milliseconds
# @option params [Numeric] :open_timeout (60) The timeout in seconds to wait for a TCP session to open
# @option params [Numeric] :read_timeout (240) The timeout in seconds for reading responses
# @option params [Hash] :global_headers Additional headers to set for all requests
# @option params [Boolean] :skip_login Should the API skip logging in if the HS URL contains user information
# @option params [Boolean] :synapse (true) Is the API connecting to a Synapse instance
# @option params [Boolean,:multithread] :threadsafe (:multithread) Should the connection be threadsafe/mutexed - or
# safe for simultaneous multi-thread usage. Will default to +:multithread+ - a.k.a. per-thread HTTP connections
# and requests
# @note Using threadsafe +:multithread+ currently doesn't support connection re-use
def initialize(homeserver, **params)
@homeserver = homeserver
raise ArgumentError, 'Homeserver URL must be String or URI' unless @homeserver.is_a?(String) || @homeserver.is_a?(URI)
@homeserver = URI.parse("#{'https://' unless @homeserver.start_with? 'http'}#{@homeserver}") unless @homeserver.is_a? URI
@homeserver.path.gsub!(/\/?_matrix\/?/, '') if @homeserver.path =~ /_matrix\/?$/
raise ArgumentError, 'Please use the base URL for your HS (without /_matrix/)' if @homeserver.path.include? '/_matrix/'
@proxy_uri = params.fetch(:proxy_uri, nil)
@connection_address = params.fetch(:address, nil)
@connection_port = params.fetch(:port, nil)
@access_token = params.fetch(:access_token, nil)
@device_id = params.fetch(:device_id, nil)
@autoretry = params.fetch(:autoretry, true)
@validate_certificate = params.fetch(:validate_certificate, false)
@transaction_id = params.fetch(:transaction_id, 0)
@backoff_time = params.fetch(:backoff_time, 5000)
@open_timeout = params.fetch(:open_timeout, nil)
@read_timeout = params.fetch(:read_timeout, nil)
@well_known = params.fetch(:well_known, {})
@global_headers = DEFAULT_HEADERS.dup
@global_headers.merge!(params.fetch(:global_headers)) if params.key? :global_headers
@synapse = params.fetch(:synapse, true)
@http = nil
@inflight = []
self.threadsafe = params.fetch(:threadsafe, :multithread)
([params.fetch(:protocols, [:CS])].flatten - protocols).each do |proto|
self.class.include MatrixSdk::Protocols.const_get(proto)
end
login(user: @homeserver.user, password: @homeserver.password) if @homeserver.user && @homeserver.password && !@access_token && !params[:skip_login] && protocol?(:CS)
@homeserver.userinfo = '' unless params[:skip_login]
end
# Create an API connection to a domain entry
#
# This will follow the server discovery spec for client-server and federation
#
# @example Opening a Matrix API connection to a homeserver
# hs = MatrixSdk::API.new_for_domain 'example.com'
# hs.connection_address
# # => 'matrix.example.com'
# hs.connection_port
# # => 443
#
# @param domain [String] The domain to set up the API connection for, can contain a ':' to denote a port
# @param target [:client,:identity,:server] The target for the domain lookup
# @param keep_wellknown [Boolean] Should the .well-known response be kept for further handling
# @param params [Hash] Additional options to pass to .new
# @return [API] The API connection
def self.new_for_domain(domain, target: :client, keep_wellknown: false, ssl: true, **params)
domain, port = domain.split(':')
uri = URI("http#{ssl ? 's' : ''}://#{domain}")
well_known = nil
target_uri = nil
logger = ::Logging.logger[self]
logger.debug "Resolving #{domain}"
if !port.nil? && !port.empty?
# If the domain is fully qualified according to Matrix (FQDN and port) then skip discovery
target_uri = URI("https://#{domain}:#{port}")
elsif target == :server
# Attempt SRV record discovery
target_uri = begin
require 'resolv'
resolver = Resolv::DNS.new
srv = "_matrix._tcp.#{domain}"
logger.debug "Trying DNS #{srv}..."
d = resolver.getresource(srv, Resolv::DNS::Resource::IN::SRV)
d
rescue StandardError => e
logger.debug "DNS lookup failed with #{e.class}: #{e.message}"
nil
end
if target_uri.nil?
# Attempt .well-known discovery for server-to-server
well_known = begin
wk_uri = URI("https://#{domain}/.well-known/matrix/server")
logger.debug "Trying #{wk_uri}..."
data = Net::HTTP.start(wk_uri.host, wk_uri.port, use_ssl: true, open_timeout: 5, read_timeout: 5, write_timeout: 5) do |http|
http.get(wk_uri.path).body
end
JSON.parse(data)
rescue StandardError => e
logger.debug "Well-known failed with #{e.class}: #{e.message}"
nil
end
target_uri = well_known['m.server'] if well_known&.key?('m.server')
else
target_uri = URI("https://#{target_uri.target}:#{target_uri.port}")
end
elsif %i[client identity].include? target
# Attempt .well-known discovery
well_known = begin
wk_uri = URI("https://#{domain}/.well-known/matrix/client")
logger.debug "Trying #{wk_uri}..."
data = Net::HTTP.start(wk_uri.host, wk_uri.port, use_ssl: true, open_timeout: 5, read_timeout: 5, write_timeout: 5) do |http|
http.get(wk_uri.path).body
end
JSON.parse(data)
rescue StandardError => e
logger.debug "Well-known failed with #{e.class}: #{e.message}"
nil
end
if well_known
key = 'm.homeserver'
key = 'm.identity_server' if target == :identity
if well_known.key?(key) && well_known[key].key?('base_url')
uri = URI(well_known[key]['base_url'])
target_uri = uri
end
end
end
logger.debug "Using #{target_uri.inspect}"
# Fall back to direct domain connection
target_uri ||= URI("https://#{domain}:8448")
params[:well_known] = well_known if keep_wellknown
new(
uri,
**params.merge(
address: target_uri.host,
port: target_uri.port
)
)
end
# Get a list of enabled protocols on the API client
#
# @example
# MatrixSdk::Api.new_for_domain('matrix.org').protocols
# # => [:IS, :CS]
#
# @return [Symbol[]] An array of enabled APIs
def protocols
self
.class.included_modules
.reject { |m| m&.name.nil? }
.select { |m| m.name.start_with? 'MatrixSdk::Protocols::' }
.map { |m| m.name.split('::').last.to_sym }
end
# Check if a protocol is enabled on the API connection
#
# @example Checking for identity server API support
# api.protocol? :IS
# # => false
#
# @param protocol [Symbol] The protocol to check
# @return [Boolean] Is the protocol enabled
def protocol?(protocol)
protocols.include? protocol
end
# @param seconds [Numeric]
# @return [Numeric]
def open_timeout=(seconds)
@http.finish if @http && @open_timeout != seconds
@open_timeout = seconds
end
# @param seconds [Numeric]
# @return [Numeric]
def read_timeout=(seconds)
@http.finish if @http && @read_timeout != seconds
@read_timeout = seconds
end
# @param validate [Boolean]
# @return [Boolean]
def validate_certificate=(validate)
# The HTTP connection needs to be reopened if this changes
@http.finish if @http && validate != @validate_certificate
@validate_certificate = validate
end
# @param hs_info [URI]
# @return [URI]
def homeserver=(hs_info)
# TODO: DNS query for SRV information about HS?
return unless hs_info.is_a? URI
@http.finish if @http && homeserver != hs_info
@homeserver = hs_info
end
# @param [URI] proxy_uri The URI for the proxy to use
# @return [URI]
def proxy_uri=(proxy_uri)
proxy_uri = URI(proxy_uri.to_s) unless proxy_uri.is_a? URI
if @http && @proxy_uri != proxy_uri
@http.finish
@http = nil
end
@proxy_uri = proxy_uri
end
# @param [Boolean,:multithread] threadsafe What level of thread-safety the API should use
# @return [Boolean,:multithread]
def threadsafe=(threadsafe)
raise ArgumentError, 'Threadsafe must be either a boolean or :multithread' unless [true, false, :multithread].include? threadsafe
raise ArugmentError, 'JRuby only support :multithread/false for threadsafe' if RUBY_ENGINE == 'jruby' && threadsafe == true
@threadsafe = threadsafe
@http_lock = nil unless threadsafe == true
@threadsafe
end
# Perform a raw Matrix API request
#
# @example Simple API query
# api.request(:get, :client_r0, '/account/whoami')
# # => { :user_id => "@alice:matrix.org" }
#
# @example Advanced API request
# api.request(:post,
# :media_r0,
# '/upload',
# body_stream: open('./file'),
# headers: { 'content-type' => 'image/png' })
# # => { :content_uri => "mxc://example.com/AQwafuaFswefuhsfAFAgsw" }
#
# @param method [Symbol] The method to use, can be any of the ones under Net::HTTP
# @param api [Symbol] The API symbol to use, :client_r0 is the current CS one
# @param path [String] The API path to call, this is the part that comes after the API definition in the spec
# @param options [Hash] Additional options to pass along to the request
# @option options [Hash] :query Query parameters to set on the URL
# @option options [Hash,String] :body The body to attach to the request, will be JSON-encoded if sent as a hash
# @option options [IO] :body_stream A body stream to attach to the request
# @option options [Hash] :headers Additional headers to set on the request
# @option options [Boolean] :skip_auth (false) Skip authentication
def request(method, api, path, **options)
url = homeserver.dup.tap do |u|
u.path = api_to_path(api) + path
u.query = [u.query, URI.encode_www_form(options.fetch(:query))].flatten.compact.join('&') if options[:query]
u.query = nil if u.query.nil? || u.query.empty?
end
failures = 0
loop do
raise MatrixConnectionError, "Server still too busy to handle request after #{failures} attempts, try again later" if failures >= 10
req_id = ('A'..'Z').to_a.sample(4).join
req_obj = construct_request(url: url, method: method, **options)
print_http(req_obj, id: req_id)
response = duration = nil
loc_http = http
perform_request = proc do
@inflight << loc_http
dur_start = Time.now
response = loc_http.request req_obj
dur_end = Time.now
duration = dur_end - dur_start
rescue EOFError
logger.error 'Socket closed unexpectedly'
raise
ensure
@inflight.delete loc_http
end
if @threadsafe == true
http_lock.synchronize { perform_request.call }
else
perform_request.call
loc_http.finish if @threadsafe == :multithread
end
print_http(response, duration: duration, id: req_id)
begin
data = JSON.parse(response.body, symbolize_names: true)
rescue JSON::JSONError => e
logger.debug "#{e.class} error when parsing response. #{e}"
data = nil
end
if response.is_a? Net::HTTPTooManyRequests
raise MatrixRequestError.new_by_code(data, response.code) unless autoretry
failures += 1
waittime = data[:retry_after_ms] || data[:error][:retry_after_ms] || @backoff_time
sleep(waittime.to_f / 1000.0)
next
end
if response.is_a? Net::HTTPSuccess
unless data
logger.error "Received non-parsable data in 200 response; #{response.body.inspect}"
raise MatrixConnectionError, response
end
return MatrixSdk::Response.new self, data
end
raise MatrixRequestError.new_by_code(data, response.code) if data
raise MatrixConnectionError.class_by_code(response.code), response
end
end
# Generate a transaction ID
#
# @return [String] An arbitrary transaction ID
def transaction_id
ret = @transaction_id ||= 0
@transaction_id = @transaction_id.succ
ret
end
def stop_inflight
@inflight.each(&:finish)
end
private
def construct_request(method:, url:, **options)
request = Net::HTTP.const_get(method.to_s.capitalize.to_sym).new url.request_uri
# FIXME: Handle bodies better, avoid duplicating work
request.body = options[:body] if options.key? :body
request.body = request.body.to_json if options.key?(:body) && !request.body.is_a?(String)
request.body_stream = options[:body_stream] if options.key? :body_stream
global_headers.each { |h, v| request[h] = v }
if request.body || request.body_stream
request.content_type = 'application/json'
request.content_length = (request.body || request.body_stream).size
end
request['authorization'] = "Bearer #{access_token}" if access_token && !options.fetch(:skip_auth, false)
if options.key? :headers
options[:headers].each do |h, v|
request[h.to_s.downcase] = v
end
end
request
end
def print_http(http, body: true, duration: nil, id: nil)
return unless logger.debug?
if http.is_a? Net::HTTPRequest
dir = "#{id ? "#{id} : " : nil}>"
logger.debug "#{dir} Sending a #{http.method} request to `#{http.path}`:"
else
dir = "#{id ? "#{id} : " : nil}<"
logger.debug "#{dir} Received a #{http.code} #{http.message} response:#{duration ? " [#{(duration * 1000).to_i}ms]" : nil}"
end
http.to_hash.map { |k, v| "#{k}: #{k == 'authorization' ? '[ REDACTED ]' : v.join(', ')}" }.each do |h|
logger.debug "#{dir} #{h}"
end
logger.debug dir
if body
clean_body = JSON.parse(http.body) rescue nil if http.body
clean_body.each_key { |k| clean_body[k] = '[ REDACTED ]' if %w[password access_token].include?(k) }.to_json if clean_body.is_a? Hash
clean_body = clean_body.to_s if clean_body
logger.debug "#{dir} #{clean_body.length < 200 ? clean_body : clean_body.slice(0..200) + "... [truncated, #{clean_body.length} Bytes]"}" if clean_body
end
rescue StandardError => e
logger.warn "#{e.class} occured while printing request debug; #{e.message}\n#{e.backtrace.join "\n"}"
end
def api_to_path(api)
return "/_synapse/#{api.to_s.split('_').join('/')}" if @synapse && api.to_s.start_with?('admin_')
# TODO: <api>_current / <api>_latest
"/_matrix/#{api.to_s.split('_').join('/')}"
end
def http
return @http if @http&.active?
host = (@connection_address || homeserver.host)
port = (@connection_port || homeserver.port)
connection = @http unless @threadsafe == :multithread
connection ||= if proxy_uri
Net::HTTP.new(host, port, proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password)
else
Net::HTTP.new(host, port)
end
connection.open_timeout = open_timeout if open_timeout
connection.read_timeout = read_timeout if read_timeout
connection.use_ssl = homeserver.scheme == 'https'
connection.verify_mode = validate_certificate ? ::OpenSSL::SSL::VERIFY_PEER : ::OpenSSL::SSL::VERIFY_NONE
connection.start
@http = connection unless @threadsafe == :multithread
connection
end
def http_lock
@http_lock ||= Mutex.new if @threadsafe == true
end
end
end