11import { ObjLogger } from "@scramjet/obj-logger" ;
2- import { APIExpose , OpResponse , ParsedMessage } from "@scramjet/types" ;
2+ import { APIExpose , OpResponse } from "@scramjet/types" ;
33import { ReasonPhrases } from "http-status-codes" ;
44import { ServiceDiscovery } from "./sd-adapter" ;
5- import { IncomingMessage } from "http" ;
5+ import { IncomingMessage } from "http" ;
66import { StreamOrigin } from "./streamHandler" ;
77import { TopicState } from "./topicHandler" ;
88import { WorkState } from "./streamHandler" ;
@@ -20,12 +20,12 @@ type TopicsPostRes = {
2020}
2121type TopicDeleteReq = { }
2222
23- type TopicDownstreamReq = IncomingMessage & {
24- headers : {
25- "content-type" : string ,
23+ type TopicStreamReq = IncomingMessage & {
24+ headers ? : {
25+ "content-type" ? : string ,
2626 cpm ?: string
2727 } ,
28- params : { topic : string }
28+ params ? : { topic ? : string }
2929}
3030
3131class TopicRouter {
@@ -36,7 +36,7 @@ class TopicRouter {
3636 this . serviceDiscovery = serviceDiscovery ;
3737 apiServer . get ( `${ apiBaseUrl } /topics` , ( ) => this . serviceDiscovery . getTopics ( ) ) ;
3838 apiServer . op ( "post" , `${ apiBaseUrl } /topics` , this . topicsPost )
39- apiServer . get ( `${ apiBaseUrl } /topic/:topic` , ( ) => this . deleteTopic ) ;
39+ apiServer . op ( "delete" , `${ apiBaseUrl } /topic/:topic` , ( ) => this . deleteTopic ) ;
4040 apiServer . downstream ( `${ apiBaseUrl } /topic/:topic` , this . topicDownstream , { checkContentType : false } ) ;
4141 apiServer . upstream ( `${ apiBaseUrl } /topic/:topic` , this . topicUpstream ) ;
4242 }
@@ -63,16 +63,16 @@ class TopicRouter {
6363 }
6464 }
6565
66- async topicDownstream ( req : TopicDownstreamReq ) {
66+ async topicDownstream ( req : TopicStreamReq ) {
6767 const { "content-type" : contentType = "" , cpm } = req . headers ;
68- const { topic : topicName } = req . params ;
68+ const { topic : name = "" } = req . params || { } ;
6969 if ( ! isContentType ( contentType ) ) return { opStatus : ReasonPhrases . BAD_REQUEST , error : "Unsupported content-type" }
70- if ( ! TopicName . validate ( topicName ) ) return { opStatus : ReasonPhrases . BAD_REQUEST , error : "Topic name incorrect format" }
70+ if ( ! TopicName . validate ( name ) ) return { opStatus : ReasonPhrases . BAD_REQUEST , error : "Topic name incorrect format" }
7171
72- const name = new TopicName ( topicName ) ;
72+ const topicName = new TopicName ( name ) ;
7373 this . logger . debug ( `Incoming topic '${ name } ' request` ) ;
7474
75- let topic = this . serviceDiscovery . topicsController . get ( name ) ;
75+ let topic = this . serviceDiscovery . topicsController . get ( topicName ) ;
7676 if ( topic ) {
7777 const topicContentType = topic . options ( ) . contentType ;
7878 if ( contentType !== topicContentType ) {
@@ -82,35 +82,39 @@ class TopicRouter {
8282 } ;
8383 }
8484 } else {
85- topic = new Topic ( name , contentType , { id : "TopicDownstream" , type : "hub" } ) ;
85+ // FIXME: Single responsibility rule validation
86+ topic = new Topic ( topicName , contentType , { id : "TopicDownstream" , type : "hub" } ) ;
8687 }
8788 req . pipe ( topic , { end : false } ) ;
8889
8990 if ( ! cpm ) {
9091 await this . serviceDiscovery . update ( {
91- provides : topic , contentType : contentType , topicName : topic
92+ provides : topic . id ( ) , contentType : contentType , topicName : topic . id ( )
9293 } ) ;
9394 } else {
9495 this . logger . debug ( `Incoming Downstream CPM request for topic '${ topic } '` ) ;
9596 }
96- return { } ;
97+ return { opStatus : ReasonPhrases . OK } ;
9798 }
9899
99- async topicUpstream ( req : ParsedMessage ) {
100+ async topicUpstream ( req : TopicStreamReq ) {
101+ const { "content-type" : contentType = "" , cpm } = req . headers ;
102+ const { topic : name = "" } = req . params || { } ;
103+ if ( ! isContentType ( contentType ) ) return { opStatus : ReasonPhrases . BAD_REQUEST , error : "Unsupported content-type" }
104+ if ( ! TopicName . validate ( name ) ) return { opStatus : ReasonPhrases . BAD_REQUEST , error : "Topic name incorrect format" }
105+
106+ const topicName = new TopicName ( name ) ;
100107 //TODO: what should be the default content type and where to store this information?
101- const contentType = req . headers [ "content-type" ] || "application/x-ndjson" ;
102- const { topic } = req . params || { } ;
103- const { cpm } = req . headers ;
104108
105109 if ( ! cpm ) {
106110 await this . serviceDiscovery . update ( {
107- requires : topic , contentType, topicName : topic
111+ requires : name , contentType, topicName : topicName . toString ( )
108112 } ) ;
109113 } else {
110- this . logger . debug ( `Incoming Upstream CPM request for topic '${ topic } '` ) ;
114+ this . logger . debug ( `Incoming Upstream CPM request for topic '${ name } '` ) ;
111115 }
112116
113- return this . serviceDiscovery . createTopicIfNotExist ( { topic, contentType } ) ;
117+ return this . serviceDiscovery . createTopicIfNotExist ( { topic : topicName , contentType } ) ;
114118 }
115119}
116120
0 commit comments