Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 102 additions & 169 deletions text/5001-node-selection-algorithm/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
different rows for stewards, and different numbers for N and M, should also be supported.
'''

import csv, os, sys, re, unittest
import csv, os, sys, re, unittest, datetime
from itertools import combinations, islice, chain
#from concurrent.futures import ProcessPoolExecutor
#import numpy as np
import collections
from multiprocessing import Pool

max_f_for_steward_list = -1
f_from_data_file = 0

config_best = 10

def load_clean_csv(fname):
# Read file and parse into rows and cells.
Expand Down Expand Up @@ -53,7 +58,7 @@ def parse_headers(rows, skip_f=False):
rule = rules[rule_idx]
row = rows[row_idx]
row_idx += 1
#print('Testing rule %d against row %d (%s)' % (rule_idx, row_idx, ','.join(row)))
print('Testing rule %d against row %d (%s)' % (rule_idx, row_idx, ','.join(row)))
if apply_rule(row, vars, rule[0], rule[1], rule[2]):
rule_idx += 1
continue
Expand All @@ -76,6 +81,7 @@ def parse_stewards(rows, row_idx):
faults = []
while row_idx < len(rows):
row = rows[row_idx]
print(row)
if is_steward_row(row):
stewards.append(row[0])
mttrs.append(float(row[1]))
Expand Down Expand Up @@ -132,11 +138,17 @@ def max_f_for_steward_count(n):
return max(int((n - 1) / 3), 0)

def load_data(fname, requested_f=max_f_for_steward_list):
print("In 'Load Data'")
rows = load_clean_csv(fname)
print(rows)
file_f, scenarios, liks, row_idx = parse_headers(rows)
print(file_f, scenarios, liks, row_idx)
stewards, mttrs, faults = parse_stewards(rows, row_idx)
print(stewards, mttrs, faults)
# Check validity of the f value we've been given.
max_f = max_f_for_steward_count(len(stewards))
if max_f > 8:
max_f=8
if file_f > 0 and (requested_f == f_from_data_file):
requested_f = file_f
elif requested_f == max_f_for_steward_list:
Expand All @@ -162,27 +174,30 @@ class BestN:
worst item and keep replacing it with something better -- then sort once at the
end.
'''
def __init__(self, quantifier, max=default_best_N):
def __init__(self, max=default_best_N):
'''
Define a quantifier func that will be used to get numeric values for
items in the list. By default, the quantifier should return numbers that
are bigger if better; to sort ascending, make the quantifier return
such a sequence times -1. Set max size of list.'''
assert quantifier
#assert quantifier
assert type(max) is type(3)
assert max > 0
assert max <= 10000
self._items = []
self.quantifier = quantifier
#self.quantifier = lambda x: x.combined_score, config_best
self._sorted = False
self.max = max
self.worst_idx = invalid_worst_idx
self.worst_score = worst_best_score

def _quantifier(self, x):
return x.combined_score, config_best
def _find_worst(self):
self.worst_idx = invalid_worst_idx
self.worst_score = worst_best_score
for i in range(len(self._items)):
n = self.quantifier(self._items[i])
n = self._quantifier(self._items[i])
if (self.worst_idx == invalid_worst_idx) or (n < self.worst_score):
self.worst_idx = i
self.worst_score = n
Expand All @@ -201,22 +216,33 @@ def keep_if_better(self, candidate):
else:
if self.worst_idx == invalid_worst_idx:
self._find_worst()
score = self.quantifier(candidate)
score = self._quantifier(candidate)
if score > self.worst_score:
self._items[self.worst_idx] = candidate
self.worst_idx = invalid_worst_idx # Need to recalculate
self._sorted = False
def get_items(self):
return self._items


