Not fully tested, mostly working

This commit is contained in:
Lewis Diamond 2021-03-14 16:08:26 -04:00
commit ae6868975f
16 changed files with 1379 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

534
Cargo.lock generated Normal file
View File

@ -0,0 +1,534 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "act"
version = "0.1.0"
dependencies = [
"async-stream",
"clap",
"csv",
"serde",
"strum",
"strum_macros",
"tokio",
"tokio-stream",
"tokio-test",
]
[[package]]
name = "ansi_term"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
dependencies = [
"winapi",
]
[[package]]
name = "async-stream"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3670df70cbc01729f901f94c887814b3c68db038aad1329a418bae178bc5295c"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3548b8efc9f8e8a5a0a2808c5bd8451a9031b9e5b879a79590304ae928b0a70"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bstr"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a40b47ad93e1a5404e6c18dec46b628214fee441c70f4ab5d6942142cc268a3d"
dependencies = [
"lazy_static",
"memchr",
"regex-automata",
"serde",
]
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [
"ansi_term",
"atty",
"bitflags",
"strsim",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
name = "csv"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [
"bstr",
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
dependencies = [
"memchr",
]
[[package]]
name = "futures-core"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94"
[[package]]
name = "heck"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if",
]
[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03b07a082330a35e43f63177cc01689da34fbffa0105e1246cf0311472cac73a"
[[package]]
name = "lock_api"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if",
]
[[package]]
name = "memchr"
version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "mio"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5dede4e2065b3842b8b0af444119f3aa331cc7cc2dd20388bfb0f5d5a38823a"
dependencies = [
"libc",
"log",
"miow",
"ntapi",
"winapi",
]
[[package]]
name = "miow"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
dependencies = [
"socket2",
"winapi",
]
[[package]]
name = "ntapi"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
dependencies = [
"winapi",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "once_cell"
version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
[[package]]
name = "parking_lot"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi",
]
[[package]]
name = "pin-project-lite"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc0e1f259c92177c30a4c9d177246edd0a3568b25756a977d0632cf8fa37e905"
[[package]]
name = "proc-macro2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
dependencies = [
"unicode-xid",
]
[[package]]
name = "quote"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7"
dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9"
dependencies = [
"bitflags",
]
[[package]]
name = "regex-automata"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
dependencies = [
"byteorder",
]
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd761ff957cb2a45fbb9ab3da6512de9de55872866160b23c25f1a841e99d29f"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1800f7693e94e186f5e25a28291ae1570da908aff7d97a095dec1e56ff99069b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "signal-hook-registry"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6"
dependencies = [
"libc",
]
[[package]]
name = "smallvec"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if",
"libc",
"winapi",
]
[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "strum"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7318c509b5ba57f18533982607f24070a55d353e90d4cae30c467cdb2ad5ac5c"
dependencies = [
"strum_macros",
]
[[package]]
name = "strum_macros"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee8bc6b87a5112aeeab1f4a9f7ab634fe6cbefc4850006df31267f4cfb9e3149"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "syn"
version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fd9d1e9976102a03c542daa2eff1b43f9d72306342f3f8b3ed5fb8908195d6f"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
]
[[package]]
name = "tokio"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d56477f6ed99e10225f38f9f75f872f29b8b8bd8c0b946f63345bb144e9eeda"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",
"mio",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-macros"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c535f53c0cfa1acace62995a8994fc9cc1f12d202420da96ff306ee24d576469"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-test"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f58403903e94d4bc56805e46597fced893410b2e753e229d3f7f22423ea03f67"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "unicode-segmentation"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796"
[[package]]
name = "unicode-width"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"

