Skip to content

Commit

Permalink
Put --use-cache on all actions (#332)
Browse files Browse the repository at this point in the history
  • Loading branch information
Oddant1 authored Jun 4, 2024
1 parent 343f930 commit efac9fc
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 40 deletions.
9 changes: 9 additions & 0 deletions q2cli/click/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,17 @@ def add_option(self, obj, opts, dest, action=None, nargs=1, const=None):
self._long_opt[opt] = option

def parse_args(self, args):
from q2cli.core.artifact_cache_global import set_used_artifact_cache

backup = args.copy() # args will be mutated by super()
try:
# We need to set this before we would normally parse it out so we
# can use the requested cache for all the operations that use a
# cache. Look for all uses of USED_ARTIFACT_CACHE. Some of these
# are during arg parsing
if '--use-cache' in backup:
set_used_artifact_cache(backup)

return super().parse_args(args)
except exceptions.UsageError:
if '--help' in backup:
Expand Down
20 changes: 11 additions & 9 deletions q2cli/click/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,22 @@ def type_expr(self):

def convert(self, value, param, ctx):
import qiime2.sdk.util
from q2cli.core.artifact_cache_global import get_used_artifact_cache

if value is None:
return None # Them's the rules
with get_used_artifact_cache():
if value is None:
return None # Them's the rules

if self.is_output:
return self._convert_output(value, param, ctx)
if self.is_output:
return self._convert_output(value, param, ctx)

if qiime2.sdk.util.is_semantic_type(self.type_expr):
return self._convert_input(value, param, ctx)
if qiime2.sdk.util.is_semantic_type(self.type_expr):
return self._convert_input(value, param, ctx)

if qiime2.sdk.util.is_metadata_type(self.type_expr):
return self._convert_metadata(value, param, ctx)
if qiime2.sdk.util.is_metadata_type(self.type_expr):
return self._convert_metadata(value, param, ctx)

return self._convert_primitive(value, param, ctx)
return self._convert_primitive(value, param, ctx)

def _convert_output(self, value, param, ctx):
import os
Expand Down
72 changes: 42 additions & 30 deletions q2cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,23 +296,23 @@ def __init__(self, name, plugin, action):
click.Option(['--parallel-config'], required=False,
type=click.Path(exists=True, dir_okay=False),
help='Execute your action in parallel using a '
'config at the indicated path.'),
click.Option(['--use-cache'], required=False,
type=click.Path(exists=True, file_okay=False),
help='Specify the cache to be used for the '
'intermediate work of this pipeline. If '
'not provided, the default cache under '
'$TMP/qiime2/<uname> will be used. '
'IMPORTANT FOR HPC USERS: If you are on an '
'HPC system and are using parallel '
'execution it is important to set this to '
'a location that is globally accessible to '
'all nodes in the cluster.')])
'config at the indicated path.')])

self._misc.extend([
q2cli.util.example_data_option(
self._get_plugin, self.action['id']),
q2cli.util.citations_option(self._get_citation_records)])
q2cli.util.citations_option(self._get_citation_records),
click.Option(['--use-cache'], required=False,
type=click.Path(exists=True, file_okay=False),
help='Specify the cache to be used for the '
'intermediate work of this action. If not '
'provided, the default cache under '
'$TMP/qiime2/<uname> will be used. '
'IMPORTANT FOR HPC USERS: If you are on an HPC '
'system and are using parallel execution it is '
'important to set this to a location that is '
'globally accessible to all nodes in the '
'cluster.')])

options = [*self._inputs, *self._params, *self._outputs, *self._misc]
help_ = [action['description']]
Expand Down Expand Up @@ -372,11 +372,16 @@ def __call__(self, **kwargs):
import click

import qiime2.util
from q2cli.util import (output_in_cache, _get_cache_path_and_key,
get_default_recycle_pool)
from qiime2.core.cache import Cache
from qiime2.sdk import ResultCollection

from q2cli.util import (output_in_cache, _get_cache_path_and_key,
get_default_recycle_pool)
from q2cli.core.artifact_cache_global import (
get_used_artifact_cache, unset_used_artifact_cache)

cache = get_used_artifact_cache()

