用Python写的限制长度的队列
2009 4 20 01:06 AM 2973次查看
其实直接继承list更方便,不过我还是用包含来实现了,正好巩固一下内置操作。
私有成员我也没写2个下划线,需要的自己替换下就行了。
后面那一大段代码是单元测试,引入模块时可以删去。
class Queue(object):
def __init__(self, iterable=[], size=10):
'''
iterable is an iterable object the queue should contain.
size is the max number of objects the queue can contain.
'''
if not isinstance(size, int) and not isinstance(size, long) or size <= 0:
raise TypeError('size must be an integer and at least 1')
self._size = size
if iterable is None:
self._list = []
else:
if len(iterable) > size:
iterable = iterable[-size:]
self._list = Queue._getList(iterable)
def __len__(self):
return len(self._list)
def __getitem__(self, key):
return self._list[key]
def __setitem__(self, key, value):
if not isinstance(key, int) and not isinstance(key, long):
raise TypeError('key must be an integer')
if (-self._size <= key < self._size):
self._list[key] = value
else:
raise IndexError('list assignment index out of range')
def __delitem__(self, key):
self._list.__delitem__(key)
def __iter__(self):
return self._list.__iter__()
def __contains__(self, item):
return item in self._list
def __getslice__(self, i, j):
return (self._list)[i:j]
def __setslice__(self, i, j, sequence):
self._list[i:j] = sequence
self._limit()
def __delslice__(self, i, j):
self._list.__delslice__(i, j)
def __str__(self):
return str(self._list)
def __repr__(self):
return repr(self._list)
def __eq__(self, other):
if self is other:
return True
return self._list == Queue._getList(other)
def __ne__(self, other):
return self._list != Queue._getList(other)
def __lt__(self, other):
return self._list < Queue._getList(other)
def __le__(self, other):
return self._list <= Queue._getList(other)
def __gt__(self, other):
return self._list > Queue._getList(other)
def __ge__(self, other):
return self._list >= Queue._getList(other)
def __hash__(self):
raise TypeError('queue objects are unhashable')
def append(self, item):
self._list.append(item)
self._limit()
def extend(self, iterable):
self._list.extend(iterable)
self._limit()
def count(self, item):
return self._list.count(item)
def index(self, *item):
return self._list.index(*item)
def insert(self, index, item):
self._list.insert(index, item)
self._limit()
def pop(self, index=0):
return self._list.pop(index)
def remove(self, item):
self._list.remove(item)
def reverse(self):
self._list.reverse()
def sort(self, *args, **kw):
self._list.sort(*args, **kw)
def min(self):
return min(self._list)
def max(self):
return max(self._list)
def __add__(self, iterable):
return Queue(self._list + Queue._getList(iterable), self._size)
def __radd__(self, iterable):
return iterable + self._list
def __mul__(self, times):
return Queue(self._list * times, self._size)
def __rmul__(self, times):
return Queue(self._list * times, self._size)
def toList(self):
return self._list[:]
def toDeepList(self):
from copy import deepcopy
return deepcopy(self._list)
def getSize(self):
return self._size
def setSize(self, size):
if not isinstance(size, int) and not isinstance(size, long) or size <= 0:
raise TypeError('size must be an integer and at least 1')
if size < len(self):
self._list = self._list[-size:]
self._size = size
@staticmethod
def _getList(iterable):
if isinstance(iterable, Queue):
return iterable._list
elif isinstance(iterable, list):
return iterable
else:
try:
return list(iterable)
except:
return iterable
def _limit(self):
size = self._size
list = self._list
if len(list) > size:
self._list = list[-size:]
if __name__ == '__main__':
import unittest
class TestQueue(unittest.TestCase):
def setUp(self):
self.queue = Queue(range(5), 5)
def testInit(self):
self.assert_(Queue(range(3)))
self.assert_(Queue(xrange(3)))
self.assert_(Queue(range(10)))
self.assert_(Queue(range(10), 5))
self.assert_(Queue('12345', 3))
self.assert_(Queue(Queue(range(3))))
self.assert_(Queue([], 100000000000000) is not None)
self.assert_(Queue(None, 2) is not None)
self.assert_(Queue(size=5) is not None)
self.assertRaises(TypeError, Queue, size=0)
self.assertRaises(TypeError, Queue, size=-1)
self.assertRaises(TypeError, Queue, size=2.5)
self.assertRaises(TypeError, Queue, size='2')
self.assertRaises(TypeError, Queue, iterable=1)
self.assertRaises(TypeError, Queue, iterable=1, size='2')
self.assertRaises(TypeError, Queue, 0, 0)
self.assertRaises(TypeError, Queue, 1)
self.assertRaises(TypeError, Queue, 1.2)
def testEq(self):
self.assertEqual(self.queue, self.queue)
self.assertEqual(self.queue, range(5))
self.assertEqual(self.queue, Queue(self.queue))
self.assertEqual(self.queue, Queue(self.queue, 5))
self.assertEqual(Queue(range(10)), range(10))
self.assertEqual(Queue(range(10), 5), range(5, 10))
self.assertEqual(Queue('12345', 3), ['3', '4', '5'])
def testNe(self):
self.assertNotEqual(self.queue, None)
self.assertNotEqual(self.queue, range(1, 5))
self.assertNotEqual(self.queue, '01234')
def testLt(self):
self.assert_(self.queue < Queue(range(6)))
self.assert_(self.queue < range(6))
def testLe(self):
self.assert_(self.queue <= Queue(range(6)))
self.assert_(self.queue <= range(6))
self.assert_(self.queue <= Queue(range(5)))
self.assert_(self.queue <= range(5))
self.assert_(self.queue <= self.queue)
def testGt(self):
self.assert_(self.queue > Queue(range(4)))
self.assert_(self.queue > range(4))
def testGe(self):
self.assert_(self.queue >= Queue(range(4)))
self.assert_(self.queue >= range(4))
self.assert_(self.queue >= Queue(range(5)))
self.assert_(self.queue >= range(5))
self.assert_(self.queue >= self.queue)
def testGetItem(self):
self.assertEqual(self.queue[0], 0)
self.assertEqual(self.queue[-5], 0)
self.assertEqual(self.queue[4], 4)
self.assertEqual(self.queue[-1], 4)
for index, item in enumerate(self.queue):
self.assertEqual(self.queue[index], item)
self.assertRaises(IndexError, self.queue.__getitem__, 5)
self.assertRaises(IndexError, self.queue.__getitem__, -10)
self.assertRaises(TypeError, self.queue.__getitem__, 1.0)
def testSetItem(self):
self.queue[0] = 10
self.assertEqual(self.queue[0], 10)
self.queue[4] = 20
self.assertEqual(self.queue[4], 20)
self.queue[-1] = 30
self.assertEqual(self.queue[-1], 30)
self.queue[-5] = 40
self.assertEqual(self.queue[-5], 40)
self.assertRaises(IndexError, self.queue.__setitem__, 5, 10)
self.assertRaises(IndexError, self.queue.__setitem__, -6, 10)
self.assertRaises(TypeError, self.queue.__setitem__, 1.0, 10)
def testDelItem(self):
del self.queue[0]
self.assertEqual(self.queue, range(1, 5))
self.assertEqual(len(self.queue), 4)
del self.queue[-1]
self.assertEqual(self.queue, range(1, 4))
self.assertEqual(len(self.queue), 3)
self.assertRaises(IndexError, self.queue.__delitem__, 5)
self.assertRaises(IndexError, self.queue.__delitem__, -5)
self.assertRaises(TypeError, self.queue.__delitem__, 1.0)
def testAdd(self):
self.assertEqual(self.queue + range(10), range(5, 10))
self.assertEqual(self.queue + self.queue, range(5))
self.assertEqual(self.queue + range(3) + range(3), range(1, 3) + range(3))
self.assertEqual(self.queue, range(5))
self.assertRaises(TypeError, self.queue.__add__, 1)
def testRadd(self):
self.assertEqual(range(5) + self.queue, range(5) * 2)
self.assertEqual(range(5) + self.queue + range(5), range(5) * 3)
self.assertEqual(self.queue, range(5))
self.assertRaises(TypeError, self.queue.__radd__, 1)
def testMul(self):
self.assertEqual(self.queue * 5, range(5))
self.assertEqual(self.queue * 2 * 5, range(5))
self.assertEqual(self.queue, range(5))
queue = Queue(range(3))
self.assertEqual(queue * 0, [])
self.assertEqual(queue * 1, range(3))
self.assertEqual(queue * 2, range(3) * 2)
self.assertEqual(queue * 3, range(3) * 3)
self.assertEqual(queue * 4, [2] + range(3) * 3)
self.assertEqual(queue * 2 * 2, [2] + range(3) * 3)
self.assertEqual(queue, range(3))
self.assertRaises(TypeError, self.queue.__mul__, '1')
self.assertRaises(TypeError, self.queue.__mul__, 1.0)
def testRmul(self):
self.assertEqual(5 * self.queue, range(5))
self.assertEqual(2 * self.queue * 5, range(5))
self.assertEqual(self.queue, range(5))
queue = Queue(range(3))
self.assertEqual(0 * queue, [])
self.assertEqual(1 * queue, range(3))
self.assertEqual(2 * queue, range(3) * 2)
self.assertEqual(3 * queue, range(3) * 3)
self.assertEqual(4 * queue, [2] + range(3) * 3)
self.assertEqual(2 * queue * 2, [2] + range(3) * 3)
self.assertEqual(queue, range(3))
self.assertRaises(TypeError, self.queue.__rmul__, '1')
self.assertRaises(TypeError, self.queue.__rmul__, 1.0)
def testIter(self):
for index, item in enumerate(self.queue.__iter__()):
self.assertEqual(index, item)
def testContains(self):
self.assert_(0 in self.queue)
self.failIf(5 in self.queue)
def testGetslice(self):
self.assertEqual(self.queue[:], self.queue)
self.assertEqual(self.queue[1:], range(1, 5))
self.assertEqual(self.queue[-4:], range(1, 5))
self.assertEqual(self.queue[:3], range(3))
self.assertEqual(self.queue[:-2], range(3))
self.assertEqual(self.queue[1:3], range(1, 3))
self.assertEqual(self.queue[-4:-2], range(1, 3))
self.assertEqual(self.queue[::-1], range(4, -1, -1))
self.assertEqual(self.queue[4:1:-2], [4, 2])
def testSetslice(self):
self.queue[:] = range(3)
self.assertEqual(self.queue, range(3))
self.queue[:] = Queue(range(4))
self.assertEqual(self.queue, range(4))
self.queue[1:] = range(1, 5)
self.assertEqual(self.queue, range(5))
self.queue[:3] = [0] * 2
self.assertEqual(self.queue, [0, 0, 3, 4])
self.queue[:2] = [1] * 10
self.assertEqual(self.queue, [1] * 3 + [3, 4])
self.queue[-2:] = [2] * 2
self.assertEqual(self.queue, [1] * 3 + [2] * 2)
self.queue[:-4] = [3] * 2
self.assertEqual(self.queue, [3] + [1] * 2 + [2] * 2)
self.queue[1:-1] = [4] * 2
self.assertEqual(self.queue, [3] + [4] * 2 + [2])
self.assertRaises(TypeError, self.queue.__setslice__, None, None, -1, [])
def testLen(self):
self.assertEqual(len(self.queue), 5)
def testAppend(self):
self.queue.append(123)
self.assertEqual(self.queue, range(1, 5) + [123])
self.queue.append('abc')
self.assertEqual(self.queue, range(2, 5) + [123, 'abc'])
self.assertEqual(len(self.queue), 5)
self.queue = Queue()
self.assertEqual(len(self.queue), 0)
self.queue.append(-1)
self.assertEqual(len(self.queue), 1)
def testInsert(self):
self.queue.insert(0, 0)
self.assertEqual(self.queue, range(5))
self.queue.insert(5, 5)
self.assertEqual(self.queue, range(1, 6))
self.queue.insert(3, 0)
self.assertEqual(self.queue, [2, 3, 0, 4, 5])
self.queue.insert(1, 0)
self.assertEqual(self.queue, [0, 3, 0, 4, 5])
self.queue.insert(-1, 0)
self.assertEqual(self.queue, [3, 0, 4, 0, 5])
def testRemove(self):
self.queue.remove(0)
self.assertEqual(self.queue, range(1, 5))
self.queue.remove(1)
self.assertEqual(self.queue, range(2, 5))
self.assertRaises(ValueError, self.queue.remove, 0)
queue = Queue([0] * 3 + [1] * 5, 7)
queue.remove(0)
self.assertEqual(queue, [0] + [1] * 5)
queue.remove(1)
self.assertEqual(queue, [0] + [1] * 4)
queue.remove(0)
self.assertEqual(queue, [1] * 4)
def testExtend(self):
self.queue.extend(xrange(10))
self.assertEqual(self.queue, range(5, 10))
self.queue.extend((1, 2, 3))
self.assertEqual(self.queue, [8, 9, 1, 2, 3])
self.queue.extend(Queue([4, 5]))
self.assertEqual(self.queue, range(1, 6))
def testPop(self):
item = self.queue.pop()
self.assertEqual(item, 0)
self.assertEqual(len(self.queue), 4)
self.assertEqual(self.queue, range(1, 5))
item = self.queue.pop()
self.assertEqual(item, 1)
self.assertEqual(len(self.queue), 3)
self.assertEqual(self.queue, range(2, 5))
self.queue.pop()
self.queue.pop()
self.queue.pop()
self.assertRaises(IndexError, self.queue.pop)
def testCount(self):
self.assertEqual(self.queue.count(0), 1)
self.assertEqual(self.queue.count(1), 1)
self.assertEqual(self.queue.count(-1), 0)
queue = Queue([0] * 3 + [1] * 5, 7)
self.assertEqual(queue.count(0), 2)
self.assertEqual(queue.count(1), 5)
self.assertEqual(queue.count(-1), 0)
def testIndex(self):
for index, item in enumerate(self.queue):
self.assertEqual(index, self.queue.index(item))
self.assertRaises(ValueError, self.queue.index, 10)
def testReverse(self):
self.queue.reverse()
self.assertEqual(self.queue, range(4, -1, -1))
def testSort(self):
self.queue.sort()
self.assertEqual(self.queue, range(5))
self.queue.reverse()
self.queue.sort()
self.assertEqual(self.queue, range(5))
self.queue.sort(reverse=True)
self.assertEqual(self.queue, range(4, -1, -1))
self.queue.sort(cmp=lambda x,y: cmp(x, y))
self.assertEqual(self.queue, range(5))
self.queue.sort(cmp=lambda x,y: cmp(y, x))
self.assertEqual(self.queue, range(4, -1, -1))
self.queue.sort(cmp=lambda x,y: cmp(y, x), reverse=True)
self.assertEqual(self.queue, range(5))
queue = Queue([3, 9934, 23, 43, 9, 32, 1, -324, 0.2], 5)
queue.sort()
self.assertEqual(queue, [-324, 0.2, 1, 9, 32])
def testMin(self):
self.assertEqual(self.queue.min(), 0)
queue = Queue([3, 9934, 23, 43, 9, 32, 1, -324, 0.2], 5)
self.assertEqual(queue.min(), -324)
def testMax(self):
self.assertEqual(self.queue.max(), 4)
queue = Queue([3, 9934, 23, 43, 9, 32, 1, -324, 0.2], 5)
self.assertEqual(queue.max(), 32)
def testSetSize(self):
self.queue.setSize(8)
self.assertEqual(self.queue.getSize(), 8)
self.assertEqual(self.queue * 2, range(2, 5) + range(5))
self.queue.setSize(4)
self.assertEqual(self.queue, range(1, 5))
self.assertRaises(TypeError, self.queue.setSize, 0)
self.assertRaises(TypeError, self.queue.setSize, "1")
def testToList(self):
self.assert_(isinstance(self.queue.toList(), list))
self.assertEqual(self.queue.toList(), self.queue)
self.assertEqual(self.queue.toList(), range(5))
testList = [TestQueue.Test(TestQueue.Test(1)), TestQueue.Test([TestQueue.Test(9), TestQueue.Test(8)])]
queue = Queue(testList)
self.assertEqual(queue.toList(), testList)
self.assert_(queue.toList()[0] is testList[0])
self.assert_(queue.toList()[1] is testList[1])
def testToDeepList(self):
self.assert_(isinstance(self.queue.toDeepList(), list))
self.assertEqual(self.queue.toDeepList(), self.queue)
self.assertEqual(self.queue.toDeepList(), range(5))
testList = [TestQueue.Test(TestQueue.Test(1)), TestQueue.Test([TestQueue.Test(9), TestQueue.Test(8)])]
queue = Queue(testList)
self.assertEqual(queue.toDeepList(), testList)
self.failIf(queue.toDeepList()[0] is testList[0])
self.failIf(queue.toDeepList()[0].data is testList[0].data)
self.failIf(queue.toDeepList()[1] is testList[1])
self.failIf(queue.toDeepList()[1].data is testList[1].data)
self.failIf(queue.toDeepList()[1].data[0] is testList[1].data[0])
self.failIf(queue.toDeepList()[1].data[1] is testList[1].data[1])
class Test(object):
def __init__(self, data):
self.data = data
def __eq__(self, other):
return self.data == other.data
unittest.main()
顺便贴出测试结果:.................................
----------------------------------------------------------------------
Ran 33 tests in 0.015s
OK
向下滚动可载入更多评论,或者点这里禁止自动加载。