20
Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "act"
version = "0.1.0"
authors = ["Lewis Diamond <git@lewisdiamond.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = { version = "1.0", features = ["derive"] }
strum = { version = "0.20.0", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
async-stream = "0.3"
csv = "1.1.5"
strum_macros = "0.20.1"
clap = "2.33.3"
[dev-dependencies]
tokio-test = "0.4.0"

7
input.csv Normal file
View File

@ -0,0 +1,7 @@
type,client,tx,amount
deposit,1,1,1.0
deposit,2,2,2.0
deposit,1,3,2.742
withdrawal,1,4,1.5
withdrawal,2,5,3.0
dispute,2,2,
1 type client tx amount
2 deposit 1 1 1.0
3 deposit 2 2 2.0
4 deposit 1 3 2.742
5 withdrawal 1 4 1.5
6 withdrawal 2 5 3.0
7 dispute 2 2

8
input2.csv Normal file
View File

@ -0,0 +1,8 @@
type,client,tx,amount
deposit,1,1,1.0
deposit,2,2,2.0
deposit,1,3,2.742
withdrawal,1,4,1.5
withdrawal,2,5,3.0
dispute,2,2,
resolve,2,2,
1 type client tx amount
2 deposit 1 1 1.0
3 deposit 2 2 2.0
4 deposit 1 3 2.742
5 withdrawal 1 4 1.5
6 withdrawal 2 5 3.0
7 dispute 2 2
8 resolve 2 2

8
input3.csv Normal file
View File

@ -0,0 +1,8 @@
type,client,tx,amount
deposit,1,1,1.0
deposit,2,2,2.0
deposit,1,3,2.742
withdrawal,1,4,1.5
withdrawal,2,5,3.0
dispute,2,2,
chargeback,2,2,
1 type client tx amount
2 deposit 1 1 1.0
3 deposit 2 2 2.0
4 deposit 1 3 2.742
5 withdrawal 1 4 1.5
6 withdrawal 2 5 3.0
7 dispute 2 2
8 chargeback 2 2

1
rustfmt.toml Normal file
View File

@ -0,0 +1 @@
edition = "2018"

43
src/bin/act.rs Normal file
View File

@ -0,0 +1,43 @@
use act::parse::parse;
use act::process::process;
use act::stores::MemActStore;
use act::types::Transaction;
use clap::{App, Arg};
use std::collections::HashMap;
use std::fs;
use std::io::{stdin, BufRead, BufReader};
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
let matches = App::new("act")
.version("0.1")
.about("Merges transactions into final account state")
.author("Lewis Diamond")
.arg(
Arg::with_name("input")
.required(false)
.index(1)
.help("Input file, stdin if omitted or -"),
)
.get_matches();
let input: Box<dyn BufRead> = match matches.value_of("input") {
Some("-") | Some("") | None => Box::new(BufReader::new(stdin())),
Some(f) => Box::new(BufReader::new(fs::File::open(f).unwrap())),
};
let mut act_store = MemActStore::new();
let mut tx_store: HashMap<u32, Transaction> = HashMap::new();
let s = parse(input);
tokio::pin!(s);
while let Some(v) = s.next().await {
process(v, &mut act_store, &mut tx_store);
}
let mut writer = csv::WriterBuilder::new().from_writer(std::io::stdout());
for act in act_store.into_iter() {
writer.serialize(act.1).unwrap();
}
}

4
src/lib.rs Normal file
View File

@ -0,0 +1,4 @@
pub mod parse;
pub mod process;
pub mod stores;
pub mod types;

240
src/parse.rs Normal file
View File

@ -0,0 +1,240 @@
use crate::types::Transaction;
use async_stream::stream;
use std::io::Read;
use tokio_stream::Stream;
pub fn parse<R: Read>(input: R) -> impl Stream<Item = Transaction> {
let mut reader = csv::ReaderBuilder::new()
.trim(csv::Trim::All)
.flexible(true)
.has_headers(true)
.from_reader(input);
stream! {
for tx in reader.deserialize() {
match tx {
Ok(tx) => yield tx,
//Depending on the infrastructure, a specific output format
//might be used to add monitoring/alerting
Err(e) => eprintln!("Error reading CSV: {}", e),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::Transaction;
use crate::types::TransactionType;
use tokio_stream::StreamExt;
use tokio_test::block_on;
#[test]
fn valid_csv_is_parsed() {
block_on(async {
let data = "\
type,client,tx,amount
deposit,1,1,1.0
deposit,2,2,2.0
deposit,1,3,2.0
withdrawal,1,4,1.5
withdrawal,2,5,3.0";
let expected = vec![
Transaction {
tx_type: TransactionType::Deposit,
amount: 10000,
client: 1,
tx: 1,
},
Transaction {
tx_type: TransactionType::Deposit,
amount: 20000,
client: 2,
tx: 2,
},
Transaction {
tx_type: TransactionType::Deposit,
amount: 20000,
client: 1,
tx: 3,
},
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 15000,
client: 1,
tx: 4,
},
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 30000,
client: 2,
tx: 5,
},
];
let txs = parse(data.as_bytes()).collect::<Vec<Transaction>>().await;
assert_eq!(expected, txs);
});
}
#[test]
fn valid_csv_with_whitespaces_is_parsed() {
block_on(async {
let data = "\
type, client, tx, amount
deposit, 1, 1, 1.0
deposit, 2, 2, 2.0
deposit, 1, 3, 2.0
withdrawal, 1, 4, 1.5
withdrawal, 2, 5, 3.0";
let expected = vec![
Transaction {
tx_type: TransactionType::Deposit,
amount: 10000,
client: 1,
tx: 1,
},
Transaction {
tx_type: TransactionType::Deposit,
amount: 20000,
client: 2,
tx: 2,
},
Transaction {
tx_type: TransactionType::Deposit,
amount: 20000,
client: 1,
tx: 3,
},
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 15000,
client: 1,
tx: 4,
},
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 30000,
client: 2,
tx: 5,
},
];
let txs = parse(data.as_bytes()).collect::<Vec<Transaction>>().await;
assert_eq!(expected, txs);
});
}
#[test]
fn amounts_are_parsed_correctly() {
block_on(async {
let data = "\
type,client,tx,amount
deposit,1,1,1.0001
deposit,2,2,2.0010
deposit,1,3,10.01
withdrawal,1,4,01.10
withdrawal,2,5,10.0110101";
let expected = vec![
Transaction {
tx_type: TransactionType::Deposit,
amount: 10001,
client: 1,
tx: 1,
},
Transaction {
tx_type: TransactionType::Deposit,
amount: 20010,
client: 2,
tx: 2,
},
Transaction {
tx_type: TransactionType::Deposit,
amount: 100100,
client: 1,
tx: 3,
},
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 11000,
client: 1,
tx: 4,
},
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 100110,
client: 2,
tx: 5,
},
];
let txs = parse(data.as_bytes()).collect::<Vec<Transaction>>().await;
assert_eq!(expected, txs);
});
}
#[test]
fn invalid_amounts_are_filtered() {
block_on(async {
let data = "\
type,client,tx,amount
deposit,1,1,99999999999999999
deposit,2,2,18446744073709551615
deposit,1,3,18446744073709551616
withdrawal,1,4,0
withdrawal,1,4,
withdrawal,1,4,a
withdrawal,2,5,-1
withdrawal,1,6,-99999999999999999
withdrawal,1,6,-18446744073709551615
withdrawal,1,7,-18446744073709551616";
let expected = vec![
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 0,
client: 1,
tx: 4,
},
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 0,
client: 1,
tx: 4,
},
];
let txs = parse(data.as_bytes()).collect::<Vec<Transaction>>().await;
assert_eq!(expected, txs);
});
}
#[test]
fn disputes_are_parsed_correctly() {
block_on(async {
let data = "\
type,client,tx,amount
dispute,1,1,
dispute,1,2,";
let expected = vec![
Transaction {
tx_type: TransactionType::Dispute,
amount: 0,
client: 1,
tx: 1,
},
Transaction {
tx_type: TransactionType::Dispute,
amount: 0,
client: 1,
tx: 2,
},
];
let txs = parse(data.as_bytes()).collect::<Vec<Transaction>>().await;
assert_eq!(expected, txs);
});
}
}