output_dir = kwargs.pop('output_dir')
# If they gave us a cache and key combo as an output dir, we want to
# error out, so we check if their output dir contains a : and the part
Expand All @@ -385,24 +390,24 @@ def __call__(self, **kwargs):
potential_cache = output_dir.rsplit(':', 1)[0]
if potential_cache and os.path.exists(potential_cache) and \
Cache.is_cache(potential_cache):
raise ValueError(f"The given output dir '{output_dir}' "
exc = ValueError(f"The given output dir '{output_dir}' "
"appears to be a cache:key combo. Cache keys "
"cannot be used as output dirs.")
q2cli.util.exit_with_error(exc)

# Args pertaining to pipeline resumption
recycle_pool = kwargs.pop('recycle_pool', None)
no_recycle = kwargs.pop('no_recycle', False)

if recycle_pool is not None and no_recycle:
raise ValueError('Cannot set a pool to be used for recycling and '
exc = ValueError('Cannot set a pool to be used for recycling and '
'no recycle simultaneously.')
q2cli.util.exit_with_error(exc)

used_cache = kwargs.pop('use_cache', None)

if used_cache is not None and not Cache.is_cache(used_cache):
raise ValueError(f"The path '{used_cache}' is not a valid cache, "
"please supply a path to a valid pre-existing "
"cache.")
# We already dealt with this kwarg, we needed to do it earlier to make
# sure we used the desired cache for all operations. Look for the call
# site for set_used_artifact_cache
kwargs.pop('use_cache')

parallel = kwargs.pop('parallel', False)
parallel_config_fp = kwargs.pop('parallel_config', None)
Expand All @@ -419,8 +424,6 @@ def __call__(self, **kwargs):
else:
quiet = True

cache = Cache(path=used_cache)

arguments = {}
init_outputs = {}
for key, value in kwargs.items():
Expand All @@ -438,18 +441,20 @@ def __call__(self, **kwargs):
# that is not globally accessible to the cluster. The user should
# be using a cache that is in a globally accessible location. We
# need to ensure we put our artifacts in that cache.
elif prefix == 'i' and used_cache is not None:
elif prefix == 'i':
value_ = value

if isinstance(value, list):
value_ = [cache.process_pool.save(v) for v in value]
value_ = [cache.process_pool.save(v)
for v in value]
elif isinstance(value, dict) or \
isinstance(value, ResultCollection):
value_ = {
k: cache.process_pool.save(v)
for k, v in value.items()}
elif isinstance(value, set):
value_ = set([cache.process_pool.save(v) for v in value])
value_ = set([cache.process_pool.save(v)
for v in value])
elif value is not None:
value_ = cache.process_pool.save(value)

Expand All @@ -476,7 +481,8 @@ def __call__(self, **kwargs):
if recycle_pool is not None and recycle_pool != default_pool and \
recycle_pool not in cache.get_pools():
msg = ("The pool '%s' does not exist on the cache at '%s'. It "
"will be created." % (recycle_pool, cache.path))
"will be created." %
(recycle_pool, cache.path))
click.echo(CONFIG.cfg_style('warning', msg))

# `qiime2.util.redirected_stdio` defaults to stdout/stderr when
Expand Down Expand Up @@ -575,13 +581,19 @@ def __call__(self, **kwargs):
if recycle_pool == default_pool:
cache.remove(recycle_pool)

# Set the USED_ARTIFACT_CACHE back to the default cache. This is mostly
# useful for the tests that invoke actions back to back to back without
# exiting the process
unset_used_artifact_cache()

def _execute_action(self, action, arguments, cache, recycle_pool=None):
with cache:
if recycle_pool is None:
results = action(**arguments)
results = results._result()
else:
pool = cache.create_pool(key=recycle_pool, reuse=True)
pool = cache.create_pool(
key=recycle_pool, reuse=True)
with pool:
results = action(**arguments)

Expand Down
81 changes: 81 additions & 0 deletions q2cli/core/artifact_cache_global.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2023, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------

from qiime2.core.cache import Cache


# Do not make this the default cache. If this is the default cache then we will
# instantiate the default cache when the module is imported which will write a
# process pool to the default cache which is undesirable if that isn't the
# cache we will be using for this action
_USED_ARTIFACT_CACHE = None


def set_used_artifact_cache(args):
"""Validates that the args passed in actually contain a valid path to an
existing cache following the --use-cache argument and if so sets that cache
as the used cache for this invocation.
Parameters
----------
args : List[str]
The arguments provided on the cli to this QIIME 2 invocation/
NOTES
-----
Should only be called if --use-cache is already known to be in the args
provided.
Should only be called once to init the used cache for this invocation.
"""
from q2cli.util import exit_with_error

global _USED_ARTIFACT_CACHE

use_cache_idx = args.index('--use-cache')

# They need to provide some kind of arg to use_cache
if len(args) < use_cache_idx + 2:
exc = ValueError('--use-cache expected an argument but none was '
'provided.')
exit_with_error(exc)

cache_path = args[use_cache_idx + 1]

# The arg given should be a path that points to an existing cache
if not Cache.is_cache(cache_path):
exc = ValueError('--use-cache expected a path to an existing cache as '
f"an argument but received '{cache_path}' which is "
'not a path to an existing cache.')
exit_with_error(exc)

_USED_ARTIFACT_CACHE = Cache(cache_path)


def unset_used_artifact_cache():
"""Set the USED_ARTIFACT_CACHE back to None.
"""
global _USED_ARTIFACT_CACHE

_USED_ARTIFACT_CACHE = None


def get_used_artifact_cache():
"""If the used cache has been set then return it otherwise return the
default cache. We use this getter because we don't want to instantiate the
default cache unless that is the cache we are using. This is because if we
instantiate the default cache we will put a process pool in it, and we want
to avoid that unless necessary.
Returns
-------
Cache
The default cache if the user didn't set a cache or the cache they set
if they did set one.
"""
return Cache() if _USED_ARTIFACT_CACHE is None else _USED_ARTIFACT_CACHE
84 changes: 83 additions & 1 deletion q2cli/tests/test_cache_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,53 @@ def test_mixed_cached_uncached_inputs(self):
self.assertEqual(collection['bar'].view(int), 1)
self.assertEqual(list(collection.keys()), ['foo', 'bar'])

def test_use_cache_on_non_pipeline(self):
"""This test is run in a subprocess so we can assert it created a
process pool in the used cache but not in the default cache
"""
default_cache = Cache()

# Set the non-default cache here to ensure our artifact isn't in the
# cache from this step
with self.cache:
art1 = Artifact.import_data(IntSequence1, [0, 1, 2])
art1_path = os.path.join(self.tempdir, 'art1.qza')
art1.save(art1_path)

left_path = str(self.cache.path) + ':left'
right_path = str(self.cache.path) + ':right'

result = self._run_command(
'split-ints', '--i-ints', art1_path, '--o-left', left_path,
'--o-right', right_path, '--verbose'
)

# Without setting a cache the artifact should have been put in the
# default cache
self.assertEqual(result.exit_code, 0)
self.assertIn(str(art1.uuid), default_cache.get_data())

# Same as above make sure this step isn't putting the artifact in the
# default cache
with self.cache:
art2 = Artifact.import_data(IntSequence1, [3, 4, 5])
art2_path = os.path.join(self.tempdir, 'art2.qza')
art2.save(art2_path)

left_path = str(self.cache.path) + ':left'
right_path = str(self.cache.path) + ':right'

result = self._run_command(
'split-ints', '--i-ints', art2_path, '--o-left', left_path,
'--o-right', right_path, '--verbose', '--use-cache',
str(self.cache.path)
)

# Since we did set a cache the artifact should not be in the default
# cache
self.assertEqual(result.exit_code, 0)
self.assertNotIn(str(art2.uuid), default_cache.get_data())

def test_pipeline_resumption_default(self):
plugin_action = 'dummy_plugin_resumable_varied_pipeline'
default_pool = get_default_recycle_pool(plugin_action)
Expand Down Expand Up @@ -790,7 +837,7 @@ def test_output_dir_as_cache(self):

self.assertEqual(result.exit_code, 1)
self.assertIn(
'Cache keys cannot be used as output dirs.', str(result.exception))
'Cache keys cannot be used as output dirs.', result.output)

