From ae6868975f76bc6eb497ee6fdf5d17c3d2fca16c Mon Sep 17 00:00:00 2001 From: Lewis Diamond Date: Sun, 14 Mar 2021 16:08:26 -0400 Subject: [PATCH] Not fully tested, mostly working --- .gitignore | 1 + Cargo.lock | 534 +++++++++++++++++++++++++++++++++++++++ Cargo.toml | 20 ++ input.csv | 7 + input2.csv | 8 + input3.csv | 8 + rustfmt.toml | 1 + src/bin/act.rs | 43 ++++ src/lib.rs | 4 + src/parse.rs | 240 ++++++++++++++++++ src/process.rs | 150 +++++++++++ src/stores/mem.rs | 182 +++++++++++++ src/stores/mod.rs | 14 + src/types/account.rs | 113 +++++++++ src/types/mod.rs | 5 + src/types/transaction.rs | 49 ++++ 16 files changed, 1379 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 input.csv create mode 100644 input2.csv create mode 100644 input3.csv create mode 100644 rustfmt.toml create mode 100644 src/bin/act.rs create mode 100644 src/lib.rs create mode 100644 src/parse.rs create mode 100644 src/process.rs create mode 100644 src/stores/mem.rs create mode 100644 src/stores/mod.rs create mode 100644 src/types/account.rs create mode 100644 src/types/mod.rs create mode 100644 src/types/transaction.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..df38e39 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,534 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "act" +version = "0.1.0" +dependencies = [ + "async-stream", + "clap", + "csv", + "serde", + "strum", + "strum_macros", + "tokio", + "tokio-stream", + "tokio-test", +] + +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + +[[package]] +name = "async-stream" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3670df70cbc01729f901f94c887814b3c68db038aad1329a418bae178bc5295c" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3548b8efc9f8e8a5a0a2808c5bd8451a9031b9e5b879a79590304ae928b0a70" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + +[[package]] +name = "bstr" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a40b47ad93e1a5404e6c18dec46b628214fee441c70f4ab5d6942142cc268a3d" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "2.33.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim", + "textwrap", + "unicode-width", + "vec_map", +] + +[[package]] +name = "csv" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" +dependencies = [ + "bstr", + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + +[[package]] +name = "futures-core" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94" + +[[package]] +name = "heck" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "hermit-abi" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" +dependencies = [ + "libc", +] + +[[package]] +name = "instant" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03b07a082330a35e43f63177cc01689da34fbffa0105e1246cf0311472cac73a" + +[[package]] +name = "lock_api" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" + +[[package]] +name = "mio" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5dede4e2065b3842b8b0af444119f3aa331cc7cc2dd20388bfb0f5d5a38823a" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" +dependencies = [ + "socket2", + "winapi", +] + +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" + +[[package]] +name = "parking_lot" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc0e1f259c92177c30a4c9d177246edd0a3568b25756a977d0632cf8fa37e905" + +[[package]] +name = "proc-macro2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex-automata" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" +dependencies = [ + "byteorder", +] + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "serde" +version = "1.0.124" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd761ff957cb2a45fbb9ab3da6512de9de55872866160b23c25f1a841e99d29f" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.124" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1800f7693e94e186f5e25a28291ae1570da908aff7d97a095dec1e56ff99069b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "signal-hook-registry" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" + +[[package]] +name = "socket2" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" +dependencies = [ + "cfg-if", + "libc", + "winapi", +] + +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + +[[package]] +name = "strum" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7318c509b5ba57f18533982607f24070a55d353e90d4cae30c467cdb2ad5ac5c" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee8bc6b87a5112aeeab1f4a9f7ab634fe6cbefc4850006df31267f4cfb9e3149" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "syn" +version = "1.0.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fd9d1e9976102a03c542daa2eff1b43f9d72306342f3f8b3ed5fb8908195d6f" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "tokio" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d56477f6ed99e10225f38f9f75f872f29b8b8bd8c0b946f63345bb144e9eeda" +dependencies = [ + "autocfg", + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c535f53c0cfa1acace62995a8994fc9cc1f12d202420da96ff306ee24d576469" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-test" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f58403903e94d4bc56805e46597fced893410b2e753e229d3f7f22423ea03f67" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + +[[package]] +name = "unicode-segmentation" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796" + +[[package]] +name = "unicode-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" + +[[package]] +name = "unicode-xid" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" + +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..18ed3e4 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "act" +version = "0.1.0" +authors = ["Lewis Diamond "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +strum = { version = "0.20.0", features = ["derive"] } +tokio = { version = "1", features = ["full"] } +tokio-stream = "0.1" +async-stream = "0.3" +csv = "1.1.5" +strum_macros = "0.20.1" +clap = "2.33.3" + +[dev-dependencies] +tokio-test = "0.4.0" diff --git a/input.csv b/input.csv new file mode 100644 index 0000000..c014a91 --- /dev/null +++ b/input.csv @@ -0,0 +1,7 @@ +type,client,tx,amount +deposit,1,1,1.0 +deposit,2,2,2.0 +deposit,1,3,2.742 +withdrawal,1,4,1.5 +withdrawal,2,5,3.0 +dispute,2,2, diff --git a/input2.csv b/input2.csv new file mode 100644 index 0000000..74100cb --- /dev/null +++ b/input2.csv @@ -0,0 +1,8 @@ +type,client,tx,amount +deposit,1,1,1.0 +deposit,2,2,2.0 +deposit,1,3,2.742 +withdrawal,1,4,1.5 +withdrawal,2,5,3.0 +dispute,2,2, +resolve,2,2, diff --git a/input3.csv b/input3.csv new file mode 100644 index 0000000..091b071 --- /dev/null +++ b/input3.csv @@ -0,0 +1,8 @@ +type,client,tx,amount +deposit,1,1,1.0 +deposit,2,2,2.0 +deposit,1,3,2.742 +withdrawal,1,4,1.5 +withdrawal,2,5,3.0 +dispute,2,2, +chargeback,2,2, diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..32a9786 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +edition = "2018" diff --git a/src/bin/act.rs b/src/bin/act.rs new file mode 100644 index 0000000..f312471 --- /dev/null +++ b/src/bin/act.rs @@ -0,0 +1,43 @@ +use act::parse::parse; +use act::process::process; +use act::stores::MemActStore; +use act::types::Transaction; +use clap::{App, Arg}; +use std::collections::HashMap; +use std::fs; +use std::io::{stdin, BufRead, BufReader}; +use tokio_stream::StreamExt; + +#[tokio::main] +async fn main() { + let matches = App::new("act") + .version("0.1") + .about("Merges transactions into final account state") + .author("Lewis Diamond") + .arg( + Arg::with_name("input") + .required(false) + .index(1) + .help("Input file, stdin if omitted or -"), + ) + .get_matches(); + + let input: Box = match matches.value_of("input") { + Some("-") | Some("") | None => Box::new(BufReader::new(stdin())), + Some(f) => Box::new(BufReader::new(fs::File::open(f).unwrap())), + }; + + let mut act_store = MemActStore::new(); + let mut tx_store: HashMap = HashMap::new(); + let s = parse(input); + tokio::pin!(s); + while let Some(v) = s.next().await { + process(v, &mut act_store, &mut tx_store); + } + + let mut writer = csv::WriterBuilder::new().from_writer(std::io::stdout()); + + for act in act_store.into_iter() { + writer.serialize(act.1).unwrap(); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3f6e737 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,4 @@ +pub mod parse; +pub mod process; +pub mod stores; +pub mod types; diff --git a/src/parse.rs b/src/parse.rs new file mode 100644 index 0000000..b7708c3 --- /dev/null +++ b/src/parse.rs @@ -0,0 +1,240 @@ +use crate::types::Transaction; +use async_stream::stream; +use std::io::Read; +use tokio_stream::Stream; + +pub fn parse(input: R) -> impl Stream { + let mut reader = csv::ReaderBuilder::new() + .trim(csv::Trim::All) + .flexible(true) + .has_headers(true) + .from_reader(input); + + stream! { + for tx in reader.deserialize() { + match tx { + Ok(tx) => yield tx, + //Depending on the infrastructure, a specific output format + //might be used to add monitoring/alerting + Err(e) => eprintln!("Error reading CSV: {}", e), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::Transaction; + use crate::types::TransactionType; + use tokio_stream::StreamExt; + use tokio_test::block_on; + + #[test] + fn valid_csv_is_parsed() { + block_on(async { + let data = "\ +type,client,tx,amount +deposit,1,1,1.0 +deposit,2,2,2.0 +deposit,1,3,2.0 +withdrawal,1,4,1.5 +withdrawal,2,5,3.0"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Deposit, + amount: 10000, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 2, + tx: 2, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 1, + tx: 3, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 15000, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 30000, + client: 2, + tx: 5, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + assert_eq!(expected, txs); + }); + } + + #[test] + fn valid_csv_with_whitespaces_is_parsed() { + block_on(async { + let data = "\ +type, client, tx, amount +deposit, 1, 1, 1.0 +deposit, 2, 2, 2.0 +deposit, 1, 3, 2.0 +withdrawal, 1, 4, 1.5 +withdrawal, 2, 5, 3.0"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Deposit, + amount: 10000, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 2, + tx: 2, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 1, + tx: 3, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 15000, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 30000, + client: 2, + tx: 5, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + assert_eq!(expected, txs); + }); + } + + #[test] + fn amounts_are_parsed_correctly() { + block_on(async { + let data = "\ +type,client,tx,amount +deposit,1,1,1.0001 +deposit,2,2,2.0010 +deposit,1,3,10.01 +withdrawal,1,4,01.10 +withdrawal,2,5,10.0110101"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Deposit, + amount: 10001, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20010, + client: 2, + tx: 2, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 100100, + client: 1, + tx: 3, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 11000, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 100110, + client: 2, + tx: 5, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + assert_eq!(expected, txs); + }); + } + + #[test] + fn invalid_amounts_are_filtered() { + block_on(async { + let data = "\ +type,client,tx,amount +deposit,1,1,99999999999999999 +deposit,2,2,18446744073709551615 +deposit,1,3,18446744073709551616 +withdrawal,1,4,0 +withdrawal,1,4, +withdrawal,1,4,a +withdrawal,2,5,-1 +withdrawal,1,6,-99999999999999999 +withdrawal,1,6,-18446744073709551615 +withdrawal,1,7,-18446744073709551616"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 0, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 0, + client: 1, + tx: 4, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + + assert_eq!(expected, txs); + }); + } + + #[test] + fn disputes_are_parsed_correctly() { + block_on(async { + let data = "\ +type,client,tx,amount +dispute,1,1, +dispute,1,2,"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Dispute, + amount: 0, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Dispute, + amount: 0, + client: 1, + tx: 2, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + + assert_eq!(expected, txs); + }); + } +} diff --git a/src/process.rs b/src/process.rs new file mode 100644 index 0000000..ead8e1e --- /dev/null +++ b/src/process.rs @@ -0,0 +1,150 @@ +use crate::{ + stores::ActStore, + types::{Transaction, TransactionType}, +}; +use std::collections::HashMap; + +pub fn process( + t: Transaction, + act_store: &mut dyn ActStore, + tx_store: &mut HashMap, +) { + match t.tx_type { + TransactionType::Deposit => { + act_store.deposit(t.client, t.amount); + tx_store.insert(t.tx, t); + } + TransactionType::Withdrawal => { + act_store.withdraw(t.client, t.amount); + } + TransactionType::Dispute => { + if let Some(orig) = tx_store.get(&t.tx) { + act_store.hold(t.client, orig.amount); + } + } + TransactionType::Resolve => { + if let Some(orig) = tx_store.get(&t.tx) { + act_store.unhold(t.client, orig.amount); + } + } + TransactionType::Chargeback => { + if let Some(orig) = tx_store.get(&t.tx) { + act_store.unhold(t.client, orig.amount); + act_store.withdraw(t.client, orig.amount); + act_store.lock_account(t.client); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stores::MemActStore; + use crate::types::Transaction; + use crate::types::TransactionType; + + #[test] + fn valid_tx_and_over_limit_withdraw() { + let mut act_store: Box = Box::new(MemActStore::new()); + let mut tx_store: HashMap = HashMap::new(); + let txs = vec![ + Transaction { + tx_type: TransactionType::Deposit, + amount: 10000, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 2, + tx: 2, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 1, + tx: 3, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 15000, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 30000, + client: 2, + tx: 5, + }, + ]; + for tx in txs { + process(tx, act_store.as_mut(), &mut tx_store); + } + + let act = act_store.get_account(1).unwrap(); + assert_eq!(0, act.held()); + assert_eq!(15000, act.available()); + + let act = act_store.get_account(2).unwrap(); + assert_eq!(0, act.held()); + assert_eq!(20000, act.available()); + } + + #[test] + fn held_funds() { + let mut act_store: Box = Box::new(MemActStore::new()); + let mut tx_store: HashMap = HashMap::new(); + process( + Transaction { + tx_type: TransactionType::Deposit, + amount: 10000, + client: 1, + tx: 1, + }, + act_store.as_mut(), + &mut tx_store, + ); + process( + Transaction { + tx_type: TransactionType::Dispute, + amount: 0, + client: 1, + tx: 1, + }, + act_store.as_mut(), + &mut tx_store, + ); + let act = act_store.get_account(1).unwrap(); + assert_eq!(10000, act.held()); + assert_eq!(0, act.available()); + process( + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 10000, + client: 1, + tx: 2, + }, + act_store.as_mut(), + &mut tx_store, + ); + let act = act_store.get_account(1).unwrap(); + assert_eq!(10000, act.held()); + assert_eq!(0, act.available()); + process( + Transaction { + tx_type: TransactionType::Resolve, + amount: 0, + client: 1, + tx: 1, + }, + act_store.as_mut(), + &mut tx_store, + ); + let act = act_store.get_account(1).unwrap(); + assert_eq!(0, act.held()); + assert_eq!(10000, act.available()); + } +} diff --git a/src/stores/mem.rs b/src/stores/mem.rs new file mode 100644 index 0000000..423f57e --- /dev/null +++ b/src/stores/mem.rs @@ -0,0 +1,182 @@ +use super::ActStore; +use crate::types::Account; +use std::collections::hash_map::IntoIter; +use std::collections::HashMap; + +pub struct MemActStore(HashMap); + +enum Action { + Withdraw(u64), + Deposit(u64), + Hold(u64), + Unhold(u64), +} + +impl MemActStore { + pub fn new() -> Self { + MemActStore(HashMap::new()) + } + + fn action_act(&mut self, client: u16, action: Action) -> u64 { + let act = self.0.entry(client).or_insert_with(|| Account::new(client)); + match action { + Action::Withdraw(amnt) => act.withdraw(amnt), + Action::Deposit(amnt) => act.deposit(amnt), + Action::Hold(amnt) => act.hold(amnt), + Action::Unhold(amnt) => act.unhold(amnt), + } + } +} + +impl Default for MemActStore { + fn default() -> Self { + Self::new() + } +} + +impl IntoIterator for MemActStore { + type Item = (u16, Account); + + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl ActStore for MemActStore { + fn deposit(&mut self, client: u16, amnt: u64) -> u64 { + self.action_act(client, Action::Deposit(amnt)) + } + + fn withdraw(&mut self, client: u16, amnt: u64) -> u64 { + self.action_act(client, Action::Withdraw(amnt)) + } + + fn hold(&mut self, client: u16, amnt: u64) -> u64 { + self.action_act(client, Action::Hold(amnt)) + } + + fn unhold(&mut self, client: u16, amnt: u64) -> u64 { + self.action_act(client, Action::Unhold(amnt)) + } + + fn lock_account(&mut self, client: u16) -> bool { + if let Some(act) = self.0.get_mut(&client) { + act.lock() + } else { + false + } + } + + fn unlock_account(&mut self, client: u16) -> bool { + if let Some(act) = self.0.get_mut(&client) { + act.unlock() + } else { + false + } + } + + fn get_account(&self, client: u16) -> Option<&Account> { + self.0.get(&client) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_add_balance() { + let mut store = MemActStore::new(); + let client_id = 200; + store.deposit(client_id, 200); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(200, act.id()); + assert_eq!(200, act.available()); + } else { + panic!("Could not get account from store"); + } + let balance = store.deposit(client_id, 50); + if let Some(act) = store.get_account(client_id) { + assert_eq!(200, act.id()); + assert_eq!(250, act.available()); + assert_eq!(250, balance); + } else { + panic!("Could not get account from store"); + } + } + + #[test] + fn test_sub_balance() { + let mut store = MemActStore::new(); + let client_id = 1; + store.deposit(client_id, 200); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert_eq!(200, act.available()); + } else { + panic!("Could not get account from store"); + } + let balance = store.withdraw(client_id, 50); + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert_eq!(150, act.available()); + assert_eq!(150, balance); + } else { + panic!("Could not get account from store"); + } + } + + #[test] + fn test_negative_balance() { + let mut store = MemActStore::new(); + let client_id = 1; + store.withdraw(client_id, 1); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert_eq!(0, act.available()); + } else { + panic!("Could not get account from store"); + } + } + + #[test] + fn test_hold() { + let mut store = MemActStore::new(); + let client_id = 1; + store.deposit(client_id, 100); + + let avail = store.hold(client_id, 10); + assert_eq!(90, avail); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert_eq!(90, act.available()); + } else { + panic!("Could not get account from store"); + } + + assert_eq!(0, store.hold(client_id, 100)); + assert_eq!(10, store.unhold(client_id, 20)); + } + + #[test] + fn test_lock() { + let mut store = MemActStore::new(); + let client_id = 1; + store.deposit(client_id, 100); + let locked = store.lock_account(client_id); + assert!(locked); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert!(act.is_locked()); + } else { + panic!("Could not get account from store"); + } + } +} diff --git a/src/stores/mod.rs b/src/stores/mod.rs new file mode 100644 index 0000000..a1b277e --- /dev/null +++ b/src/stores/mod.rs @@ -0,0 +1,14 @@ +pub mod mem; +pub use mem::MemActStore; + +use crate::types::Account; + +pub trait ActStore { + fn get_account(&self, client: u16) -> Option<&Account>; + fn deposit(&mut self, client: u16, amnt: u64) -> u64; + fn withdraw(&mut self, client: u16, amnt: u64) -> u64; + fn hold(&mut self, client: u16, amnt: u64) -> u64; + fn unhold(&mut self, client: u16, amnt: u64) -> u64; + fn lock_account(&mut self, client: u16) -> bool; + fn unlock_account(&mut self, client: u16) -> bool; +} diff --git a/src/types/account.rs b/src/types/account.rs new file mode 100644 index 0000000..c3d5d2b --- /dev/null +++ b/src/types/account.rs @@ -0,0 +1,113 @@ +use serde::{Serialize, Serializer}; +const PRECISION: u32 = 4; + +#[derive(Debug, PartialEq)] +pub struct Account { + id: u16, + total: u64, + held: u64, + locked: bool, +} + +impl Account { + pub fn new(client_id: u16) -> Account { + Account { + id: client_id, + total: 0, + held: 0, + locked: false, + } + } + + pub fn with_balance(client_id: u16, seed_balance: u64) -> Account { + Account { + id: client_id, + total: seed_balance, + held: 0, + locked: false, + } + } + + pub fn id(&self) -> u16 { + self.id + } + pub fn available(&self) -> u64 { + self.total.saturating_sub(self.held) + } + pub fn held(&self) -> u64 { + self.held + } + pub fn hold(&mut self, amnt: u64) -> u64 { + if let Some(new_held) = self.held.checked_add(amnt) { + self.held = new_held; + } + self.available() + } + + pub fn unhold(&mut self, amnt: u64) -> u64 { + if let Some(new_held) = self.held.checked_sub(amnt) { + self.held = new_held; + } + self.available() + } + pub fn deposit(&mut self, amnt: u64) -> u64 { + if let Some(new_bal) = self.total.checked_add(amnt) { + self.total = new_bal; + }; + self.available() + } + pub fn withdraw(&mut self, amnt: u64) -> u64 { + if self.available().checked_sub(amnt).is_some() { + self.total -= amnt; + }; + self.available() + } + + pub fn lock(&mut self) -> bool { + self.locked = true; + self.locked + } + + pub fn unlock(&mut self) -> bool { + self.locked = false; + self.locked + } + + pub fn is_locked(&self) -> bool { + self.locked + } +} + +#[derive(Debug, Serialize, PartialEq)] +pub struct AccountSer { + id: u16, + total: String, + held: String, + available: String, + locked: bool, +} + +impl serde::Serialize for Account { + fn serialize(&self, s: S) -> Result + where + S: Serializer, + { + let total = format!( + "{0:.4}", + (self.total as f64) / (10u64.pow(PRECISION) as f64) + ); + let held = format!("{0:.4}", (self.held as f64) / (10u64.pow(PRECISION) as f64)); + let available = format!( + "{0:.4}", + (self.available() as f64) / (10u64.pow(PRECISION) as f64) + ); + let ser = AccountSer { + id: self.id, + total, + held, + available, + locked: self.locked, + }; + ser.serialize(s) + } +} diff --git a/src/types/mod.rs b/src/types/mod.rs new file mode 100644 index 0000000..205ba43 --- /dev/null +++ b/src/types/mod.rs @@ -0,0 +1,5 @@ +pub mod transaction; +pub use transaction::Transaction; +pub use transaction::TransactionType; +pub mod account; +pub use account::Account; diff --git a/src/types/transaction.rs b/src/types/transaction.rs new file mode 100644 index 0000000..102c3c2 --- /dev/null +++ b/src/types/transaction.rs @@ -0,0 +1,49 @@ +use serde::de; +use serde::{Deserialize, Deserializer}; +const PRECISION: u32 = 4; + +#[derive(Eq, PartialEq, Debug, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum TransactionType { + Deposit, + Withdrawal, + Dispute, + Resolve, + Chargeback, +} + +#[derive(Debug, Deserialize, PartialEq)] +pub struct Transaction { + #[serde(rename = "type")] + pub tx_type: TransactionType, + pub client: u16, + pub tx: u32, + /// Amount of the smallest unit, e.g. 0.0001 as per the specification + #[serde(deserialize_with = "de_amount")] + pub amount: u64, +} + +fn de_amount<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + //TODO use something like num_bigint instead + let deserialized = String::deserialize(deserializer)?; + let mut splitted = deserialized.split('.'); + let units = splitted + .next() + .map_or(Ok(0), |v| match v { + "" => Ok(0), + _ => v.parse::(), + }) + .map_err(de::Error::custom)? + .checked_mul(10u64.pow(PRECISION)) + .ok_or_else(|| de::Error::custom("Value too large"))?; + //TODO format! here isn't great! + let dec = splitted + .next() + .map_or(Ok(0u64), |v| format!("{:0<4.4}", v).parse::()) + .map_err(de::Error::custom)?; + + Ok(units + dec) +}