diff --git a/appengine/standard/localtesting/datastore_test.py b/appengine/standard/localtesting/datastore_test.py deleted file mode 100644 index 8a505d05b1a..00000000000 --- a/appengine/standard/localtesting/datastore_test.py +++ /dev/null @@ -1,165 +0,0 @@ -# Copyright 2015 Google Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START imports] -import unittest - -from google.appengine.api import memcache -from google.appengine.ext import ndb -from google.appengine.ext import testbed -# [END imports] - - -# [START datastore_example_1] -class TestModel(ndb.Model): - """A model class used for testing.""" - number = ndb.IntegerProperty(default=42) - text = ndb.StringProperty() - - -class TestEntityGroupRoot(ndb.Model): - """Entity group root""" - pass - - -def GetEntityViaMemcache(entity_key): - """Get entity from memcache if available, from datastore if not.""" - entity = memcache.get(entity_key) - if entity is not None: - return entity - key = ndb.Key(urlsafe=entity_key) - entity = key.get() - if entity is not None: - memcache.set(entity_key, entity) - return entity -# [END datastore_example_1] - - -# [START datastore_example_test] -class DatastoreTestCase(unittest.TestCase): - - def setUp(self): - # First, create an instance of the Testbed class. - self.testbed = testbed.Testbed() - # Then activate the testbed, which prepares the service stubs for use. - self.testbed.activate() - # Next, declare which service stubs you want to use. - self.testbed.init_datastore_v3_stub() - self.testbed.init_memcache_stub() - # Clear ndb's in-context cache between tests. - # This prevents data from leaking between tests. - # Alternatively, you could disable caching by - # using ndb.get_context().set_cache_policy(False) - ndb.get_context().clear_cache() - -# [END datastore_example_test] - - # [START datastore_example_teardown] - def tearDown(self): - self.testbed.deactivate() - # [END datastore_example_teardown] - - # [START datastore_example_insert] - def testInsertEntity(self): - TestModel().put() - self.assertEqual(1, len(TestModel.query().fetch(2))) - # [END datastore_example_insert] - - # [START datastore_example_filter] - def testFilterByNumber(self): - root = TestEntityGroupRoot(id="root") - TestModel(parent=root.key).put() - TestModel(number=17, parent=root.key).put() - query = TestModel.query(ancestor=root.key).filter( - TestModel.number == 42) - results = query.fetch(2) - self.assertEqual(1, len(results)) - self.assertEqual(42, results[0].number) - # [END datastore_example_filter] - - # [START datastore_example_memcache] - def testGetEntityViaMemcache(self): - entity_key = TestModel(number=18).put().urlsafe() - retrieved_entity = GetEntityViaMemcache(entity_key) - self.assertNotEqual(None, retrieved_entity) - self.assertEqual(18, retrieved_entity.number) - # [END datastore_example_memcache] - - -# [START HRD_example_1] -from google.appengine.datastore import datastore_stub_util # noqa - - -class HighReplicationTestCaseOne(unittest.TestCase): - - def setUp(self): - # First, create an instance of the Testbed class. - self.testbed = testbed.Testbed() - # Then activate the testbed, which prepares the service stubs for use. - self.testbed.activate() - # Create a consistency policy that will simulate the High Replication - # consistency model. - self.policy = datastore_stub_util.PseudoRandomHRConsistencyPolicy( - probability=0) - # Initialize the datastore stub with this policy. - self.testbed.init_datastore_v3_stub(consistency_policy=self.policy) - # Initialize memcache stub too, since ndb also uses memcache - self.testbed.init_memcache_stub() - # Clear in-context cache before each test. - ndb.get_context().clear_cache() - - def tearDown(self): - self.testbed.deactivate() - - def testEventuallyConsistentGlobalQueryResult(self): - class TestModel(ndb.Model): - pass - - user_key = ndb.Key('User', 'ryan') - - # Put two entities - ndb.put_multi([ - TestModel(parent=user_key), - TestModel(parent=user_key) - ]) - - # Global query doesn't see the data. - self.assertEqual(0, TestModel.query().count(3)) - # Ancestor query does see the data. - self.assertEqual(2, TestModel.query(ancestor=user_key).count(3)) -# [END HRD_example_1] - - # [START HRD_example_2] - def testDeterministicOutcome(self): - # 50% chance to apply. - self.policy.SetProbability(.5) - # Use the pseudo random sequence derived from seed=2. - self.policy.SetSeed(2) - - class TestModel(ndb.Model): - pass - - TestModel().put() - - self.assertEqual(0, TestModel.query().count(3)) - self.assertEqual(0, TestModel.query().count(3)) - # Will always be applied before the third query. - self.assertEqual(1, TestModel.query().count(3)) - # [END HRD_example_2] - - -# [START main] -if __name__ == '__main__': - unittest.main() -# [END main] diff --git a/appengine/standard/localtesting/env_vars_test.py b/appengine/standard/localtesting/env_vars_test.py deleted file mode 100644 index 31e374def64..00000000000 --- a/appengine/standard/localtesting/env_vars_test.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright 2015 Google Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START env_example] -import os -import unittest - -from google.appengine.ext import testbed - - -class EnvVarsTestCase(unittest.TestCase): - def setUp(self): - self.testbed = testbed.Testbed() - self.testbed.activate() - self.testbed.setup_env( - app_id='your-app-id', - my_config_setting='example', - overwrite=True) - - def tearDown(self): - self.testbed.deactivate() - - def testEnvVars(self): - self.assertEqual(os.environ['APPLICATION_ID'], 'your-app-id') - self.assertEqual(os.environ['MY_CONFIG_SETTING'], 'example') -# [END env_example] - - -if __name__ == '__main__': - unittest.main() diff --git a/appengine/standard/localtesting/login_test.py b/appengine/standard/localtesting/login_test.py deleted file mode 100644 index 8886b8cb6e6..00000000000 --- a/appengine/standard/localtesting/login_test.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2015 Google Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START login_example] -import unittest - -from google.appengine.api import users -from google.appengine.ext import testbed - - -class LoginTestCase(unittest.TestCase): - # [START setup] - def setUp(self): - self.testbed = testbed.Testbed() - self.testbed.activate() - self.testbed.init_user_stub() - # [END setup] - - def tearDown(self): - self.testbed.deactivate() - - # [START login] - def loginUser(self, email='user@example.com', id='123', is_admin=False): - self.testbed.setup_env( - user_email=email, - user_id=id, - user_is_admin='1' if is_admin else '0', - overwrite=True) - # [END login] - - # [START test] - def testLogin(self): - self.assertFalse(users.get_current_user()) - self.loginUser() - self.assertEquals(users.get_current_user().email(), 'user@example.com') - self.loginUser(is_admin=True) - self.assertTrue(users.is_current_user_admin()) - # [END test] -# [END login_example] - - -if __name__ == '__main__': - unittest.main() diff --git a/appengine/standard/localtesting/mail_test.py b/appengine/standard/localtesting/mail_test.py deleted file mode 100644 index a362aa210eb..00000000000 --- a/appengine/standard/localtesting/mail_test.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2015 Google Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START mail_example] -import unittest - -from google.appengine.api import mail -from google.appengine.ext import testbed - - -class MailTestCase(unittest.TestCase): - - def setUp(self): - self.testbed = testbed.Testbed() - self.testbed.activate() - self.testbed.init_mail_stub() - self.mail_stub = self.testbed.get_stub(testbed.MAIL_SERVICE_NAME) - - def tearDown(self): - self.testbed.deactivate() - - def testMailSent(self): - mail.send_mail(to='alice@example.com', - subject='This is a test', - sender='bob@example.com', - body='This is a test e-mail') - messages = self.mail_stub.get_sent_messages(to='alice@example.com') - self.assertEqual(1, len(messages)) - self.assertEqual('alice@example.com', messages[0].to) -# [END mail_example] - - -if __name__ == '__main__': - unittest.main() diff --git a/appengine/standard/localtesting/memcache_test.py b/appengine/standard/localtesting/memcache_test.py new file mode 100644 index 00000000000..ffecc2474e6 --- /dev/null +++ b/appengine/standard/localtesting/memcache_test.py @@ -0,0 +1,62 @@ +# Copyright 2015 Google Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import calendar +import datetime +import unittest +import mock +import time + +from google.appengine.api import memcache +from google.appengine.ext import testbed + + +class MemcacheTestCase(unittest.TestCase): + def setUp(self): + self.testbed = testbed.Testbed() + self.testbed.activate() + self.testbed.init_memcache_stub() + + def tearDown(self): + self.testbed.deactivate() + + @mock.patch('time.time') + def testMemcache(self, time_mock): + initial_datetime = datetime.datetime(year=2018, month=7, day=5, hour=9, minute=1, second=0) + time_mock.return_value = time.mktime(initial_datetime.timetuple()) + + day_after = datetime.datetime(year=2018, month=7, day=6, hour=9, minute=1, second=0) + memcache.set('TEST_K', 1, time=calendar.timegm(day_after.timetuple())) + self.assertEqual(memcache.get('TEST_K'), 1) + + @mock.patch('time.time') + def testMemcacheExpiration(self, time_mock): + initial_datetime = datetime.datetime(year=2018, month=7, day=1, hour=9, minute=1, second=0) + time_mock.return_value = time.mktime(initial_datetime.timetuple()) + #mock.patch('time.time', mock.MagicMock(return_value=time.mktime(initial_datetime.timetuple()))) + + day_after = datetime.datetime(year=2018, month=7, day=2, hour=9, minute=0, second=0) + memcache.set('TEST_K', 1, time=calendar.timegm(day_after.timetuple())) + self.assertEqual(memcache.get('TEST_K'), 1) + + day_after_after = datetime.datetime(year=2018, month=7, day=3, hour=9, minute=1, second=0) + time_mock.return_value = time.mktime(day_after_after.timetuple()) + #mock.patch('time.time', mock.MagicMock(return_value=time.mktime(day_after_tomorrow.timetuple()))) + + self.assertIsNone(memcache.get('TEST_K')) + + + +if __name__ == '__main__': + unittest.main() diff --git a/appengine/standard/localtesting/task_queue_test.py b/appengine/standard/localtesting/task_queue_test.py deleted file mode 100644 index 2472fe18887..00000000000 --- a/appengine/standard/localtesting/task_queue_test.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright 2015 Google Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START taskqueue] -import operator -import os -import unittest - -from google.appengine.api import taskqueue -from google.appengine.ext import deferred -from google.appengine.ext import testbed - - -class TaskQueueTestCase(unittest.TestCase): - def setUp(self): - self.testbed = testbed.Testbed() - self.testbed.activate() - - # root_path must be set the the location of queue.yaml. - # Otherwise, only the 'default' queue will be available. - self.testbed.init_taskqueue_stub( - root_path=os.path.join(os.path.dirname(__file__), 'resources')) - self.taskqueue_stub = self.testbed.get_stub( - testbed.TASKQUEUE_SERVICE_NAME) - - def tearDown(self): - self.testbed.deactivate() - - def testTaskAddedToQueue(self): - taskqueue.Task(name='my_task', url='/url/of/my/task/').add() - tasks = self.taskqueue_stub.get_filtered_tasks() - self.assertEqual(len(tasks), 1) - self.assertEqual(tasks[0].name, 'my_task') -# [END taskqueue] - - # [START filtering] - def testFiltering(self): - taskqueue.Task(name='task_one', url='/url/of/task/1/').add('queue-1') - taskqueue.Task(name='task_two', url='/url/of/task/2/').add('queue-2') - - # All tasks - tasks = self.taskqueue_stub.get_filtered_tasks() - self.assertEqual(len(tasks), 2) - - # Filter by name - tasks = self.taskqueue_stub.get_filtered_tasks(name='task_one') - self.assertEqual(len(tasks), 1) - self.assertEqual(tasks[0].name, 'task_one') - - # Filter by URL - tasks = self.taskqueue_stub.get_filtered_tasks(url='/url/of/task/1/') - self.assertEqual(len(tasks), 1) - self.assertEqual(tasks[0].name, 'task_one') - - # Filter by queue - tasks = self.taskqueue_stub.get_filtered_tasks(queue_names='queue-1') - self.assertEqual(len(tasks), 1) - self.assertEqual(tasks[0].name, 'task_one') - - # Multiple queues - tasks = self.taskqueue_stub.get_filtered_tasks( - queue_names=['queue-1', 'queue-2']) - self.assertEqual(len(tasks), 2) - # [END filtering] - - # [START deferred] - def testTaskAddedByDeferred(self): - deferred.defer(operator.add, 1, 2) - - tasks = self.taskqueue_stub.get_filtered_tasks() - self.assertEqual(len(tasks), 1) - - result = deferred.run(tasks[0].payload) - self.assertEqual(result, 3) - # [END deferred] - - -if __name__ == '__main__': - unittest.main()