-
Notifications
You must be signed in to change notification settings - Fork 77
/
Copy pathtest_auth.py
157 lines (129 loc) · 5.06 KB
/
test_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from arango.connection import BasicConnection, JwtConnection, JwtSuperuserConnection
from arango.errno import FORBIDDEN, HTTP_UNAUTHORIZED
from arango.exceptions import (
JWTAuthError,
JWTExpiredError,
JWTSecretListError,
JWTSecretReloadError,
ServerConnectionError,
ServerEncryptionError,
ServerTLSError,
ServerTLSReloadError,
ServerVersionError,
)
from tests.helpers import assert_raises, generate_jwt, generate_string
def test_auth_invalid_method(client, db_name, username, password):
with assert_raises(ValueError) as err:
client.db(
name=db_name,
username=username,
password=password,
verify=True,
auth_method="bad_method",
)
assert "invalid auth_method" in str(err.value)
def test_auth_basic(client, db_name, username, password):
db = client.db(
name=db_name,
username=username,
password=password,
verify=True,
auth_method="basic",
)
assert isinstance(db.conn, BasicConnection)
assert isinstance(db.version(), str)
assert isinstance(db.properties(), dict)
def test_auth_jwt(client, db_name, username, password, secret):
# Test JWT authentication with username and password.
db = client.db(
name=db_name,
username=username,
password=password,
verify=True,
auth_method="jwt",
)
assert isinstance(db.conn, JwtConnection)
assert isinstance(db.version(), str)
assert isinstance(db.properties(), dict)
bad_password = generate_string()
with assert_raises(JWTAuthError) as err:
client.db(db_name, username, bad_password, auth_method="jwt")
assert err.value.error_code == HTTP_UNAUTHORIZED
# Test JWT authentication with user token.
token = generate_jwt(secret)
db = client.db("_system", user_token=token)
assert isinstance(db.conn, JwtConnection)
assert isinstance(db.version(), str)
assert isinstance(db.properties(), dict)
# TODO re-examine commented out code
def test_auth_superuser_token(client, db_name, root_password, secret):
token = generate_jwt(secret)
db = client.db("_system", superuser_token=token)
bad_db = client.db("_system", superuser_token="bad_token")
assert isinstance(db.conn, JwtSuperuserConnection)
assert isinstance(db.version(), str)
assert isinstance(db.properties(), dict)
# # Test get JWT secrets
# secrets = db.jwt_secrets()
# assert 'active' in secrets
# assert 'passive' in secrets
# Test get JWT secrets with bad database
with assert_raises(JWTSecretListError) as err:
bad_db.jwt_secrets()
assert err.value.error_code == FORBIDDEN
# # Test reload JWT secrets
# secrets = db.reload_jwt_secrets()
# assert 'active' in secrets
# assert 'passive' in secrets
# Test reload JWT secrets with bad database
with assert_raises(JWTSecretReloadError) as err:
bad_db.reload_jwt_secrets()
assert err.value.error_code == FORBIDDEN
# Test get TLS data
result = db.tls()
assert isinstance(result, dict)
# Test get TLS data with bad database
with assert_raises(ServerTLSError) as err:
bad_db.tls()
assert err.value.error_code == FORBIDDEN
# Test reload TLS
result = db.reload_tls()
assert isinstance(result, dict)
# Test reload TLS with bad database
with assert_raises(ServerTLSReloadError) as err:
bad_db.reload_tls()
assert err.value.error_code == FORBIDDEN
# # Test get encryption
# result = db.encryption()
# assert isinstance(result, dict)
# Test reload user-defined encryption keys.
with assert_raises(ServerEncryptionError) as err:
bad_db.encryption()
assert err.value.error_code == FORBIDDEN
def test_auth_jwt_expiry(client, db_name, root_password, secret):
# Test automatic token refresh on expired token.
db = client.db("_system", "root", root_password, auth_method="jwt")
valid_token = generate_jwt(secret)
expired_token = generate_jwt(secret, exp=-1000)
db.conn._token = expired_token
db.conn._auth_header = f"bearer {expired_token}"
assert isinstance(db.version(), str)
# Test expiry error on db instantiation (superuser)
with assert_raises(ServerConnectionError) as err:
client.db("_system", superuser_token=expired_token, verify=True)
# Test expiry error on db version (superuser)
db = client.db("_system", superuser_token=expired_token)
with assert_raises(ServerVersionError) as err:
db.version()
assert err.value.error_code == FORBIDDEN
# Test expiry error on set_token (superuser).
db = client.db("_system", superuser_token=valid_token)
with assert_raises(JWTExpiredError) as err:
db.conn.set_token(expired_token)
# Test expiry error on db instantiation (user)
with assert_raises(JWTExpiredError) as err:
db = client.db("_system", user_token=expired_token)
# Test expiry error on set_token (user).
db = client.db("_system", user_token=valid_token)
with assert_raises(JWTExpiredError) as err:
db.conn.set_token(expired_token)