


@ 1,13 +1,110 @@





# py.test3 logclilevel=DEBUG v k test_share_income share.py





# py.test3 logclilevel=DEBUG vv share.py










import copy





import math





import logging





import itertools










#





# Implementation of the Hostea revenue sharing model





# See https://forum.hostea.org/t/decisionrevenuesharingmodel/92





#





# The members argument is a list of Hostea members, represented as a list:





# [





# [





# ID, # username in the https://gitea.hostea.org/Hostea/organization repository





# EXPENSE, # integer, expense elligible for payment





# INCOME, # integer, total available income





# ],





# ...





# ]





#





# The function returns which share of the pending expenses each member





# is allowed to redeem and, if necessary, how much to invoice other





# peer members to get the rest.





#





# share_income(members) => (shares, payments)





#





# The 'shares' are as follows:





#





# [





# [ID, EXPENSE, INCOME],





# ...





# }





#





# For instance in the if the 'members' argument is [['isabelle', 200, 10]], the





# shares will be [['isabelle', 10, 10]] where the 200 expense was reduced to a





# share of 10 because there only is 10 income available.





#





# The 'payments' maps member IDs to the amount other peer members must be invoiced:





#





# {





# ID: [(ID, integer), (ID, integer), ...],





# ...





# }





#





# For instance in the following marie will invoice isable for 200





# and john for 50 while lucie will invoice john for 10:





#





# {





# 'marie': [('isabelle', 200), ('john', 50)],





# 'lucie': [('john', 10)],





# }





#





# If a member has both income and expenses, they are not required to invoice





# themselves, it is implicit. For instance, if the 'payable' return value is





# [['isabelle', 10, 10]], the 'payments' return value will be empty and isabelle





# will be expected to reduce both her income and her expenses by 10.





#





def share_income(members):





payable = payable_expenses(members)





payments = {





member[ID]: paying_members





for member, paying_members in distribute(copy.deepcopy(payable))





}





return (payable, payments)















def test_share_income():





members = [["a", 10, 5], ["b", 450, 25], ["c", 0, 0]]





payable, payments = share_income(members)





assert payable == [["a", 10, 5], ["b", 20, 25]]





assert payments == {"a": [("b", 5)]}















######################################################################





#





# Structure layout and utilities





#





######################################################################










ID = 0





EXPENSE = 1





INCOME = 2





OPTIMUM_TRANSACTIONS = 2















def total(members, position):





return sum(member[position] for member in members)















def total_expense(members):





return total(members, EXPENSE)















def total_income(members):





return total(members, INCOME)















######################################################################





#





# Figure out the share each member is entitled to





#





######################################################################





#





# For a given income, return a list of the members who will not be paid in full





# and how much will remain in their expense balance.





#





# See https://forum.hostea.org/t/decisionrevenuesharingmodel/92





#





# The members list given in argument is a list of pairs of





#





# ['member', EXPENSE]




# argument and only contains the members that cannot be paid in full





# with the income given in argument. For instance:





#





# share_income(3, [['a', 2], ['b', 2]]) == [['b', 1]]





# unpaid_expenses(3, [['a', 2], ['b', 2]]) == [['b', 1]]





#





# Means that with an income of 3, only member 'a' can be paid in full and





# the remaining expense for member 'b' is 1.





#





def share_income(income, members):





def unpaid_expenses(income, members):





if income <= 0:





#





# Recursion termination: no more income to share, none of the




# income to give each member the lowest expense, it is divided





# evenly.





#





share = min(int(income / len(members)), min(members, key=lambda m: m[EXPENSE])[EXPENSE])





share = min(





int(income / len(members)), min(members, key=lambda m: m[EXPENSE])[EXPENSE]





)





count = len(members)





#





# remaining is the list of members that cannot be paid in full




# case above) recurse, but only with the members that expect to be





# paid more.





#





return share_income(income  share * count, remaining)





return unpaid_expenses(income  share * count, remaining)










def test_share_income():





assert share_income(1, [['a', 1], ['b', 1]]) == [['b', 1]]





assert share_income(5, [['a', 10], ['b', 2]]) == [['a', 7]]





assert share_income(5, [['a', 2], ['b', 10], ['c', 1]]) == [['b', 8]]





assert share_income(5, [['a', 2], ['b', 10], ['c', 1], ['d', 40]]) == [['b', 9], ['d', 39]]





assert share_income(5, [['a', 2], ['b', 10], ['c', 1], ['d', 40], ['e', 3]]) == [['a', 1], ['b', 9], ['d', 39], ['e', 2]]





assert share_income(5, [['a', 2], ['b', 10], ['c', 1], ['d', 40], ['e', 3], ['f', 1]]) == [['a', 1], ['b', 9], ['d', 39], ['e', 2], ['f', 1]]










def test_unpaid_expenses():





assert unpaid_expenses(1, [["a", 1], ["b", 1]]) == [["b", 1]]





