forked from opensearch-project/opensearch-build
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathvalidation_rpm.py
68 lines (58 loc) · 2.88 KB
/
validation_rpm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
import logging
import os
from system.execute import execute
from test_workflow.integ_test.utils import get_password
from validation_workflow.api_test_cases import ApiTestCases
from validation_workflow.download_utils import DownloadUtils
from validation_workflow.validation import Validation
from validation_workflow.validation_args import ValidationArgs
class ValidateRpm(Validation, DownloadUtils):
def __init__(self, args: ValidationArgs) -> None:
super().__init__(args)
def installation(self) -> bool:
try:
execute('sudo rpm --import https://artifacts.opensearch.org/publickeys/opensearch.pgp', str(self.tmp_dir.path), True, False)
for project in self.args.projects:
self.filename = os.path.basename(self.args.file_path.get(project))
execute(f'sudo yum remove {project} -y', ".")
execute(f'sudo env OPENSEARCH_INITIAL_ADMIN_PASSWORD={get_password(str(self.args.version))} rpm -ivh {os.path.join(self.tmp_dir.path, self.filename)}', str(self.tmp_dir.path), True, False) # noqa: 501
except:
raise Exception('Failed to install Opensearch')
return True
def start_cluster(self) -> bool:
try:
for project in self.args.projects:
execute(f'sudo systemctl start {project}', ".")
(stdout, stderr, status) = execute(f'sudo systemctl status {project}', ".")
if(status == 0):
logging.info(stdout)
else:
logging.info(stderr)
except:
raise Exception('Failed to Start Cluster')
return True
def validation(self) -> bool:
if self.check_cluster_readiness():
test_result, counter = ApiTestCases().test_apis(self.args.version, self.args.projects,
self.check_for_security_plugin(os.path.join(os.sep, "usr", "share", "opensearch")) if self.args.allow_http else True)
if (test_result):
logging.info(f'All tests Pass : {counter}')
return True
else:
raise Exception(f'Not all tests Pass : {counter}')
else:
raise Exception("Cluster is not ready for API test")
def cleanup(self) -> bool:
try:
for project in self.args.projects:
execute(f'sudo systemctl stop {project}', ".")
execute(f'sudo yum remove {project} -y', ".")
except Exception as e:
raise Exception(f'Exception occurred either while attempting to stop cluster or removing OpenSearch/OpenSearch-Dashboards. {str(e)}')
return True