Skip to content

Commit 5426e6c

Browse files
authored
[NEXMANAGE-737] Sota cancel mode with threading event method (#567)
* [NEXMANAGE-737] Enable sota cancel mode. This PR implements the sota cancel mode using a threading event flag. When the thread is created, it will be added to a thread list. When the dispatcher receives the sota cancel request, it checks the current running thread and retrieves its sota type. If it is a download-only sota, the dispatcher sets the event flag to issue a cancel request. Please note that the first inbc command should be terminated before sending inbc cancel command as they will be sharing the same mqtt connection. Signed-off-by: yengliong <yeng.liong.wong@intel.com> * Cancel request in the middle of download * Fix unit test * Add unit tests to increase coverage * Add timeout in thread.join --------- Signed-off-by: yengliong <yeng.liong.wong@intel.com>
1 parent 4bfbbd0 commit 5426e6c

16 files changed

+497
-42
lines changed

inbc-program/inbc/parser/parser.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def parse_sota_args(self) -> None:
224224
parser_sota.add_argument('--reboot', '-rb', default='yes', required=False, choices=['yes', 'no'],
225225
help='Type of information [ yes | no ]')
226226
parser_sota.add_argument('--mode', '-m', default='full',
227-
required=False, choices=['full', 'download-only', 'no-download'])
227+
required=False, choices=['full', 'download-only', 'no-download', 'cancel'])
228228
parser_sota.add_argument('--package-list', '-p', required=False,
229229
type=lambda x: validate_package_list(x),
230230
help='Comma-separated list of package names to install')

inbm/Changelog.md

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
55

66
## NEXT - MMMM-DD-YY
7+
### Added
8+
- (NEXMANAGE-737) Enable sota cancel mode
9+
710
### Fixed
811
- (NEXMANAGE-872) Fix provision-tc issue in TiberOS - cannot overwrite /etc/dispatcher.environment
912
- (NEXMANAGE-846) Fix granular log raise error when granular log file is empty

inbm/dispatcher-agent/dispatcher/dispatcher_class.py

+44-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import signal
1313
import sys
1414
from queue import Queue
15-
from threading import Thread, active_count, Lock
15+
from threading import Thread, active_count, Lock, Event
1616
from time import sleep
1717
from typing import Optional, Any, Mapping, Tuple, Sequence
1818