assert unpaid_expenses(5, [["a", 10], ["b", 2]]) == [["a", 7]]





assert unpaid_expenses(5, [["a", 2], ["b", 10], ["c", 1]]) == [["b", 8]]





assert unpaid_expenses(5, [["a", 2], ["b", 10], ["c", 1], ["d", 40]]) == [





["b", 9],





["d", 39],





]





assert unpaid_expenses(5, [["a", 2], ["b", 10], ["c", 1], ["d", 40], ["e", 3]]) == [





["a", 1],





["b", 9],





["d", 39],





["e", 2],





]





assert unpaid_expenses(





5, [["a", 2], ["b", 10], ["c", 1], ["d", 40], ["e", 3], ["f", 1]]





) == [["a", 1], ["b", 9], ["d", 39], ["e", 2], ["f", 1]]















def payable_expenses(members):





members = copy.deepcopy(members)





id2members = {m[ID]: m for m in members}





for unpaid in unpaid_expenses(





min(total_expense(members), total_income(members)), copy.deepcopy(members)





):





id2members[unpaid[ID]][EXPENSE] = unpaid[EXPENSE]





return [m for m in id2members.values() if m[EXPENSE] > 0 or m[INCOME] > 0]















def test_payable_expenses():





members = [["a", 1, 0], ["b", 1, 1], ["c", 0, 0]]





expected = [["a", 1, 0], ["b", 0, 1]]





assert payable_expenses(members) == expected















######################################################################





#





# Distribute the income from senders to receivers, nothing tricky here





#





######################################################################















def distribute_to_receivers(receivers, senders):





result = []





for receiver in receivers:





paying_members, senders = get_paid(receiver, senders)





result.append((receiver, paying_members))





return result















def test_distribute_to_receivers():





receivers = [["a", 5, 0], ["b", 10, 0]]





senders = [["c", 0, 12], ["d", 0, 3]]





assert distribute_to_receivers(receivers, senders) == [





(["a", 0, 0], [("c", 5)]),





(["b", 0, 0], [("c", 7), ("d", 3)]),





]










receivers = [["b", 12, 0], ["a", 5, 0]]





senders = [["c", 0, 10], ["d", 0, 5], ["e", 0, 1], ["f", 0, 1]]





expected = [





(["b", 0, 0], [("c", 10), ("d", 2)]),





(["a", 0, 0], [("d", 3), ("e", 1), ("f", 1)]),





]





assert distribute_to_receivers(receivers, senders) == expected










receivers = [["a", 5, 0], ["b", 12, 0]]





senders = [["c", 0, 10], ["d", 0, 5], ["e", 0, 1], ["f", 0, 1]]





expected = [





(["a", 0, 0], [("c", 5)]),





(["b", 0, 0], [("c", 5), ("d", 5), ("e", 1), ("f", 1)]),





]





assert distribute_to_receivers(receivers, senders) == expected















def get_paid(who, members):





paying_members = []





result = []





for member in sorted(members, key=lambda m: m[INCOME], reverse=True):





pay = min(member[INCOME], who[EXPENSE])





if pay > 0:





paying_members.append((member[ID], pay))





member[INCOME] = pay





who[EXPENSE] = pay





result.append(member)





assert who[EXPENSE] == 0





return (paying_members, result)















def test_get_paid():





orig = [["a", 10, 0], ["c", 0, 6], ["b", 0, 10]]





paying_members, modified = get_paid(orig[0], orig)





assert paying_members == [("b", 10)]





assert modified == [["b", 0, 0], ["c", 0, 6], ["a", 0, 0]]










orig = [["a", 10, 0], ["c", 0, 6], ["b", 0, 4]]





paying_members, modified = get_paid(orig[0], orig)





assert paying_members == [("c", 6), ("b", 4)]





assert modified == [["c", 0, 0], ["b", 0, 0], ["a", 0, 0]]










orig = [["a", 10, 0], ["c", 0, 8], ["b", 0, 4], ["d", 2, 0]]





paying_members, modified = get_paid(orig[0], orig)





assert paying_members == [("c", 8), ("b", 2)]





assert modified == [["c", 0, 0], ["b", 0, 2], ["a", 0, 0], ["d", 2, 0]]





paying_members, modified = get_paid(orig[3], orig)





assert paying_members == [("b", 2)]





assert modified == [["b", 0, 0], ["a", 0, 0], ["c", 0, 0], ["d", 0, 0]]















def self_get_paid(members):





for member in members:





pay = min(member[INCOME], member[EXPENSE])





member[INCOME] = pay





member[EXPENSE] = pay





return [m for m in members if m[EXPENSE] > 0 or m[INCOME] > 0]















def test_self_get_paid():





members = [["a", 5, 10], ["b", 20, 10], ["c", 0, 30], ["d", 40, 0]]





expected = [["a", 0, 5], ["b", 10, 0], ["c", 0, 30], ["d", 40, 0]]





assert self_get_paid(members) == expected















