Skip to content

Commit 90985ba

Browse files
committed
Expire SRA and GEO metadata after 14 days
1 parent b5aa676 commit 90985ba

File tree

4 files changed

+72
-6
lines changed

4 files changed

+72
-6
lines changed

rnaseq_pipeline/sources/geo.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
This module contains all the logic to retrieve RNA-Seq data from GEO.
33
"""
44

5+
from datetime import timedelta
56
import gzip
67
import logging
78
from subprocess import Popen
@@ -21,6 +22,7 @@
2122
from ..config import rnaseq_pipeline
2223
from ..miniml_utils import collect_geo_samples, collect_geo_samples_info
2324
from ..platforms import Platform, BgiPlatform, IlluminaPlatform
25+
from ..targets import ExpirableLocalTarget
2426
from ..utils import RerunnableTaskMixin
2527
from .sra import DownloadSraExperiment
2628

@@ -67,13 +69,15 @@ class DownloadGeoSampleMetadata(RerunnableTaskMixin, luigi.Task):
6769
retry_count = 3
6870

6971
def run(self):
72+
if self.output().is_stale():
73+
logger.info('%s is stale, redownloading...', self.output())
7074
res = requests.get('https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi', params=dict(acc=self.gsm, form='xml'))
7175
res.raise_for_status()
7276
with self.output().open('w') as f:
7377
f.write(res.text)
7478

7579
def output(self):
76-
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}.xml'.format(self.gsm)))
80+
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}.xml'.format(self.gsm)), ttl=timedelta(days=14))
7781

7882
@requires(DownloadGeoSampleMetadata)
7983
class DownloadGeoSample(TaskWithMetadataMixin, DynamicTaskWithOutputMixin, DynamicWrapperTask):
@@ -113,14 +117,16 @@ class DownloadGeoSeriesMetadata(RerunnableTaskMixin, luigi.Task):
113117
retry_count = 3
114118

115119
def run(self):
120+
if self.output().is_stale():
121+
logger.info('%s is stale, redownloading...', self.output())
116122
res = requests.get('https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi', params=dict(acc=self.gse, form='xml', targ='gsm'))
117123
res.raise_for_status()
118124
with self.output().open('w') as f:
119125
f.write(res.text)
120126

121127
def output(self):
122128
# TODO: remove the _family suffix
123-
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}_family.xml'.format(self.gse)))
129+
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}_family.xml'.format(self.gse)), ttl=timedelta(days=14))
124130

125131
@requires(DownloadGeoSeriesMetadata)
126132
class DownloadGeoSeries(DynamicTaskWithOutputMixin, DynamicWrapperTask):

rnaseq_pipeline/sources/sra.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import timedelta
12
import gzip
23
import logging
34
import os
@@ -14,6 +15,7 @@
1415
import pandas as pd
1516

1617
from ..config import rnaseq_pipeline
18+
from ..targets import ExpirableLocalTarget
1719
from ..utils import remove_task_output, RerunnableTaskMixin
1820

1921
class sra(luigi.Config):
@@ -115,11 +117,13 @@ class DownloadSraExperimentRunInfo(TaskWithMetadataMixin, RerunnableTaskMixin, l
115117
retry_count = 1
116118

117119
def run(self):
120+
if self.output().is_stale():
121+
logger.info('%s is stale, redownloading...', self.output())
118122
with self.output().open('w') as f:
119123
f.write(retrieve_runinfo(self.srx))
120124

121125
def output(self):
122-
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'sra', '{}.runinfo'.format(self.srx)))
126+
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'sra', '{}.runinfo'.format(self.srx)), ttl=timedelta(days=14))
123127

124128
@requires(DownloadSraExperimentRunInfo)
125129
class DownloadSraExperiment(DynamicTaskWithOutputMixin, DynamicWrapperTask):
@@ -171,7 +175,7 @@ def run(self):
171175
f.write(retrieve_runinfo(self.srp))
172176

173177
def output(self):
174-
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'sra', '{}.runinfo'.format(self.srp)))
178+
return luigi.ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'sra', '{}.runinfo'.format(self.srp)), ttl=timedelta(days=14))
175179

176180
@requires(DownloadSraProjectRunInfo)
177181
class DownloadSraProject(DynamicTaskWithOutputMixin, DynamicWrapperTask):

rnaseq_pipeline/targets.py

+33-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import os
2-
from os.path import join, exists
2+
from datetime import timedelta
3+
from os.path import join, exists, getctime, getmtime
4+
from time import time
35

46
import luigi
57
import requests
@@ -52,3 +54,33 @@ def __init__(self, dataset_short_name):
5254

5355
def exists(self):
5456
return self._gemma_api.dataset_has_batch(self.dataset_short_name)
57+
58+
class ExpirableLocalTarget(luigi.LocalTarget):
59+
"""
60+
A local target that can expire according to a TTL value
61+
62+
The TTL can either be a timedelta of a float representing the number of
63+
seconds past the creation time of the target that it will be considered
64+
fresh. Once that delay expired, the target will not be considered as
65+
existing.
66+
67+
By default, creation time is used as per os.path.getctime. Use the
68+
`use_mtime` parameter to use the modification time instead.
69+
"""
70+
def __init__(self, path, ttl, use_mtime=False):
71+
super().__init__(path)
72+
if not isinstance(ttl, timedelta):
73+
self._ttl = timedelta(seconds=ttl)
74+
else:
75+
self._ttl = ttl
76+
self._use_mtime = use_mtime
77+
78+
def is_stale(self):
79+
try:
80+
creation_time = getmtime(self.path) if self._use_mtime else getctime(self.path)
81+
except OSError:
82+
return False # file is missing, assume non-stale
83+
return creation_time + self._ttl.total_seconds() < time()
84+
85+
def exists(self):
86+
return super().exists() and not self.is_stale()

tests/test_targets.py

+25-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,29 @@
1-
from rnaseq_pipeline.targets import GemmaDatasetPlatform, GemmaDatasetHasBatch
1+
import tempfile
2+
3+
from datetime import timedelta
4+
from time import sleep
5+
from rnaseq_pipeline.targets import GemmaDatasetPlatform, GemmaDatasetHasBatch, ExpirableLocalTarget
26

37
def test_gemma_targets():
48
assert GemmaDatasetHasBatch('GSE110256').exists()
59
assert GemmaDatasetPlatform('GSE110256', 'Generic_mouse_ncbiIds').exists()
10+
11+
def test_expirable_local_target():
12+
with tempfile.TemporaryDirectory() as tmp_dir:
13+
t = ExpirableLocalTarget(tmp_dir + '/test', ttl=timedelta(seconds=1))
14+
assert not t.exists()
15+
with t.open('w') as f:
16+
pass
17+
assert t.exists()
18+
sleep(1)
19+
assert not t.exists()
20+
21+
def test_expirable_local_target_with_float_ttl():
22+
with tempfile.TemporaryDirectory() as tmp_dir:
23+
t = ExpirableLocalTarget(tmp_dir + '/test', ttl=1.0)
24+
assert not t.exists()
25+
with t.open('w') as f:
26+
pass
27+
assert t.exists()
28+
sleep(1)
29+
assert not t.exists()

0 commit comments

Comments
 (0)