-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
124 lines (92 loc) · 4.18 KB
/
utils.py
File metadata and controls
124 lines (92 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import logging
import math
import sys
import random
import pandas as pd
from job import Job, Trace
from policy import ElasticFlow, GPUPacking, FGD, DRR, Hops
sys.path.append("..")
def simulate(trace, vc, log_dir, policy, logger, start_ts, args):
if policy == "ElasticFlow":
scheduler = ElasticFlow(trace, vc, log_dir, logger, start_ts, args)
elif policy == "R&P":
scheduler = GPUPacking(trace, vc, log_dir, logger, start_ts, args)
elif policy == "FGD":
scheduler = FGD(trace, vc, log_dir, logger, start_ts, args)
elif policy == "DRR":
scheduler = DRR(trace, vc, log_dir, logger, start_ts, args)
elif policy == "Hops":
scheduler = Hops(trace, vc, log_dir, logger, start_ts, args)
scheduler.simulate()
return True
def trace_pai_process(dir, date_range, read_full, args):
df = pd.read_csv(dir + "/" + args.trace_file)
df.rename(columns={'runtime': 'duration'}, inplace=True)
df.rename(columns={'inst_num': 'node_num'}, inplace=True)
df.rename(columns={'plan_cpu': 'cpu_num'}, inplace=True)
df.rename(columns={'plan_gpu': 'gpu_num'}, inplace=True)
df.rename(columns={'norm_start_time': 'submit_time'}, inplace=True)
df["remain"] = df["duration"]
df[["start_time", "end_time"]] = sys.maxsize
df[["ckpt_times", "queue", "jct"]] = 0
df["status"] = None
df = df[date_range[0] : date_range[1]]
df.sort_values(by="submit_time", inplace=True)
df.reset_index(inplace=True, drop=True)
interarrival_time_generator = random.Random()
interarrival_time_generator.seed(42)
def sample_arrival_time_delta(rate_parameter):
"""Samples job interarrival rate from a Poisson distribution according
to the specified rate parameter."""
return -math.log(1.0 - interarrival_time_generator.random()) / rate_parameter
next_job_arrival_time = 0
lam = 1 / args.interarrival_time
for index, row in df.iterrows():
row['submit_time'] = int(next_job_arrival_time)
df.at[index, 'submit_time'] = row['submit_time']
last_job_arrival_time = next_job_arrival_time
arrival_time_delta = sample_arrival_time_delta(lam)
next_job_arrival_time = arrival_time_delta + last_job_arrival_time
df["submit_time"] -= df.iloc[0]["submit_time"]
begin = df.iloc[0]["submit_time"]
return df, begin
def trace_typical_process(dir, date_range, read_full):
df = pd.read_csv(dir + "/" + "filtered_traces.csv")
df.rename(columns={'runtime': 'duration'}, inplace=True)
df.rename(columns={'inst_num': 'node_num'}, inplace=True)
df.rename(columns={'plan_cpu': 'cpu_num'}, inplace=True)
df.rename(columns={'plan_gpu': 'gpu_num'}, inplace=True)
df.rename(columns={'norm_start_time': 'submit_time'}, inplace=True)
df["remain"] = df["duration"]
df[["start_time", "end_time"]] = sys.maxsize
df[["ckpt_times", "queue", "jct"]] = 0
df["status"] = None
df.reset_index(inplace=True, drop=True)
return df
def trace_parser(df, experiment_name, trace_typical, args):
trace = Trace(experiment_name, args)
for _, series in df.iterrows():
trace.append_job(Job(series))
trace.sort_jobs("submit_time")
if args.scheduler == "FGD":
for _, series in trace_typical.iterrows():
trace.append_job2(Job(series))
return trace
def logger_init(file):
logger = logging.getLogger("mylogger")
handler_file = logging.FileHandler(f"{file}.log", "w")
handler_stream = logging.StreamHandler() # sys.stdout
logger.setLevel(logging.INFO)
handler_file.setLevel(logging.INFO)
handler_stream.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s | %(processName)s | %(message)s", datefmt="%Y %b %d %H:%M:%S")
handler_file.setFormatter(formatter)
handler_stream.setFormatter(formatter)
logger.addHandler(handler_file)
logger.addHandler(handler_stream)
return logger
def get_trace(experiment_name, trace_dir, read_full, range=None, args=None):
trace_range = range
trace_df, start_ts = trace_pai_process(trace_dir, trace_range, read_full, args)
trace_typical = trace_typical_process(trace_dir, trace_range, read_full)
return trace_df, start_ts, trace_typical