commit ae6868975f76bc6eb497ee6fdf5d17c3d2fca16c Author: Lewis Diamond Date: Sun Mar 14 16:08:26 2021 -0400 Not fully tested, mostly working diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..df38e39 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,534 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "act" +version = "0.1.0" +dependencies = [ + "async-stream", + "clap", + "csv", + "serde", + "strum", + "strum_macros", + "tokio", + "tokio-stream", + "tokio-test", +] + +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + +[[package]] +name = "async-stream" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3670df70cbc01729f901f94c887814b3c68db038aad1329a418bae178bc5295c" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3548b8efc9f8e8a5a0a2808c5bd8451a9031b9e5b879a79590304ae928b0a70" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + +[[package]] +name = "bstr" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a40b47ad93e1a5404e6c18dec46b628214fee441c70f4ab5d6942142cc268a3d" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "2.33.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim", + "textwrap", + "unicode-width", + "vec_map", +] + +[[package]] +name = "csv" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" +dependencies = [ + "bstr", + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + +[[package]] +name = "futures-core" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94" + +[[package]] +name = "heck" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "hermit-abi" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" +dependencies = [ + "libc", +] + +[[package]] +name = "instant" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03b07a082330a35e43f63177cc01689da34fbffa0105e1246cf0311472cac73a" + +[[package]] +name = "lock_api" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" + +[[package]] +name = "mio" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5dede4e2065b3842b8b0af444119f3aa331cc7cc2dd20388bfb0f5d5a38823a" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" +dependencies = [ + "socket2", + "winapi", +] + +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" + +[[package]] +name = "parking_lot" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc0e1f259c92177c30a4c9d177246edd0a3568b25756a977d0632cf8fa37e905" + +[[package]] +name = "proc-macro2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex-automata" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" +dependencies = [ + "byteorder", +] + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "serde" +version = "1.0.124" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd761ff957cb2a45fbb9ab3da6512de9de55872866160b23c25f1a841e99d29f" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.124" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1800f7693e94e186f5e25a28291ae1570da908aff7d97a095dec1e56ff99069b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "signal-hook-registry" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" + +[[package]] +name = "socket2" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" +dependencies = [ + "cfg-if", + "libc", + "winapi", +] + +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + +[[package]] +name = "strum" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7318c509b5ba57f18533982607f24070a55d353e90d4cae30c467cdb2ad5ac5c" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee8bc6b87a5112aeeab1f4a9f7ab634fe6cbefc4850006df31267f4cfb9e3149" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "syn" +version = "1.0.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fd9d1e9976102a03c542daa2eff1b43f9d72306342f3f8b3ed5fb8908195d6f" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "tokio" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d56477f6ed99e10225f38f9f75f872f29b8b8bd8c0b946f63345bb144e9eeda" +dependencies = [ + "autocfg", + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c535f53c0cfa1acace62995a8994fc9cc1f12d202420da96ff306ee24d576469" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-test" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f58403903e94d4bc56805e46597fced893410b2e753e229d3f7f22423ea03f67" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + +[[package]] +name = "unicode-segmentation" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796" + +[[package]] +name = "unicode-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" + +[[package]] +name = "unicode-xid" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" + +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..18ed3e4 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "act" +version = "0.1.0" +authors = ["Lewis Diamond "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +strum = { version = "0.20.0", features = ["derive"] } +tokio = { version = "1", features = ["full"] } +tokio-stream = "0.1" +async-stream = "0.3" +csv = "1.1.5" +strum_macros = "0.20.1" +clap = "2.33.3" + +[dev-dependencies] +tokio-test = "0.4.0" diff --git a/input.csv b/input.csv new file mode 100644 index 0000000..c014a91 --- /dev/null +++ b/input.csv @@ -0,0 +1,7 @@ +type,client,tx,amount +deposit,1,1,1.0 +deposit,2,2,2.0 +deposit,1,3,2.742 +withdrawal,1,4,1.5 +withdrawal,2,5,3.0 +dispute,2,2, diff --git a/input2.csv b/input2.csv new file mode 100644 index 0000000..74100cb --- /dev/null +++ b/input2.csv @@ -0,0 +1,8 @@ +type,client,tx,amount +deposit,1,1,1.0 +deposit,2,2,2.0 +deposit,1,3,2.742 +withdrawal,1,4,1.5 +withdrawal,2,5,3.0 +dispute,2,2, +resolve,2,2, diff --git a/input3.csv b/input3.csv new file mode 100644 index 0000000..091b071 --- /dev/null +++ b/input3.csv @@ -0,0 +1,8 @@ +type,client,tx,amount +deposit,1,1,1.0 +deposit,2,2,2.0 +deposit,1,3,2.742 +withdrawal,1,4,1.5 +withdrawal,2,5,3.0 +dispute,2,2, +chargeback,2,2, diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..32a9786 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +edition = "2018" diff --git a/src/bin/act.rs b/src/bin/act.rs new file mode 100644 index 0000000..f312471 --- /dev/null +++ b/src/bin/act.rs @@ -0,0 +1,43 @@ +use act::parse::parse; +use act::process::process; +use act::stores::MemActStore; +use act::types::Transaction; +use clap::{App, Arg}; +use std::collections::HashMap; +use std::fs; +use std::io::{stdin, BufRead, BufReader}; +use tokio_stream::StreamExt; + +#[tokio::main] +async fn main() { + let matches = App::new("act") + .version("0.1") + .about("Merges transactions into final account state") + .author("Lewis Diamond") + .arg( + Arg::with_name("input") + .required(false) + .index(1) + .help("Input file, stdin if omitted or -"), + ) + .get_matches(); + + let input: Box = match matches.value_of("input") { + Some("-") | Some("") | None => Box::new(BufReader::new(stdin())), + Some(f) => Box::new(BufReader::new(fs::File::open(f).unwrap())), + }; + + let mut act_store = MemActStore::new(); + let mut tx_store: HashMap = HashMap::new(); + let s = parse(input); + tokio::pin!(s); + while let Some(v) = s.next().await { + process(v, &mut act_store, &mut tx_store); + } + + let mut writer = csv::WriterBuilder::new().from_writer(std::io::stdout()); + + for act in act_store.into_iter() { + writer.serialize(act.1).unwrap(); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3f6e737 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,4 @@ +pub mod parse; +pub mod process; +pub mod stores; +pub mod types; diff --git a/src/parse.rs b/src/parse.rs new file mode 100644 index 0000000..b7708c3 --- /dev/null +++ b/src/parse.rs @@ -0,0 +1,240 @@ +use crate::types::Transaction; +use async_stream::stream; +use std::io::Read; +use tokio_stream::Stream; + +pub fn parse(input: R) -> impl Stream { + let mut reader = csv::ReaderBuilder::new() + .trim(csv::Trim::All) + .flexible(true) + .has_headers(true) + .from_reader(input); + + stream! { + for tx in reader.deserialize() { + match tx { + Ok(tx) => yield tx, + //Depending on the infrastructure, a specific output format + //might be used to add monitoring/alerting + Err(e) => eprintln!("Error reading CSV: {}", e), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::Transaction; + use crate::types::TransactionType; + use tokio_stream::StreamExt; + use tokio_test::block_on; + + #[test] + fn valid_csv_is_parsed() { + block_on(async { + let data = "\ +type,client,tx,amount +deposit,1,1,1.0 +deposit,2,2,2.0 +deposit,1,3,2.0 +withdrawal,1,4,1.5 +withdrawal,2,5,3.0"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Deposit, + amount: 10000, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 2, + tx: 2, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 1, + tx: 3, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 15000, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 30000, + client: 2, + tx: 5, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + assert_eq!(expected, txs); + }); + } + + #[test] + fn valid_csv_with_whitespaces_is_parsed() { + block_on(async { + let data = "\ +type, client, tx, amount +deposit, 1, 1, 1.0 +deposit, 2, 2, 2.0 +deposit, 1, 3, 2.0 +withdrawal, 1, 4, 1.5 +withdrawal, 2, 5, 3.0"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Deposit, + amount: 10000, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 2, + tx: 2, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 1, + tx: 3, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 15000, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 30000, + client: 2, + tx: 5, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + assert_eq!(expected, txs); + }); + } + + #[test] + fn amounts_are_parsed_correctly() { + block_on(async { + let data = "\ +type,client,tx,amount +deposit,1,1,1.0001 +deposit,2,2,2.0010 +deposit,1,3,10.01 +withdrawal,1,4,01.10 +withdrawal,2,5,10.0110101"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Deposit, + amount: 10001, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20010, + client: 2, + tx: 2, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 100100, + client: 1, + tx: 3, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 11000, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 100110, + client: 2, + tx: 5, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + assert_eq!(expected, txs); + }); + } + + #[test] + fn invalid_amounts_are_filtered() { + block_on(async { + let data = "\ +type,client,tx,amount +deposit,1,1,99999999999999999 +deposit,2,2,18446744073709551615 +deposit,1,3,18446744073709551616 +withdrawal,1,4,0 +withdrawal,1,4, +withdrawal,1,4,a +withdrawal,2,5,-1 +withdrawal,1,6,-99999999999999999 +withdrawal,1,6,-18446744073709551615 +withdrawal,1,7,-18446744073709551616"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 0, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 0, + client: 1, + tx: 4, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + + assert_eq!(expected, txs); + }); + } + + #[test] + fn disputes_are_parsed_correctly() { + block_on(async { + let data = "\ +type,client,tx,amount +dispute,1,1, +dispute,1,2,"; + + let expected = vec![ + Transaction { + tx_type: TransactionType::Dispute, + amount: 0, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Dispute, + amount: 0, + client: 1, + tx: 2, + }, + ]; + let txs = parse(data.as_bytes()).collect::>().await; + + assert_eq!(expected, txs); + }); + } +} diff --git a/src/process.rs b/src/process.rs new file mode 100644 index 0000000..ead8e1e --- /dev/null +++ b/src/process.rs @@ -0,0 +1,150 @@ +use crate::{ + stores::ActStore, + types::{Transaction, TransactionType}, +}; +use std::collections::HashMap; + +pub fn process( + t: Transaction, + act_store: &mut dyn ActStore, + tx_store: &mut HashMap, +) { + match t.tx_type { + TransactionType::Deposit => { + act_store.deposit(t.client, t.amount); + tx_store.insert(t.tx, t); + } + TransactionType::Withdrawal => { + act_store.withdraw(t.client, t.amount); + } + TransactionType::Dispute => { + if let Some(orig) = tx_store.get(&t.tx) { + act_store.hold(t.client, orig.amount); + } + } + TransactionType::Resolve => { + if let Some(orig) = tx_store.get(&t.tx) { + act_store.unhold(t.client, orig.amount); + } + } + TransactionType::Chargeback => { + if let Some(orig) = tx_store.get(&t.tx) { + act_store.unhold(t.client, orig.amount); + act_store.withdraw(t.client, orig.amount); + act_store.lock_account(t.client); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stores::MemActStore; + use crate::types::Transaction; + use crate::types::TransactionType; + + #[test] + fn valid_tx_and_over_limit_withdraw() { + let mut act_store: Box = Box::new(MemActStore::new()); + let mut tx_store: HashMap = HashMap::new(); + let txs = vec![ + Transaction { + tx_type: TransactionType::Deposit, + amount: 10000, + client: 1, + tx: 1, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 2, + tx: 2, + }, + Transaction { + tx_type: TransactionType::Deposit, + amount: 20000, + client: 1, + tx: 3, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 15000, + client: 1, + tx: 4, + }, + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 30000, + client: 2, + tx: 5, + }, + ]; + for tx in txs { + process(tx, act_store.as_mut(), &mut tx_store); + } + + let act = act_store.get_account(1).unwrap(); + assert_eq!(0, act.held()); + assert_eq!(15000, act.available()); + + let act = act_store.get_account(2).unwrap(); + assert_eq!(0, act.held()); + assert_eq!(20000, act.available()); + } + + #[test] + fn held_funds() { + let mut act_store: Box = Box::new(MemActStore::new()); + let mut tx_store: HashMap = HashMap::new(); + process( + Transaction { + tx_type: TransactionType::Deposit, + amount: 10000, + client: 1, + tx: 1, + }, + act_store.as_mut(), + &mut tx_store, + ); + process( + Transaction { + tx_type: TransactionType::Dispute, + amount: 0, + client: 1, + tx: 1, + }, + act_store.as_mut(), + &mut tx_store, + ); + let act = act_store.get_account(1).unwrap(); + assert_eq!(10000, act.held()); + assert_eq!(0, act.available()); + process( + Transaction { + tx_type: TransactionType::Withdrawal, + amount: 10000, + client: 1, + tx: 2, + }, + act_store.as_mut(), + &mut tx_store, + ); + let act = act_store.get_account(1).unwrap(); + assert_eq!(10000, act.held()); + assert_eq!(0, act.available()); + process( + Transaction { + tx_type: TransactionType::Resolve, + amount: 0, + client: 1, + tx: 1, + }, + act_store.as_mut(), + &mut tx_store, + ); + let act = act_store.get_account(1).unwrap(); + assert_eq!(0, act.held()); + assert_eq!(10000, act.available()); + } +} diff --git a/src/stores/mem.rs b/src/stores/mem.rs new file mode 100644 index 0000000..423f57e --- /dev/null +++ b/src/stores/mem.rs @@ -0,0 +1,182 @@ +use super::ActStore; +use crate::types::Account; +use std::collections::hash_map::IntoIter; +use std::collections::HashMap; + +pub struct MemActStore(HashMap); + +enum Action { + Withdraw(u64), + Deposit(u64), + Hold(u64), + Unhold(u64), +} + +impl MemActStore { + pub fn new() -> Self { + MemActStore(HashMap::new()) + } + + fn action_act(&mut self, client: u16, action: Action) -> u64 { + let act = self.0.entry(client).or_insert_with(|| Account::new(client)); + match action { + Action::Withdraw(amnt) => act.withdraw(amnt), + Action::Deposit(amnt) => act.deposit(amnt), + Action::Hold(amnt) => act.hold(amnt), + Action::Unhold(amnt) => act.unhold(amnt), + } + } +} + +impl Default for MemActStore { + fn default() -> Self { + Self::new() + } +} + +impl IntoIterator for MemActStore { + type Item = (u16, Account); + + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl ActStore for MemActStore { + fn deposit(&mut self, client: u16, amnt: u64) -> u64 { + self.action_act(client, Action::Deposit(amnt)) + } + + fn withdraw(&mut self, client: u16, amnt: u64) -> u64 { + self.action_act(client, Action::Withdraw(amnt)) + } + + fn hold(&mut self, client: u16, amnt: u64) -> u64 { + self.action_act(client, Action::Hold(amnt)) + } + + fn unhold(&mut self, client: u16, amnt: u64) -> u64 { + self.action_act(client, Action::Unhold(amnt)) + } + + fn lock_account(&mut self, client: u16) -> bool { + if let Some(act) = self.0.get_mut(&client) { + act.lock() + } else { + false + } + } + + fn unlock_account(&mut self, client: u16) -> bool { + if let Some(act) = self.0.get_mut(&client) { + act.unlock() + } else { + false + } + } + + fn get_account(&self, client: u16) -> Option<&Account> { + self.0.get(&client) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_add_balance() { + let mut store = MemActStore::new(); + let client_id = 200; + store.deposit(client_id, 200); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(200, act.id()); + assert_eq!(200, act.available()); + } else { + panic!("Could not get account from store"); + } + let balance = store.deposit(client_id, 50); + if let Some(act) = store.get_account(client_id) { + assert_eq!(200, act.id()); + assert_eq!(250, act.available()); + assert_eq!(250, balance); + } else { + panic!("Could not get account from store"); + } + } + + #[test] + fn test_sub_balance() { + let mut store = MemActStore::new(); + let client_id = 1; + store.deposit(client_id, 200); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert_eq!(200, act.available()); + } else { + panic!("Could not get account from store"); + } + let balance = store.withdraw(client_id, 50); + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert_eq!(150, act.available()); + assert_eq!(150, balance); + } else { + panic!("Could not get account from store"); + } + } + + #[test] + fn test_negative_balance() { + let mut store = MemActStore::new(); + let client_id = 1; + store.withdraw(client_id, 1); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert_eq!(0, act.available()); + } else { + panic!("Could not get account from store"); + } + } + + #[test] + fn test_hold() { + let mut store = MemActStore::new(); + let client_id = 1; + store.deposit(client_id, 100); + + let avail = store.hold(client_id, 10); + assert_eq!(90, avail); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert_eq!(90, act.available()); + } else { + panic!("Could not get account from store"); + } + + assert_eq!(0, store.hold(client_id, 100)); + assert_eq!(10, store.unhold(client_id, 20)); + } + + #[test] + fn test_lock() { + let mut store = MemActStore::new(); + let client_id = 1; + store.deposit(client_id, 100); + let locked = store.lock_account(client_id); + assert!(locked); + + if let Some(act) = store.get_account(client_id) { + assert_eq!(1, act.id()); + assert!(act.is_locked()); + } else { + panic!("Could not get account from store"); + } + } +} diff --git a/src/stores/mod.rs b/src/stores/mod.rs new file mode 100644 index 0000000..a1b277e --- /dev/null +++ b/src/stores/mod.rs @@ -0,0 +1,14 @@ +pub mod mem; +pub use mem::MemActStore; + +use crate::types::Account; + +pub trait ActStore { + fn get_account(&self, client: u16) -> Option<&Account>; + fn deposit(&mut self, client: u16, amnt: u64) -> u64; + fn withdraw(&mut self, client: u16, amnt: u64) -> u64; + fn hold(&mut self, client: u16, amnt: u64) -> u64; + fn unhold(&mut self, client: u16, amnt: u64) -> u64; + fn lock_account(&mut self, client: u16) -> bool; + fn unlock_account(&mut self, client: u16) -> bool; +} diff --git a/src/types/account.rs b/src/types/account.rs new file mode 100644 index 0000000..c3d5d2b --- /dev/null +++ b/src/types/account.rs @@ -0,0 +1,113 @@ +use serde::{Serialize, Serializer}; +const PRECISION: u32 = 4; + +#[derive(Debug, PartialEq)] +pub struct Account { + id: u16, + total: u64, + held: u64, + locked: bool, +} + +impl Account { + pub fn new(client_id: u16) -> Account { + Account { + id: client_id, + total: 0, + held: 0, + locked: false, + } + } + + pub fn with_balance(client_id: u16, seed_balance: u64) -> Account { + Account { + id: client_id, + total: seed_balance, + held: 0, + locked: false, + } + } + + pub fn id(&self) -> u16 { + self.id + } + pub fn available(&self) -> u64 { + self.total.saturating_sub(self.held) + } + pub fn held(&self) -> u64 { + self.held + } + pub fn hold(&mut self, amnt: u64) -> u64 { + if let Some(new_held) = self.held.checked_add(amnt) { + self.held = new_held; + } + self.available() + } + + pub fn unhold(&mut self, amnt: u64) -> u64 { + if let Some(new_held) = self.held.checked_sub(amnt) { + self.held = new_held; + } + self.available() + } + pub fn deposit(&mut self, amnt: u64) -> u64 { + if let Some(new_bal) = self.total.checked_add(amnt) { + self.total = new_bal; + }; + self.available() + } + pub fn withdraw(&mut self, amnt: u64) -> u64 { + if self.available().checked_sub(amnt).is_some() { + self.total -= amnt; + }; + self.available() + } + + pub fn lock(&mut self) -> bool { + self.locked = true; + self.locked + } + + pub fn unlock(&mut self) -> bool { + self.locked = false; + self.locked + } + + pub fn is_locked(&self) -> bool { + self.locked + } +} + +#[derive(Debug, Serialize, PartialEq)] +pub struct AccountSer { + id: u16, + total: String, + held: String, + available: String, + locked: bool, +} + +impl serde::Serialize for Account { + fn serialize(&self, s: S) -> Result + where + S: Serializer, + { + let total = format!( + "{0:.4}", + (self.total as f64) / (10u64.pow(PRECISION) as f64) + ); + let held = format!("{0:.4}", (self.held as f64) / (10u64.pow(PRECISION) as f64)); + let available = format!( + "{0:.4}", + (self.available() as f64) / (10u64.pow(PRECISION) as f64) + ); + let ser = AccountSer { + id: self.id, + total, + held, + available, + locked: self.locked, + }; + ser.serialize(s) + } +} diff --git a/src/types/mod.rs b/src/types/mod.rs new file mode 100644 index 0000000..205ba43 --- /dev/null +++ b/src/types/mod.rs @@ -0,0 +1,5 @@ +pub mod transaction; +pub use transaction::Transaction; +pub use transaction::TransactionType; +pub mod account; +pub use account::Account; diff --git a/src/types/transaction.rs b/src/types/transaction.rs new file mode 100644 index 0000000..102c3c2 --- /dev/null +++ b/src/types/transaction.rs @@ -0,0 +1,49 @@ +use serde::de; +use serde::{Deserialize, Deserializer}; +const PRECISION: u32 = 4; + +#[derive(Eq, PartialEq, Debug, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum TransactionType { + Deposit, + Withdrawal, + Dispute, + Resolve, + Chargeback, +} + +#[derive(Debug, Deserialize, PartialEq)] +pub struct Transaction { + #[serde(rename = "type")] + pub tx_type: TransactionType, + pub client: u16, + pub tx: u32, + /// Amount of the smallest unit, e.g. 0.0001 as per the specification + #[serde(deserialize_with = "de_amount")] + pub amount: u64, +} + +fn de_amount<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + //TODO use something like num_bigint instead + let deserialized = String::deserialize(deserializer)?; + let mut splitted = deserialized.split('.'); + let units = splitted + .next() + .map_or(Ok(0), |v| match v { + "" => Ok(0), + _ => v.parse::(), + }) + .map_err(de::Error::custom)? + .checked_mul(10u64.pow(PRECISION)) + .ok_or_else(|| de::Error::custom("Value too large"))?; + //TODO format! here isn't great! + let dec = splitted + .next() + .map_or(Ok(0u64), |v| format!("{:0<4.4}", v).parse::()) + .map_err(de::Error::custom)?; + + Ok(units + dec) +}