######################################################################





#





# For each receiver (i.e. a member that will have expenses paid), set





# the number of transactions they would need in the ideal situation





# where they would get paid by the senders who have the most income





# before anyone else.





#





# It will be used to stop iterating as soon as an optimal situation





# is detected instead of exploring all permutations.





#





######################################################################















def set_optimum_transactions(receivers, senders):





result = []





for receiver in receivers:





expense = receiver[EXPENSE]





optimum = 0





for sender in sorted(senders, key=lambda m: m[INCOME], reverse=True):





optimum += 1





expense = sender[INCOME]





if expense <= 0:





break





assert expense <= 0





receiver[OPTIMUM_TRANSACTIONS] = optimum





result.append(receiver)





return result















def test_set_optimum_transactions():





receivers = [["a", 5, 0], ["b", 12, 0]]





senders = [["c", 0, 10], ["d", 0, 5]]





expected = [["a", 5, 1], ["b", 12, 2]]





modified = set_optimum_transactions(receivers, senders)





assert modified == expected















def is_optimal(result):





for receiver, paying_members in result:





if receiver[OPTIMUM_TRANSACTIONS] > len(paying_members):





return False





return True















######################################################################





#





# Computes which members must be invoiced to pay the expenses to other





# members, with a minimum number of transactions. The algorithm is based





# on the following assumptions:





#





# * The total of all expenses is not greater than the total of all





# incomes.





#





# * The members with are sorted with greater income first and





# permutations where a better solution could be found with a





# different ordering are ignored.





#





# * Iterating over the permutations of the list of members that are





# allowed to get a share will produce a solution that is good enough.





#





# * An optimal solution (see is_optimal) will be found before





# iterating over a significant part of the permutations when there





# is a large (>10) number of members.





#





# The members argument is a list of Hostea members, represented as a list:





# [





# [





# ID, # username in the https://gitea.hostea.org/Hostea/organization repository





# EXPENSE, # integer, expense elligible for payment





# INCOME, # integer, total available income





# ],





# ...





# ]





#





#





######################################################################















def distribute(members):





members = self_get_paid(members)





senders = [m for m in members if m[INCOME] > 0]





receivers = [m for m in members if m[EXPENSE] > 0]





receivers = set_optimum_transactions(receivers, senders)





logging.debug(





f"distribute {receivers} to {senders} with at most {math.factorial(len(receivers))} iterations"





)





results = []





iter = 0





for receivers in itertools.permutations(receivers):





iter += 1





result = distribute_to_receivers(receivers, senders)





#





# There may be other solutions but since this one is optimal,





# no need to continue





#





if is_optimal(result):





logging.debug(f"found optimal result after {iter} iterations")





return result





results.append(result)





return best_result(results)















def test_distribute():





members = [["a", 10, 5], ["b", 20, 25]]





assert distribute(members) == [(["a", 0, 1], [("b", 5)])]










members = [["a", 10, 5], ["b", 20, 28], ["c", 3, 0]]





assert distribute(members) == [(["a", 0, 1], [("b", 5)]), (["c", 0, 1], [("b", 3)])]










members = [["a", 10, 5], ["b", 20, 280], ["c", 3, 0]]





assert distribute(members) == [(["a", 0, 1], [("b", 5)]), (["c", 0, 1], [("b", 3)])]










members = [





["a", 10, 5],





["b", 20, 280],





["c", 3, 0],





["d", 3, 0],





["e", 3, 0],





["f", 3, 0],





["g", 3, 0],





["h", 3, 0],





["i", 3, 0],





["j", 3, 0],





["k", 3, 0],





]





expected = [





(["a", 0, 1], [("b", 5)]),





(["c", 0, 1], [("b", 3)]),





(["d", 0, 1], [("b", 3)]),





(["e", 0, 1], [("b", 3)]),





(["f", 0, 1], [("b", 3)]),





(["g", 0, 1], [("b", 3)]),





(["h", 0, 1], [("b", 3)]),





(["i", 0, 1], [("b", 3)]),





(["j", 0, 1], [("b", 3)]),





(["k", 0, 1], [("b", 3)]),





]





assert distribute(members) == expected















#





# The best result is the one with less transactions





#





def best_result(results):





best = None





best_transactions_count = None





for result in results:





transactions_count = sum(





[len(transactions) for receiver, transactions in result]





)





if (





best_transactions_count is None





or transactions_count < best_transactions_count





):





best_transactions_count = transactions_count





best = result





return best















def test_best_result():





result1 = [(["a", 0, 0], [("c", 5)]), (["b", 0, 0], [("c", 7), ("d", 3)])]





result2 = [(["a", 0, 0], [("c", 5)]), (["b", 0, 0], [("c", 7)])]





result3 = [(["a", 0, 0], [("c", 5)]), (["b", 0, 0], [("c", 6), ("d", 2), ("e", 2)])]





results = [





result1,





result2,





result3,





]





assert best_result(results) == result2




