在GAE上使用服务器推(Channel API)+聊天室示例

标签:Python, Google App Engine

GAE SDK 1.4.0中最重大的更新就是Channel API了,呼唤了那么久终于出来了,不去尝试下可对不起自己,于是便将今天下午花费在这上面了…

先把聊天室示例放在这里(不保证长期有效),有兴趣的可以继续往下看:http://chat.latest.gaejava.appspot.com/

之前也曾介绍过服务器推(server push)的原理,还有很多术语和它表达的基本上是一类技术:HTTP长连接、Comet、Long polling等。(顺带一提,Channel API是靠iframe实现的,应该是重用了Google Talk的infrastructure;为了避免被GFW,具体地址我就不放出了。)更新:据Google的Ikai Lan介绍,Channel API采用了不同的技术,老式的浏览器是用long polling,而更现代的浏览器会采用Web Socket
所以这里就不再重复介绍了,只说一下Channel API和普通的HTTP长连接有何不同。
简单来说,server push是用HTTP长连接来实现的,而Channel API作为一种server push的实现,它也是依赖于HTTP长连接的。但Channel API有个局限:必须由用户(或者说浏览器)发出的POST请求来驱动,不能在后台自动发送数据。
由于存在这个限制,你就无法让服务器每隔1分钟自动抓下Twitter上的新Tweet发给你,也不能做一个定时刷怪的网游出来——除非让客户端定时发送请求。

了解这个限制后,如果你还感兴趣,那么就继续往下看吧~
正如Channel API文档所述,你必须了解这些术语:

Javascript Client:这个实际上就是你的浏览器,需要做这些事:
  • 用channel的唯一token来连接channel,与sever建立长连接。
  • 监听channel的数据并更新给用户。
  • 发送数据给sever。

Server:就是GAE服务器,它负责:
  • 为每个Javascript client创建唯一的channel。
  • 创建和发送唯一token给client。
  • 接收client发送的POST数据。
  • 通过channel来给client发送数据。

Client ID:用于唯一标识一个Javascript Client。注意它必须是ASCII字符,最多64个字符。

Tokens:用于让client连接channel。它包含Client ID和过期时间。

Channel:Server与client之间建立的通信通道。这个通道在过期前会一直打开,只能由server单方面地向client发送数据,client向server发送数据是靠普通的POST方法。

Message:Server与client之间发送的数据,一条message最大为32kb。

Socket:Javascript Client使用token来打开与server之间的socket,这个socket用于接收channel发送过来的信息。


文档中有几张图可以让你更好地理解它们之间的关系:


这张图是Javascript Client与Server之间建立Channel的示意图。
  1. Client先发送请求,告诉Server它要建立一个Channel。
  2. Server接收到请求后,根据Client ID来创建一个Channel,然后把这个Channel的token返回给Client。(实际上这个例子并不好,Client ID并不需要Client来发送,Server可以用登录的用户名或随机数来生成。)
  3. Client接收到token,用这个token来建立Socket,连接到Channel。


这张图是Javascript Client与Server之间传递消息的示意图。
  1. Client B向Server发送POST请求。
  2. Server接收到请求后,解析这个请求,并且将消息转发给指定的Channel。(根据你的需求,也可以转发给所有Channel)
  3. Client A从Socket接收到Channel发来的消息,调用onmessage方法来响应这个消息。


流程说完后,就来看如何用代码实现吧。由于文档中的例子比较复杂,而且在IE下有bug,因此我就拿自己的聊天室来说明了。

很明显,要使用它首先需要创建Channel并告诉客户端Channel所对应的token:
class GetTokenHandler(webapp.RequestHandler):
	def get(self):
		user = users.get_current_user()
		if user:
			channel_id = id = user.email()
		else:
			id = random.randint(1, 1000)
			channel_id = 'anonymous(%s)' % id
		token = channel.create_channel(channel_id)
		tokens = memcache.get('tokens') or {}
		tokens[token] = id
		memcache.set('tokens', tokens)
		self.response.out.write(token)
这里我简单地取用用户的email作为channel_id,如果用户没登录的话,就生成一个随机数来作为channel_id。
为了让这个例子显得简单,我就不去做错误处理了(例如email可能长于64个字符,随机数可能会重复)。
注意channel_id和token都不能丢弃,因为稍后我们还会用到,所以就放到memcache里了(要保证不丢失的话,自然得改用datastore)。
最后输出这个token给Client即可。

考虑客户端的代码,它可以先载入HTML,当DOM ready后,发送AJAX请求来获取token:
var token;
function get_token() {
	$.get('/get_token', function(data){
		token = data;
		openChannel();
	});
}
拿到token后,就可以用它来连接Channel了:
function openChannel() {
	var channel = new goog.appengine.Channel(token);
	var handler = {
	  'onopen': onOpen,
	  'onmessage': onMessage,
	  'onerror': function() {},
	  'onclose': function() {}
	};
	channel.open(handler);
}
可以看到这个连接过程是有2步的:
  1. 用new goog.appengine.Channel(token)来创建Channel对象。(注意:需要载入/_ah/channel/jsapi这个JavaScript文件才能使用这个类。)
  2. 构造一个handler来处理Channel发送的信息。
  3. 将handler作为参数,调用Channel对象的open方法来建立socket连接。

