Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions testfiles/displaySummary_workload.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[1] ADD,oY01WVirLr,63511.53
[2] QUOTE,oY01WVirLr,S
[3] SET_SELL_AMOUNT,oY01WVirLr,S,216.83
[4] BUY,oY01WVirLr,S,303.83
[5] SET_BUY_AMOUNT,oY01WVirLr,S,658.38
[6] SET_SELL_TRIGGER,oY01WVirLr,S,30.04
[7] ADD,oY01WVirLr,63511.53
[8] QUOTE,oY01WVirLr,S
[9] SET_SELL_AMOUNT,oY01WVirLr,S,216.83
[10] BUY,oY01WVirLr,S,303.83
[11] SET_BUY_AMOUNT,oY01WVirLr,S,658.38
[12] SET_SELL_TRIGGER,oY01WVirLr,S,30.04
[13] DISPLAY_SUMMARY,oY01WVirLr
16 changes: 16 additions & 0 deletions transactionServer/runcode/mqDatabaseServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class databaseFunctions:
RELEASE_PORTFOLIO = 13
BUY_TRIGGER = 14
SELL_TRIGGER = 15
SUMMARY = 16

# @classmethod makes it so you dont have to instantiate the class. just call databaseFunctions.createAddRequest()

Expand Down Expand Up @@ -199,6 +200,11 @@ def createSellTriggerRequest(cls, userId, costPer, portfolioCommitAmount, portfo
'symbol': symbol
}

@classmethod
def createSummaryRequest(cls, userId, args):
args.update({'function': cls.SUMMARY, "userId": userId})
return args

@classmethod
def listOptions(cls):
return [attr for attr in dir(databaseFunctions) if not callable(attr) and not attr.startswith("__") and attr != "listOptions" ]
Expand Down Expand Up @@ -671,6 +677,15 @@ def handleTriggerSell(payload):
return create_response(200, user)
return create_response(400, "not enough portfolio reserved")

def handleSummary(payload):
userId = payload["userId"]

user = databaseServer.getOrAddUser(userId)
payload['response'] = 200
payload['user'] = user

return payload


def on_request(ch, method, props, payload):
print "payload: ", payload
Expand Down Expand Up @@ -714,6 +729,7 @@ def on_request(ch, method, props, payload):
databaseFunctions.RELEASE_PORTFOLIO: handleReleasePortfolio,
databaseFunctions.BUY_TRIGGER: handleTriggerBuy,
databaseFunctions.SELL_TRIGGER: handleTriggerSell,
databaseFunctions.SUMMARY: handleSummary,
}
# Object to send back to Transaction client
print "create publisher"
Expand Down
46 changes: 44 additions & 2 deletions transactionServer/runcode/mqTriggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TriggerFunctions:
ACTIVATE_SELL = 5
CANCEL_SELL = 6
GET_SELL = 7
SUMMARY = 8

@classmethod
def createAddBuyRequest(cls, command, userId, stockSymbol, amount, lineNum):
Expand Down Expand Up @@ -113,6 +114,11 @@ def createGetSellRequest(cls, command, userId, symbol, cash, lineNum):
'lineNum': lineNum,
}

@classmethod
def createSummaryRequest(cls, userId, args):
args.update({'function': cls.SUMMARY, "userId": userId})
return args

@classmethod
def listOptions(cls):
return [attr for attr in dir(databaseFunctions) if not callable(attr) and not attr.startswith("__") and attr != "listOptions" ]
Expand All @@ -130,13 +136,31 @@ def getBuyTrigger(self, userId, symbol):
if self._triggerExists(userId, symbol, self.buyTriggers):
return self.buyTriggers[symbol][userId]

def getUserBuyTriggers(self, userId):
triggers = []
for symbol in self.buyTriggers:
for user in self.buyTriggers[symbol]:
if user == userId:
triggers.append(self.buyTriggers[symbol][user])

return triggers

def getSellTriggers(self):
return self.sellTriggers

def getSellTrigger(self, userId, symbol):
if self._triggerExists(userId, symbol, self.sellTriggers):
return self.sellTriggers[symbol][userId]

def getUserSellTriggers(self, userId):
triggers = []
for symbol in self.sellTriggers:
for user in self.sellTriggers[symbol]:
if user == userId:
triggers.append(self.sellTriggers[symbol][user])

return triggers

def addBuyTrigger(self, userId, sym, cashReserved, transactionNum):
if sym not in self.buyTriggers:
self.buyTriggers[sym] = {}
Expand Down Expand Up @@ -379,6 +403,22 @@ def handleGetSell(payload):
payload['errorString'] = "trigger doesnt exist"
return payload

def handleSummary(payload):
# TODO: might have to hold onto triggers in 2 ways if it is a performance problem, currently have to look through all triggers to get single users
userId = payload["userId"]

buyTriggers = triggers.getUserBuyTriggers(userId)
sellTriggers = triggers.getUserSellTriggers(userId)

