Skip to content
This repository was archived by the owner on May 4, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 64 additions & 3 deletions mlbench/refimpls/pytorch/controlflow/controlflow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import torch
import torch.distributed as dist
import numpy as np
import torch.nn.functional as F

from utils import checkpoint
from utils import log
from utils.metrics import AverageMeter
from utils.helper import Timeit, maybe_range, update_best_runtime_metric
from utils.communication import aggregate_gradients, global_average
from utils.communication import aggregate_gradients, global_average, aggregate_sparsified_gradients
from utils.utils import convert_dtype

from datasets import create_dataset
Expand All @@ -22,6 +24,7 @@ def train_epoch(model, optimizer, criterion, scheduler, options, timeit):
scheduler.step()

data = convert_dtype(options.dtype, data)

if options.force_target_dtype:
target = convert_dtype(options.dtype, target)

Expand All @@ -32,9 +35,27 @@ def train_epoch(model, optimizer, criterion, scheduler, options, timeit):
output = model(data)
loss = criterion(output, target)
loss.backward()
aggregate_gradients(model, options.world_size)

if options.opt_name == 'sparsified_sgd':
aggregate_sparsified_gradients(model, options.world_size,
options.sparse_grad_size,
options.random_sparse,
optimizer,
scheduler.get_lr())
else:
aggregate_gradients(model, options.world_size)

optimizer.step()

if options.model_name == 'logistic_regression' and options.train_validate:
t = options.runtime['current_epoch'] * options.train_num_samples_per_device + batch_idx * options.batch_size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single letter variable names are usually not that good for readability (except for something like i as a counter in a for loop), verbose variable names make the code more readable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I'll fix it.

optimizer.update_estimated_weights(model, t, options.sparse_grad_size)

if t % options.compute_loss_every == 0:
print("Train validation....")
timeit.pause()
train_validate(optimizer, model, options)
timeit.resume()
with torch.no_grad():
loss = loss.item()
loss = global_average(loss, 1).item()
Expand All @@ -46,6 +67,45 @@ def train_epoch(model, optimizer, criterion, scheduler, options, timeit):
timeit.resume()


def train_validate(optimizer, model, options):
""" Validation on train data by using weighted average of parameters """
estimated_weights = optimizer.get_estimated_weights(model)
num_samples = 0
l1 = options.l1_coef
l2 = options.l2_coef

loss = 0

for batch_idx, (data, target) in zip(maybe_range(options.max_batch_per_epoch),
options.val_loader):
data = convert_dtype(options.dtype, data)
if options.force_target_dtype:
target = convert_dtype(options.dtype, target)

if options.use_cuda:
data, target = data.cuda(), target.cuda()
target = target * 2 - 1

for weight in estimated_weights:
w = weight.squeeze()
batch_loss = np.log(1 + np.exp(-target * (data @ w)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a softmargin loss, right? Could use https://pytorch.org/docs/stable/nn.html#softmarginloss and https://pytorch.org/docs/stable/torch.html#torch.matmul here.

Especially since numpy ops are on the CPU, not GPU, so the .cuda() earlier would just waste time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll use the softmargin loss instead, thanks.

loss += batch_loss.sum().item()

num_samples += data.size()[0]

train_loss = global_average(loss, num_samples).item()

l2_loss = sum(weight.norm(2) ** 2 for weight in estimated_weights).item()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we just need to calculate L1 and L2 norm of a tensor here, I think using loss functions make it complicated.

train_loss += l2 / 2 * l2_loss
l1_loss = sum(weight.norm(1) for weight in estimated_weights).item()
train_loss += l1 * l1_loss

print("Global Train Loss: " + str(train_loss))

with open(options.ckpt_run_dir + "/" + str(dist.get_rank()) + "_train_validation.txt", "a+") as file:
file.write(str(train_loss) + "\n")


def validate(model, optimizer, criterion, metrics, options):
model.eval()

Expand Down Expand Up @@ -123,7 +183,7 @@ def __call__(self, model, optimizer, criterion, metrics, scheduler, options):
options.batch_size), 0)

# train the model and evaluate the model per args.eval_freq
max_epochs = min(options.train_epochs, options.max_train_steps)\
max_epochs = min(options.train_epochs, options.max_train_steps) \
if options.max_train_steps else options.train_epochs
start_epoch = options.runtime['current_epoch'] if options.resume else 0
options.runtime['records'] = options.runtime.get('records', [])
Expand All @@ -134,6 +194,7 @@ def __call__(self, model, optimizer, criterion, metrics, scheduler, options):

timeit = Timeit(0 if len(options.runtime['cumu_time_val']) == 0
else options.runtime['cumu_time_val'][-1])

for epoch in range(start_epoch, max_epochs):
options.runtime['current_epoch'] = epoch

Expand Down
4 changes: 3 additions & 1 deletion mlbench/refimpls/pytorch/datasets/load_libsvm_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def f(x):
for ind, (from_index, to_index) in from_to_indices:
if from_index <= x and x < to_index:
return ind, x - from_index

