477 lines
15 KiB
Python
477 lines
15 KiB
Python
# py.test3 logclilevel=DEBUG vv share.py




import copy


import math


import logging


import itertools




#


# Implementation of the Hostea revenue sharing model


# See https://forum.hostea.org/t/decisionrevenuesharingmodel/92


#


# The members argument is a list of Hostea members, represented as a list:


# [


# [


# ID, # username in the https://gitea.hostea.org/Hostea/organization repository


# EXPENSE, # integer, expense elligible for payment


# INCOME, # integer, total available income


# ],


# ...


# ]


#


# The function returns which share of the pending expenses each member


# is allowed to redeem and, if necessary, how much to invoice other


# peer members to get the rest.


#


# share_income(members) => (shares, payments)


#


# The 'shares' are as follows:


#


# [


# [ID, EXPENSE, INCOME],


# ...


# }


#


# For instance in the if the 'members' argument is [['isabelle', 200, 10]], the


# shares will be [['isabelle', 10, 10]] where the 200 expense was reduced to a


# share of 10 because there only is 10 income available.


#


# The 'payments' maps member IDs to the amount other peer members must be invoiced:


#


# {


# ID: [(ID, integer), (ID, integer), ...],


# ...


# }


#


# For instance in the following marie will invoice isable for 200


# and john for 50 while lucie will invoice john for 10:


#


# {


# 'marie': [('isabelle', 200), ('john', 50)],


# 'lucie': [('john', 10)],


# }


#


# If a member has both income and expenses, they are not required to invoice


# themselves, it is implicit. For instance, if the 'payable' return value is


# [['isabelle', 10, 10]], the 'payments' return value will be empty and isabelle


# will be expected to reduce both her income and her expenses by 10.


#


def share_income(members):


payable = payable_expenses(members)


payments = {


member[ID]: paying_members


for member, paying_members in distribute(copy.deepcopy(payable))


}


return (payable, payments)






def test_share_income():


members = [["a", 10, 5], ["b", 450, 25], ["c", 0, 0]]


payable, payments = share_income(members)


assert payable == [["a", 10, 5], ["b", 20, 25]]


assert payments == {"a": [("b", 5)]}






######################################################################


#


# Structure layout and utilities


#


######################################################################




ID = 0


EXPENSE = 1


INCOME = 2


OPTIMUM_TRANSACTIONS = 2






def total(members, position):


return sum(member[position] for member in members)






def total_expense(members):


return total(members, EXPENSE)






def total_income(members):


return total(members, INCOME)






######################################################################


#


# Figure out the share each member is entitled to


#


######################################################################


#


# For a given income, return a list of the members who will not be paid in full


# and how much will remain in their expense balance.


#


# The members list given in argument is a list of pairs of


#


# ['member', EXPENSE]


#


# Where 'member' is the unique id of the member and EXPENSE is a


# positive integer.


#


# The income argument must be an integer lower or equal to the sum of


# all members EXPENSE.


#


# The returned member list is a subset of the members list given in


# argument and only contains the members that cannot be paid in full


# with the income given in argument. For instance:


#


# unpaid_expenses(3, [['a', 2], ['b', 2]]) == [['b', 1]]


#


# Means that with an income of 3, only member 'a' can be paid in full and


# the remaining expense for member 'b' is 1.


#


def unpaid_expenses(income, members):


if income <= 0:


#


# Recursion termination: no more income to share, none of the


# remaining members get anything


#


return members


if income <= len(members):


#


# The income is an integer number: when it cannot be divided


# among members (2/3 == 0), some of them get 1 and the others


# get nothing. If the income is consistently very low some


# members to never get paid but the amount of money involved


# would then be so low that no member would care.


#


share = 1


count = income


else:


#


# The share that each member will get at this stage of the


# recursion is the lowest expense. If there is not enough


# income to give each member the lowest expense, it is divided


# evenly.


#


share = min(


int(income / len(members)), min(members, key=lambda m: m[EXPENSE])[EXPENSE]


)


count = len(members)


#


# remaining is the list of members that cannot be paid in full


# with the share.


#


remaining = []


for i in range(count):


m = members[i]


m[EXPENSE] = share


if m[EXPENSE] > 0:


#


# If the member needs more to be paid in full, they are


# elligible to participate in the next recursion round.


#


remaining.append(m)


#


# This is the border case when income <= len(members): some


# members cannot get their share because there is not enough


# income to provide an equal share of 1 to everyone and were


# excluded from the loop above. They are added because they could


# not be paid in full.


#


remaining.extend(members[count:])


#


# Now that each member got an equal share (except for the border


# case above) recurse, but only with the members that expect to be


# paid more.


#


return unpaid_expenses(income  share * count, remaining)






def test_unpaid_expenses():


assert unpaid_expenses(1, [["a", 1], ["b", 1]]) == [["b", 1]]


assert unpaid_expenses(5, [["a", 10], ["b", 2]]) == [["a", 7]]


