用Python写的限制长度的队列

标签:Python

由于某些应用经常要限制列表的长度,所以就自己写了个类。
其实直接继承list更方便,不过我还是用包含来实现了,正好巩固一下内置操作。
私有成员我也没写2个下划线,需要的自己替换下就行了。
后面那一大段代码是单元测试,引入模块时可以删去。
class Queue(object):
  def __init__(self, iterable=[], size=10):
    '''
    iterable is an iterable object the queue should contain.

    size is the max number of objects the queue can contain.
    '''

    if not isinstance(size, int) and not isinstance(size, long) or size <= 0:
      raise TypeError('size must be an integer and at least 1')
    self._size = size

    if iterable is None:
      self._list = []

    else:

      if len(iterable) > size:
        iterable = iterable[-size:]

      self._list = Queue._getList(iterable)

  def __len__(self):
    return len(self._list)

  def __getitem__(self, key):
    return self._list[key]

  def __setitem__(self, key, value):
    if not isinstance(key, int) and not isinstance(key, long):
      raise TypeError('key must be an integer')
    if (-self._size <= key < self._size):
      self._list[key] = value
    else:
      raise IndexError('list assignment index out of range')

  def __delitem__(self, key):
    self._list.__delitem__(key)

  def __iter__(self):
    return self._list.__iter__()

  def __contains__(self, item):
    return item in self._list

  def __getslice__(self, i, j):
    return (self._list)[i:j]

  def __setslice__(self, i, j, sequence):
    self._list[i:j] = sequence
    self._limit()

  def __delslice__(self, i, j):
    self._list.__delslice__(i, j)

  def __str__(self):
    return str(self._list)

  def __repr__(self):
    return repr(self._list)

  def __eq__(self, other):
    if self is other:
      return True
    return self._list == Queue._getList(other)

  def __ne__(self, other):
    return self._list != Queue._getList(other)

  def __lt__(self, other):
    return self._list < Queue._getList(other)

  def __le__(self, other):
    return self._list <= Queue._getList(other)

  def __gt__(self, other):
    return self._list > Queue._getList(other)

  def __ge__(self, other):
    return self._list >= Queue._getList(other)

  def __hash__(self):
    raise TypeError('queue objects are unhashable')

  def append(self, item):
    self._list.append(item)
    self._limit()

  def extend(self, iterable):
    self._list.extend(iterable)
    self._limit()

  def count(self, item):
    return self._list.count(item)

  def index(self, *item):
    return self._list.index(*item)

  def insert(self, index, item):
    self._list.insert(index, item)
    self._limit()

  def pop(self, index=0):
    return self._list.pop(index)

  def remove(self, item):
    self._list.remove(item)

  def reverse(self):
    self._list.reverse()

  def sort(self, *args, **kw):
    self._list.sort(*args, **kw)

  def min(self):
    return min(self._list)

  def max(self):
    return max(self._list)

  def __add__(self, iterable):
    return Queue(self._list + Queue._getList(iterable), self._size)

  def __radd__(self, iterable):
    return iterable + self._list

  def __mul__(self, times):
    return Queue(self._list * times, self._size)

  def __rmul__(self, times):
    return Queue(self._list * times, self._size)

  def toList(self):
    return self._list[:]

  def toDeepList(self):
    from copy import deepcopy
    return deepcopy(self._list)

  def getSize(self):
    return self._size

  def setSize(self, size):
    if not isinstance(size, int) and not isinstance(size, long) or size <= 0:
      raise TypeError('size must be an integer and at least 1')
    if size < len(self):
      self._list = self._list[-size:]
    self._size = size

  @staticmethod
  def _getList(iterable):
    if isinstance(iterable, Queue):
      return iterable._list
    elif isinstance(iterable, list):
      return iterable
    else:
      try:
        return list(iterable)
      except:
        return iterable

  def _limit(self):
    size = self._size
    list = self._list
    if len(list) > size:
      self._list = list[-size:]


