diff --git a/setup.py b/setup.py index 1d68190d..6ae60f29 100644 --- a/setup.py +++ b/setup.py @@ -42,10 +42,14 @@ ], install_requires=[ 'colorama', + 'types-PyYAML', 'PyYAML>=5.1', 'pykwalify', 'setuptools', 'packaging', + 'types-requests', + 'requests', + 'py7zr', ], python_requires='>=3.8', entry_points={'console_scripts': ('west = west.app.main:main',)}, diff --git a/src/west/app/main.py b/src/west/app/main.py index ae0e904c..5fdf4bfe 100755 --- a/src/west/app/main.py +++ b/src/west/app/main.py @@ -34,7 +34,7 @@ from west.commands import WestCommand, extension_commands, \ CommandError, ExtensionCommandError, Verbosity from west.app.project import List, ManifestCommand, Compare, Diff, Status, \ - SelfUpdate, ForAll, Grep, Init, Update, Topdir + SelfUpdate, ForAll, Grep, Init, Update, Topdir, Sdk from west.app.config import Config from west.manifest import Manifest, MalformedConfig, MalformedManifest, \ ManifestVersionError, ManifestImportFailed, _ManifestImportDepth, \ @@ -1104,6 +1104,7 @@ def main(argv=None): Help, Config, Topdir, + Sdk, ], # None is for hidden commands we don't want to show to the user. diff --git a/src/west/app/project.py b/src/west/app/project.py index 0ffe1e5a..35e88bc6 100644 --- a/src/west/app/project.py +++ b/src/west/app/project.py @@ -7,14 +7,21 @@ import argparse from functools import partial +import hashlib import logging import os from os.path import abspath, relpath from pathlib import PurePath, Path +import platform +from py7zr import SevenZipFile +import re +import requests import shutil import shlex import subprocess import sys +import tarfile +import tempfile import textwrap from time import perf_counter from urllib.parse import urlparse @@ -1914,6 +1921,335 @@ def do_add_parser(self, parser_adder): def do_run(self, args, user_args): self.die(self.description) + +class Sdk(_ProjectCommand): + def __init__(self): + super().__init__( + "sdk", + "List and Install Zephyr SDK", + textwrap.dedent('''\ + Listing SDKs: + + Run 'west sdk' command information about available SDKs is displayed. + + Installing SDK: + + Run 'west sdk --install=latest' to simply install the latest SDK. + + Set --install option to install the specified version of the SDK. + If it is already installed, it will be used to continue processing. + + You can specify the installation directory with --install-base. + The SDK will be extracted under the specified directory. + You can only install to a directory where you have write permissions. + + The --toolchains and --hosttools options specify the modules to install. + If neither is specified, an interactive installer will be launched. + + --api-url specifies the REST API endpoint for GitHub releases information + when installing the SDK from a different GitHub repository.'''), + requires_workspace=False, + ) + + def do_add_parser(self, parser_adder): + parser = self._parser(parser_adder) + parser.add_argument( + "--install", + default=None, + dest="sdk_version", + metavar="SDK_VER", + help="Install the selected version of Zephyr SDK. " + "If you set this option to \"latest\", select the latest version." + ) + parser.add_argument( + "--install-base", + default=os.path.expanduser("~"), + metavar="BASE", + help="specify install base directory. Use $HOME as default", + ) + parser.add_argument( + "--toolchains", + nargs="*", + type=str, + help="specify installing toolchains", + ) + parser.add_argument( + "--hosttools", action="store_true", help="install host-tools" + ) + parser.add_argument( + "--api-url", + default="https://api.github.com/repos/zephyrproject-rtos/sdk-ng/releases", + type=str, + help="specifies the GitHub releases API endpoint for find SDKs.", + ) + + return parser + + def fetch_all_releases(self, url): + releases = [] + page = 1 + + while True: + params = {"page": page, "per_page": 100} + resp = requests.get(url, params=params) + if resp.status_code != 200: + raise Exception(f"Failed to fetch: {resp.status_code}, {resp.text}") + + data = resp.json() + if not data: + break + + releases.extend(data) + page += 1 + + return releases + + def sdk_basename(self, release): + return "zephyr-sdk-" + re.sub("^v", "", release["tag_name"]) + + def minimal_sdk_filename(self, release): + basename = self.sdk_basename(release) + system = platform.system() + machine = platform.machine() + + if system == 'Linux': + osname = 'linux' + elif system == 'Darwin': + osname = 'macos' + elif system == 'Windows': + osname = 'windows' + else: + self.die(f'Unsupported system: {system}') + + if machine == 'aarch64': + arch = 'aarch64' + elif machine == 'x86_64' or machine == 'AMD64': + arch = 'x86_64' + else: + self.die(f'Unsupported machine: {machine}') + + if osname == 'windows': + name = basename + '_' + osname + '-' + arch + '_minimal.7z' + else: + name = basename + '_' + osname + '-' + arch + '_minimal.tar.xz' + + return name + + def minimal_sdk_url(self, release): + name = self.minimal_sdk_filename(release) + assets = release.get('assets', []) + minimal_sdk_asset = next(filter(lambda x: x['name'] == name, assets)) + + return minimal_sdk_asset['browser_download_url'] + + def sha256_sum_url(self, release): + assets = release.get('assets', []) + minimal_sdk_asset = next(filter(lambda x: x['name'] == 'sha256.sum', assets)) + + return minimal_sdk_asset['browser_download_url'] + + def sha256_hashtable(self, sha256_list): + hashtable = {} + + for line in sha256_list.splitlines(): + tuple = re.split(r'\s+', line) + hashtable[tuple[1]] = tuple[0] + + return hashtable + + def download_and_extract(self, archive_url, sha256, install_base, sdk_dir): + resp = requests.get(archive_url, stream=True) + if resp.status_code != 200: + raise Exception(f'Failed to download {archive_url}: {resp.status_code}') + + with tempfile.TemporaryDirectory() as tempdir: + # download archive file + filename = os.path.join(tempdir, re.sub(r'^.*/', '', archive_url)) + file = open(filename, mode='wb') + total_length = int(resp.headers['Content-Length']) + count = 0 + for chunk in resp.iter_content(chunk_size=8192): + file.write(chunk) + count = count + len(chunk) + self.inf(f'\r {count}/{total_length}', end='') + self.inf() + self.inf(f'Downloaded: {file.name}') + file.close() + + # check sha256 hash + with open(file.name, 'rb') as sha256file: + digest = hashlib.sha256(sha256file.read()).hexdigest() + if sha256 != digest: + raise Exception(f'sha256 mismatched: {sha256}:{digest}') + + # extract archive file + self.inf(f'Extract: {file.name}') + if file.name.endswith('.tar.xz'): + with tarfile.open(file.name, mode='r:xz') as archive: + archive.extractall(path=install_base) + else: + with SevenZipFile(file.name, mode='r') as archive: + archive.extractall(path=install_base) + + def install_sdk(self, args, user_args): + if not os.path.exists(args.install_base): + self.die(f"{args.install_base} not exists") + + if not os.access(args.install_base, os.W_OK): + self.die(f"{args.install_base} not writable ") + + self.inf('Fetching Zephyr SDK list...') + releases = self.fetch_all_releases(args.api_url) + + if args.sdk_version == 'latest': + target_release = releases[0] + else: + target_release = next(filter(lambda x: x['tag_name'] == args.sdk_version, releases)) + + sdk_dir = self.sdk_basename(target_release) + + target_dir = os.path.join(args.install_base, sdk_dir) + if 'Windows' == platform.system(): + setup = os.path.join(target_dir, 'setup.cmd') + optsep = '/' + else: + setup = os.path.join(target_dir, 'setup.sh') + optsep = '-' + + cmds = [setup] + + if args.toolchains is not None or args.hosttools: + if 'all' in args.toolchains: + cmds.extend([optsep + 't', 'all']) + else: + for tc in args.toolchains: + cmds.extend([optsep + 't', tc]) + + if args.hosttools: + cmds.extend([optsep + 'h']) + + if not os.path.exists(setup): + sha256_url = self.sha256_sum_url(target_release) + sdk_url = self.minimal_sdk_url(target_release) + + self.inf('Fetching assets list...') + resp = requests.get(sha256_url, stream=True) + if resp.status_code != 200: + raise Exception(f'Failed to download {sha256_url}: {resp.status_code}') + + hashtable = self.sha256_hashtable(resp.content.decode('UTF-8')) + sha256 = hashtable[self.minimal_sdk_filename(target_release)] + + self.inf(f'Downloading {sdk_url}.') + self.download_and_extract(sdk_url, sha256, args.install_base, sdk_dir) + + else: + self.inf(f'{target_dir} exists. Use this.') + + self.dbg(cmds) + + ret = subprocess.run(cmds) + + if ret.returncode != 0: + self.die(f"command \"{' '.join(cmds)}\" failed") + + # Associate it so that it can be found. + cmds.extend([optsep + 'c']) + ret = subprocess.run(cmds) + if ret.returncode != 0: + self.die(f"command \"{' '.join(cmds)}\" failed") + + def list_sdk(self, args, user_args): + cmake_script = ''' +SET(zephyr_sdk_search_paths + /usr + /usr/local + /opt + $ENV{HOME} + $ENV{HOME}/.local + $ENV{HOME}/.local/opt + $ENV{HOME}/bin) + +# Search for Zephyr SDK version 0.0.0 which does not exist, this is needed to +# return a list of compatible versions and find the best suited version that +# is available. +find_package(Zephyr-sdk 0.0.0 EXACT QUIET CONFIG PATHS ${zephyr_sdk_search_paths}) + +# Remove duplicate entries and sort naturally in descending order. +set(zephyr_sdk_found_versions ${Zephyr-sdk_CONSIDERED_VERSIONS}) +set(zephyr_sdk_found_configs ${Zephyr-sdk_CONSIDERED_CONFIGS}) + +list(REMOVE_DUPLICATES Zephyr-sdk_CONSIDERED_VERSIONS) +list(SORT Zephyr-sdk_CONSIDERED_VERSIONS COMPARE NATURAL ORDER DESCENDING) + +# Loop over each found Zepher SDK version until one is found that is compatible. +foreach(zephyr_sdk_candidate ${Zephyr-sdk_CONSIDERED_VERSIONS}) + if('${zephyr_sdk_candidate}' VERSION_GREATER_EQUAL '${Zephyr-sdk_FIND_VERSION}') + # Find the path for the current version being checked and get the directory + # of the Zephyr SDK so it can be checked. + list(FIND zephyr_sdk_found_versions ${zephyr_sdk_candidate} zephyr_sdk_current_index) + list(GET zephyr_sdk_found_configs ${zephyr_sdk_current_index} zephyr_sdk_current_check_path) + get_filename_component(zephyr_sdk_current_check_path ${zephyr_sdk_current_check_path} DIRECTORY) + + # Then see if this version is compatible. + find_package(Zephyr-sdk ${Zephyr-sdk_FIND_VERSION} QUIET CONFIG PATHS + ${zephyr_sdk_current_check_path} NO_DEFAULT_PATH) + + if (${Zephyr-sdk_FOUND}) + message(STATUS ${Zephyr-sdk_DIR}) + endif() + endif() +endforeach() +''' + + sorted_sdks = [] + with tempfile.TemporaryDirectory() as d: + with open(os.path.join(d, 'listsdk.cmake'), mode='w') as f: + f.write(cmake_script) + f.close() + + result = subprocess.run(['cmake', '-P', f.name], capture_output=True, text=True) + + sorted_sdks = [x[3:] for x in result.stdout.strip().split('\n')] + + for sdk in sorted_sdks: + self.inf(f'{os.path.basename(sdk)}:') + self.inf() + self.inf(f' Path: {sdk}') + + with open(os.path.join(sdk, 'sdk_toolchains')) as f: + toolchains = f.readlines() + + self.inf() + self.inf(' Installed toolcains:') + + for tc in toolchains: + if os.path.exists(os.path.join(sdk, tc.strip())): + self.inf(f' {tc.strip()}') + + if os.path.exists(os.path.join(sdk, 'sysroots')): + self.inf() + self.inf(' Hosttools installed:') + + self.inf() + + home_cmake = os.path.join(os.environ.get('HOME'), '.cmake') + for root, ds, fs in os.walk(home_cmake): + for f in fs: + with open(os.path.join(root, f)) as file: + line = file.readline() + if line != os.path.join(sdk, 'cmake'): + continue + self.inf(' Zephyr SDK CMake package registered:') + self.inf() + + def do_run(self, args, user_args): + if args.sdk_version: + self.install_sdk(args, user_args) + else: + self.list_sdk(args, user_args) + + # # Private helper routines. #