150
src/process.rs Normal file
View File

@ -0,0 +1,150 @@
use crate::{
stores::ActStore,
types::{Transaction, TransactionType},
};
use std::collections::HashMap;
pub fn process(
t: Transaction,
act_store: &mut dyn ActStore,
tx_store: &mut HashMap<u32, Transaction>,
) {
match t.tx_type {
TransactionType::Deposit => {
act_store.deposit(t.client, t.amount);
tx_store.insert(t.tx, t);
}
TransactionType::Withdrawal => {
act_store.withdraw(t.client, t.amount);
}
TransactionType::Dispute => {
if let Some(orig) = tx_store.get(&t.tx) {
act_store.hold(t.client, orig.amount);
}
}
TransactionType::Resolve => {
if let Some(orig) = tx_store.get(&t.tx) {
act_store.unhold(t.client, orig.amount);
}
}
TransactionType::Chargeback => {
if let Some(orig) = tx_store.get(&t.tx) {
act_store.unhold(t.client, orig.amount);
act_store.withdraw(t.client, orig.amount);
act_store.lock_account(t.client);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stores::MemActStore;
use crate::types::Transaction;
use crate::types::TransactionType;
#[test]
fn valid_tx_and_over_limit_withdraw() {
let mut act_store: Box<dyn ActStore> = Box::new(MemActStore::new());
let mut tx_store: HashMap<u32, Transaction> = HashMap::new();
let txs = vec![
Transaction {
tx_type: TransactionType::Deposit,
amount: 10000,
client: 1,
tx: 1,
},
Transaction {
tx_type: TransactionType::Deposit,
amount: 20000,
client: 2,
tx: 2,
},
Transaction {
tx_type: TransactionType::Deposit,
amount: 20000,
client: 1,
tx: 3,
},
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 15000,
client: 1,
tx: 4,
},
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 30000,
client: 2,
tx: 5,
},
];
for tx in txs {
process(tx, act_store.as_mut(), &mut tx_store);
}
let act = act_store.get_account(1).unwrap();
assert_eq!(0, act.held());
assert_eq!(15000, act.available());
let act = act_store.get_account(2).unwrap();
assert_eq!(0, act.held());
assert_eq!(20000, act.available());
}
#[test]
fn held_funds() {
let mut act_store: Box<dyn ActStore> = Box::new(MemActStore::new());
let mut tx_store: HashMap<u32, Transaction> = HashMap::new();
process(
Transaction {
tx_type: TransactionType::Deposit,
amount: 10000,
client: 1,
tx: 1,
},
act_store.as_mut(),
&mut tx_store,
);
process(
Transaction {
tx_type: TransactionType::Dispute,
amount: 0,
client: 1,
tx: 1,
},
act_store.as_mut(),
&mut tx_store,
);
let act = act_store.get_account(1).unwrap();
assert_eq!(10000, act.held());
assert_eq!(0, act.available());
process(
Transaction {
tx_type: TransactionType::Withdrawal,
amount: 10000,
client: 1,
tx: 2,
},
act_store.as_mut(),
&mut tx_store,
);
let act = act_store.get_account(1).unwrap();
assert_eq!(10000, act.held());
assert_eq!(0, act.available());
process(
Transaction {
tx_type: TransactionType::Resolve,
amount: 0,
client: 1,
tx: 1,
},
act_store.as_mut(),
&mut tx_store,
);
let act = act_store.get_account(1).unwrap();
assert_eq!(0, act.held());
assert_eq!(10000, act.available());
}
}

182
src/stores/mem.rs Normal file
View File

@ -0,0 +1,182 @@
use super::ActStore;
use crate::types::Account;
use std::collections::hash_map::IntoIter;
use std::collections::HashMap;
pub struct MemActStore(HashMap<u16, Account>);
enum Action {
Withdraw(u64),
Deposit(u64),
Hold(u64),
Unhold(u64),
}
impl MemActStore {
pub fn new() -> Self {
MemActStore(HashMap::new())
}
fn action_act(&mut self, client: u16, action: Action) -> u64 {
let act = self.0.entry(client).or_insert_with(|| Account::new(client));
match action {
Action::Withdraw(amnt) => act.withdraw(amnt),
Action::Deposit(amnt) => act.deposit(amnt),
Action::Hold(amnt) => act.hold(amnt),
Action::Unhold(amnt) => act.unhold(amnt),
}
}
}
impl Default for MemActStore {
fn default() -> Self {
Self::new()
}
}
impl IntoIterator for MemActStore {
type Item = (u16, Account);
type IntoIter = IntoIter<u16, Account>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl ActStore for MemActStore {
fn deposit(&mut self, client: u16, amnt: u64) -> u64 {
self.action_act(client, Action::Deposit(amnt))
}
fn withdraw(&mut self, client: u16, amnt: u64) -> u64 {
self.action_act(client, Action::Withdraw(amnt))
}
fn hold(&mut self, client: u16, amnt: u64) -> u64 {
self.action_act(client, Action::Hold(amnt))
}
fn unhold(&mut self, client: u16, amnt: u64) -> u64 {
self.action_act(client, Action::Unhold(amnt))
}
fn lock_account(&mut self, client: u16) -> bool {
if let Some(act) = self.0.get_mut(&client) {
act.lock()
} else {
false
}
}
fn unlock_account(&mut self, client: u16) -> bool {
if let Some(act) = self.0.get_mut(&client) {
act.unlock()
} else {
false
}
}
fn get_account(&self, client: u16) -> Option<&Account> {
self.0.get(&client)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_add_balance() {
let mut store = MemActStore::new();
let client_id = 200;
store.deposit(client_id, 200);
if let Some(act) = store.get_account(client_id) {
assert_eq!(200, act.id());
assert_eq!(200, act.available());
} else {
panic!("Could not get account from store");
}
let balance = store.deposit(client_id, 50);
if let Some(act) = store.get_account(client_id) {
assert_eq!(200, act.id());
assert_eq!(250, act.available());
assert_eq!(250, balance);
} else {
panic!("Could not get account from store");
}
}
#[test]
fn test_sub_balance() {
let mut store = MemActStore::new();
let client_id = 1;
store.deposit(client_id, 200);
if let Some(act) = store.get_account(client_id) {
assert_eq!(1, act.id());
assert_eq!(200, act.available());
} else {
panic!("Could not get account from store");
}
let balance = store.withdraw(client_id, 50);
if let Some(act) = store.get_account(client_id) {
assert_eq!(1, act.id());
assert_eq!(150, act.available());
assert_eq!(150, balance);
} else {
panic!("Could not get account from store");
}
}
#[test]
fn test_negative_balance() {
let mut store = MemActStore::new();
let client_id = 1;
store.withdraw(client_id, 1);
if let Some(act) = store.get_account(client_id) {
assert_eq!(1, act.id());
assert_eq!(0, act.available());
} else {
panic!("Could not get account from store");
}
}
#[test]
fn test_hold() {
let mut store = MemActStore::new();
let client_id = 1;
store.deposit(client_id, 100);
let avail = store.hold(client_id, 10);
assert_eq!(90, avail);
if let Some(act) = store.get_account(client_id) {
assert_eq!(1, act.id());
assert_eq!(90, act.available());
} else {
panic!("Could not get account from store");
}
assert_eq!(0, store.hold(client_id, 100));
assert_eq!(10, store.unhold(client_id, 20));
}
#[test]
fn test_lock() {
let mut store = MemActStore::new();
let client_id = 1;
store.deposit(client_id, 100);
let locked = store.lock_account(client_id);
assert!(locked);
if let Some(act) = store.get_account(client_id) {
assert_eq!(1, act.id());
assert!(act.is_locked());
} else {
panic!("Could not get account from store");
}
}
}

14
src/stores/mod.rs Normal file
View File

@ -0,0 +1,14 @@
pub mod mem;
pub use mem::MemActStore;
use crate::types::Account;
pub trait ActStore {
fn get_account(&self, client: u16) -> Option<&Account>;
fn deposit(&mut self, client: u16, amnt: u64) -> u64;
fn withdraw(&mut self, client: u16, amnt: u64) -> u64;
fn hold(&mut self, client: u16, amnt: u64) -> u64;
fn unhold(&mut self, client: u16, amnt: u64) -> u64;
fn lock_account(&mut self, client: u16) -> bool;
fn unlock_account(&mut self, client: u16) -> bool;
}

113
src/types/account.rs Normal file
View File

@ -0,0 +1,113 @@
use serde::{Serialize, Serializer};
const PRECISION: u32 = 4;
#[derive(Debug, PartialEq)]
pub struct Account {
id: u16,
total: u64,
held: u64,
locked: bool,
}
impl Account {
pub fn new(client_id: u16) -> Account {
Account {
id: client_id,
total: 0,
held: 0,
locked: false,
}
}
pub fn with_balance(client_id: u16, seed_balance: u64) -> Account {
Account {
id: client_id,
total: seed_balance,
held: 0,
locked: false,
}
}
pub fn id(&self) -> u16 {
self.id
}
pub fn available(&self) -> u64 {
self.total.saturating_sub(self.held)
}
pub fn held(&self) -> u64 {
self.held
}
pub fn hold(&mut self, amnt: u64) -> u64 {
if let Some(new_held) = self.held.checked_add(amnt) {
self.held = new_held;
}
self.available()
}
pub fn unhold(&mut self, amnt: u64) -> u64 {
if let Some(new_held) = self.held.checked_sub(amnt) {
self.held = new_held;
}
self.available()
}
pub fn deposit(&mut self, amnt: u64) -> u64 {
if let Some(new_bal) = self.total.checked_add(amnt) {
self.total = new_bal;
};
self.available()
}
pub fn withdraw(&mut self, amnt: u64) -> u64 {
if self.available().checked_sub(amnt).is_some() {
self.total -= amnt;
};
self.available()
}
pub fn lock(&mut self) -> bool {
self.locked = true;
self.locked
}
pub fn unlock(&mut self) -> bool {
self.locked = false;
self.locked
}
pub fn is_locked(&self) -> bool {
self.locked
}
}
#[derive(Debug, Serialize, PartialEq)]
pub struct AccountSer {
id: u16,
total: String,
held: String,
available: String,
locked: bool,
}
impl serde::Serialize for Account {
fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let total = format!(
"{0:.4}",
(self.total as f64) / (10u64.pow(PRECISION) as f64)
);
let held = format!("{0:.4}", (self.held as f64) / (10u64.pow(PRECISION) as f64));
let available = format!(
"{0:.4}",
(self.available() as f64) / (10u64.pow(PRECISION) as f64)
);
let ser = AccountSer {
id: self.id,
total,
held,
available,
locked: self.locked,
};
ser.serialize(s)
}
}

5
src/types/mod.rs Normal file
View File

@ -0,0 +1,5 @@
pub mod transaction;
pub use transaction::Transaction;
pub use transaction::TransactionType;
pub mod account;
pub use account::Account;

49
src/types/transaction.rs Normal file
View File

@ -0,0 +1,49 @@
use serde::de;
use serde::{Deserialize, Deserializer};
const PRECISION: u32 = 4;
#[derive(Eq, PartialEq, Debug, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TransactionType {
Deposit,
Withdrawal,
Dispute,
Resolve,
Chargeback,
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct Transaction {
#[serde(rename = "type")]
pub tx_type: TransactionType,
pub client: u16,
pub tx: u32,
/// Amount of the smallest unit, e.g. 0.0001 as per the specification
#[serde(deserialize_with = "de_amount")]
pub amount: u64,
}
fn de_amount<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
//TODO use something like num_bigint instead
let deserialized = String::deserialize(deserializer)?;
let mut splitted = deserialized.split('.');
let units = splitted
.next()
.map_or(Ok(0), |v| match v {
"" => Ok(0),
_ => v.parse::<u64>(),
})
.map_err(de::Error::custom)?
.checked_mul(10u64.pow(PRECISION))
.ok_or_else(|| de::Error::custom("Value too large"))?;
//TODO format! here isn't great!
let dec = splitted
.next()
.map_or(Ok(0u64), |v| format!("{:0<4.4}", v).parse::<u64>())
.map_err(de::Error::custom)?;
Ok(units + dec)
}