if (buyTriggers is not None) and (sellTriggers is not None):
payload['response'] = 200
payload['buyTriggers'] = buyTriggers
payload['sellTriggers'] = sellTriggers
else:
payload['response'] = 500
payload['errorString'] = "error finding triggers"
return payload


def on_request(ch, method, props, payload):
print "payload: ", payload
Expand All @@ -391,6 +431,7 @@ def on_request(ch, method, props, payload):
payload['errorString'] = "function not found"
response = payload

print "response to transactionClient:", response
transactionClient.send(response)


Expand All @@ -408,7 +449,8 @@ def create_error_response(status, response):
TriggerFunctions.SELL: handleAddSell,
TriggerFunctions.ACTIVATE_SELL: handleSetSellActive,
TriggerFunctions.CANCEL_SELL: handleCancelSell,
TriggerFunctions.GET_SELL: handleGetSell
TriggerFunctions.GET_SELL: handleGetSell,
TriggerFunctions.SUMMARY: handleSummary,
}

# self.start() currently commented out in both threads
Expand All @@ -435,7 +477,7 @@ def create_error_response(status, response):
payload = msg[1]
props = msg[0]
print "queue size: ", P2Q_rabbit.qsize()
# on_request(None, None, props, payload)
on_request(None, None, props, payload)
continue
except:
pass
Expand Down
70 changes: 49 additions & 21 deletions transactionServer/runcode/transactionServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,28 @@
from threading import Thread
import multiprocessing
from multiprocessing import Process
from pprint import pprint

#
# class rabbitConsumer():
# def __init__(self, queueName,Q1, Q2, Q3):
# self.rabbitPQueue1 = Q1
# self.rabbitPQueue2 = Q2
# self.rabbitPQueue3 = Q3
# self.connection = RabbitMQReceiver(self.consume, queueName)
#
# def consume(self, ch, method, props, body):
# payload = json.loads(body)
# print "Received :", payload
# print "priority = ",props.priority
#
#
# if props.priority == 1:
# self.rabbitPQueue1.put((1, payload))
# elif props.priority == 2:
# self.rabbitPQueue2.put((2, payload))
# else:
# self.rabbitPQueue3.put((3, payload))
# userDisplaySummary shape: {userId: [command, command ...], ...}
class DisplaySummary():
def __init__(self, max=10):
self.max = max
self.userDisplaySummary = {}

def addCommand(self, command):
userId = command["userId"]

userSummary = self.userDisplaySummary.get(userId)
if not userSummary:
self.userDisplaySummary[userId] = []
userSummary = self.userDisplaySummary[userId]

userSummary.append(command)
if len(userSummary) > self.max:
del userSummary[0]

def getDisplaySummary(self, userId):
return self.userDisplaySummary.get(userId)


# quote shape: symbol: {value: string, retrieved: epoch time, user: string, cryptoKey: string}
Expand Down Expand Up @@ -389,6 +389,28 @@ def handleCommandDumplog(args):
auditClient.send(requestBody, 3)
return "DUMPLOG SENT TO AUDIT"

def handleDisplaySummary(args):
userId = args["userId"]

buyTriggers = args.get("buyTriggers")
sellTriggers = args.get("sellTriggers")

user = args.get("user")
print userId, buyTriggers, sellTriggers, user

if user is not None:
args["commandSummary"] = localDisplaySummary.getDisplaySummary(userId)
return args
elif (buyTriggers is not None) and (sellTriggers is not None):
databaseClient.send(
databaseFunctions.createSummaryRequest(userId, args)
)
else:
triggerClient.send(
TriggerFunctions.createSummaryRequest(userId, args)
)

return None


def errorPrint(args, error):
Expand Down Expand Up @@ -434,6 +456,10 @@ def delegate(ch , method, prop, args):
# send command to audit, if it is from web server
if prop == 1 or prop == 3:
if args["command"] != "DUMPLOG":
# add command to local memory for the user
localDisplaySummary.addCommand(args)

# send command to audit
requestBody = auditFunctions.createUserCommand(
int(time.time() * 1000),
"transactionServer",
Expand Down Expand Up @@ -518,6 +544,7 @@ def delegate(ch , method, prop, args):


localQuoteCache = Quotes()
localDisplaySummary = DisplaySummary()

functionSwitch = {
"QUOTE": handleCommandQuote,
Expand All @@ -534,7 +561,8 @@ def delegate(ch , method, prop, args):
"SET_SELL_AMOUNT": handleCommandSetSellAmount,
"CANCEL_SET_SELL": handleCommandCancelSetSell,
"SET_SELL_TRIGGER": handleCommandSetSellTrigger,
"DUMPLOG": handleCommandDumplog
"DUMPLOG": handleCommandDumplog,
"DISPLAY_SUMMARY": handleDisplaySummary
}


Expand Down