const strom = require("stromjs"); const { allInUSD } = require("./pure/balancer"); const REBALANCE = "rebalance"; const PRICING = "pricing"; //Pricing as of July 12th; const SEED = { AAPL: 383.68, GOOG: 1541.74, ACAD: 55.45, GFN: 6.23, CYBR: 109.0, ABB: 24.49, }; const pricingStore = require("./stores/pricing").global({ seed: SEED }); const pricing = require("./processors/pricing")(); const balancer = require("./processors/balancer")(); //Conversion layer for manual input const stdinReader = strom.compose([ strom.split(), strom.map(data => data.trim()), strom.filter(str => !!str), strom.map(str => { const [cmd, param] = str.split(":"); switch (cmd) { case REBALANCE: return { type: REBALANCE, accountId: param }; default: return { type: PRICING, ticker: cmd, price: param }; } }), ]); //This pipeline could be built in a configurable way such that the input is //either stdin + interactive, stdin as JSON/protobuff, a Kafka topic, a NATS //topic, a RabbitMQ topic, a file, etc. process.stdin //The conversion layer would be added conditionally .pipe(stdinReader) .pipe( strom.demux(key => { switch (key) { case REBALANCE: return balancer; break; case PRICING: return pricing; break; } }, "type"), ) //This is logging for presentation only. This could also be a module added //conditionally. .pipe( strom.map(data => { const { account, balanced, actions, pricing } = data; if (account && balanced && actions && pricing) { const balancedValue = allInUSD(balanced, pricing); const previousValue = allInUSD(account.openPositions, pricing); console.log("Previous positions"); console.table(account.openPositions); console.log("Balanced portfolio"); console.table(balanced); console.log("Transactions required"); const transactionsPivot = Array.from( new Set(Object.keys({ ...actions.buy, ...actions.sell })), ).reduce((acc, ticker) => { acc[ticker] = { buy: actions.buy[ticker] || 0, sell: actions.sell[ticker] || 0, }; return acc; }, {}); console.table(transactionsPivot); console.log("Total value"); console.table({ balanced: balancedValue, previous: previousValue, }); console.table( Object.entries(account.targets).reduce( (acc, [ticker, target]) => { const price = pricing[ticker]; acc[ticker] = { target, balanced: (balanced[ticker] * price) / previousValue, previous: (account.openPositions[ticker] * price) / previousValue, }; return acc; }, {}, ), ); } return data; }), ) .pipe(strom.map(out => console.log(JSON.stringify(out))));