diff --git a/zaza/openstack/charm_tests/ceph/tests.py b/zaza/openstack/charm_tests/ceph/tests.py index aa4b7a40e..46911c07c 100644 --- a/zaza/openstack/charm_tests/ceph/tests.py +++ b/zaza/openstack/charm_tests/ceph/tests.py @@ -900,6 +900,34 @@ def clean_rgw_multisite_config(self, app_name): '--rgw-zonegroup=default' ) + def enable_virtual_hosted_bucket(self): + """Enable virtual hosted bucket on primary rgw app.""" + zaza_model.set_application_config( + self.primary_rgw_app, + { + 'virtual-hosted-bucket-enabled': "true" + } + ) + + def set_os_public_hostname(self): + """Set os-public-hostname on primary rgw app.""" + zaza_model.set_application_config( + self.primary_rgw_app, + { + 'os-public-hostname': "rgw.example.com", + } + ) + + def clean_virtual_hosted_bucket(self): + """Clear virtual hosted bucket on primary app.""" + zaza_model.set_application_config( + self.primary_rgw_app, + { + 'os-public-hostname': "", + 'virtual-hosted-bucket-enabled': "false" + } + ) + def test_001_processes(self): """Verify Ceph processes. @@ -1196,6 +1224,71 @@ def test_004_migration_and_multisite_failover(self): self.purge_bucket(self.secondary_rgw_app, 'zaza-container') self.purge_bucket(self.secondary_rgw_app, 'failover-container') + def test_005_virtual_hosted_bucket(self): + """Test virtual hosted bucket.""" + primary_rgw_unit = zaza_model.get_unit_from_name(self.primary_rgw_unit) + if primary_rgw_unit.workload_status != "active": + logging.info('Skipping virtual hosted bucket test since ' + 'primary rgw unit is not in active state') + return + + logging.info('Testing virtual hosted bucket') + + # 0. Configure virtual hosted bucket + self.enable_virtual_hosted_bucket() + assert_state = { + self.primary_rgw_app: { + "workload-status": "blocked", + "workload-status-message-prefix": + "os-public-hostname must have a value " + "when virtual hosted bucket is enabled" + } + } + zaza_model.wait_for_application_states(self.model_name, + states=assert_state, + timeout=900) + self.set_os_public_hostname() + zaza_model.block_until_all_units_idle(self.model_name) + container_name = 'zaza-bucket' + obj_data = 'Test content from Zaza' + obj_name = 'testfile' + + # 1. Fetch Primary Endpoint Details + primary_endpoint = self.get_rgw_endpoint(self.primary_rgw_unit) + self.assertNotEqual(primary_endpoint, None) + + # 2. Create RGW Client and perform IO + access_key, secret_key = self.get_client_keys() + primary_client = boto3.resource("s3", + verify=False, + endpoint_url=primary_endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key) + primary_client.Bucket(container_name).create() + primary_object_one = primary_client.Object( + container_name, + obj_name + ) + primary_object_one.put(Body=obj_data) + primary_client.Bucket(container_name).Acl().put(ACL='public-read') + primary_client.Object(container_name, obj_name).Acl().put( + ACL='public-read' + ) + + # 3. Test if we can get content via virtual hosted bucket name + public_hostname = zaza_model.get_application_config( + self.primary_rgw_app + )["os-public-hostname"]["value"] + url = f"{primary_endpoint}/{obj_name}" + headers = {'host': f"{container_name}.{public_hostname}"} + f = requests.get(url, headers=headers, verify=False) + self.assertEqual(f.text, obj_data) + + # 4. Cleanup and de-configure virtual hosted bucket + self.clean_virtual_hosted_bucket() + zaza_model.block_until_all_units_idle(self.model_name) + self.purge_bucket(self.primary_rgw_app, container_name) + class CephProxyTest(unittest.TestCase): """Test ceph via proxy."""