1
+ package com .oembedler .moon .graphql .boot .publishers ;
2
+
3
+ import java .math .BigDecimal ;
4
+ import java .math .RoundingMode ;
5
+ import java .time .LocalDateTime ;
6
+ import java .util .ArrayList ;
7
+ import java .util .List ;
8
+ import java .util .Map ;
9
+ import java .util .Random ;
10
+ import java .util .concurrent .ConcurrentHashMap ;
11
+ import java .util .concurrent .Executors ;
12
+ import java .util .concurrent .ScheduledExecutorService ;
13
+ import java .util .concurrent .TimeUnit ;
14
+
15
+ import org .slf4j .Logger ;
16
+ import org .slf4j .LoggerFactory ;
17
+ import org .springframework .stereotype .Component ;
18
+ import com .oembedler .moon .graphql .boot .resolvers .StockPriceUpdate ;
19
+ import reactor .core .publisher .ConnectableFlux ;
20
+ import reactor .core .publisher .Flux ;
21
+ import reactor .core .publisher .FluxSink ;
22
+
23
+ @ Component
24
+ public class StockTickerReactorPublisher {
25
+ private static final Logger LOG = LoggerFactory .getLogger (StockTickerRxPublisher .class );
26
+
27
+ private final Flux <StockPriceUpdate > publisher ;
28
+
29
+ public StockTickerReactorPublisher () {
30
+ Flux <StockPriceUpdate > stockPriceUpdateFlux = Flux .create (emitter -> {
31
+ ScheduledExecutorService executorService = Executors .newScheduledThreadPool (1 );
32
+ executorService .scheduleAtFixedRate (newStockTick (emitter ), 0 , 2 , TimeUnit .SECONDS );
33
+ }, FluxSink .OverflowStrategy .BUFFER );
34
+ ConnectableFlux <StockPriceUpdate > connectableFlux = stockPriceUpdateFlux .share ().publish ();
35
+ connectableFlux .connect ();
36
+
37
+ publisher = Flux .from (connectableFlux );
38
+ }
39
+
40
+ private Runnable newStockTick (FluxSink <StockPriceUpdate > emitter ) {
41
+ return () -> {
42
+ List <StockPriceUpdate > stockPriceUpdates = getUpdates (rollDice (0 , 5 ));
43
+ if (stockPriceUpdates != null ) {
44
+ emitStocks (emitter , stockPriceUpdates );
45
+ }
46
+ };
47
+ }
48
+
49
+ private void emitStocks (FluxSink <StockPriceUpdate > emitter , List <StockPriceUpdate > stockPriceUpdates ) {
50
+ for (StockPriceUpdate stockPriceUpdate : stockPriceUpdates ) {
51
+ try {
52
+ emitter .next (stockPriceUpdate );
53
+ } catch (RuntimeException e ) {
54
+ LOG .error ("Cannot send StockUpdate" , e );
55
+ }
56
+ }
57
+ }
58
+
59
+ public Flux <StockPriceUpdate > getPublisher () {
60
+ return publisher ;
61
+ }
62
+
63
+ public Flux <StockPriceUpdate > getPublisher (List <String > stockCodes ) {
64
+ if (stockCodes != null ) {
65
+ return publisher .filter (stockPriceUpdate -> stockCodes .contains (stockPriceUpdate .getStockCode ()));
66
+ }
67
+ return publisher ;
68
+ }
69
+
70
+ private List <StockPriceUpdate > getUpdates (int number ) {
71
+ List <StockPriceUpdate > updates = new ArrayList <>();
72
+ for (int i = 0 ; i < number ; i ++) {
73
+ updates .add (rollUpdate ());
74
+ }
75
+ return updates ;
76
+ }
77
+
78
+ private final static Map <String , BigDecimal > CURRENT_STOCK_PRICES = new ConcurrentHashMap <>();
79
+
80
+ static {
81
+ CURRENT_STOCK_PRICES .put ("TEAM" , dollars (39 , 64 ));
82
+ CURRENT_STOCK_PRICES .put ("IBM" , dollars (147 , 10 ));
83
+ CURRENT_STOCK_PRICES .put ("AMZN" , dollars (1002 , 94 ));
84
+ CURRENT_STOCK_PRICES .put ("MSFT" , dollars (77 , 49 ));
85
+ CURRENT_STOCK_PRICES .put ("GOOGL" , dollars (1007 , 87 ));
86
+ }
87
+
88
+ private StockPriceUpdate rollUpdate () {
89
+ ArrayList <String > STOCK_CODES = new ArrayList <>(CURRENT_STOCK_PRICES .keySet ());
90
+
91
+ String stockCode = STOCK_CODES .get (rollDice (0 , STOCK_CODES .size () - 1 ));
92
+ BigDecimal currentPrice = CURRENT_STOCK_PRICES .get (stockCode );
93
+
94
+ BigDecimal incrementDollars = dollars (rollDice (0 , 1 ), rollDice (0 , 99 ));
95
+ if (rollDice (0 , 10 ) > 7 ) {
96
+ // 0.3 of the time go down
97
+ incrementDollars = incrementDollars .negate ();
98
+ }
99
+ BigDecimal newPrice = currentPrice .add (incrementDollars );
100
+
101
+ CURRENT_STOCK_PRICES .put (stockCode , newPrice );
102
+ return new StockPriceUpdate (stockCode , LocalDateTime .now (), newPrice , incrementDollars );
103
+ }
104
+
105
+ private static BigDecimal dollars (int dollars , int cents ) {
106
+ return truncate ("" + dollars + "." + cents );
107
+ }
108
+
109
+ private static BigDecimal truncate (final String text ) {
110
+ BigDecimal bigDecimal = new BigDecimal (text );
111
+ if (bigDecimal .scale () > 2 )
112
+ bigDecimal = new BigDecimal (text ).setScale (2 , RoundingMode .HALF_UP );
113
+ return bigDecimal .stripTrailingZeros ();
114
+ }
115
+
116
+ private final static Random rand = new Random ();
117
+
118
+ private static int rollDice (int min , int max ) {
119
+ return rand .nextInt ((max - min ) + 1 ) + min ;
120
+ }
121
+
122
+ }
0 commit comments