@@ -55,6 +55,7 @@
5555
from .sota.os_factory import SotaOsFactory
5656
from .sota.sota import SOTA
5757
from .sota.sota_error import SotaError
58+
from .sota.cancel import cancel_thread
5859
from .workload_orchestration import WorkloadOrchestration
5960
from inbm_lib.xmlhandler import *
6061
from inbm_lib.version import get_friendly_inbm_version_commit
@@ -94,6 +95,8 @@ def __init__(self, args: list[str], broker: DispatcherBroker, install_check_serv
9495
# Initialize update_queue with a capacity of 1 to ensure serialized handling of updates.
9596
self.update_queue: Queue[Tuple[str, str, Optional[str]]] = Queue(1)
9697
self._thread_count = 1
98+
self._thread_list: list[Thread] = []
99+
self._active_thread_manifest: Optional[str] = None
97100
self._sota_repos = None
98101
self.sota_mode = None
99102
self._package_list: str = ""
@@ -115,6 +118,7 @@ def __init__(self, args: list[str], broker: DispatcherBroker, install_check_serv
115118

116119
self.sqlite_mgr = SqliteManager()
117120
self.ap_scheduler = APScheduler(sqlite_mgr=self.sqlite_mgr)
121+
self._cancel_event = Event()
118122

119123
def stop(self) -> None:
120124
self.RUNNING = False
@@ -186,7 +190,13 @@ def _sig_handler(signo, frame) -> None:
186190
if active_count() - active_start_count < self._thread_count:
187191
worker = Thread(target=handle_updates, args=(self,))
188192
worker.setDaemon(True)
193+
self._thread_list.append(worker)
189194
worker.start()
195+
196+
# Periodically check if processes have finished. If process finished, remove it from the list.
197+
for thread in self._thread_list[:]:
198+
if not thread.is_alive():
199+
self._thread_list.remove(thread)
190200
sleep(1)
191201

192202
self._dispatcher_broker.mqtt_publish(f'{AGENT}/state', 'dead', retain=True)
@@ -310,6 +320,8 @@ def do_install(self, xml: str, schema_location: Optional[str] = None, job_id: st
310320
result: Result = Result()
311321
logger.debug("do_install")
312322
parsed_head = None
323+
# Assumption is that there is only one active OTA thread at a time
324+
self._active_thread_manifest = xml
313325
try: # TODO: Split into multiple try/except blocks
314326
type_of_manifest, parsed_head = \
315327
_check_type_validate_manifest(xml, schema_location=schema_location)
@@ -411,7 +423,8 @@ def _do_ota_update(self, ota_type: str, repo_type: str, resource: dict,
411423
self._sota_repos,
412424
self._install_check_service,
413425
self._update_logger,
414-
self.config_dbs)
426+
self.config_dbs,
427+
self._cancel_event)
415428

416429
p = factory.create_parser()
417430
# NOTE: p.parse can raise one of the *otaError exceptions
@@ -440,7 +453,8 @@ def _validate_pota_manifest(self, repo_type: str,
440453
self._sota_repos,
441454
self._install_check_service,
442455
self._update_logger,
443-
self.config_dbs)
456+
self.config_dbs,
457+
self._cancel_event)
444458
p = factory.create_parser()
445459
# NOTE: p.parse can raise one of the *otaError exceptions
446460
parsed_manifest = p.parse(ota_list[ota], kwargs, parsed_head)
@@ -497,7 +511,32 @@ def _on_cloud_request(self, topic: str, payload: str, qos: int) -> None:
497511
request_type = topic.split('/')[2]
498512
request_id = topic.split('/')[3] if len(topic.split('/')) > 3 else None
499513
manifest = payload
500-
self._add_request_to_queue(request_type, manifest, request_id)
514+
if not self._handle_cancel_request(request_type, manifest):
515+
self._add_request_to_queue(request_type, manifest, request_id)
516+
517+
def _handle_cancel_request(self, request_type: str, manifest: str) -> bool:
518+
"""
519+
Check if it is a SOTA cancel request. If it is, send the terminate signal to current process.
520+
521+
@param request_type: type of the request
522+
@param manifest: manifest to be processed
523+
@return: True if the request has been processed; False if no request has been handled.
524+
"""
525+
if request_type == "install":
526+
type_of_manifest, parsed_head = \
527+
_check_type_validate_manifest(manifest)
528+
type_of_active_manifest = active_thread_parsed_head = None
529+
if self._active_thread_manifest:
530+
type_of_active_manifest, active_thread_parsed_head = \
531+
_check_type_validate_manifest(self._active_thread_manifest)
532+
result = cancel_thread(type_of_manifest, parsed_head, self._thread_list,
533+
type_of_active_manifest, active_thread_parsed_head,
534+
self._dispatcher_broker, self._cancel_event)
535+
if result:
536+
logger.debug(f"Request cancel complete.")
537+
self._send_result(str(Result(CODE_OK, "Request complete.")))
538+
return True
539+
return False
501540

502541
def _on_message(self, topic: str, payload: Any, qos: int) -> None:
503542
"""Called when a message is received from _telemetry-agent
@@ -619,6 +658,7 @@ def invoke_sota(self, snapshot: Optional[Any] = None, action: Optional[Any] = No
619658
self._update_logger,
620659
self._sota_repos,
621660
self._install_check_service,
661+
self._cancel_event,
622662
snapshot, action)
623663

624664
sota_instance.execute(self.proceed_without_rollback)

inbm/dispatcher-agent/dispatcher/ota_factory.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77
import abc
88
import logging
9+
import threading
910
from typing import Any, Optional, Mapping
1011

1112
from .config_dbs import ConfigDbs
@@ -53,7 +54,8 @@ def get_factory(ota_type,
5354
sota_repos: Optional[str],
5455
install_check_service: InstallCheckService,
5556
update_logger: UpdateLogger,
56-
dbs: ConfigDbs) -> "OtaFactory":
57+
dbs: ConfigDbs,
58+
cancel_event: threading.Event) -> "OtaFactory":
5759
"""Create an OTA factory of a specified OTA type
5860
5961
@param ota_type: The OTA type
@@ -64,6 +66,7 @@ def get_factory(ota_type,
6466
@param install_check_service: provides install_check
6567
@param update_logger: UpdateLogger (expected to update after OTA)
6668
@param dbs: ConfigDbs.ON or ConfigDbs.WARN or ConfigDbs.OFF
69+
@param cancel_event: Event used to stop the downloading process
6770
@raise ValueError: Unsupported OTA type
6871
"""
6972

@@ -72,7 +75,7 @@ def get_factory(ota_type,
7275
return FotaFactory(repo_type, dispatcher_broker, install_check_service, update_logger)
7376
if ota_type == OtaType.SOTA.name:
7477
return SotaFactory(repo_type, dispatcher_broker, proceed_without_rollback,
75-
sota_repos, install_check_service, update_logger)
78+
sota_repos, install_check_service, update_logger, cancel_event)
7679
if ota_type == OtaType.AOTA.name:
7780
return AotaFactory(repo_type, dispatcher_broker, install_check_service, update_logger, dbs=dbs)
7881
if ota_type == OtaType.POTA.name:
@@ -116,7 +119,8 @@ class SotaFactory(OtaFactory):
116119
@param proceed_without_rollback: Is it OK to run SOTA without rollback ability?
117120
@param install_check_service: provides InstallCheckService
118121
@param sota_repos: new Ubuntu/Debian mirror (or None)
119-
@param update_logger: UpdateLogger (expected to update after OTA)
122+
@param update_logger: UpdateLogger (expected to update after OTA)
123+
@param cancel_event: Event used to stop the downloading process
120124
"""
121125

122126
def __init__(self,
@@ -125,13 +129,15 @@ def __init__(self,
125129
proceed_without_rollback: bool,
126130
sota_repos: Optional[str],
127131
install_check_service: InstallCheckService,
128-
update_logger: UpdateLogger) -> None:
132+
update_logger: UpdateLogger,
133+
cancel_event: threading.Event) -> None:
129134

130135
super().__init__(repo_type, install_check_service)
131136
self._sota_repos = sota_repos
132137
self._proceed_without_rollback = proceed_without_rollback
133138
self._update_logger = update_logger
134139
self._dispatcher_broker = dispatcher_broker
140+
self._cancel_event = cancel_event
135141

136142
def create_parser(self) -> OtaParser:
137143
logger.debug(" ")
@@ -145,7 +151,8 @@ def create_thread(self, parsed_manifest: Mapping[str, Optional[Any]]) -> OtaThre
145151
self._sota_repos,
146152
self._install_check_service,
147153
parsed_manifest,
148-
self._update_logger)
154+
self._update_logger,
155+
self._cancel_event)
149156

