forked from osbuild/osbuild-composer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcheck-snapshots
executable file
·180 lines (152 loc) · 5.54 KB
/
check-snapshots
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/python3
"""check-snapshots greps the directory tree for rpmrepo urls and checks them
against the current snapshot list"""
import argparse
import json
import os
import sys
import subprocess
import time
from urllib.parse import urlparse
import requests
SNAPSHOTS_URL="https://rpmrepo.osbuild.org/v2/enumerate"
SNAPSHOTS_TIMEOUT = 2 * 60
SNAPSHOT_GREP = ["grep", "--color=never", "-or", r"http.*rpmrepo.osbuild.org.*-20[0-9]\+"]
def fetch_snapshots_api(url, timeout=SNAPSHOTS_TIMEOUT):
"""Get the list of snapshots from the rpmrepo API"""
print(f"Fetching list of snapshots from {url}")
start = time.time()
try:
r = requests.get(url, timeout=timeout)
except:
return None
elapsed = time.time() - start
if r.status_code != 200:
print(f"HTTP Response {r.status_code} from {url} after {elapsed:0.0f}s: {r.text}")
return None
print(f"Received snapshot list in {elapsed:0.0f}s")
return r.json()
def find_snapshot_urls(directory):
"""grep the directory for rpmrepo snapshot urls
Returns a map of urls to the files they are used in.
"""
urls = {}
try:
grep_out = subprocess.run(SNAPSHOT_GREP + [directory],
check=True,
capture_output=True,
env={"LANG": "C"})
except subprocess.CalledProcessError as e:
print("ERROR: " + e.stderr.decode("utf-8"))
sys.exit(1)
for line in grep_out.stdout.decode("utf-8").splitlines():
try:
file, url = line.split(":", 1)
except ValueError:
print(f"Problem parsing {line}")
continue
url = url.strip()
if url not in urls:
urls[url] = [file]
else:
urls[url].append(file)
return urls
def check_baseurl(repo, snapshots):
"""Check the baseurl to see if it is a valid snapshot, and if there is a newer one
available.
"""
invalid = None
newer = None
url = urlparse(repo)
snapshot = os.path.basename(url.path)
# Is this snapshot valid?
if snapshot not in snapshots:
invalid = f"{snapshot} is not a valid snapshot name"
# is this snapshot old?
base = snapshot.rsplit("-", 1)[0]
newest = snapshot
for s in snapshots:
if s.rsplit("-", 1)[0] != base:
continue
if s > newest:
newest = s
if newest != snapshot:
newer = f"{snapshot} has a newer version - {newest}"
return invalid, newer
def check_snapshot_urls(urls, snapshots, skip=["test/data/manifests"], errors_only=False):
"""check the urls against the current list of snapshots
Returns:
0 if all were valid and no newer snapshots are available
2 if there were invalid snapshots
3 if there were newer snapshots
6 if there were invalid and newer snapshots
"""
# Gather up the messages for each file
messages = {}
ret = 0
for url in urls:
invalid, newer = check_baseurl(url, snapshots)
if invalid:
# Add this to each file's invalid message list
for f in urls[url]:
if any(bool(s in f) for s in skip):
continue
ret |= 2
if f in messages:
if invalid not in messages[f]["invalid"]:
messages[f]["invalid"].append(invalid)
else:
messages[f] = {"invalid": [invalid], "newer": []}
if errors_only:
continue
if newer:
# Add this to each file's newer message list
for f in urls[url]:
if any(bool(s in f) for s in skip):
continue
ret |= 4
if f in messages:
if newer not in messages[f]["newer"]:
messages[f]["newer"].append(newer)
else:
messages[f] = {"newer": [newer], "invalid": []}
# Print the messages for each file
for f in messages:
print(f"{f}:")
for msg in messages[f]["invalid"]:
print(f" ERROR: {msg}")
for msg in messages[f]["newer"]:
print(f" NEWER: {msg}")
return ret
# parse cmdline args
def parse_args():
parser =argparse.ArgumentParser(description="Check snapshot urls")
parser.add_argument("--verbose")
parser.add_argument("--timeout", type=int, default=SNAPSHOTS_TIMEOUT,
help="How long to wait for rpmrepo snapshot list")
parser.add_argument("--cache", help="Use a cached file for the list of rpmrepo snapshots")
parser.add_argument("--url", default=SNAPSHOTS_URL,
help="URL to use for the list of rpmrepo snapshots")
parser.add_argument("--errors-only", action="store_true",
help="Only return errors")
parser.add_argument("directory")
return parser.parse_args()
def main():
args = parse_args()
urls = find_snapshot_urls(args.directory)
snapshots = None
if args.cache:
try:
with open(args.cache, encoding="utf8") as f:
snapshots = json.load(f)
except:
print(f"No snapshots cache found at {args.cache}")
sys.exit(1)
else:
snapshots = fetch_snapshots_api(args.url, args.timeout)
if not snapshots:
print(f"Cannot download snapshots from {args.url}")
sys.exit(1)
return check_snapshot_urls(urls, snapshots, errors_only=args.errors_only)
if __name__=='__main__':
sys.exit(main())