|
39 | 39 | import os
|
40 | 40 | import gc
|
41 | 41 | import tempfile
|
42 |
| -from datetime import datetime, timedelta, time |
| 42 | +from datetime import date, datetime, timedelta, time |
43 | 43 | from threading import Thread
|
44 | 44 | import traceback
|
45 | 45 | import time
|
@@ -4629,3 +4629,226 @@ def test_cursor_prepared_statement_with_charset_utf8(self):
|
4629 | 4629 | @foreach_cnx()
|
4630 | 4630 | def test_cursor_prepared_statement_with_charset_latin1(self):
|
4631 | 4631 | self._test_charset('latin1', [u'ñ', u'Ñ'])
|
| 4632 | + |
| 4633 | + |
| 4634 | +@unittest.skipIf(tests.MYSQL_VERSION < (5, 7, 8), |
| 4635 | + "Support for native JSON data types introduced on 5.7.8 ") |
| 4636 | +class BugOra24948186(tests.MySQLConnectorTests): |
| 4637 | + """BUG#24948186: MySQL JSON TYPES RETURNED AS BYTES INSTEAD OF PYTHON TYPES |
| 4638 | + """ |
| 4639 | + def setUp(self): |
| 4640 | + pass |
| 4641 | + |
| 4642 | + def tearDown(self): |
| 4643 | + pass |
| 4644 | + |
| 4645 | + def run_test_mysql_json_type(self, stm, test_values, expected_values, |
| 4646 | + mysql_type, expected_type): |
| 4647 | + config = tests.get_mysql_config() |
| 4648 | + config['charset'] = "utf8" |
| 4649 | + config['use_unicode'] = True |
| 4650 | + cnx = connection.MySQLConnection(**config) |
| 4651 | + cur = cnx.cursor() |
| 4652 | + |
| 4653 | + for test_value, expected_value in zip(test_values, expected_values): |
| 4654 | + cur.execute(stm.format(value=test_value)) |
| 4655 | + row = cur.fetchall()[0] |
| 4656 | + self.assertEqual(row[0], expected_value) #,"value is not the expected") |
| 4657 | + self.assertTrue(isinstance(row[0], expected_type), |
| 4658 | + u"value {} is not python type {}" |
| 4659 | + u"".format(row[0], expected_type)) |
| 4660 | + self.assertEqual(row[1], mysql_type, |
| 4661 | + u"value {} is mysql type {} but expected {}" |
| 4662 | + u"".format(row[0], row[1], mysql_type)) |
| 4663 | + cur.close() |
| 4664 | + cnx.close() |
| 4665 | + |
| 4666 | + @foreach_cnx() |
| 4667 | + def test_retrieve_mysql_json_boolean(self): |
| 4668 | + stm = ("SELECT j, JSON_TYPE(j)" |
| 4669 | + "from (SELECT CAST({value} AS JSON) as J) jdata") |
| 4670 | + test_values = ["true", "false"] |
| 4671 | + expected_values = [True, False] |
| 4672 | + mysql_type = "BOOLEAN" |
| 4673 | + expected_type = bool |
| 4674 | + self.run_test_mysql_json_type(stm, test_values, expected_values, |
| 4675 | + mysql_type, expected_type) |
| 4676 | + |
| 4677 | + @foreach_cnx() |
| 4678 | + def test_retrieve_mysql_json_integer(self): |
| 4679 | + stm = ("SELECT j->>'$.foo', JSON_TYPE(j->>'$.foo')" |
| 4680 | + "FROM (SELECT json_object('foo', {value}) AS j) jdata") |
| 4681 | + test_values = [-2147483648, -1, 0, 1, 2147483647] |
| 4682 | + expected_values = test_values |
| 4683 | + mysql_type = "INTEGER" |
| 4684 | + expected_type = int |
| 4685 | + self.run_test_mysql_json_type(stm, test_values, expected_values, |
| 4686 | + mysql_type, expected_type) |
| 4687 | + |
| 4688 | + test_values = [-9223372036854775808] |
| 4689 | + expected_values = test_values |
| 4690 | + mysql_type = "INTEGER" |
| 4691 | + expected_type = int |
| 4692 | + self.run_test_mysql_json_type(stm, test_values, expected_values, |
| 4693 | + mysql_type, expected_type) |
| 4694 | + |
| 4695 | + test_values = [92233720368547760, |
| 4696 | + 18446744073709551615] |
| 4697 | + expected_values = test_values |
| 4698 | + mysql_type = "UNSIGNED INTEGER" |
| 4699 | + expected_type = int |
| 4700 | + self.run_test_mysql_json_type(stm, test_values[0:1], |
| 4701 | + expected_values[0:1], |
| 4702 | + mysql_type, expected_type) |
| 4703 | + if PY2: |
| 4704 | + expected_type = long |
| 4705 | + else: |
| 4706 | + expected_type = int |
| 4707 | + self.run_test_mysql_json_type(stm, test_values[1:], |
| 4708 | + expected_values[1:], |
| 4709 | + mysql_type, expected_type) |
| 4710 | + |
| 4711 | + @foreach_cnx() |
| 4712 | + def test_retrieve_mysql_json_double(self): |
| 4713 | + # Because floating-point values are approximate and not stored as exact |
| 4714 | + # values the test values are not the true maximun or minum values |
| 4715 | + stm = ("SELECT j->>'$.foo', JSON_TYPE(j->>'$.foo')" |
| 4716 | + "FROM (SELECT json_object('foo', {value}) AS j) jdata") |
| 4717 | + test_values = [-12345.555, -1.55, 1.55, 12345.555] |
| 4718 | + expected_values = test_values |
| 4719 | + mysql_type = "DOUBLE" |
| 4720 | + expected_type = float |
| 4721 | + self.run_test_mysql_json_type(stm, test_values, expected_values, |
| 4722 | + mysql_type, expected_type) |
| 4723 | + |
| 4724 | + @foreach_cnx() |
| 4725 | + def test_retrieve_mysql_json_string(self): |
| 4726 | + stm = (u"SELECT j->>'$.foo', JSON_TYPE(j->>'$.foo')" |
| 4727 | + u"FROM (SELECT json_object('foo', {value}) AS j) jdata") |
| 4728 | + test_values = ['\'" "\'', '\'"some text"\'', u'\'"データベース"\''] |
| 4729 | + expected_values = ['" "', '"some text"', u'"データベース"'] |
| 4730 | + mysql_type = "STRING" |
| 4731 | + expected_type = STRING_TYPES |
| 4732 | + self.run_test_mysql_json_type(stm, test_values, expected_values, |
| 4733 | + mysql_type, expected_type) |
| 4734 | + |
| 4735 | + @foreach_cnx() |
| 4736 | + def test_retrieve_mysql_json_datetime_types(self): |
| 4737 | + stm = ("SELECT j, JSON_TYPE(j)" |
| 4738 | + "from (SELECT CAST({value} AS JSON) as J) jdata") |
| 4739 | + test_values = ["cast('1972-01-01 00:42:49.000000' as DATETIME)", |
| 4740 | + "cast('2018-01-01 23:59:59.000000' as DATETIME)"] |
| 4741 | + expected_values = [datetime(1972, 1, 1, 0, 42, 49), |
| 4742 | + datetime(2018, 1, 1, 23, 59, 59)] |
| 4743 | + mysql_type = "DATETIME" |
| 4744 | + expected_type = datetime |
| 4745 | + self.run_test_mysql_json_type(stm, test_values, expected_values, |
| 4746 | + mysql_type, expected_type) |
| 4747 | + |
| 4748 | + @foreach_cnx() |
| 4749 | + def test_retrieve_mysql_json_date_types(self): |
| 4750 | + stm = ("SELECT j, JSON_TYPE(j)" |
| 4751 | + "from (SELECT CAST({value} AS JSON) as J) jdata") |
| 4752 | + test_values = ["DATE('1972-01-01')", |
| 4753 | + "DATE('2018-12-31')"] |
| 4754 | + expected_values = [date(1972, 1, 1), |
| 4755 | + date(2018, 12, 31)] |
| 4756 | + mysql_type = "DATE" |
| 4757 | + expected_type = date |
| 4758 | + self.run_test_mysql_json_type(stm, test_values, expected_values, |
| 4759 | + mysql_type, expected_type) |
| 4760 | + |
| 4761 | + @foreach_cnx() |
| 4762 | + def test_retrieve_mysql_json_time_types(self): |
| 4763 | + stm = ("SELECT j, JSON_TYPE(j)" |
| 4764 | + "from (SELECT CAST({value} AS JSON) as J) jdata") |
| 4765 | + test_values = ["TIME('00:42:49.000000')", |
| 4766 | + "TIME('23:59:59.000001')"] |
| 4767 | + expected_values = [timedelta(hours=0, minutes=42, seconds=49), |
| 4768 | + timedelta(hours=23, minutes=59, seconds=59, |
| 4769 | + microseconds=1)] |
| 4770 | + mysql_type = "TIME" |
| 4771 | + expected_type = timedelta |
| 4772 | + self.run_test_mysql_json_type(stm, test_values, expected_values, |
| 4773 | + mysql_type, expected_type) |
| 4774 | + |
| 4775 | + |
| 4776 | +class BugOra27364914(tests.MySQLConnectorTests): |
| 4777 | + """BUG#27364914: CURSOR PREPARED STATEMENTS DO NOT CONVERT STRINGS |
| 4778 | + """ |
| 4779 | + charsets_list = ('gbk', 'sjis', 'big5', 'utf8', 'utf8mb4', 'latin1') |
| 4780 | + |
| 4781 | + def setUp(self): |
| 4782 | + cnx = connection.MySQLConnection(**tests.get_mysql_config()) |
| 4783 | + cur = cnx.cursor(cursor_class=cursor.MySQLCursorPrepared) |
| 4784 | + |
| 4785 | + for charset in self.charsets_list: |
| 4786 | + tablename = '{0}_ps_test'.format(charset) |
| 4787 | + cur.execute("DROP TABLE IF EXISTS {0}".format(tablename)) |
| 4788 | + table = ( |
| 4789 | + "CREATE TABLE {table} (" |
| 4790 | + " id INT AUTO_INCREMENT KEY," |
| 4791 | + " c1 VARCHAR(40)," |
| 4792 | + " val2 datetime" |
| 4793 | + ") CHARACTER SET '{charset}'" |
| 4794 | + ).format(table=tablename, charset=charset) |
| 4795 | + cur.execute(table) |
| 4796 | + cnx.commit() |
| 4797 | + cur.close() |
| 4798 | + cnx.close() |
| 4799 | + |
| 4800 | + def tearDown(self): |
| 4801 | + cnx = connection.MySQLConnection(**tests.get_mysql_config()) |
| 4802 | + for charset in self.charsets_list: |
| 4803 | + tablename = '{0}_ps_test'.format(charset) |
| 4804 | + cnx.cmd_query("DROP TABLE IF EXISTS {0}".format(tablename)) |
| 4805 | + cnx.close() |
| 4806 | + |
| 4807 | + def _test_charset(self, charset, data): |
| 4808 | + config = tests.get_mysql_config() |
| 4809 | + config['charset'] = charset |
| 4810 | + config['use_unicode'] = True |
| 4811 | + self.cnx = connection.MySQLConnection(**tests.get_mysql_config()) |
| 4812 | + cur = self.cnx.cursor(cursor_class=cursor.MySQLCursorPrepared) |
| 4813 | + |
| 4814 | + tablename = '{0}_ps_test'.format(charset) |
| 4815 | + cur.execute("TRUNCATE {0}".format(tablename)) |
| 4816 | + self.cnx.commit() |
| 4817 | + |
| 4818 | + insert = "INSERT INTO {0} (c1) VALUES (%s)".format(tablename) |
| 4819 | + for value in data: |
| 4820 | + cur.execute(insert, (value,)) |
| 4821 | + self.cnx.commit() |
| 4822 | + |
| 4823 | + cur.execute("SELECT id, c1 FROM {0} ORDER BY id".format(tablename)) |
| 4824 | + for row in cur: |
| 4825 | + self.assertTrue(isinstance(row[1], STRING_TYPES), |
| 4826 | + "The value is expected to be a string") |
| 4827 | + self.assertEqual(data[row[0] - 1], row[1]) |
| 4828 | + |
| 4829 | + cur.close() |
| 4830 | + self.cnx.close() |
| 4831 | + |
| 4832 | + @foreach_cnx() |
| 4833 | + def test_cursor_prepared_statement_with_charset_gbk(self): |
| 4834 | + self._test_charset('gbk', [u'赵孟頫', u'赵\孟\頫\\', u'遜']) |
| 4835 | + |
| 4836 | + @foreach_cnx() |
| 4837 | + def test_cursor_prepared_statement_with_charset_sjis(self): |
| 4838 | + self._test_charset('sjis', ['\u005c']) |
| 4839 | + |
| 4840 | + @foreach_cnx() |
| 4841 | + def test_cursor_prepared_statement_with_charset_big5(self): |
| 4842 | + self._test_charset('big5', ['\u5C62']) |
| 4843 | + |
| 4844 | + @foreach_cnx() |
| 4845 | + def test_cursor_prepared_statement_with_charset_utf8mb4(self): |
| 4846 | + self._test_charset('utf8mb4', ['\u5C62']) |
| 4847 | + |
| 4848 | + @foreach_cnx() |
| 4849 | + def test_cursor_prepared_statement_with_charset_utf8(self): |
| 4850 | + self._test_charset('utf8', [u'データベース', u'데이터베이스']) |
| 4851 | + |
| 4852 | + @foreach_cnx() |
| 4853 | + def test_cursor_prepared_statement_with_charset_latin1(self): |
| 4854 | + self._test_charset('latin1', [u'ñ', u'Ñ']) |
0 commit comments