|
60 | 60 |
|
61 | 61 | from tests import foreach_cnx, cnx_config
|
62 | 62 | from . import check_tls_versions_support
|
63 |
| -from mysql.connector import (connection, cursor, conversion, protocol, |
64 |
| - errors, constants, pooling) |
| 63 | +from mysql.connector import (connection, constants, conversion, cursor, |
| 64 | + errors, MySQLConnection, pooling, protocol,) |
65 | 65 | from mysql.connector.optionfiles import read_option_files
|
66 | 66 | from mysql.connector.pooling import PooledMySQLConnection
|
67 | 67 | import mysql.connector
|
@@ -5943,3 +5943,121 @@ def test_without_use_unicode_cursor_prepared(self):
|
5943 | 5943 | cursor=cursor.MySQLCursorPrepared,
|
5944 | 5944 | use_binary_charset=True,
|
5945 | 5945 | )
|
| 5946 | + |
| 5947 | + |
| 5948 | +class Bug32162928(tests.MySQLConnectorTests): |
| 5949 | + """BUG#32162928: change user command fails with pure python implementation. |
| 5950 | +
|
| 5951 | + The cmd_change_user() command fails with pure-python. |
| 5952 | + """ |
| 5953 | + def setUp(self): |
| 5954 | + self.connect_kwargs = tests.get_mysql_config() |
| 5955 | + cnx = MySQLConnection(**self.connect_kwargs) |
| 5956 | + self.users = [ |
| 5957 | + {'user': 'user_native1', |
| 5958 | + 'host': self.connect_kwargs['host'], |
| 5959 | + 'port': self.connect_kwargs['port'], |
| 5960 | + 'database': self.connect_kwargs['database'], |
| 5961 | + 'password': 'native1_pass', |
| 5962 | + 'auth_plugin': 'mysql_native_password'}, |
| 5963 | + {'user': 'user_native2', |
| 5964 | + 'host': self.connect_kwargs['host'], |
| 5965 | + 'port': self.connect_kwargs['port'], |
| 5966 | + 'database': self.connect_kwargs['database'], |
| 5967 | + 'password': 'native2_pass', |
| 5968 | + 'auth_plugin': 'mysql_native_password'}, |
| 5969 | + {'user': 'user_sha2561', |
| 5970 | + 'host': self.connect_kwargs['host'], |
| 5971 | + 'port': self.connect_kwargs['port'], |
| 5972 | + 'database': self.connect_kwargs['database'], |
| 5973 | + 'password': 'sha2561_pass', |
| 5974 | + 'auth_plugin': 'sha256_password'}, |
| 5975 | + {'user': 'user_sha2562', |
| 5976 | + 'host': self.connect_kwargs['host'], |
| 5977 | + 'port': self.connect_kwargs['port'], |
| 5978 | + 'database': self.connect_kwargs['database'], |
| 5979 | + 'password': 'sha2562_pass', |
| 5980 | + 'auth_plugin': 'sha256_password'}, |
| 5981 | + {'user': 'user_caching1', |
| 5982 | + 'host': self.connect_kwargs['host'], |
| 5983 | + 'port': self.connect_kwargs['port'], |
| 5984 | + 'database': self.connect_kwargs['database'], |
| 5985 | + 'password': 'caching1_pass', |
| 5986 | + 'auth_plugin': 'caching_sha2_password'}, |
| 5987 | + {'user': 'user_caching2', |
| 5988 | + 'host': self.connect_kwargs['host'], |
| 5989 | + 'port': self.connect_kwargs['port'], |
| 5990 | + 'database': self.connect_kwargs['database'], |
| 5991 | + 'password': 'caching2_pass', |
| 5992 | + 'auth_plugin': 'caching_sha2_password'}] |
| 5993 | + |
| 5994 | + # create users |
| 5995 | + if tests.MYSQL_VERSION < (8, 0, 0): |
| 5996 | + self.new_users = self.users[0:4] |
| 5997 | + else: |
| 5998 | + self.new_users = self.users |
| 5999 | + |
| 6000 | + for new_user in self.new_users: |
| 6001 | + cnx.cmd_query("DROP USER IF EXISTS '{user}'@'{host}'".format(**new_user)) |
| 6002 | + |
| 6003 | + stmt = ("CREATE USER IF NOT EXISTS '{user}'@'{host}' IDENTIFIED " |
| 6004 | + "WITH {auth_plugin} BY '{password}'").format(**new_user) |
| 6005 | + cnx.cmd_query(stmt) |
| 6006 | + |
| 6007 | + cnx.cmd_query("GRANT ALL PRIVILEGES ON {database}.* TO " |
| 6008 | + "'{user}'@'{host}'".format(**new_user)) |
| 6009 | + |
| 6010 | + @foreach_cnx() |
| 6011 | + def test_change_user(self): |
| 6012 | + # test users can connect |
| 6013 | + for user in self.new_users: |
| 6014 | + conn_args = user.copy() |
| 6015 | + try: |
| 6016 | + self.connect_kwargs.pop('auth_plugin') |
| 6017 | + except: |
| 6018 | + pass |
| 6019 | + cnx_test = self.cnx.__class__(**conn_args) |
| 6020 | + cnx_test.cmd_query("SELECT USER()") |
| 6021 | + logged_user = cnx_test.get_rows()[0][0][0] |
| 6022 | + self.assertEqual(u"{user}@{host}".format(**user), logged_user) |
| 6023 | + cnx_test.close() |
| 6024 | + |
| 6025 | + # tests change user |
| 6026 | + if tests.MYSQL_VERSION < (8, 0, 0): |
| 6027 | + # 5.6 does not support caching_sha2_password auth plugin |
| 6028 | + test_cases = [(0, 1), (1, 2), (2,3), (3, 0)] |
| 6029 | + else: |
| 6030 | + test_cases = [(0, 1), (1, 2), (2,3), (3, 0), (3, 4), (4, 5), |
| 6031 | + (5, 3), (5, 0)] |
| 6032 | + for user1, user2 in test_cases: |
| 6033 | + conn_args_user1 = self.users[user1].copy() |
| 6034 | + try: |
| 6035 | + conn_args_user1.pop('auth_plugin') |
| 6036 | + except: |
| 6037 | + pass |
| 6038 | + if tests.MYSQL_VERSION < (8, 0, 0): |
| 6039 | + # change user does not work in 5.x with charset utf8mb4 |
| 6040 | + conn_args_user1['charset'] = 'utf8' |
| 6041 | + |
| 6042 | + cnx_test = self.cnx.__class__(**conn_args_user1) |
| 6043 | + cnx_test.cmd_query("SELECT USER()") |
| 6044 | + first_user = cnx_test.get_rows()[0][0][0] |
| 6045 | + self.assertEqual( |
| 6046 | + u"{user}@{host}".format(**self.users[user1]), first_user) |
| 6047 | + |
| 6048 | + cnx_test.cmd_change_user(self.users[user2]['user'], |
| 6049 | + self.users[user2]['password'], |
| 6050 | + self.users[user2]['database']) |
| 6051 | + cnx_test.cmd_query("SELECT USER()") |
| 6052 | + rows = cnx_test.get_rows() |
| 6053 | + current_user = rows[0][0][0] |
| 6054 | + self.assertNotEqual(first_user, current_user) |
| 6055 | + self.assertEqual( |
| 6056 | + u"{user}@{host}".format(**self.users[user2]), current_user) |
| 6057 | + cnx_test.close() |
| 6058 | + |
| 6059 | + def tearDown(self): |
| 6060 | + cnx = MySQLConnection(**self.connect_kwargs) |
| 6061 | + # cleanup users |
| 6062 | + for new_user in self.users: |
| 6063 | + cnx.cmd_query("DROP USER IF EXISTS '{user}'@'{host}'".format(**new_user)) |
0 commit comments