assert unpaid_expenses(5, [["a", 2], ["b", 10], ["c", 1]]) == [["b", 8]]


assert unpaid_expenses(5, [["a", 2], ["b", 10], ["c", 1], ["d", 40]]) == [


["b", 9],


["d", 39],


]


assert unpaid_expenses(5, [["a", 2], ["b", 10], ["c", 1], ["d", 40], ["e", 3]]) == [


["a", 1],


["b", 9],


["d", 39],


["e", 2],


]


assert unpaid_expenses(


5, [["a", 2], ["b", 10], ["c", 1], ["d", 40], ["e", 3], ["f", 1]]


) == [["a", 1], ["b", 9], ["d", 39], ["e", 2], ["f", 1]]






def payable_expenses(members):


members = copy.deepcopy(members)


id2members = {m[ID]: m for m in members}


for unpaid in unpaid_expenses(


min(total_expense(members), total_income(members)), copy.deepcopy(members)


):


id2members[unpaid[ID]][EXPENSE] = unpaid[EXPENSE]


return [m for m in id2members.values() if m[EXPENSE] > 0 or m[INCOME] > 0]






def test_payable_expenses():


members = [["a", 1, 0], ["b", 1, 1], ["c", 0, 0]]


expected = [["a", 1, 0], ["b", 0, 1]]


assert payable_expenses(members) == expected






######################################################################


#


# Distribute the income from senders to receivers, nothing tricky here


#


######################################################################






def distribute_to_receivers(receivers, senders):


result = []


for receiver in receivers:


paying_members, senders = get_paid(receiver, senders)


result.append((receiver, paying_members))


return result






def test_distribute_to_receivers():


receivers = [["a", 5, 0], ["b", 10, 0]]


senders = [["c", 0, 12], ["d", 0, 3]]


assert distribute_to_receivers(receivers, senders) == [


(["a", 0, 0], [("c", 5)]),


(["b", 0, 0], [("c", 7), ("d", 3)]),


]




receivers = [["b", 12, 0], ["a", 5, 0]]


senders = [["c", 0, 10], ["d", 0, 5], ["e", 0, 1], ["f", 0, 1]]


expected = [


(["b", 0, 0], [("c", 10), ("d", 2)]),


(["a", 0, 0], [("d", 3), ("e", 1), ("f", 1)]),


]


assert distribute_to_receivers(receivers, senders) == expected




receivers = [["a", 5, 0], ["b", 12, 0]]


senders = [["c", 0, 10], ["d", 0, 5], ["e", 0, 1], ["f", 0, 1]]


expected = [


(["a", 0, 0], [("c", 5)]),


(["b", 0, 0], [("c", 5), ("d", 5), ("e", 1), ("f", 1)]),


]


assert distribute_to_receivers(receivers, senders) == expected






def get_paid(who, members):


paying_members = []


result = []


for member in sorted(members, key=lambda m: m[INCOME], reverse=True):


pay = min(member[INCOME], who[EXPENSE])


if pay > 0:


paying_members.append((member[ID], pay))


member[INCOME] = pay


who[EXPENSE] = pay


result.append(member)


assert who[EXPENSE] == 0


return (paying_members, result)






def test_get_paid():


orig = [["a", 10, 0], ["c", 0, 6], ["b", 0, 10]]


paying_members, modified = get_paid(orig[0], orig)


assert paying_members == [("b", 10)]


assert modified == [["b", 0, 0], ["c", 0, 6], ["a", 0, 0]]




orig = [["a", 10, 0], ["c", 0, 6], ["b", 0, 4]]


paying_members, modified = get_paid(orig[0], orig)


assert paying_members == [("c", 6), ("b", 4)]


assert modified == [["c", 0, 0], ["b", 0, 0], ["a", 0, 0]]




orig = [["a", 10, 0], ["c", 0, 8], ["b", 0, 4], ["d", 2, 0]]


paying_members, modified = get_paid(orig[0], orig)


assert paying_members == [("c", 8), ("b", 2)]


assert modified == [["c", 0, 0], ["b", 0, 2], ["a", 0, 0], ["d", 2, 0]]


paying_members, modified = get_paid(orig[3], orig)


assert paying_members == [("b", 2)]


assert modified == [["b", 0, 0], ["a", 0, 0], ["c", 0, 0], ["d", 0, 0]]






def self_get_paid(members):


for member in members:


pay = min(member[INCOME], member[EXPENSE])


member[INCOME] = pay


member[EXPENSE] = pay


return [m for m in members if m[EXPENSE] > 0 or m[INCOME] > 0]






def test_self_get_paid():


members = [["a", 5, 10], ["b", 20, 10], ["c", 0, 30], ["d", 40, 0]]


expected = [["a", 0, 5], ["b", 10, 0], ["c", 0, 30], ["d", 40, 0]]


assert self_get_paid(members) == expected






######################################################################


#


