350 lines
12 KiB
Python
Executable File
350 lines
12 KiB
Python
Executable File
# The contents of this file are subject to the BitTorrent Open Source License
|
|
# Version 1.1 (the License). You may not copy or use this file, in either
|
|
# source code or executable form, except in compliance with the License. You
|
|
# may obtain a copy of the License at http://www.bittorrent.com/license/.
|
|
#
|
|
# Software distributed under the License is distributed on an AS IS basis,
|
|
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
|
|
# for the specific language governing rights and limitations under the
|
|
# License.
|
|
|
|
from BitTorrent.platform import bttime as time
|
|
|
|
import const
|
|
|
|
from khash import intify
|
|
from ktable import KTable, K
|
|
from util import unpackNodes
|
|
from krpc import KRPCProtocolError, KRPCSelfNodeError
|
|
from bisect import insort
|
|
|
|
class NodeWrap(object):
|
|
def __init__(self, node, target):
|
|
self.num = target
|
|
self.node = node
|
|
|
|
def __cmp__(self, o):
|
|
""" this function is for sorting nodes relative to the ID we are looking for """
|
|
x, y = self.num ^ o.num, self.num ^ self.node.num
|
|
if x > y:
|
|
return 1
|
|
elif x < y:
|
|
return -1
|
|
return 0
|
|
|
|
class ActionBase(object):
|
|
""" base class for some long running asynchronous proccesses like finding nodes or values """
|
|
def __init__(self, table, target, callback, callLater):
|
|
self.table = table
|
|
self.target = target
|
|
self.callLater = callLater
|
|
self.num = intify(target)
|
|
self.found = {}
|
|
self.foundq = []
|
|
self.queried = {}
|
|
self.queriedip = {}
|
|
self.answered = {}
|
|
self.callback = callback
|
|
self.outstanding = 0
|
|
self.finished = 0
|
|
|
|
def sort(self, a, b):
|
|
""" this function is for sorting nodes relative to the ID we are looking for """
|
|
x, y = self.num ^ a.num, self.num ^ b.num
|
|
if x > y:
|
|
return 1
|
|
elif x < y:
|
|
return -1
|
|
return 0
|
|
|
|
def shouldQuery(self, node):
|
|
if node.id == self.table.node.id:
|
|
return False
|
|
elif (node.host, node.port) not in self.queriedip and node.id not in self.queried:
|
|
self.queriedip[(node.host, node.port)] = 1
|
|
self.queried[node.id] = 1
|
|
return True
|
|
return False
|
|
|
|
def _cleanup(self):
|
|
self.foundq = None
|
|
self.found = None
|
|
self.queried = None
|
|
self.queriedip = None
|
|
|
|
def goWithNodes(self, t):
|
|
pass
|
|
|
|
|
|
|
|
FIND_NODE_TIMEOUT = 15
|
|
|
|
class FindNode(ActionBase):
|
|
""" find node action merits it's own class as it is a long running stateful process """
|
|
def handleGotNodes(self, dict):
|
|
_krpc_sender = dict['_krpc_sender']
|
|
dict = dict['rsp']
|
|
sender = {'id' : dict["id"]}
|
|
sender['port'] = _krpc_sender[1]
|
|
sender['host'] = _krpc_sender[0]
|
|
sender = self.table.Node().initWithDict(sender)
|
|
try:
|
|
l = unpackNodes(dict.get("nodes", []))
|
|
if not self.answered.has_key(sender.id):
|
|
self.answered[sender.id] = sender
|
|
except:
|
|
l = []
|
|
self.table.invalidateNode(sender)
|
|
|
|
if self.finished:
|
|
# a day late and a dollar short
|
|
return
|
|
self.outstanding = self.outstanding - 1
|
|
for node in l:
|
|
n = self.table.Node().initWithDict(node)
|
|
if not self.found.has_key(n.id):
|
|
self.found[n.id] = n
|
|
insort(self.foundq, NodeWrap(n, self.num))
|
|
self.table.insertNode(n, contacted=0)
|
|
self.schedule()
|
|
|
|
def schedule(self):
|
|
"""
|
|
send messages to new peers, if necessary
|
|
"""
|
|
if self.finished:
|
|
return
|
|
l = [wrapper.node for wrapper in self.foundq[:K]]
|
|
for node in l:
|
|
if node.id == self.target:
|
|
self.finished=1
|
|
return self.callback([node])
|
|
if self.shouldQuery(node):
|
|
#xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
|
|
try:
|
|
df = node.findNode(self.target, self.table.node.id)
|
|
except KRPCSelfNodeError:
|
|
pass
|
|
else:
|
|
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
|
|
self.outstanding = self.outstanding + 1
|
|
if self.outstanding >= const.CONCURRENT_REQS:
|
|
break
|
|
assert(self.outstanding) >=0
|
|
if self.outstanding == 0:
|
|
## all done!!
|
|
self.finished=1
|
|
self._cleanup()
|
|
self.callLater(self.callback, 0, (l[:K],))
|
|
|
|
def makeMsgFailed(self, node):
|
|
return self._defaultGotNodes
|
|
|
|
def _defaultGotNodes(self, err):
|
|
self.outstanding = self.outstanding - 1
|
|
self.schedule()
|
|
|
|
def goWithNodes(self, nodes):
|
|
"""
|
|
this starts the process, our argument is a transaction with t.extras being our list of nodes
|
|
it's a transaction since we got called from the dispatcher
|
|
"""
|
|
for node in nodes:
|
|
if node.id == self.table.node.id:
|
|
continue
|
|
else:
|
|
self.found[node.id] = node
|
|
insort(self.foundq, NodeWrap(node, self.num))
|
|
self.schedule()
|
|
|
|
|
|
get_value_timeout = 15
|
|
class GetValue(FindNode):
|
|
def __init__(self, table, target, callback, callLater, find="findValue"):
|
|
FindNode.__init__(self, table, target, callback, callLater)
|
|
self.findValue = find
|
|
|
|
""" get value task """
|
|
def handleGotNodes(self, dict):
|
|
_krpc_sender = dict['_krpc_sender']
|
|
dict = dict['rsp']
|
|
sender = {'id' : dict["id"]}
|
|
sender['port'] = _krpc_sender[1]
|
|
sender['host'] = _krpc_sender[0]
|
|
sender = self.table.Node().initWithDict(sender)
|
|
|
|
if self.finished or self.answered.has_key(sender.id):
|
|
# a day late and a dollar short
|
|
return
|
|
self.outstanding = self.outstanding - 1
|
|
|
|
self.answered[sender.id] = sender
|
|
# go through nodes
|
|
# if we have any closer than what we already got, query them
|
|
if dict.has_key('nodes'):
|
|
try:
|
|
l = unpackNodes(dict.get('nodes',[]))
|
|
except:
|
|
l = []
|
|
del(self.answered[sender.id])
|
|
|
|
for node in l:
|
|
n = self.table.Node().initWithDict(node)
|
|
if not self.found.has_key(n.id):
|
|
self.table.insertNode(n)
|
|
self.found[n.id] = n
|
|
insort(self.foundq, NodeWrap(n, self.num))
|
|
elif dict.has_key('values'):
|
|
def x(y, z=self.results):
|
|
if not z.has_key(y):
|
|
z[y] = 1
|
|
return y
|
|
else:
|
|
return None
|
|
z = len(dict.get('values', []))
|
|
v = filter(None, map(x, dict.get('values',[])))
|
|
if(len(v)):
|
|
self.callLater(self.callback, 0, (v,))
|
|
self.schedule()
|
|
|
|
## get value
|
|
def schedule(self):
|
|
if self.finished:
|
|
return
|
|
for node in [wrapper.node for wrapper in self.foundq[:K]]:
|
|
if self.shouldQuery(node):
|
|
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
|
|
try:
|
|
f = getattr(node, self.findValue)
|
|
except AttributeError:
|
|
print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
|
|
else:
|
|
try:
|
|
df = f(self.target, self.table.node.id)
|
|
df.addCallback(self.handleGotNodes)
|
|
df.addErrback(self.makeMsgFailed(node))
|
|
self.outstanding = self.outstanding + 1
|
|
self.queried[node.id] = 1
|
|
except KRPCSelfNodeError:
|
|
pass
|
|
if self.outstanding >= const.CONCURRENT_REQS:
|
|
break
|
|
assert(self.outstanding) >=0
|
|
if self.outstanding == 0:
|
|
## all done, didn't find it!!
|
|
self.finished=1
|
|
self._cleanup()
|
|
self.callLater(self.callback,0, ([],))
|
|
|
|
## get value
|
|
def goWithNodes(self, nodes, found=None):
|
|
self.results = {}
|
|
if found:
|
|
for n in found:
|
|
self.results[n] = 1
|
|
for node in nodes:
|
|
if node.id == self.table.node.id:
|
|
continue
|
|
else:
|
|
self.found[node.id] = node
|
|
insort(self.foundq, NodeWrap(node, self.num))
|
|
self.schedule()
|
|
|
|
|
|
class StoreValue(ActionBase):
|
|
def __init__(self, table, target, value, callback, callLater, store="storeValue"):
|
|
ActionBase.__init__(self, table, target, callback, callLater)
|
|
self.value = value
|
|
self.stored = []
|
|
self.store = store
|
|
|
|
def storedValue(self, t, node):
|
|
self.outstanding -= 1
|
|
if self.finished:
|
|
return
|
|
self.stored.append(t)
|
|
if len(self.stored) >= const.STORE_REDUNDANCY:
|
|
self.finished=1
|
|
self.callback(self.stored)
|
|
else:
|
|
if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
|
|
self.schedule()
|
|
return t
|
|
|
|
def storeFailed(self, t, node):
|
|
self.outstanding -= 1
|
|
if self.finished:
|
|
return t
|
|
self.schedule()
|
|
return t
|
|
|
|
def schedule(self):
|
|
if self.finished:
|
|
return
|
|
num = const.CONCURRENT_REQS - self.outstanding
|
|
if num > const.STORE_REDUNDANCY - len(self.stored):
|
|
num = const.STORE_REDUNDANCY - len(self.stored)
|
|
if num == 0 and not self.finished:
|
|
self.finished=1
|
|
self.callback(self.stored)
|
|
while num > 0:
|
|
try:
|
|
node = self.nodes.pop()
|
|
except IndexError:
|
|
if self.outstanding == 0:
|
|
self.finished = 1
|
|
self._cleanup()
|
|
self.callback(self.stored)
|
|
return
|
|
else:
|
|
if not node.id == self.table.node.id:
|
|
try:
|
|
f = getattr(node, self.store)
|
|
except AttributeError:
|
|
print ">>> %s doesn't have a %s method!" % (node, self.store)
|
|
else:
|
|
try:
|
|
df = f(self.target, self.value, self.table.node.id)
|
|
except KRPCProtocolError:
|
|
self.table.table.invalidateNode(node)
|
|
except KRPCSelfNodeError:
|
|
pass
|
|
else:
|
|
df.addCallback(self.storedValue,(),{'node':node})
|
|
df.addErrback(self.storeFailed, (), {'node':node})
|
|
self.outstanding += 1
|
|
num -= 1
|
|
|
|
def goWithNodes(self, nodes):
|
|
self.nodes = nodes
|
|
self.nodes.sort(self.sort)
|
|
self.schedule()
|
|
|
|
|
|
class GetAndStore(GetValue):
|
|
def __init__(self, table, target, value, callback, storecallback, callLater, find="findValue", store="storeValue"):
|
|
self.store = store
|
|
self.value = value
|
|
self.cb2 = callback
|
|
self.storecallback = storecallback
|
|
def cb(res):
|
|
self.cb2(res)
|
|
if not(res):
|
|
n = StoreValue(self.table, self.target, self.value, self.doneStored, self.callLater, self.store)
|
|
n.goWithNodes(self.answered.values())
|
|
GetValue.__init__(self, table, target, cb, callLater, find)
|
|
|
|
def doneStored(self, dict):
|
|
self.storecallback(dict)
|
|
|
|
class KeyExpirer:
|
|
def __init__(self, store, callLater):
|
|
self.store = store
|
|
self.callLater = callLater
|
|
self.callLater(self.doExpire, const.KEINITIAL_DELAY)
|
|
|
|
def doExpire(self):
|
|
self.cut = time() - const.KE_AGE
|
|
self.store.expire(self.cut)
|
|
self.callLater(self.doExpire, const.KE_DELAY)
|