From 016e1d8d760113a64afcc5d516f08010cb566d68 Mon Sep 17 00:00:00 2001 From: fxqnlr Date: Sun, 21 May 2023 13:43:52 +0200 Subject: added multithreaded downloads and progressbar --- src/files.rs | 160 +++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 101 insertions(+), 59 deletions(-) (limited to 'src/files.rs') diff --git a/src/files.rs b/src/files.rs index a4c128e..a4a1d3b 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1,9 +1,11 @@ use futures_util::StreamExt; +use indicatif::{ProgressBar, ProgressStyle, MultiProgress}; use reqwest::Client; +use tokio::task::JoinSet; use std::{ collections::HashMap, fs::{copy, read_dir, remove_file, rename, File}, - io::Write, + io::Write, cmp::min, }; use crate::{ @@ -15,83 +17,123 @@ use crate::{ List, }; -pub async fn download_versions(list: List, config: Cfg, versions: Vec) -> MLE { - let mut cached = get_cached_versions(&config.cache); +pub async fn download_versions(list: List, config: Cfg, versions: Vec) -> MLE<()> { + let cached = get_cached_versions(&config.cache); // println!("{:#?}", cached); - let dl_path = String::from(&list.download_folder); + // println!(" └Download mods to {}", dl_path); + + let mp = MultiProgress::new(); - println!(" └Download mods to {}", dl_path); + let mut js = JoinSet::new(); + let style = ProgressStyle::with_template("{spinner:.green}{msg}\t[{bar:.green/lime}] {bytes}/{total_bytes}") + .unwrap() + .progress_chars("#>-"); for ver in versions { - let project_info = mods_get_info(config.clone(), &ver.project_id)?; - - //Check cache if already downloaded - let c = cached.remove(&ver.id); - if c.is_some() { - print!( - "\t└({})Get version {} from cache", - project_info.title, ver.id - ); - //Force flush of stdout, else print! doesn't print instantly - std::io::stdout().flush()?; - copy_cached_version(&c.unwrap(), &dl_path); - println!(" ✓"); - } else { - print!("\t└({})Download version {}", project_info.title, ver.id); - //Force flush of stdout, else print! doesn't print instantly - std::io::stdout().flush().unwrap(); - let files = ver.files; - let file = match files.clone().into_iter().find(|f| f.primary) { - Some(f) => f, - None => files[0].clone() - }; - let mut splitname: Vec<&str> = file.filename.split('.').collect(); - let extension = match splitname.pop().ok_or("") { - Ok(e) => e, - Err(..) => return Err(MLError::new(ErrorType::Other, "NO_FILE_EXTENSION")), - }; - let filename = format!( - "{}.mr.{}.{}.{}", - splitname.join("."), - ver.project_id, - ver.id, - extension - ); - download_file( - file.url, - list.clone().download_folder, - filename.clone(), - ) - .await?; - println!(" ✓"); - //Copy file to cache - print!("\t └Copy to cache"); - //Force flush of stdout, else print! doesn't print instantly - std::io::stdout().flush().unwrap(); - let dl_path_file = format!("{}/{}", list.download_folder, filename); - let cache_path = format!("{}/{}", &config.clone().cache, filename); - // println!("{}:{}", dl_path_file, cache_path); - copy(dl_path_file, cache_path)?; - println!(" ✓"); - } + let p = mp.add(ProgressBar::new(1)); + p.set_style(style.clone()); + js.spawn(download_version(config.clone(), list.clone(), ver, cached.clone(), p)); + } + + mp.clear().unwrap(); + + while js.join_next().await.is_some() {} + + Ok(()) +} + +async fn download_version(config: Cfg, list: List, version: Version, mut cached: HashMap, progress: ProgressBar) -> MLE<()> { + let project_info = mods_get_info(config.clone(), &version.project_id)?; + + let dl_path = String::from(&list.download_folder); + + progress.set_message(String::from(&version.id)); + + //Check cache if already downloaded + let c = cached.remove(&version.id); + if c.is_some() { + print!( + "\t└({})Get version {} from cache", + project_info.title, version.id + ); + //Force flush of stdout, else print! doesn't print instantly + std::io::stdout().flush()?; + copy_cached_version(&c.unwrap(), &dl_path); + println!(" ✓"); + } else { + // print!("\t└({})Download version {}", project_info.title, version.id); + //Force flush of stdout, else print! doesn't print instantly + std::io::stdout().flush().unwrap(); + let files = version.files; + let file = match files.clone().into_iter().find(|f| f.primary) { + Some(f) => f, + None => files[0].clone() + }; + let mut splitname: Vec<&str> = file.filename.split('.').collect(); + let extension = match splitname.pop().ok_or("") { + Ok(e) => e, + Err(..) => return Err(MLError::new(ErrorType::Other, "NO_FILE_EXTENSION")), + }; + let filename = format!( + "{}.mr.{}.{}.{}", + splitname.join("."), + version.project_id, + version.id, + extension + ); + + download_file( + &file.url, + &list.download_folder, + &filename, + &progress + ) + .await?; + // println!(" ✓"); + //Copy file to cache + // print!("\t └Copy to cache"); + //Force flush of stdout, else print! doesn't print instantly + std::io::stdout().flush().unwrap(); + let dl_path_file = format!("{}/{}", list.download_folder, filename); + let cache_path = format!("{}/{}", &config.clone().cache, filename); + // println!("{}:{}", dl_path_file, cache_path); + copy(dl_path_file, cache_path)?; + // println!(" ✓"); } - Ok(dl_path) + progress.finish_with_message(format!("✓{}", version.id)); + + Ok(()) } -async fn download_file(url: String, path: String, name: String) -> MLE<()> { +async fn download_file(url: &str, path: &str, name: &str, progress: &ProgressBar) -> MLE<()> { let dl_path_file = format!("{}/{}", path, name); - let res = Client::new().get(String::from(&url)).send().await?; + let res = Client::new().get(url).send().await?; + + let size = res.content_length().expect("Couldn't get content length"); + + progress.set_length(size); + // progress.set_style(ProgressStyle::with_template("{spinner:.green}{msg}\t[{wide_bar:.green/lime}] {bytes}/{total_bytes}").unwrap().progress_chars("#>-")); // download chunks let mut file = File::create(&dl_path_file)?; let mut stream = res.bytes_stream(); + let mut downloaded: u64 = 0; + while let Some(item) = stream.next().await { + progress.inc(1); let chunk = item?; file.write_all(&chunk)?; + + // Progress bar + let new = min(downloaded + (chunk.len() as u64), size); + downloaded = new; + progress.set_position(new); + + // std::thread::sleep(std::time::Duration::from_millis(100)); } Ok(()) -- cgit v1.2.3