diff --git a/dask-metrics/README.md b/dask-metrics/README.md index 535cc54..4037acc 100644 --- a/dask-metrics/README.md +++ b/dask-metrics/README.md @@ -32,6 +32,7 @@ pip install . * **`timestamp`**: the time, in seconds, since the start of metric collection. ### Optional +* **`tasks`**: the keys of the currently executing tasks * **`total-mem`**: the memory used, in bytes, by each GPU on a worker * **`mem-util`**: the memory utilization of each GPU on a worker as a percentage of total memory available. * **`compute-util`**: the compute utilization of each GPU on a worker as a percentage of maximum compute ability. diff --git a/dask-metrics/conda.recipe/meta.yaml b/dask-metrics/conda.recipe/meta.yaml index 4522b70..d36ac0a 100644 --- a/dask-metrics/conda.recipe/meta.yaml +++ b/dask-metrics/conda.recipe/meta.yaml @@ -1,9 +1,9 @@ package: name: dask-metrics - version: "2021.8.12" + version: "2021.8.20" build: - number: 0 + number: 2 noarch: python source: diff --git a/dask-metrics/dask_metrics/monitor.py b/dask-metrics/dask_metrics/monitor.py index fb6fb0a..994291d 100644 --- a/dask-metrics/dask_metrics/monitor.py +++ b/dask-metrics/dask_metrics/monitor.py @@ -677,6 +677,14 @@ def mem_util(worker, handle): def compute_util(worker, handle): return pynvml.nvmlDeviceGetUtilizationRates(handle).gpu + @operation("tasks", per_device=False) + def tasks(worker, handles): + result = [] + for key, state in worker.tasks.items(): + if state.state == "executing": + result.append(key) + return ", ".join(result) + self.tracking_list = [o for o in operations if o["name"] in tracking] self.tracking_list += custom self.clear_metrics(clear_disk=True) diff --git a/dask-metrics/dask_metrics/visualize.py b/dask-metrics/dask_metrics/visualize.py index b6d2aca..df3c0e7 100644 --- a/dask-metrics/dask_metrics/visualize.py +++ b/dask-metrics/dask_metrics/visualize.py @@ -13,13 +13,18 @@ def _get_data(worker_file, job, metrics): df = cudf.read_csv(worker_file) section = df[df["job"] == job] + if len(section) == 0: + raise ValueError( + f"The requested job '{job}' was not found in the provided file" + ) + sections = [] for metric in metrics: if metric not in section: raise ValueError( f"The requested metric '{metric}' was not found in the provided file" ) - + m = section[metric].str.split(pat=", ") m = m.to_arrow().to_pylist() m = cudf.DataFrame(m, dtype="int64") diff --git a/dask-metrics/setup.py b/dask-metrics/setup.py index 3eb8740..ac7f86e 100755 --- a/dask-metrics/setup.py +++ b/dask-metrics/setup.py @@ -11,7 +11,7 @@ setup( name="dask-metrics", - version="2021.8.12", + version="2021.8.20", description="A tool for collecting metrics on distributed Dask clusters", author="Travis Hester", packages=find_packages(include=["dask_metrics"]),