This commit is contained in:
Lewis Diamond
2020-09-01 16:25:22 -04:00
parent 01a42cebf6
commit 68d0bf7ab2
2 changed files with 54 additions and 35 deletions

View File

@@ -1,15 +1,17 @@
use crate::message::Message; use crate::message::Message;
use crate::stores::{IMessageSearcher, IMessageStorage, IMessageStore, MessageStoreError}; use crate::stores::{IMessageSearcher, IMessageStorage, IMessageStore, MessageStoreError};
use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::future::join_all;
use log::error; use log::error;
use maildir::{MailEntry, Maildir}; use maildir::{MailEntry, Maildir};
use pbr::{MultiBar, ProgressBar}; use pbr::{MultiBar, ProgressBar};
use rayon::prelude::*;
use std::collections::HashSet; use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::mpsc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task;
pub struct MessageStore { pub struct MessageStore {
pub searcher: Box<dyn IMessageSearcher>, pub searcher: Box<dyn IMessageSearcher>,
@@ -17,6 +19,7 @@ pub struct MessageStore {
progress: Option<ProgressBar<pbr::Pipe>>, progress: Option<ProgressBar<pbr::Pipe>>,
display_progress: bool, display_progress: bool,
} }
#[async_trait]
impl IMessageStore for MessageStore { impl IMessageStore for MessageStore {
fn get_message(&self, id: String) -> Result<Option<Message>, MessageStoreError> { fn get_message(&self, id: String) -> Result<Option<Message>, MessageStoreError> {
self.searcher.get_message(id) self.searcher.get_message(id)
@@ -32,8 +35,8 @@ impl IMessageStore for MessageStore {
Ok(id) Ok(id)
} }
fn add_maildir(&mut self, path: PathBuf, all: bool) -> Result<usize, MessageStoreError> { async fn add_maildir(&mut self, path: PathBuf, all: bool) -> Result<usize, MessageStoreError> {
self.index_mails(path, all) self.index_mails(path, all).await
} }
fn tag_message_id( fn tag_message_id(
&mut self, &mut self,
@@ -135,7 +138,28 @@ impl MessageStore {
} }
} }
fn do_index_mails(&mut self, maildir: Maildir, full: bool) -> Result<usize, MessageStoreError> { async fn parse_and_add_message(
mail: MailEntry,
) -> Result<(Message, String), MessageStoreError> {
let message = Message::from_mailentry(mail);
match message {
Ok(msg) => {
let parsed_body = msg.get_body().as_text();
Ok((msg, parsed_body))
}
Err(err) => {
error!("A message could not be parsed: {}", err.message);
Err(MessageStoreError::CouldNotAddMessage(
"Failed to parse email".to_string(),
))
}
}
}
async fn do_index_mails(
&mut self,
maildir: Maildir,
full: bool,
) -> Result<usize, MessageStoreError> {
let mails: Vec<Result<MailEntry, _>> = Self::mail_iterator(&maildir, full).collect(); let mails: Vec<Result<MailEntry, _>> = Self::mail_iterator(&maildir, full).collect();
let count = mails.len(); let count = mails.len();
self.start_indexing_process(count)?; self.start_indexing_process(count)?;
@@ -143,37 +167,26 @@ impl MessageStore {
if self.display_progress { if self.display_progress {
progress_handle = Some(self.init_progress(count)); progress_handle = Some(self.init_progress(count));
} }
let (tx, rx) = mpsc::channel(); let (tx, mut rx) = mpsc::channel(100);
let handles = mails
let t = thread::spawn(move || { .into_iter()
mails .map(|m| {
.into_par_iter() let mut tx = tx.clone();
.for_each_with(tx, |tx, msg| match msg { task::spawn(async move {
Ok(unparsed_msg) => { if let Ok(msg) = m {
let message = Message::from_mailentry(unparsed_msg); if let Ok((msg, body)) = MessageStore::parse_and_add_message(msg).await {
match message { tx.send((msg, body)).await;
Ok(msg) => { };
let parsed_body = msg.get_body().as_text();
tx.send((msg, parsed_body))
.expect("Could not send to channel?")
} }
Err(err) => { })
error!("A message could not be parsed: {}", err.message); })
} .collect::<Vec<task::JoinHandle<_>>>();
} while let Some((msg, parsed_body)) = rx.recv().await {
}
Err(e) => {
error!("Failed to get message {}", e);
}
});
});
while let Ok((msg, parsed_body)) = rx.recv() {
self.add_message(msg, parsed_body)?; self.add_message(msg, parsed_body)?;
self.inc_progress();
} }
join_all(handles).await;
self.finish_indexing_process()?; self.finish_indexing_process()?;
t.join().expect("Unable to join threads for some reason"); self.inc_progress();
self.finish_progress(); self.finish_progress();
if let Some(handle) = progress_handle { if let Some(handle) = progress_handle {
handle handle
@@ -183,11 +196,15 @@ impl MessageStore {
Ok(count) Ok(count)
} }
pub fn index_mails(&mut self, path: PathBuf, full: bool) -> Result<usize, MessageStoreError> { pub async fn index_mails(
&mut self,
path: PathBuf,
full: bool,
) -> Result<usize, MessageStoreError> {
let maildir = self.maildir(path); let maildir = self.maildir(path);
match maildir { match maildir {
Ok(maildir) => { Ok(maildir) => {
self.do_index_mails(maildir, full)?; self.do_index_mails(maildir, full).await?;
Ok(1) Ok(1)
} }
Err(_) => Err(MessageStoreError::CouldNotOpenMaildir( Err(_) => Err(MessageStoreError::CouldNotOpenMaildir(

View File

@@ -1,4 +1,5 @@
use crate::message::Message; use crate::message::Message;
use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use std::collections::HashSet; use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;
@@ -192,6 +193,7 @@ pub trait IMessageStorage {
) -> Result<Vec<Message>, MessageStoreError>; ) -> Result<Vec<Message>, MessageStoreError>;
} }
#[async_trait]
pub trait IMessageStore { pub trait IMessageStore {
fn get_message(&self, id: String) -> Result<Option<Message>, MessageStoreError>; fn get_message(&self, id: String) -> Result<Option<Message>, MessageStoreError>;
fn add_message( fn add_message(
@@ -199,7 +201,7 @@ pub trait IMessageStore {
msg: Message, msg: Message,
parsed_body: String, parsed_body: String,
) -> Result<String, MessageStoreError>; ) -> Result<String, MessageStoreError>;
fn add_maildir(&mut self, path: PathBuf, all: bool) -> Result<usize, MessageStoreError>; async fn add_maildir(&mut self, path: PathBuf, all: bool) -> Result<usize, MessageStoreError>;
fn tag_message_id( fn tag_message_id(
&mut self, &mut self,
id: String, id: String,