# For each receiver (i.e. a member that will have expenses paid), set


# the number of transactions they would need in the ideal situation


# where they would get paid by the senders who have the most income


# before anyone else.


#


# It will be used to stop iterating as soon as an optimal situation


# is detected instead of exploring all permutations.


#


######################################################################






def set_optimum_transactions(receivers, senders):


result = []


for receiver in receivers:


expense = receiver[EXPENSE]


optimum = 0


for sender in sorted(senders, key=lambda m: m[INCOME], reverse=True):


optimum += 1


expense = sender[INCOME]


if expense <= 0:


break


assert expense <= 0


receiver[OPTIMUM_TRANSACTIONS] = optimum


result.append(receiver)


return result






def test_set_optimum_transactions():


receivers = [["a", 5, 0], ["b", 12, 0]]


senders = [["c", 0, 10], ["d", 0, 5]]


expected = [["a", 5, 1], ["b", 12, 2]]


modified = set_optimum_transactions(receivers, senders)


assert modified == expected






def is_optimal(result):


for receiver, paying_members in result:


if receiver[OPTIMUM_TRANSACTIONS] > len(paying_members):


return False


return True






######################################################################


#


# Computes which members must be invoiced to pay the expenses to other


# members, with a minimum number of transactions. The algorithm is based


# on the following assumptions:


#


# * The total of all expenses is not greater than the total of all


# incomes.


#


# * The members with are sorted with greater income first and


# permutations where a better solution could be found with a


# different ordering are ignored.


#


# * Iterating over the permutations of the list of members that are


# allowed to get a share will produce a solution that is good enough.


#


# * An optimal solution (see is_optimal) will be found before


# iterating over a significant part of the permutations when there


# is a large (>10) number of members.


#


# The members argument is a list of Hostea members, represented as a list:


# [


# [


# ID, # username in the https://gitea.hostea.org/Hostea/organization repository


# EXPENSE, # integer, expense elligible for payment


# INCOME, # integer, total available income


# ],


# ...


# ]


#


#


######################################################################






def distribute(members):


members = self_get_paid(members)


senders = [m for m in members if m[INCOME] > 0]


receivers = [m for m in members if m[EXPENSE] > 0]


receivers = set_optimum_transactions(receivers, senders)


logging.debug(


f"distribute {receivers} to {senders} with at most {math.factorial(len(receivers))} iterations"


)


results = []


iter = 0


for receivers in itertools.permutations(receivers):


iter += 1


result = distribute_to_receivers(receivers, senders)


#


# There may be other solutions but since this one is optimal,


# no need to continue


#


if is_optimal(result):


logging.debug(f"found optimal result after {iter} iterations")


return result


results.append(result)


return best_result(results)






def test_distribute():


members = [["a", 10, 5], ["b", 20, 25]]


assert distribute(members) == [(["a", 0, 1], [("b", 5)])]




members = [["a", 10, 5], ["b", 20, 28], ["c", 3, 0]]


assert distribute(members) == [(["a", 0, 1], [("b", 5)]), (["c", 0, 1], [("b", 3)])]




members = [["a", 10, 5], ["b", 20, 280], ["c", 3, 0]]


assert distribute(members) == [(["a", 0, 1], [("b", 5)]), (["c", 0, 1], [("b", 3)])]




members = [


["a", 10, 5],


["b", 20, 280],


["c", 3, 0],


["d", 3, 0],


["e", 3, 0],


["f", 3, 0],


["g", 3, 0],


["h", 3, 0],


["i", 3, 0],


["j", 3, 0],


["k", 3, 0],


]


expected = [


(["a", 0, 1], [("b", 5)]),


(["c", 0, 1], [("b", 3)]),


(["d", 0, 1], [("b", 3)]),


(["e", 0, 1], [("b", 3)]),


(["f", 0, 1], [("b", 3)]),


(["g", 0, 1], [("b", 3)]),


(["h", 0, 1], [("b", 3)]),


(["i", 0, 1], [("b", 3)]),


(["j", 0, 1], [("b", 3)]),


(["k", 0, 1], [("b", 3)]),


]


assert distribute(members) == expected






#


# The best result is the one with less transactions


#


def best_result(results):


best = None


best_transactions_count = None


for result in results:


transactions_count = sum(


[len(transactions) for receiver, transactions in result]


)


if (


best_transactions_count is None


or transactions_count < best_transactions_count


):


best_transactions_count = transactions_count


best = result


return best






def test_best_result():


result1 = [(["a", 0, 0], [("c", 5)]), (["b", 0, 0], [("c", 7), ("d", 3)])]


result2 = [(["a", 0, 0], [("c", 5)]), (["b", 0, 0], [("c", 7)])]


result3 = [(["a", 0, 0], [("c", 5)]), (["b", 0, 0], [("c", 6), ("d", 2), ("e", 2)])]


results = [


result1,


result2,


result3,


]


assert best_result(results) == result2