if __name__ == '__main__':

  import unittest

  class TestQueue(unittest.TestCase):

    def setUp(self):
      self.queue = Queue(range(5), 5)

    def testInit(self):
      self.assert_(Queue(range(3)))
      self.assert_(Queue(xrange(3)))
      self.assert_(Queue(range(10)))
      self.assert_(Queue(range(10), 5))
      self.assert_(Queue('12345', 3))
      self.assert_(Queue(Queue(range(3))))
      self.assert_(Queue([], 100000000000000) is not None)
      self.assert_(Queue(None, 2) is not None)
      self.assert_(Queue(size=5) is not None)

      self.assertRaises(TypeError, Queue, size=0)
      self.assertRaises(TypeError, Queue, size=-1)
      self.assertRaises(TypeError, Queue, size=2.5)
      self.assertRaises(TypeError, Queue, size='2')
      self.assertRaises(TypeError, Queue, iterable=1)
      self.assertRaises(TypeError, Queue, iterable=1, size='2')
      self.assertRaises(TypeError, Queue, 0, 0)
      self.assertRaises(TypeError, Queue, 1)
      self.assertRaises(TypeError, Queue, 1.2)

    def testEq(self):
      self.assertEqual(self.queue, self.queue)
      self.assertEqual(self.queue, range(5))
      self.assertEqual(self.queue, Queue(self.queue))
      self.assertEqual(self.queue, Queue(self.queue, 5))
      self.assertEqual(Queue(range(10)), range(10))
      self.assertEqual(Queue(range(10), 5), range(5, 10))
      self.assertEqual(Queue('12345', 3), ['3', '4', '5'])

    def testNe(self):
      self.assertNotEqual(self.queue, None)
      self.assertNotEqual(self.queue, range(1, 5))
      self.assertNotEqual(self.queue, '01234')

    def testLt(self):
      self.assert_(self.queue < Queue(range(6)))
      self.assert_(self.queue < range(6))

    def testLe(self):
      self.assert_(self.queue <= Queue(range(6)))
      self.assert_(self.queue <= range(6))
      self.assert_(self.queue <= Queue(range(5)))
      self.assert_(self.queue <= range(5))
      self.assert_(self.queue <= self.queue)

    def testGt(self):
      self.assert_(self.queue > Queue(range(4)))
      self.assert_(self.queue > range(4))

    def testGe(self):
      self.assert_(self.queue >= Queue(range(4)))
      self.assert_(self.queue >= range(4))
      self.assert_(self.queue >= Queue(range(5)))
      self.assert_(self.queue >= range(5))
      self.assert_(self.queue >= self.queue)

    def testGetItem(self):
      self.assertEqual(self.queue[0], 0)
      self.assertEqual(self.queue[-5], 0)
      self.assertEqual(self.queue[4], 4)
      self.assertEqual(self.queue[-1], 4)

      for index, item in enumerate(self.queue):
        self.assertEqual(self.queue[index], item)

      self.assertRaises(IndexError, self.queue.__getitem__, 5)
      self.assertRaises(IndexError, self.queue.__getitem__, -10)
      self.assertRaises(TypeError, self.queue.__getitem__, 1.0)

    def testSetItem(self):
      self.queue[0] = 10
      self.assertEqual(self.queue[0], 10)
      self.queue[4] = 20
      self.assertEqual(self.queue[4], 20)
      self.queue[-1] = 30
      self.assertEqual(self.queue[-1], 30)
      self.queue[-5] = 40
      self.assertEqual(self.queue[-5], 40)

      self.assertRaises(IndexError, self.queue.__setitem__, 5, 10)
      self.assertRaises(IndexError, self.queue.__setitem__, -6, 10)
      self.assertRaises(TypeError, self.queue.__setitem__, 1.0, 10)

    def testDelItem(self):
      del self.queue[0]
      self.assertEqual(self.queue, range(1, 5))
      self.assertEqual(len(self.queue), 4)
      del self.queue[-1]
      self.assertEqual(self.queue, range(1, 4))
      self.assertEqual(len(self.queue), 3)

      self.assertRaises(IndexError, self.queue.__delitem__, 5)
      self.assertRaises(IndexError, self.queue.__delitem__, -5)
      self.assertRaises(TypeError, self.queue.__delitem__, 1.0)

    def testAdd(self):
      self.assertEqual(self.queue + range(10), range(5, 10))
      self.assertEqual(self.queue + self.queue, range(5))
      self.assertEqual(self.queue + range(3) + range(3), range(1, 3) + range(3))
      self.assertEqual(self.queue, range(5))

      self.assertRaises(TypeError, self.queue.__add__, 1)

    def testRadd(self):
      self.assertEqual(range(5) + self.queue, range(5) * 2)
      self.assertEqual(range(5) + self.queue + range(5), range(5) * 3)
      self.assertEqual(self.queue, range(5))

      self.assertRaises(TypeError, self.queue.__radd__, 1)

    def testMul(self):
      self.assertEqual(self.queue * 5, range(5))
      self.assertEqual(self.queue * 2 * 5, range(5))
      self.assertEqual(self.queue, range(5))

      queue = Queue(range(3))
      self.assertEqual(queue * 0, [])
      self.assertEqual(queue * 1, range(3))
      self.assertEqual(queue * 2, range(3) * 2)
      self.assertEqual(queue * 3, range(3) * 3)
      self.assertEqual(queue * 4, [2] + range(3) * 3)
      self.assertEqual(queue * 2 * 2, [2] + range(3) * 3)
      self.assertEqual(queue, range(3))

      self.assertRaises(TypeError, self.queue.__mul__, '1')
      self.assertRaises(TypeError, self.queue.__mul__, 1.0)

    def testRmul(self):
      self.assertEqual(5 * self.queue, range(5))
      self.assertEqual(2 * self.queue * 5, range(5))
      self.assertEqual(self.queue, range(5))

      queue = Queue(range(3))
      self.assertEqual(0 * queue, [])
      self.assertEqual(1 * queue, range(3))
      self.assertEqual(2 * queue, range(3) * 2)
      self.assertEqual(3 * queue, range(3) * 3)
      self.assertEqual(4 * queue, [2] + range(3) * 3)
      self.assertEqual(2 * queue * 2, [2] + range(3) * 3)
      self.assertEqual(queue, range(3))

      self.assertRaises(TypeError, self.queue.__rmul__, '1')
      self.assertRaises(TypeError, self.queue.__rmul__, 1.0)

    def testIter(self):
      for index, item in enumerate(self.queue.__iter__()):
        self.assertEqual(index, item)

    def testContains(self):
      self.assert_(0 in self.queue)
      self.failIf(5 in self.queue)

    def testGetslice(self):
      self.assertEqual(self.queue[:], self.queue)
      self.assertEqual(self.queue[1:], range(1, 5))
      self.assertEqual(self.queue[-4:], range(1, 5))
      self.assertEqual(self.queue[:3], range(3))
      self.assertEqual(self.queue[:-2], range(3))
      self.assertEqual(self.queue[1:3], range(1, 3))
      self.assertEqual(self.queue[-4:-2], range(1, 3))
      self.assertEqual(self.queue[::-1], range(4, -1, -1))
      self.assertEqual(self.queue[4:1:-2], [4, 2])

    def testSetslice(self):
      self.queue[:] = range(3)
      self.assertEqual(self.queue, range(3))
      self.queue[:] = Queue(range(4))
      self.assertEqual(self.queue, range(4))
      self.queue[1:] = range(1, 5)
      self.assertEqual(self.queue, range(5))
      self.queue[:3] = [0] * 2
      self.assertEqual(self.queue, [0, 0, 3, 4])
      self.queue[:2] = [1] * 10
      self.assertEqual(self.queue, [1] * 3 + [3, 4])
      self.queue[-2:] = [2] * 2
      self.assertEqual(self.queue, [1] * 3 + [2] * 2)
      self.queue[:-4] = [3] * 2
      self.assertEqual(self.queue, [3] + [1] * 2 + [2] * 2)
      self.queue[1:-1] = [4] * 2
      self.assertEqual(self.queue, [3] + [4] * 2 + [2])

      self.assertRaises(TypeError, self.queue.__setslice__, None, None, -1, [])

    def testLen(self):
      self.assertEqual(len(self.queue), 5)

    def testAppend(self):
      self.queue.append(123)
      self.assertEqual(self.queue, range(1, 5) + [123])
      self.queue.append('abc')
      self.assertEqual(self.queue, range(2, 5) + [123, 'abc'])
      self.assertEqual(len(self.queue), 5)
      self.queue = Queue()
      self.assertEqual(len(self.queue), 0)
      self.queue.append(-1)
      self.assertEqual(len(self.queue), 1)

    def testInsert(self):
      self.queue.insert(0, 0)
      self.assertEqual(self.queue, range(5))
      self.queue.insert(5, 5)
      self.assertEqual(self.queue, range(1, 6))
      self.queue.insert(3, 0)
      self.assertEqual(self.queue, [2, 3, 0, 4, 5])
      self.queue.insert(1, 0)
      self.assertEqual(self.queue, [0, 3, 0, 4, 5])
      self.queue.insert(-1, 0)
      self.assertEqual(self.queue, [3, 0, 4, 0, 5])

    def testRemove(self):
      self.queue.remove(0)
      self.assertEqual(self.queue, range(1, 5))
      self.queue.remove(1)
      self.assertEqual(self.queue, range(2, 5))

      self.assertRaises(ValueError, self.queue.remove, 0)

      queue = Queue([0] * 3 + [1] * 5, 7)
      queue.remove(0)
      self.assertEqual(queue, [0] + [1] * 5)
      queue.remove(1)
      self.assertEqual(queue, [0] + [1] * 4)
      queue.remove(0)
      self.assertEqual(queue, [1] * 4)

    def testExtend(self):
      self.queue.extend(xrange(10))
      self.assertEqual(self.queue, range(5, 10))
      self.queue.extend((1, 2, 3))
      self.assertEqual(self.queue, [8, 9, 1, 2, 3])
      self.queue.extend(Queue([4, 5]))
      self.assertEqual(self.queue, range(1, 6))

    def testPop(self):
      item = self.queue.pop()
      self.assertEqual(item, 0)
      self.assertEqual(len(self.queue), 4)
      self.assertEqual(self.queue, range(1, 5))

      item = self.queue.pop()
      self.assertEqual(item, 1)
      self.assertEqual(len(self.queue), 3)
      self.assertEqual(self.queue, range(2, 5))

      self.queue.pop()
      self.queue.pop()
      self.queue.pop()
      self.assertRaises(IndexError, self.queue.pop)

    def testCount(self):
      self.assertEqual(self.queue.count(0), 1)
      self.assertEqual(self.queue.count(1), 1)
      self.assertEqual(self.queue.count(-1), 0)

      queue = Queue([0] * 3 + [1] * 5, 7)
      self.assertEqual(queue.count(0), 2)
      self.assertEqual(queue.count(1), 5)
      self.assertEqual(queue.count(-1), 0)

    def testIndex(self):
      for index, item in enumerate(self.queue):
        self.assertEqual(index, self.queue.index(item))

      self.assertRaises(ValueError, self.queue.index, 10)

    def testReverse(self):
      self.queue.reverse()
      self.assertEqual(self.queue, range(4, -1, -1))

    def testSort(self):
      self.queue.sort()
      self.assertEqual(self.queue, range(5))

      self.queue.reverse()
      self.queue.sort()
      self.assertEqual(self.queue, range(5))

      self.queue.sort(reverse=True)
      self.assertEqual(self.queue, range(4, -1, -1))

      self.queue.sort(cmp=lambda x,y: cmp(x, y))
      self.assertEqual(self.queue, range(5))

      self.queue.sort(cmp=lambda x,y: cmp(y, x))
      self.assertEqual(self.queue, range(4, -1, -1))

      self.queue.sort(cmp=lambda x,y: cmp(y, x), reverse=True)
      self.assertEqual(self.queue, range(5))

      queue = Queue([3, 9934, 23, 43, 9, 32, 1, -324, 0.2], 5)
      queue.sort()
      self.assertEqual(queue, [-324, 0.2, 1, 9, 32])

    def testMin(self):
      self.assertEqual(self.queue.min(), 0)
      queue = Queue([3, 9934, 23, 43, 9, 32, 1, -324, 0.2], 5)
      self.assertEqual(queue.min(), -324)

    def testMax(self):
      self.assertEqual(self.queue.max(), 4)
      queue = Queue([3, 9934, 23, 43, 9, 32, 1, -324, 0.2], 5)
      self.assertEqual(queue.max(), 32)

    def testSetSize(self):
      self.queue.setSize(8)
      self.assertEqual(self.queue.getSize(), 8)
      self.assertEqual(self.queue * 2, range(2, 5) + range(5))

      self.queue.setSize(4)
      self.assertEqual(self.queue, range(1, 5))

      self.assertRaises(TypeError, self.queue.setSize, 0)
      self.assertRaises(TypeError, self.queue.setSize, "1")

    def testToList(self):
      self.assert_(isinstance(self.queue.toList(), list))
      self.assertEqual(self.queue.toList(), self.queue)
      self.assertEqual(self.queue.toList(), range(5))

      testList = [TestQueue.Test(TestQueue.Test(1)), TestQueue.Test([TestQueue.Test(9), TestQueue.Test(8)])]

      queue = Queue(testList)
      self.assertEqual(queue.toList(), testList)
      self.assert_(queue.toList()[0] is testList[0])
      self.assert_(queue.toList()[1] is testList[1])

    def testToDeepList(self):
      self.assert_(isinstance(self.queue.toDeepList(), list))
      self.assertEqual(self.queue.toDeepList(), self.queue)
      self.assertEqual(self.queue.toDeepList(), range(5))

      testList = [TestQueue.Test(TestQueue.Test(1)), TestQueue.Test([TestQueue.Test(9), TestQueue.Test(8)])]

      queue = Queue(testList)
      self.assertEqual(queue.toDeepList(), testList)
      self.failIf(queue.toDeepList()[0] is testList[0])
      self.failIf(queue.toDeepList()[0].data is testList[0].data)
      self.failIf(queue.toDeepList()[1] is testList[1])
      self.failIf(queue.toDeepList()[1].data is testList[1].data)
      self.failIf(queue.toDeepList()[1].data[0] is testList[1].data[0])
      self.failIf(queue.toDeepList()[1].data[1] is testList[1].data[1])

    class Test(object):
      def __init__(self, data):
        self.data = data

      def __eq__(self, other):
        return self.data == other.data

  unittest.main()
顺便贴出测试结果:
.................................
----------------------------------------------------------------------
Ran 33 tests in 0.015s

OK

1条评论 你不来一发么↓ 顺序排列 倒序排列

    向下滚动可载入更多评论,或者点这里禁止自动加载

    想说点什么呢?