Skip to content

Commit c82a56b

Browse files
Alon Ben TamarJongy
Alon Ben Tamar
andauthored
Feature/stacks container name (#35)
Send the container name for stacks from processes that are inside a Docker container * Add container and hostname metadata to profiles * Use absolute imports * Update docs and DaemonSet file to add the Docker sock mount * Remove redundant container code * Add missing import * Add missing Process import * Strip container data from local stack file * Separate long line into multiple lines * Add option to disable the container names in profiles * Reformat file * Reformat using black * Check if container name is not None before adding to container names list * Update gprofiler/main.py Co-authored-by: Yonatan Goldschmidt <yonatan.goldschmidt@granulate.io> * Update gprofiler/main.py Co-authored-by: Yonatan Goldschmidt <yonatan.goldschmidt@granulate.io> * Update gprofiler/merge.py Co-authored-by: Yonatan Goldschmidt <yonatan.goldschmidt@granulate.io> * Fix argument renaming * Ignore files in linting and activate venv on linting script * Reordered imports * Remove lxc from cgroup search * Update gprofiler/main.py Co-authored-by: Yonatan Goldschmidt <yonatan.goldschmidt@granulate.io> * Remove redundant --log-file param * Use absolute imports * Reformat imports Co-authored-by: Yonatan Goldschmidt <yonatan.goldschmidt@granulate.io>
1 parent 3013002 commit c82a56b

16 files changed

+214
-49
lines changed

.flake8

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
max-line-length = 120
33
# options for compatibility with black - see https://black.readthedocs.io/en/stable/compatible_configs.html#flake8
44
extend-ignore = E203, W503
5+
exclude = .git,__pycache__,build,gprofiler.egg-info,venv

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ docker pull granulate/gprofiler:latest
6161
docker run --name gprofiler -d --restart=always \
6262
--network=host --pid=host --userns=host --privileged \
6363
-v /lib/modules:/lib/modules:ro -v /usr/src:/usr/src:ro \
64-
granulate/gprofiler:latest -cu --token <token> --service-name <service> [options]
64+
-v /var/run/docker.sock:/var/run/docker.sock \
65+
granulate/gprofiler:latest -cu --token <token> --service-name <service> [options]
6566
```
6667

6768
For profiling with eBPF, kernel headers must be accessible from within the container at

deploy/k8s/gprofiler.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ spec:
2626
- name: usr-src
2727
hostPath:
2828
path: /usr/src
29+
- name: docker-sock # Allow containers to fetch the names
30+
hostPath:
31+
path: /var/run/docker.sock
2932
hostPID: true
3033
securityContext:
3134
runAsUser: 0
@@ -51,6 +54,8 @@ spec:
5154
- name: usr-src
5255
mountPath: /usr/src
5356
readOnly: true
57+
- name: docker-sock
58+
mountPath: /var/run/docker.sock
5459
args:
5560
- -cu
5661
- --token

dev-requirements.txt

-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,5 @@ flake8
33
black
44
mypy
55
isort
6-
docker
76
pyinstaller==4.0
87
staticx

gprofiler/__main__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# Copyright (c) Granulate. All rights reserved.
33
# Licensed under the AGPL3 License. See LICENSE.md in the project root for license information.
44
#
5-
from .main import main
5+
from gprofiler.main import main
66

77
if __name__ == "__main__":
88
main()

gprofiler/client.py

+13-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import requests
1313
from requests import Session
1414

15-
from .utils import get_iso8061_format_time
15+
from gprofiler.utils import get_iso8061_format_time
1616

1717
logger = logging.getLogger(__name__)
1818

@@ -48,11 +48,18 @@ def _init_session(self, key: str, service: str):
4848
# Raises on failure
4949
self.get_health()
5050

51-
def get_base_url(self) -> str:
52-
return "{}/{}/{}".format(self._host.rstrip("/"), self.BASE_PATH, self._version)
51+
def get_base_url(self, api_version: str = None) -> str:
52+
version = api_version if api_version is not None else self._version
53+
return "{}/{}/{}".format(self._host.rstrip("/"), self.BASE_PATH, version)
5354

5455
def _send_request(
55-
self, method: str, path: str, data: Dict, files: Dict = None, timeout: float = DEFAULT_REQUEST_TIMEOUT
56+
self,
57+
method: str,
58+
path: str,
59+
data: Dict,
60+
files: Dict = None,
61+
timeout: float = DEFAULT_REQUEST_TIMEOUT,
62+
api_version: str = None,
5663
) -> Dict:
5764
opts: dict = {"headers": {}, "files": files, "timeout": timeout}
5865

@@ -66,7 +73,7 @@ def _send_request(
6673
json.dump(data, gzip_file, ensure_ascii=False) # type: ignore
6774
opts["data"] = buffer.getvalue()
6875

69-
resp = self._session.request(method, "{}/{}".format(self.get_base_url(), path), **opts)
76+
resp = self._session.request(method, "{}/{}".format(self.get_base_url(api_version), path), **opts)
7077
if 400 <= resp.status_code < 500:
7178
try:
7279
data = resp.json()
@@ -107,4 +114,5 @@ def submit_profile(
107114
"profile": profile,
108115
},
109116
timeout=self._upload_timeout,
117+
api_version="v2",
110118
)

gprofiler/docker_client.py

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#
2+
# Copyright (c) Granulate. All rights reserved.
3+
# Licensed under the AGPL3 License. See LICENSE.md in the project root for license information.
4+
#
5+
import logging
6+
import re
7+
from typing import Dict, List, Optional, Set
8+
9+
import docker
10+
11+
DOCKER_SYSTEMD_CGROUPS = [re.compile(r"/system.slice/docker-([a-z0-9]{64})\.scope")]
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class DockerClient:
17+
def __init__(self):
18+
try:
19+
self._client = docker.from_env()
20+
except Exception:
21+
logger.warning(
22+
'Could not initiate the Docker client, so the profiling data will not include the container'
23+
' names. If you are running the gProfiler in a container, please mount the Docker sock file'
24+
'by running the Docker run command with the following argument: '
25+
'"-v /var/run/docker.sock:/var/run/docker.sock". Otherwise, please open a new issue here: '
26+
'https://github.com/Granulate/gprofiler/issues/new'
27+
)
28+
self._client = None
29+
30+
self._pid_to_container_name_cache: Dict[int, str] = {}
31+
self._current_container_names: Set[str] = set()
32+
self._container_id_to_name_cache: Dict[str, Optional[str]] = {}
33+
34+
def reset_cache(self):
35+
self._pid_to_container_name_cache.clear()
36+
self._current_container_names.clear()
37+
38+
@property
39+
def container_names(self) -> List[str]:
40+
return list(self._current_container_names)
41+
42+
def get_container_name(self, pid: int) -> str:
43+
if self._client is None:
44+
return ''
45+
if pid in self._pid_to_container_name_cache:
46+
return self._pid_to_container_name_cache[pid]
47+
container_name: Optional[str] = self._safely_get_process_container_name(pid)
48+
if container_name is None:
49+
self._pid_to_container_name_cache[pid] = ''
50+
return ''
51+
self._pid_to_container_name_cache[pid] = container_name
52+
return container_name
53+
54+
def _safely_get_process_container_name(self, pid: int) -> Optional[str]:
55+
try:
56+
container_id = self._get_process_container_id(pid)
57+
if container_id is None:
58+
return None
59+
return self._get_container_name(container_id)
60+
except Exception:
61+
logger.warning(f'Could not get a container name for PID {pid}', exc_info=True)
62+
return None
63+
64+
def _get_container_name(self, container_id) -> Optional[str]:
65+
if container_id in self._container_id_to_name_cache:
66+
container_name = self._container_id_to_name_cache[container_id]
67+
if container_name is not None:
68+
# Might happen a few times for the same container name, so we use a set to have unique values
69+
self._current_container_names.add(container_name)
70+
return container_name
71+
72+
self._refresh_container_names_cache()
73+
if container_id not in self._container_id_to_name_cache:
74+
self._container_id_to_name_cache[container_id] = None
75+
return None
76+
container_name = self._container_id_to_name_cache[container_id]
77+
if container_name is not None:
78+
self._current_container_names.add(container_name)
79+
return container_name
80+
81+
def _refresh_container_names_cache(self):
82+
# We re-fetch all of the currently running containers, so in order to keep the cache small we clear it
83+
self._container_id_to_name_cache.clear()
84+
running_containers = self._client.containers.list()
85+
for container in running_containers:
86+
self._container_id_to_name_cache[container.id] = container.name
87+
88+
@staticmethod
89+
def _get_process_container_id(pid: int) -> Optional[str]:
90+
# ECS uses /ecs/uuid/container-id
91+
# standard Docker uses /docker/container-id
92+
# k8s uses /kubepods/{burstable,besteffort}/uuid/container-id
93+
try:
94+
with open(f"/proc/{pid}/cgroup", 'r') as cgroup_file:
95+
cgroup = cgroup_file.read()
96+
except FileNotFoundError:
97+
# The process died before we got to this point
98+
return None
99+
for line in cgroup.split():
100+
if any(s in line for s in (':/docker/', ':/ecs/', ':/kubepods')):
101+
return line.split("/")[-1]
102+
for p in DOCKER_SYSTEMD_CGROUPS:
103+
m = p.match(line)
104+
if m is not None:
105+
return m.group(1)
106+
return None

gprofiler/java.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
import psutil
1515
from psutil import Process
1616

17-
from .exceptions import StopEventSetException
18-
from .merge import parse_one_collapsed
19-
from .profiler_base import ProfilerBase
20-
from .utils import (
17+
from gprofiler.exceptions import StopEventSetException
18+
from gprofiler.merge import parse_one_collapsed
19+
from gprofiler.profiler_base import ProfilerBase
20+
from gprofiler.utils import (
2121
TEMPORARY_STORAGE_PATH,
2222
is_same_ns,
2323
pgrep_exe,

gprofiler/main.py

+39-9
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@
2020
import configargparse
2121
from requests import RequestException, Timeout
2222

23-
from . import __version__, merge
24-
from .client import DEFAULT_UPLOAD_TIMEOUT, GRANULATE_SERVER_HOST, APIClient, APIError
25-
from .java import JavaProfiler
26-
from .perf import SystemProfiler
27-
from .profiler_base import NoopProfiler
28-
from .python import get_python_profiler
29-
from .utils import (
23+
from gprofiler import __version__, merge
24+
from gprofiler.client import DEFAULT_UPLOAD_TIMEOUT, GRANULATE_SERVER_HOST, APIClient, APIError
25+
from gprofiler.docker_client import DockerClient
26+
from gprofiler.java import JavaProfiler
27+
from gprofiler.perf import SystemProfiler
28+
from gprofiler.profiler_base import NoopProfiler
29+
from gprofiler.python import get_python_profiler
30+
from gprofiler.utils import (
3031
TEMPORARY_STORAGE_PATH,
3132
TemporaryDirectoryWithMode,
3233
atomically_symlink,
@@ -76,6 +77,7 @@ def __init__(
7677
rotating_output: bool,
7778
runtimes: Dict[str, bool],
7879
client: APIClient,
80+
include_container_names=True,
7981
):
8082
self._frequency = frequency
8183
self._duration = duration
@@ -101,6 +103,8 @@ def __init__(
101103
self._frequency, self._duration, self._stop_event, self._temp_storage_dir.name
102104
)
103105
self.initialize_python_profiler()
106+
self._docker_client = DockerClient()
107+
self._include_container_names = include_container_names
104108

105109
def __enter__(self):
106110
self.start()
@@ -145,6 +149,7 @@ def _generate_output_files(
145149
base_filename = os.path.join(self._output_dir, "profile_{}".format(end_ts))
146150

147151
collapsed_path = base_filename + ".col"
152+
collapsed_data = self._strip_container_data(collapsed_data)
148153
Path(collapsed_path).write_text(collapsed_data)
149154

150155
# point last_profile.col at the new file; and possibly, delete the previous one.
@@ -172,6 +177,15 @@ def _generate_output_files(
172177

173178
logger.info(f"Saved flamegraph to {flamegraph_path}")
174179

180+
@staticmethod
181+
def _strip_container_data(collapsed_data):
182+
lines = []
183+
for line in collapsed_data.splitlines():
184+
if line.startswith("#"):
185+
continue
186+
lines.append(line[line.find(';') + 1 :])
187+
return '\n'.join(lines)
188+
175189
def start(self):
176190
self._stop_event.clear()
177191

@@ -213,7 +227,9 @@ def _snapshot(self):
213227
logger.exception(f"{future.name} profiling failed")
214228

215229
local_end_time = local_start_time + datetime.timedelta(seconds=(time.monotonic() - monotonic_start_time))
216-
merged_result = merge.merge_perfs(system_future.result(), process_perfs)
230+
merged_result = merge.merge_perfs(
231+
system_future.result(), process_perfs, self._docker_client, self._include_container_names
232+
)
217233

218234
if self._output_dir:
219235
self._generate_output_files(merged_result, local_start_time, local_end_time)
@@ -357,6 +373,13 @@ def parse_cmd_args():
357373
dest="log_rotate_backup_count",
358374
default=DEFAULT_LOG_BACKUP_COUNT,
359375
)
376+
parser.add_argument(
377+
"--disable-container-names",
378+
action="store_true",
379+
dest="disable_container_names",
380+
default=False,
381+
help="gProfiler won't gather the container names of processes that run in containers",
382+
)
360383

361384
continuous_command_parser = parser.add_argument_group("continuous")
362385
continuous_command_parser.add_argument(
@@ -459,7 +482,14 @@ def main():
459482

460483
runtimes = {"java": args.java, "python": args.python}
461484
gprofiler = GProfiler(
462-
args.frequency, args.duration, args.output_dir, args.flamegraph, args.rotating_output, runtimes, client
485+
args.frequency,
486+
args.duration,
487+
args.output_dir,
488+
args.flamegraph,
489+
args.rotating_output,
490+
runtimes,
491+
client,
492+
not args.disable_container_names,
463493
)
464494
logger.info("gProfiler initialized and ready to start profiling")
465495

gprofiler/merge.py

+32-7
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22
# Copyright (c) Granulate. All rights reserved.
33
# Licensed under the AGPL3 License. See LICENSE.md in the project root for license information.
44
#
5+
import json
56
import logging
67
import re
8+
import socket
79
from collections import Counter, defaultdict
810
from typing import Iterable, Mapping, MutableMapping
911

12+
from gprofiler.docker_client import DockerClient
13+
1014
logger = logging.getLogger(__name__)
1115

1216
SAMPLE_REGEX = re.compile(
@@ -63,7 +67,7 @@ def parse_many_collapsed(text: str) -> Mapping[int, Mapping[str, int]]:
6367
return results
6468

6569

66-
def collapse_stack(stack: str, comm: str) -> str:
70+
def collapse_stack(comm: str, stack: str) -> str:
6771
"""
6872
Collapse a single stack from "perf".
6973
"""
@@ -97,7 +101,12 @@ def parse_perf_script(script: str):
97101
logger.exception(f"Error processing sample: {sample}")
98102

99103

100-
def merge_perfs(perf_all: Iterable[Mapping[str, str]], process_perfs: Mapping[int, Mapping[str, int]]) -> str:
104+
def merge_perfs(
105+
perf_all: Iterable[Mapping[str, str]],
106+
process_perfs: Mapping[int, Mapping[str, int]],
107+
docker_client: DockerClient,
108+
should_determine_container_names: bool,
109+
) -> str:
101110
per_process_samples: MutableMapping[int, int] = Counter()
102111
new_samples: MutableMapping[str, int] = Counter()
103112
process_names = {}
@@ -108,7 +117,10 @@ def merge_perfs(perf_all: Iterable[Mapping[str, str]], process_perfs: Mapping[in
108117
per_process_samples[pid] += 1
109118
process_names[pid] = parsed["comm"]
110119
elif parsed["stack"] is not None:
111-
new_samples[collapse_stack(parsed["stack"], parsed["comm"])] += 1
120+
container_name = _get_container_name(pid, docker_client, should_determine_container_names)
121+
collapsed_stack = collapse_stack(parsed["comm"], parsed["stack"])
122+
stack_line = f'{container_name};{collapsed_stack}'
123+
new_samples[stack_line] += 1
112124
except Exception:
113125
logger.exception(f"Error processing sample: {parsed}")
114126

@@ -118,7 +130,20 @@ def merge_perfs(perf_all: Iterable[Mapping[str, str]], process_perfs: Mapping[in
118130
if process_perf_count > 0:
119131
ratio = perf_all_count / process_perf_count
120132
for stack, count in process_stacks.items():
121-
full_stack = ";".join([process_names[pid], stack])
122-
new_samples[full_stack] += round(count * ratio)
123-
124-
return "\n".join((f"{stack} {count}" for stack, count in new_samples.items()))
133+
container_name = _get_container_name(pid, docker_client, should_determine_container_names)
134+
stack_line = ";".join([container_name, process_names[pid], stack])
135+
new_samples[stack_line] += round(count * ratio)
136+
container_names = docker_client.container_names
137+
docker_client.reset_cache()
138+
profile_metadata = {
139+
'containers': container_names,
140+
'hostname': socket.gethostname(),
141+
'container_names_enabled': should_determine_container_names,
142+
}
143+
output = [f"#{json.dumps(profile_metadata)}"]
144+
output += [f"{stack} {count}" for stack, count in new_samples.items()]
145+
return "\n".join(output)
146+
147+
148+
def _get_container_name(pid: int, docker_client: DockerClient, should_determine_container_names: bool):
149+
return docker_client.get_container_name(pid) if should_determine_container_names else ""

0 commit comments

Comments
 (0)