在Go和Python之间通过ActiveMQ通讯
踩到的坑
首先说一句老话:Windows就是善于制造别的操作系统中不存在的问题。
本来我这里有一个提供文件服务的东西是基于Python开发的,在Linux上运行的还行。但是最近因为需要部署到Windows上,就碰到了一个在别的平台不存在的问题:常用的WSGI服务器都不支持Windows或者支持得不好。而如果不用WSGI服务器来跑的话,并发处理又不行。
找了半天也没有找到什么像样的解决办法,只好用Go改写了,但是因为还是有很多后续处理不方便用Go改写(因为没有相关的库可用),所以折衷方案是Web端用Go,后端继续用Python,二者之间通过消息队列解耦。因为正好服务器上有一个给其它Java应用使用的ActiveMQ,就拿来用了。
本来改写还挺顺利的,但是结果踩到一个坑:Go的Stomp库文档不够详细,所以拿它的例子代码写了个DEMO结果调不通。搜了半天也不知道问题在哪里。只能自己看源码研究,还好最后还是搞通了。
一个DEMO
下面的DEMO里,Go版和Python分别订阅了一个队列,然后Go向Python的队列发一条消息,Python收到后再向Go的队列发送一条消息。
Golang
package main
import (
"time"
"github.com/go-stomp/stomp"
)
var config map[string]interface{}
var stop = make(chan bool)
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Host("demo"),
stomp.ConnOpt.HeartBeat(360*time.Second, 360*time.Second),
stomp.ConnOpt.HeartBeatError(360*time.Second),
}
func recvMessages(subscribed chan bool) {
conn, err := stomp.Dial("tcp", config["AMQ_SERVER"].(string), options...)
if err != nil {
println("Cannot connect to server: ", err.Error())
subscribed <- false
return
}
defer func() {
conn.Disconnect()
println("Go disconnected")
stop <- true
}()
sub, err := conn.Subscribe(config["AMQ_DEST_RESP"].(string), stomp.AckAuto)
if err != nil {
println("Cannot subscribe: ", config["AMQ_DEST_RESP"].(string), err.Error())
subscribed <- false
return
}
println("Go subscribed!")
subscribed <- true
close(subscribed)
msg := <- sub.C
println("Go received: ", string(msg.Body))
}
func sendMessage(content string) {
conn, err := stomp.Dial("tcp", config["AMQ_SERVER"].(string), options...)
if err != nil {
println("Cannot connect to server: ", err.Error())
return
}
defer func() {
conn.Disconnect()
stop <- true
}()
err = conn.Send(config["AMQ_DEST_REQ"].(string), "text/plain", []byte(content))
if err != nil {
println("Send fail: ", err.Error())
return
}
println("Go sent!")
}
func main() {
config = map[string]interface{} {
"AMQ_SERVER": "localhost:61613",
"AMQ_DEST_REQ": "demoRequest",
"AMQ_DEST_RESP": "demoResponse",
}
subscribed := make(chan bool)
go recvMessages(subscribed)
sub_res := <-subscribed
if sub_res {
println("Go Sending...")
go sendMessage("Hello world")
}
<-stop
<-stop
}
Python
# -*- coding:UTF-8 -*-
import atexit
import time
import logging
import stomp
__author__ = 'raptor'
__doc__ = """
"""
logger = logging.getLogger(__name__)
class AMQListener(stomp.ConnectionListener):
def __init__(self, amq):
super(AMQListener, self).__init__()
self.amq = amq
def on_error(self, headers, message):
logger.error('Python received an error "%s"' % message)
def on_message(self, headers, message):
logger.info("Python received: %s" % message)
self.amq.send("Hello ActiveMQ")
self.amq.term = True
class AMQConnection(object):
HOST = "localhost"
PORT = 61613
DEST_REQ = "demoRequest"
DEST_RESP = "demoResponse"
def __init__(self):
self.conn = stomp.Connection([(self.HOST, self.PORT)])
self.conn.set_listener(self.DEST_REQ, AMQListener(self))
self.conn.start()
self.conn.connect()
self.conn.subscribe(self.DEST_REQ, '1', 'auto')
logger.info("Python subscribed!")
atexit.register(self.close)
self.term = False
def run_forever(self):
while not self.term:
time.sleep(5)
def send(self, content):
logger.info("Python sending...")
self.conn.send(self.DEST_RESP, body=content)
logger.info("Python sent!")
def close(self):
self.conn.disconnect()
logger.info("Python disconnected")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
amq = AMQConnection()
amq.run_forever()
推送到[go4pro.org]