150157

151158
class AotaFactory(OtaFactory):

inbm/dispatcher-agent/dispatcher/ota_thread.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import abc
88
import logging
99
import os
10+
import threading
1011
from threading import Lock
1112
from typing import Optional, Any, Mapping
1213

@@ -142,6 +143,7 @@ class SotaThread(OtaThread):
142143
@param install_check_service: provides install_check
143144
@param parsed_manifest: parameters from OTA manifest
144145
@param update_logger: UpdateLogger instance; expected to update when done with OTA
146+
@param cancel_event: Event used to stop the downloading process
145147
@return (dict): dict representation of COMMAND_SUCCESS or OTA_FAILURE/OTA_FAILURE_IN_PROGRESS
146148
"""
147149

@@ -152,13 +154,15 @@ def __init__(self,
152154
sota_repos: Optional[str],
153155
install_check_service: InstallCheckService,
154156
parsed_manifest: Mapping[str, Optional[Any]],
155-
update_logger: UpdateLogger) -> None:
157+
update_logger: UpdateLogger,
158+
cancel_event: threading.Event) -> None:
156159
super().__init__(repo_type, parsed_manifest,
157160
install_check_service=install_check_service)
158161
self._sota_repos = sota_repos
159162
self._proceed_without_rollback = proceed_without_rollback
160163
self._update_logger = update_logger
161164
self._dispatcher_broker = dispatcher_broker
165+
self._cancel_event = cancel_event
162166

163167
def start(self) -> Result: # pragma: no cover
164168
"""Starts the SOTA thread and which checks for existing locks before delegating to
@@ -177,6 +181,7 @@ def start(self) -> Result: # pragma: no cover
177181
dispatcher_broker=self._dispatcher_broker,
178182
update_logger=self._update_logger,
179183
sota_repos=self._sota_repos,
184+
cancel_event=self._cancel_event,
180185
install_check_service=self._install_check_service)
181186
try:
182187
sota_instance.execute(self._proceed_without_rollback)
@@ -200,6 +205,7 @@ def check(self) -> None:
200205
dispatcher_broker=self._dispatcher_broker,
201206
update_logger=self._update_logger,
202207
sota_repos=self._sota_repos,
208+
cancel_event=self._cancel_event,
203209
install_check_service=self._install_check_service)
204210
sota_instance.check()
205211
except SotaError as e:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""
2+
Method to handle cancel request
3+
4+
Copyright (C) 2017-2024 Intel Corporation
5+
SPDX-License-Identifier: Apache-2.0
6+
"""
7+
8+
import logging
9+
from typing import Optional
10+
from threading import Event
11+
from inbm_lib.xmlhandler import XmlHandler
12+
from threading import Thread
13+
from .constants import SOTA_CACHE
14+
from ..constants import OtaType
15+
from dispatcher.dispatcher_exception import DispatcherException
16+
from dispatcher.packagemanager.local_repo import DirectoryRepo
17+
from dispatcher.dispatcher_broker import DispatcherBroker
18+
from dispatcher.common.result_constants import Result, CODE_OK, CODE_BAD_REQUEST
19+
20+
logger = logging.getLogger(__name__)
21+
22+
23+
def cancel_thread(type_of_manifest: str, parsed_head: XmlHandler, thread_list: list[Thread],
24+
type_of_active_manifest: Optional[str], active_thread_parsed_head: Optional[XmlHandler],
25+
dispatcher_broker: DispatcherBroker, cancel_event: Event) -> bool:
26+
"""
27+
Cancel the current active thread by sending the terminate signal.
28+
29+
@param type_of_manifest: type of the request
30+
@param parsed_head: The root parsed xml
31+
@param thread_list: List of the active thread
32+
@param type_of_active_manifest: type of the request on running thread
33+
@param active_thread_parsed_head: The root parsed xml of running thread
34+
@param dispatcher_broker: DispatcherBroker object used to communicate with other INBM services
35+
@param cancel_event: Event used to stop the downloading process
36+
@return: True if the request has been processed; False if no request has been handled.
37+
"""
38+
if type_of_manifest == 'ota':
39+
header = parsed_head.get_children('ota/header')
40+
ota_type = header['type']
41+
resource = parsed_head.get_children(f'ota/type/{ota_type}')
42+
if ota_type == OtaType.SOTA.name.lower():
43+
sota_mode = resource.get('mode', None)
44+
if sota_mode == 'cancel':
45+
logger.debug(f"Receive sota cancel request.")
46+
# If the active thread is not SOTA download-only, forbid the cancel request.
47+
if type_of_active_manifest and active_thread_parsed_head:
48+
if not is_active_ota_sota_download_only(type_of_active_manifest, active_thread_parsed_head):
49+
dispatcher_broker.send_result(
50+
str(Result(CODE_BAD_REQUEST, "Current thread is not SOTA download-only. "
51+
"Cannot proceed with the cancel request.")))
52+
return True
53+
else:
54+
dispatcher_broker.send_result(str(Result(CODE_BAD_REQUEST, "Running thread manifest not found.")))
55+
return True
56+
57+
# The list should only contain one OTA process.
58+
for thread in thread_list:
59+
if thread.is_alive():
60+
cancel_event.set()
61+
# Wait thread to gracefully exit
62+
logger.debug(f"Waiting thread to exit...")
63+
thread.join(timeout=300)
64+
logger.debug(f"Request cancel complete.")
65+
# Reset the event flag
66+
cancel_event.clear()
67+
return True
68+
return False
69+
70+
71+
def is_active_ota_sota_download_only(type_of_active_manifest: str, active_parsed_head: XmlHandler) -> bool:
72+
"""
73+
Check whether the current active thread is SOTA download-only mode.
74+
75+
@param type_of_active_manifest: type of the request
76+
@param active_parsed_head: The root parsed xml
77+
@return: True if it is SOTA download-only; False if not.
78+
"""
79+
logger.debug("")
80+
if type_of_active_manifest == 'ota':
81+
header = active_parsed_head.get_children('ota/header')
82+
ota_type = header['type']
83+
resource = active_parsed_head.get_children(f'ota/type/{ota_type}')
84+
if ota_type == OtaType.SOTA.name.lower():
85+
sota_mode = resource.get('mode', None)
86+
if sota_mode == 'download-only':
87+
return True
88+
return False

0 commit comments

Comments
 (0)