Handler一共可处理4种事件:
  • onopen:当socket成功建立时调用。是否定义它都无所谓,这里我让它通知服务器有新用户加入聊天室:
    function onOpen() {
    	$.post('/open', {'token': token});
    }
    class OpenHandler(webapp.RequestHandler):
    	def post(self):
    		token = self.request.get('token')
    		if not token:
    			return
    		tokens = memcache.get('tokens')
    		if tokens:
    			id = tokens.get(token, '')
    			if id:
    				if isinstance(id, int):
    					user_name = u'天朝匿名用户(%s)' % id
    				else:
    					user_name = id.split('@')[0]
    				message = user_name + u'加入了聊天室'
    				message = simplejson.dumps(message)
    				send_to_all(message, tokens)
    这里我让客户端发送token,服务器端则在memcache里根据token来获取Client ID,并用send_to_all()来广播通知其他Client有新用户加入:
    def send_to_all(message, tokens=None):
    	if not tokens:
    		tokens = memcache.get('tokens')
    	if tokens:
    		for token, id in tokens.iteritems():
    			if isinstance(id, int):
    				id = 'anonymous(%s)' % id
    			channel.send_message(id, message)
    注意就算没有用户接收Channel,channel.send_message()方法也不会抛出异常,所以需要自己来清除不再使用的channel。
    另外还得注意,send_message的参数是Client ID,而不是token。实际上token只在建立连接时有效,之后就没用了,但是我需要用它来获取Client ID,所以不能丢弃。
  • onmessage:当接收到Channel发送的消息时调用。你不实现它就没有意义了,这里我用它来显示用户发送的消息:
    function onMessage(m) {
    	var message = $.parseJSON(m.data);
    	message = message.replace(/&/g, '&amp;').replace(/</g, '&lt;').replace(/>/g, '&gt;');
    	$msg.prepend('<blockquote><pre>' + message + '</pre></blockquote>');
    }
    class ReceiveHandler(webapp.RequestHandler):
    	def post(self):
    		token = self.request.get('token')
    		if not token:
    			return
    		message = self.request.get('content')
    		if not message:
    			return
    		tokens = memcache.get('tokens')
    		if tokens:
    			id = tokens.get(token, '')
    			if id:
    				if isinstance(id, int):
    					user_name = u'天朝匿名用户(%s)' % id
    				else:
    					user_name = id.split('@')[0]
    				message = '%s: %s' % (user_name, message)
    				message = simplejson.dumps(message)
    				if len(message) > channel.MAXIMUM_MESSAGE_LENGTH:
    					return
    				send_to_all(message)
    注意接收到的数据是个JSON对象,它的data属性才是真正的message。由于不能发送非ASCII字符,所以服务器端我采用了JSON编码,客户端则还需要parseJSON。
  • onerror:socket出错时调用。为了简化,这里我就不做处理。
  • onclose:Channel关闭时调用。注意Channel建立后2小时会过期,此时会调用onclose和onerror,可以捕捉这个事件,再次获取token来连接Channel。为了简化,这里我就不做处理。

处理逻辑完成后,还要让客户端能够POST数据:
function submit() {
	$.ajax({
		url: '/post_msg',
		type: 'POST',
		data: {'token': token, 'content': $content.val()}
	});
	$content.val('').focus();
}

$content.keypress(function(e) {
	if (e.shiftKey && e.keyCode == 13) {
		submit();
		return false;
	}
});
$('#submit_msg').click(submit);
这段代码会调用服务端的ReceiveHandler.post(),从而与onmessage关联起来。

现在最主要的逻辑已经实现了,接下来做些其他的辅助功能。

正如上文所述,用户退出时Channel仍然是开放的,这会浪费遍历调用channel.send_message()的时间。
为此我捕捉了window.onbeforeunload事件来释放token(可惜Opera这个破浏览器不支持):
$(window).bind('beforeunload', function() {
	$.post('/del_token', {'token': token});
})
class ReleaseTokenHandler(webapp.RequestHandler):
	def post(self):
		token = self.request.get('token')
		if not token:
			return
		tokens = memcache.get('tokens')
		if tokens:
			id = tokens.get(token, '')
			if id:
				if isinstance(id, int):
					user_name = u'天朝匿名用户(%s)' % id
				else:
					user_name = id.split('@')[0]
				message = user_name + u'离开了聊天室'
				message = simplejson.dumps(message)
				del tokens[token]
				memcache.set('tokens', tokens)
				send_to_all(message, tokens)
这里不但释放了token,而且还可以通知其他用户有人离开了。
当然,最谨慎的实现是让客户端定时用AJAX访问服务器,服务器一段时间没有接收到用户发来的数据,就认为其断开,然后释放token。

然后是毫无技术含量的登录代码:
class LoginOrOut(webapp.RequestHandler):
	def get(self):
		if users.get_current_user():
			self.redirect(users.create_logout_url('/'))
		else:
			self.redirect(users.create_login_url('/'))

代码已经放到bitbucket了,稍微与本文中的不同,做了一些错误处理,但原理是一样的。

顺便提醒一下这2个限制:
  • 一个Channel只能用于一个Client。不过我在测试时,2个浏览器同时使用一个Channel也成功了。
  • 一个页面上,一个Client只能连接一个Channel。(这个是HTTP协议里规定的,其他server push实现也会有这种限制的。)
而且目前每天Channels Created的限制是8640个,所以不能弄万人同时在线的聊天室。开启支付后是95040,觉得不够的话应该是可以要求Google增加的。
而且channel.create_channel()会用掉2770ms的CPU API时间(响应时间只有几ms而已),比很多数据库的操作还昂贵。

最后还得说下,本地开发服务器和云端的服务器是不同的。本地测试时会用AJAX poll来保持链接,云端则是真正的HTTP长连接。
至于原因,我估计是本地开发服务器是单线程,长连接会一直阻塞,于是无法处理其他请求了。

4条评论 你不来一发么↓ 顺序排列 倒序排列

    向下滚动可载入更多评论,或者点这里禁止自动加载

    想说点什么呢?