class ComboAnalysis:
'''Encapsulate info about a single combination of stewards.'''
def __init__(self, combo, stewards):
def __init__(self, faults, combo, stewards, mttrs):
self.combo = sorted(combo)
self.steward_indexes = [stewards.index(s) for s in combo]
self.steward_indexes = {} # row index
self.combo_faults = [0,0,0,0,0,0,0,0,0,0,0,0]
for i in combo: # needs to be optamized, maybe use panda....
stewards_index = stewards.index(i) # row
indexes = mttrs[stewards_index]
self.steward_indexes[stewards_index] = indexes
fault = faults[stewards_index]
self.combo_faults = [x+y for x, y in zip(self.combo_faults, fault)]
#print(self.combo_faults)
self.results = []
self._total = None
def __getattr__(self, item):
if item == 'combined_score':
if self._total is None:
if not self._total:
self._total = sum([r.score for r in self.results])
return self._total
raise AttributeError(item)
Expand All @@ -227,30 +253,33 @@ def __str__(self):

class ScenarioResult:
'''Encapsulate info about one combination of stewards in one scenario.'''
def __init__(self, scenario, scenarios, liks, faults, combo_indexes, f, mttrs):
self.name = scenario
self.idx = scenarios.index(scenario)
self.likelihood = liks[self.idx]
def __init__(self, index, liks, combo_indexes, combo_summed_faults, f, mttrs):
#self.name = scenario
#self.idx = scenarios.index(scenario)
self.likelihood = liks[index]
self.combo_indexes = combo_indexes
self.f = f
self.fault_count = 0
relevant_mttrs = []
profile = []
for ci in combo_indexes:
relevant_mttrs.append(mttrs[ci])
faults_for_member = faults[ci]
n = faults_for_member[self.idx]
profile.append(n)
#self.fault_count = 0
#relevant_mttrs = []
#profile = []
""" for ci in combo_indexes.keys():
#relevant_mttrs.append(mttrs[ci])
n = faults[ci][index]
#profile.append(n)
if n:
self.fault_count += 1
self.profile = profile
self.failure_distance = self.f - self.fault_count
self.fault_count += 1 """
#self.profile = profile
combo_values = combo_indexes.values()
relevant_mttrs = list( combo_values )
self.failure_distance = self.f - combo_summed_faults
if self.failure_distance < 0:
relevant_mttrs.sort()
# The MTTR of the scenario is the time it will take for the i-th node to
# repair its fault, where i is the number of the node that finally
# gets the whole network back into consensus.
self.mttr = relevant_mttrs[-(self.failure_distance + 1)]
index = self.failure_distance + 1
index = -index
self.mttr = relevant_mttrs[index]
self.importance = self.likelihood * self.mttr
else:
# We don't have any repair time if we never lost consensus.
Expand All @@ -263,36 +292,20 @@ def __init__(self, scenario, scenarios, liks, faults, combo_indexes, f, mttrs):
def __lt__(self, other):
return self.score < other.score

def analyze(f, scenarios, liks, stewards, mttrs, faults, bestN, quiet=False):
m = (3 * f) + 1
n = len(stewards)
total_combinations = factorial(n) / (factorial(m) * factorial(n - m))
if not quiet:
print('Analyzing %d total %d-steward combinations (n=%d, f=%d).' % (total_combinations, m, n, f))
best = BestN(lambda x: x.combined_score, bestN)
for combo in unique_combinations(stewards, m):
analyze_combo(combo, best, scenarios, liks, stewards, mttrs, faults, f)
return best

def analyze_combo(combo, best, scenarios, liks, stewards, mttrs, faults, f):
def analyze_combo(combo, scenarios, liks, stewards, mttrs, faults, f):
'''
Given a single combination, evaluate its downtime across all scenarios. If it has less
downtime than one of the current "top N" combinations, put this one into the "top N"
list.
'''
ca = ComboAnalysis(combo, stewards)
for scenario in scenarios:
sr = ScenarioResult(scenario, scenarios, liks, faults, ca.steward_indexes, f, mttrs)
ca = ComboAnalysis(faults, combo, stewards, mttrs)
for index in range(len(scenarios)):
#print(ca.combo_summed_faults)
summed_faults = ca.combo_faults[index]
sr = ScenarioResult(index, liks, ca.steward_indexes, summed_faults, f, mttrs)
ca.results.append(sr)
best.keep_if_better(ca)