return f

def _get_matched_index(self, index):
Expand Down Expand Up @@ -200,7 +201,8 @@ def get_dataset_info(name):
def load_libsvm_lmdb(name, lmdb_path):
stats = get_dataset_info(name)
dataset = IMDBPT(lmdb_path, transform=maybe_transform_sparse(stats),
target_transform=lambda x: torch.Tensor(x), is_image=False)
# target_transform=lambda x: torch.Tensor(x), is_image=False)
target_transform=None, is_image=False)
return dataset


Expand Down
3 changes: 2 additions & 1 deletion mlbench/refimpls/pytorch/datasets/partition_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class DataPartitioner(Partitioner):
def __init__(self, data, rank, shuffle, sizes=[0.7, 0.2, 0.1]):
# prepare info.
self.data = data
self.data_size = len(self.data)
# Drop a few of the last samples to make the dataset size divisible by the the number of workers
self.data_size = int(len(self.data)/len(sizes)) * len(sizes)
self.partitions = []

# get shuffled/unshuffled data.
Expand Down
1 change: 1 addition & 0 deletions mlbench/refimpls/pytorch/models/linear_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import math
import torch.nn.functional as F
from torch.nn.parameter import Parameter
import numpy as np


class LogisticRegression(torch.nn.Module):
Expand Down
96 changes: 78 additions & 18 deletions mlbench/refimpls/pytorch/optim/lr.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SchedulerParser(argparse.ArgumentParser):
def __init__(self, add_help=False, multisteplr_milestones=True, multisteplr_gamma=True,
warmup=True, warmup_init_lr=True, warmup_linear_scaling=True, warmup_durations=True,
clr_cycle_length=True, clr_base_lr=True, clr_max_lr=True, clr_mode=True,
clr_gamma=True, clr_extra=True):
clr_gamma=True, clr_extra=True, sgd_lr_alpha=True, sgd_lr_beta=True, sgd_lr_gamma=True):
super(SchedulerParser, self).__init__(add_help=add_help)

if multisteplr_milestones:
Expand All @@ -60,8 +60,8 @@ def __init__(self, add_help=False, multisteplr_milestones=True, multisteplr_gamm
if warmup:
self.add_argument("--warmup", default=False, action="store_true",
help="[default: %(default)s] linearly warmup learning rate before other scheduling."
"For the moment, only implemented for multistep learning rate with warmup."
"The warmup is used for training with more than one process.")
"For the moment, only implemented for multistep learning rate with warmup."
"The warmup is used for training with more than one process.")

if warmup_init_lr:
warmup_init_lr_group = self.add_mutually_exclusive_group()
Expand All @@ -70,32 +70,32 @@ def __init__(self, add_help=False, multisteplr_milestones=True, multisteplr_gamm

warmup_init_lr_group.add_argument("--warmup_init_lr_nonscale", action='store_true', default=False,
help="[default: %(default)s] Use nonscaled lr for initial warmup lr"
"for training. If this flag is true, then ignore")
"for training. If this flag is true, then ignore")

if warmup_linear_scaling:
self.add_argument("--warmup_linear_scaling", action='store_true', default=False,
help="[default: %(default)s] scale the learning rate by a factor after warmup."
"For linear scaling rule, this factor is the number of machines.")
"For linear scaling rule, this factor is the number of machines.")

if warmup_durations:
self.add_argument("--warmup_durations", type=parse_batch_epoch, default={'batch': 1}, metavar='<MLSRSI>',
help="[default: % (default)s] duration for the warmup."
"The warumup should be a batch level.")
"The warumup should be a batch level.")

if clr_cycle_length:
self.add_argument("--clr_cycle_length", type=parse_batch_epoch, default='batch:2000', metavar='<CLRCL>',
help="[default: %(default)s] cycle length in a cyclical learning rates training."
"It can be `batch:int_batches` or `epoch:float_epochs`.")
"It can be `batch:int_batches` or `epoch:float_epochs`.")

if clr_base_lr:
self.add_argument("--clr_base_lr", type=float, default=0.001, metavar='<CLRBLR>',
help="[default: %(default)s] minimum and initial learning rate in cyclical"
"learning rates training.")
"learning rates training.")
if clr_max_lr:
self.add_argument("--clr_max_lr", type=float, default=0.1, metavar='<CLRMLR>',
help="[default: %(default)s] maximum learning rate in cyclical"
"learning rates training. Note this maximum value might not be reached "
"depending on the chosen scaling mode.")
"learning rates training. Note this maximum value might not be reached "
"depending on the chosen scaling mode.")

