This repository was archived by the owner on Nov 5, 2019. It is now read-only.
Description I've set up a simple asyncmongo/zeromq router to act as a message queue as well as manage writes to my datastore.
what I'm finding is that the connections are getting released or reused, so it eventually hits maxconnections and stops.
Here's what I'm working with. You can pass a few messages then it stops and starts throwing TooManyConnections
#!chatforus-ve/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2011 Joseph Bowman
#
# Posting this in a Google group so I guess
# it's public domain now.
#
import asyncmongo
import zmq
from zmq.eventloop import ioloop, zmqstream
from tornado import escape
from tornado import gen
import datetime
loop = ioloop.IOLoop.instance()
db = asyncmongo.Client(pool_id='mydb', host='127.0.0.1', port=27017, maxconnections=5, dbname='chatforus')
ctx = zmq.Context()
receiver = ctx.socket(zmq.PULL)
broadcast = ctx.socket(zmq.PUSH)
receiver.bind('tcp://127.0.0.1:5555')
broadcast.bind('tcp://127.0.0.1:5556')
receiver_stream = zmqstream.ZMQStream(receiver, loop)
broadcast_stream = zmqstream.ZMQStream(broadcast, loop)
@gen.engine
def handle_message(msg):
broadcast_stream.send(msg[0])
message = escape.json_decode(msg[0])
if message["type"] == "msg":
message["ts"] = datetime.datetime.now()
(response,), error = yield gen.Task(db.msgs.insert, message)
receiver_stream.on_recv(handle_message)
loop.start()