def unique_combinations(items, n):
if n == 0:
yield []
else:
for i in range(len(items)):
for cc in unique_combinations(items[i + 1:], n - 1):
yield [items[i]] + cc
return ca

def report(best, f):
m = (3 * f) + 1
Expand All @@ -303,124 +316,25 @@ def report(best, f):
combo = best.items[i]
print('%d: %s' % (i + 1, combo))

def select(fname, suggested_f, bestN):
f, scenarios, liks, stewards, mttrs, faults = load_data(fname, suggested_f)
best = analyze(f, scenarios, liks, stewards, mttrs, faults, bestN)
report(best, f)

class Tests(unittest.TestCase):

def test_ScenarioResult(self):
stewards = 'A,B,C,D,E,F'.split(',')
scenarios = 'a,b,c'.split(',')
liks = [.5, .4, .3]
mttrs = [6,7,8,9,10,11]
combo_indexes = [0,2,3,4]
faults = [[0,1,1],[1,0,0],[0,0,0],[1,1,1],[0,1,0],[1,0,1]]
sr = ScenarioResult('b', scenarios, liks, faults, combo_indexes, 1, mttrs)
self.assertEquals(sr.name, 'b')
self.assertEquals(sr.idx, 1)
self.assertAlmostEquals(sr.likelihood, .4)
self.assertEquals(sr.mttr, 8)
self.assertAlmostEquals(sr.score, -6.4)

def test_analyze(self):
stewards = 'A,B,C,D,E,F'.split(',')
scenarios = 'a,b,c'.split(',')
liks = [.5, .4, .3]
mttrs = [6,7,8,9,10,11]
faults = [[0,1,1],[1,0,0],[0,0,0],[1,1,1],[0,1,0],[1,0,1]]
best = analyze(1,scenarios,liks,stewards,mttrs,faults,3, quiet=True)
self.assertTrue(best.items[0].combined_score > best.items[1].combined_score)
self.assertTrue(best.items[1].combined_score > best.items[2].combined_score)

def test_unique_combinations(self):
fruit = 'apple,banana,orange,pear'.split(',')
combos = '\n'.join(sorted(['+'.join(x) for x in unique_combinations(fruit, 2)]))
self.assertEquals(combos, 'apple+banana\napple+orange\napple+pear\nbanana+orange\nbanana+pear\norange+pear')

def test_is_empty_row(self):
self.assertTrue(is_empty_row((None,None,None)))
self.assertTrue(is_empty_row(('','','')))
self.assertFalse(is_empty_row((1,'',None)))

def test_convert_float(self):
self.assertAlmostEquals(convert_float('1'), 1.0)
self.assertAlmostEquals(convert_float(1), 1.0)
self.assertAlmostEquals(convert_float('0.01'), 0.01)
self.assertAlmostEquals(convert_float('10%'), 0.1)

def test_factorial(self):
self.assertEquals(factorial(1), 1)
self.assertEquals(factorial(2), 2)
self.assertEquals(factorial(3), 6)
self.assertEquals(factorial(4), 24)

def test_has_string(self):
self.assertTrue(has_string(' abc '))
self.assertTrue(has_string('X'))
self.assertFalse(has_string(' 123'))

def test_has_num(self):
self.assertFalse(has_num(' abc '))
self.assertFalse(has_num('X'))
self.assertTrue(has_num(' 123'))
self.assertTrue(has_num('-2'))
self.assertTrue(has_num('1%'))

def test_is_steward_row(self):
self.assertTrue(is_steward_row(('x', '25', 1, '0', '0', '1')))
self.assertFalse(is_steward_row(('x', '25', 5, '0', '0', '1')))
self.assertFalse(is_steward_row(('x', '25', '1', '0', '0.1', '1')))

def test_max_f_for_steward_count(self):
self.assertEquals(max_f_for_steward_count(0), 0)
self.assertEquals(max_f_for_steward_count(3), 0)
self.assertEquals(max_f_for_steward_count(4), 1)
self.assertEquals(max_f_for_steward_count(5), 1)
self.assertEquals(max_f_for_steward_count(7), 2)