if clr_mode:
self.add_argument("--clr_mode", type=str, default='triangular', metavar='<CLRM>',
Expand All @@ -104,12 +104,25 @@ def __init__(self, add_help=False, multisteplr_milestones=True, multisteplr_gamm
if clr_gamma:
self.add_argument("--clr_gamma", type=float, default=0.99, metavar='<CLRG>',
help="[default: %(default)s] constant in 'exp_range' scaling function"
" in cyclical learning rate schedule.")
" in cyclical learning rate schedule.")

if clr_extra:
self.add_argument("--clr_extra", type=float, default=0.1, metavar='<CLRE>',
help="[default: %(default)s] Extra number of iterations of training for one cycle.")

if sgd_lr_alpha:
self.add_argument("--alpha", type=float, default=100,
help="[default: %(default)s] Constant is used to calculate the optimal learning rate for"
"SGD ( alpha / beta + t).")
if sgd_lr_beta:
self.add_argument("--beta", type=float, default=100,
help="[default: %(default)s] Constant is used to calculate the optimal learning rate for"
"SGD ( alpha / beta + t).")
if sgd_lr_gamma:
self.add_argument("--sgd_lr_gamma", type=float, default=100,
help="[default: %(default)s] Constant is used to calculate the optimal learning rate for"
"sparsified SGD ( gamma / (a + t) * lambda).")


def const(optimizer):
return LambdaLR(optimizer, lr_lambda=lambda x: 1.0)
Expand Down Expand Up @@ -140,16 +153,16 @@ def triangular_learning_rates(optimizer, base_lr, max_lr, cycle_length, scale_fn
def f(iterations):
if iterations <= cycle_length:
cycle = np.floor(1 + iterations / (2 * step_size))
x = np.abs(iterations/step_size - 2 * cycle + 1)
lr = base_lr + (max_lr-base_lr) * np.maximum(0, (1-x)) * scale_fn(cycle, iterations)
x = np.abs(iterations / step_size - 2 * cycle + 1)
lr = base_lr + (max_lr - base_lr) * np.maximum(0, (1 - x)) * scale_fn(cycle, iterations)
else:
lr = base_lr * extra
return lr / base_lr
else:
def f(iterations):
cycle = np.floor(1 + iterations / (2 * step_size))
x = np.abs(iterations/step_size - 2 * cycle + 1)
lr = base_lr + (max_lr-base_lr) * np.maximum(0, (1-x)) * scale_fn(cycle, iterations)
x = np.abs(iterations / step_size - 2 * cycle + 1)
lr = base_lr + (max_lr - base_lr) * np.maximum(0, (1 - x)) * scale_fn(cycle, iterations)
return lr / base_lr

# Use base_lr to overwrite the --lr
Expand All @@ -175,11 +188,14 @@ def cyclical_learning_rates(options, optimizer):
mode = options.clr_mode
gamma = options.clr_gamma
if mode in ['linear', 'triangular', 'one_cycle']:
def scale_fn(cycle, iterations): return 1.
def scale_fn(cycle, iterations):
return 1.
elif mode == 'triangular2':
def scale_fn(cycle, iterations): return 1 / (2. ** (cycle - 1))
def scale_fn(cycle, iterations):
return 1 / (2. ** (cycle - 1))
elif mode == 'exp_range':
def scale_fn(cycle, iterations): return gamma ** iterations
def scale_fn(cycle, iterations):
return gamma ** iterations
else:
raise ValueError("Cycle mode {} not support.".format(mode))

Expand Down Expand Up @@ -249,13 +265,57 @@ def f(durations):
return LambdaLR(optimizer, lr_lambda=f)


def sgd_optimal_learning_rates(options, optimizer):
"""
Learning rate schedule for SGD (alpha / (t + beta))
:param options: all configs
:param optimizer: optimizer associated with the scheduler
"""
beta = options.beta
alpha = options.alpha

def f(iterations):
return beta / (beta + iterations)

for group in optimizer.param_groups:
group['initial_lr'] = alpha / beta

optimizer.base_lrs = [alpha / beta for _ in optimizer.param_groups]
return LambdaLR(optimizer, lr_lambda=f)


def sparsified_sgd_optimal_learning_rate(options, optimizer):
"""
Learning rate schedule for sparsifiedSGD (gamma / lambda * (t + a))
param options: all configs
param optimizer: optimizer associated with the scheduler
"""
# TODO get feature size from the config file
a = 2000 / options.sparse_grad_size
l2_coef = options.l2_coef
gamma = options.sgd_lr_gamma

def f(iterations):
return 1 / max(1, (a + iterations))

optimizer.base_lrs = [gamma / l2_coef for _ in optimizer.param_groups]
for group in optimizer.param_groups:
group['initial_lr'] = gamma / l2_coef

return LambdaLR(optimizer, lr_lambda=f)


def get_scheduler(options, optimizer):
if options.lr_scheduler == 'const':
return const(optimizer)
elif options.lr_scheduler == 'CLR':
return cyclical_learning_rates(options, optimizer)
elif options.lr_scheduler == 'MultiStepLRW':
return multistep_learning_rates_with_warmup(options, optimizer)
elif options.lr_scheduler == 'sgd_optimal':
return sgd_optimal_learning_rates(options, optimizer)
elif options.lr_scheduler == 'sparsified_sgd':
return sparsified_sgd_optimal_learning_rate(options, optimizer)
else:
raise NotImplementedError

Expand Down
Loading