def test_parallel(self):
output = os.path.join(self.tempdir, 'output')
Expand Down Expand Up @@ -934,6 +981,41 @@ def test_parallel_flags_on_non_pipeline(self):
self.assertEqual(result.exit_code, 1)
self.assertIn('No such option: --parallel-config', result.output)

def test_no_cache_arg(self):
art_path = os.path.join(self.tempdir, 'art:1.qza')
self.art1.save(art_path)

left_path = os.path.join(self.tempdir, 'left.qza')
right_path = os.path.join(self.tempdir, 'right.qza')

result = self._run_command(
'split-ints', '--i-ints', art_path, '--o-left', left_path,
'--o-right', right_path, '--verbose', '--use-cache'
)

self.assertEqual(result.exit_code, 1)
self.assertIn(
'--use-cache expected an argument but none was provided.',
result.output)

def test_cache_arg_invalid(self):
art_path = os.path.join(self.tempdir, 'art:1.qza')
self.art1.save(art_path)

left_path = os.path.join(self.tempdir, 'left.qza')
right_path = os.path.join(self.tempdir, 'right.qza')

# The path to our artifact is definitely not a cache
result = self._run_command(
'split-ints', '--i-ints', art_path, '--o-left', left_path,
'--o-right', right_path, '--verbose', '--use-cache', art_path
)

self.assertEqual(result.exit_code, 1)
self.assertIn(
f"received '{art_path}' which is not a path to an existing cache",
result.output)

def _load_alias_execution_contexts(self, collection):
execution_contexts = []

Expand Down

0 comments on commit efac9fc

Please sign in to comment.