def test_parse_headers(self):
rows = [x.split(',') for x in ''',,scheduled maintenance coincidence,botched upgrade,foo
likelihood per year (from MTBF),,1%,85%,17%
Steward,MTTR,fault?,fault?,fault?
Bank A,5,1,1,0
Tech Firm B,13,1,0,1'''.replace('\r', '').split('\n')]
f, scenarios, liks, row_idx = parse_headers(rows)
self.assertEquals(row_idx, 3)
self.assertEquals(liks, [0.01,0.85,0.17])
self.assertEquals(f, 0)

def test_load_data_good(self):
fname = os.path.join(os.path.dirname(__file__), 'sample-data.csv')
f, scenarios, liks, stewards, mttrs, faults = load_data(fname, f_from_data_file)
self.assertEquals(f, 1)
self.assertEquals(scenarios[0], 'scheduled maintenance coincidence')
self.assertEquals(scenarios[11], 'major natural disaster, US East Coast')
self.assertEquals(liks, [0.01, 0.85, 0.0001, 0.05, 0.01, 0.8, 0.03, 0.5, 0.02, 0.03, 0.1, 0.0001])
self.assertEquals(stewards, ['Bank A', 'Tech Firm B', 'University C', 'Law Firm D', 'NGO E', 'Government F', 'Consortium G', 'Tech Firm H', 'Biotech Firm J'])
self.assertEquals(mttrs, [5.0, 13.0, 11.0, 6.0, 6.0, 9.0, 12.0, 8.0, 7.0])
self.assertEquals(faults[0], [1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0])
self.assertEquals(faults[8], [1, 1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0])

def test_not_enough_stewards(self):
fname = os.path.join(os.path.dirname(__file__), 'sample-data.csv')
self.assertRaises(Exception, load_data, fname, 5)

def test_BestN(self):
# Keep a list of the 3 best integers. Always give it back in sorted form.
b = BestN(lambda x: x, 3)
def try_this(*args):
b = BestN(lambda x: x, 3)
for arg in args:
b.keep_if_better(arg)
return b
b = try_this(3,10,4,11,9)
self.assertEquals(b.items, [11,10,9])
self.assertEquals(b.worst_idx, 2)
self.assertEquals(b.worst_score, 9)
b = try_this(-10,-9,-3.14,4.0,-1)
self.assertEquals(b.items, [4,-1,-3.14])
def batch(iterable, n=1): # https://stackoverflow.com/questions/8290397/how-to-split-an-iterable-in-constant-size-chunks
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]

""" def grouper(n, iterable):
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args) """

def grouper_it(n, iterable): # https://pastebin.com/YkKFvm8b
it = iter(iterable)
while True:
chunk_it = islice(it, n)
try:
first_el = next(chunk_it)
except StopIteration:
return
yield chain((first_el,), chunk_it)

if __name__ == '__main__':
if len(sys.argv) >= 2 and sys.argv[1] == 'test':
Expand All @@ -433,4 +347,23 @@ def try_this(*args):
parser.add_argument('--f', '-f', help='Number of faulted nodes to allow before consensus is lost. -1=max allowed by steward list (default); 0=as in data file', type=int, default=max_f_for_steward_list)
parser.add_argument('--best', help='Specify how many of the best steward combinations to show.', type=int, default=10)
args = parser.parse_args()
select(args.fname, args.f, args.best)

f, scenarios, liks, stewards, mttrs, faults = load_data(args.fname, args.f)
# analyze
m = (3 * f) + 1
print ("m= %s",m)
n = len(stewards)
print ("n= %s",n)
total_combinations = factorial(n) / (factorial(m) * factorial(n - m))
print('Analyzing %d total %d-steward combinations (n=%d, f=%d).' % (total_combinations, m, n, f))

def calculate_combination(combo):
return analyze_combo(combo, scenarios, liks, stewards, mttrs, faults, f)

with Pool() as pool:
best = BestN()
combos = grouper_it(800, combinations(stewards, m))
for cs in combos:
combos_with_score = pool.map_async( calculate_combination, cs)
[best.keep_if_better(combo) for combo in combos_with_score.get() ]
report(best, f)