commit 6116f744308210e8f722a63b08a5d714ec7b7c40 from: murilo ijanc date: Tue Nov 18 18:42:18 2025 UTC Add users to groups commit - 159646d3ac75937fe0b83a2f97f52ce9418510eb commit + 6116f744308210e8f722a63b08a5d714ec7b7c40 blob - ea8c4bf7f35f6f77f75d92ad8ce8349f6e81ddba blob + ed7cf036e55b15528829f63580e1131885dabf1d --- .gitignore +++ .gitignore @@ -1 +1,2 @@ /target +.mail_sync blob - c01caf510419879c5bc6f7cbec1102e9df4669f0 blob + ce03734d18a04a393b07ddb9f8f2fa8f46efcdff --- Cargo.lock +++ Cargo.lock @@ -1273,6 +1273,15 @@ source = "registry+https://github.com/rust-lang/crates checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" [[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + +[[package]] name = "log" version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1378,6 +1387,29 @@ source = "registry+https://github.com/rust-lang/crates checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" [[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + +[[package]] name = "percent-encoding" version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1445,6 +1477,15 @@ source = "registry+https://github.com/rust-lang/crates checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" [[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + +[[package]] name = "regex" version = "1.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1620,6 +1661,12 @@ dependencies = [ ] [[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] name = "sct" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1866,6 +1913,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.6.1", blob - 9ad96f231a7da19d7f9ad5121831913f19159994 blob + 2221b8157522e67a09fad9f0c67101f15f5aff01 --- Cargo.toml +++ Cargo.toml @@ -14,7 +14,7 @@ anyhow = "1.0.100" aws-config = { version = "1.8.10", features = ["behavior-version-latest"] } aws-sdk-cognitoidentityprovider = "1.103.0" clap = { version = "4.5.52", features = ["derive", "env"] } -tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "fs", "io-std"] } +tokio = { version = "1.48.0", features = ["full"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } blob - 896804d1a7e2f3f51bae5bad0af3dfab9a297776 blob + aaac2ae2d7af614e1b350ea0fe82a5934b1aaba6 --- src/main.rs +++ src/main.rs @@ -21,22 +21,25 @@ // accept timeout // accept poolid - mod helper; +use std::collections::HashMap; +use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; -use clap::{ArgAction, Parser, Subcommand}; -use tracing::{debug, info}; -use tracing_subscriber::EnvFilter; -use tokio::fs::File; -use tokio::io::{self, AsyncWrite, AsyncWriteExt}; use aws_sdk_cognitoidentityprovider::Client as CognitoClient; use aws_sdk_cognitoidentityprovider::types::UserType; +use clap::{ArgAction, Parser, Subcommand}; +use tokio::fs::File; +use tokio::io::{self, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::sync::Semaphore; +use tokio::task::JoinHandle; +use tracing::{debug, error, info}; +use tracing_subscriber::EnvFilter; - const LONG_VERSION: &str = concat!( env!("CARGO_PKG_NAME"), " ", @@ -81,12 +84,75 @@ enum Commands { Sync(SyncArgs), /// Add users to one or more Cognito groups. - Add(GroupOperationArgs), + Add(AddArgs), /// Remove users from one or more Cognito groups. Del(GroupOperationArgs), } +/// Arguments for the `add` operation. +/// +/// High-level flow: +/// - `sync_file` is the CSV produced by `sync` (username,email). +/// - `emails_file` is a plain text file with one e-mail per line, +/// representing the users that should be added to the given groups. +/// - `groups` is the list of Cognito group names. +/// - `pool_id` is the Cognito User Pool ID. +/// - `concurrency` defines how many users are processed in parallel. +#[derive(Debug, Parser)] +pub struct AddArgs { + /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX). + #[arg(long = "pool-id")] + pub pool_id: String, + + /// CSV file generated by the `sync` command (username,email). + /// + /// This file is loaded into a HashMap and used + /// as the source of truth for resolving usernames from e-mail. + #[arg( + long = "sync-file", + value_name = "CSV_PATH", + help = "CSV file produced by `sync` containing username,email columns" + )] + pub sync_file: PathBuf, + + /// Text file containing one e-mail per line. + /// + /// Each e-mail will be normalized (trim + lowercase) and then + /// looked up in the sync CSV map to obtain the corresponding username. + #[arg( + long = "emails-file", + value_name = "TXT_PATH", + help = "Plain text file with one e-mail per line to be added to the groups" + )] + pub emails_file: PathBuf, + + /// One or more Cognito group names. + /// + /// All resolved users will be added to every group listed here. + #[arg( + long = "group", + alias = "groups", + value_name = "GROUP", + num_args = 1.., + required = true + )] + pub groups: Vec, + + /// Maximum number of users to process concurrently. + /// + /// This controls how many users are handled in parallel when calling + /// the AdminAddUserToGroup API. + #[arg(long = "concurrency", value_name = "N", default_value_t = 4)] + pub concurrency: usize, + + /// Global timeout for the operation, in seconds. + /// + /// When set, the entire add operation is bounded by this timeout. + #[arg(long = "timeout", value_name = "SECONDS")] + pub timeout: Option, +} + /// Common arguments shared by group-based operations. #[derive(clap::Args, Debug, Clone)] pub struct CommonOperationArgs { @@ -175,6 +241,11 @@ struct GroupOperationArgs { #[tokio::main] async fn main() -> Result<()> { + // let users = read_sync_file_to_map(".mail_sync").await?; + + // for (email, username) in &users { + // println!("{email} => {username}"); + // } let cli = Cli::parse(); init_tracing(cli.verbose); @@ -191,18 +262,9 @@ async fn main() -> Result<()> { run_sync(&common).await?; } - _ => unimplemented!(), - // Commands::Add(args) => { - // let common = CommonOperationArgs { - // pool_id: args.pool_id, - // groups: args.groups, - // emails_file: Some(args.emails_file), - // concurrency: args.concurrency, - // timeout: args.timeout, - // }; - - // run_add_groups(common).await?; - // } + Commands::Add(args) => { + run_add_groups(args).await?; + } // Commands::Del(args) => { // let common = CommonOperationArgs { // pool_id: args.pool_id, @@ -214,6 +276,7 @@ async fn main() -> Result<()> { // run_remove_groups(common).await?; // } + _ => unimplemented!(), } Ok(()) @@ -265,9 +328,8 @@ pub async fn run_sync(args: &CommonOperationArgs) -> R "Starting users sync from Cognito user pool" ); - let config = aws_config::load_from_env() - .await; - // .context("failed to load AWS configuration")?; + let config = aws_config::load_from_env().await; + // .context("failed to load AWS configuration")?; let client = CognitoClient::new(&config); let timeout = args.timeout.map(Duration::from_secs); @@ -294,21 +356,28 @@ pub async fn run_sync(args: &CommonOperationArgs) -> R Ok(()) } -async fn run_add_groups(args: CommonOperationArgs) -> Result<()> { - info!( - pool_id = %args.pool_id, - emails_file = ?args.emails_file, - concurrency = ?args.concurrency, - timeout = ?args.timeout, - "add groups operation requested (not implemented yet)" - ); +async fn run_add_groups(args: AddArgs) -> Result<()> { + // info!( + // pool_id = %args.pool_id, + // emails_file = ?args.emails_file, + // concurrency = ?args.concurrency, + // timeout = ?args.timeout, + // "add groups operation requested (not implemented yet)" + // ); - if let Some(seconds) = args.timeout { - let _timeout = Duration::from_secs(seconds); - debug!(?seconds, "add operation timeout configured"); - } + let config = aws_config::load_from_env().await; + let client = CognitoClient::new(&config); - // TODO: implement add-to-groups logic. + add_users_to_groups_from_files( + &client, + &args.pool_id, + &args.sync_file, + &args.emails_file, + &args.groups, + args.concurrency, + ) + .await?; + Ok(()) } @@ -334,15 +403,19 @@ async fn run_remove_groups(args: CommonOperationArgs) /// /// If `args.emails_file` is set, the CSV is written to that file. /// Otherwise, the CSV is written to stdout. -pub(crate)async fn sync_users_to_csv(client: &CognitoClient, args: &CommonOperationArgs) -> Result<()> { - let mut writer: Box = if let Some(path) = &args.emails_file { - let file = File::create(path) - .await - .with_context(|| format!("failed to create output file at '{}'", path.display()))?; - Box::new(file) - } else { - Box::new(io::stdout()) - }; +pub(crate) async fn sync_users_to_csv( + client: &CognitoClient, + args: &CommonOperationArgs, +) -> Result<()> { + let mut writer: Box = + if let Some(path) = &args.emails_file { + let file = File::create(path).await.with_context(|| { + format!("failed to create output file at '{}'", path.display()) + })?; + Box::new(file) + } else { + Box::new(io::stdout()) + }; // CSV header writer @@ -382,9 +455,8 @@ pub(crate)async fn sync_users_to_csv(client: &CognitoC total_users += 1; } - pagination_token = response - .pagination_token() - .map(|token| token.to_owned()); + pagination_token = + response.pagination_token().map(|token| token.to_owned()); if pagination_token.is_none() { break; @@ -412,3 +484,205 @@ fn extract_username_and_email(user: &UserType) -> (Str (username, email) } +/// Read a Cognito sync CSV file ("username,email") and return a +/// HashMap. +/// +/// Expected CSV format: +/// ```text +/// username,email +/// johndoe,john@example.com +/// alice,alice@example.com +/// ... +/// ``` +/// +/// Notes: +/// - The first line is treated as a header and skipped. +/// - Lines missing either username or email are ignored. +/// - Email is lowercased and trimmed to allow normalized lookups. +pub async fn read_sync_file_to_map>( + path: P, +) -> Result> { + let file = File::open(&path).await.with_context(|| { + format!("failed to open sync file '{}'", path.as_ref().display()) + })?; + + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + let mut map = HashMap::::new(); + + // Skip header: "username,email" + if let Some(line) = lines.next_line().await? { + let _header = line.trim(); + // We can ignore header validation for now. + } + + while let Some(line) = lines.next_line().await? { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let parts: Vec<&str> = trimmed.split(',').collect(); + if parts.len() < 2 { + // malformed line, skip + continue; + } + + let username = parts[0].trim(); + let email = parts[1].trim().to_lowercase(); + + if !email.is_empty() && !username.is_empty() { + map.insert(email, username.to_string()); + } + } + + Ok(map) +} + +/// Load a plain-text file containing one e-mail per line. +/// +/// Each line is trimmed and lowercased. Empty lines are skipped. +pub async fn load_email_list>(path: P) -> Result> { + let file = File::open(&path).await.with_context(|| { + format!( + "failed to open e-mail list file '{}'", + path.as_ref().display() + ) + })?; + + let reader = BufReader::new(file); + let mut lines = reader.lines(); + let mut emails = Vec::new(); + + while let Some(line) = lines.next_line().await? { + let email = line.trim().to_lowercase(); + if email.is_empty() { + continue; + } + emails.push(email); + } + + Ok(emails) +} + +/// Add a Cognito user to one or more groups using the Admin API. +/// +/// This function assumes AWS credentials and permissions allow admin operations. +pub async fn admin_add_user_to_groups( + client: &CognitoClient, + pool_id: &str, + username: &str, + groups: &[String], +) -> Result<()> { + for group in groups { + info!(%username, %group, %pool_id, "adding user to Cognito group"); + + client + .admin_add_user_to_group() + .user_pool_id(pool_id) + .username(username) + .group_name(group) + .send() + .await + .with_context(|| { + format!( + "failed to add user '{}' to group '{}'", + username, group + ) + })?; + + info!(%username, %group, "user successfully added to group"); + } + + Ok(()) +} + +/// High-level flow: +/// 1. Load sync CSV into HashMap. +/// 2. Load target e-mails (one per line). +/// 3. For each email, find username in HashMap. +/// 4. Call `admin_add_user_to_groups` with concurrency limit. +/// 5. Log success or failure for each email. +/// +/// `sync_csv_path`: CSV generated by `sync` (username,email). +/// `emails_list_path`: TXT file with one e-mail per line (users to be added). +pub async fn add_users_to_groups_from_files( + client: &CognitoClient, + pool_id: &str, + sync_csv_path: &Path, + emails_list_path: &Path, + groups: &[String], + concurrency: usize, +) -> Result<()> { + let concurrency = std::cmp::max(1, concurrency); + + // 1. Load sync CSV into map: email -> username + let email_to_username = read_sync_file_to_map(sync_csv_path).await?; + info!( + total_entries = email_to_username.len(), + "loaded sync map (email -> username)" + ); + + // 2. Load target e-mails + let emails = load_email_list(emails_list_path).await?; + info!(total_emails = emails.len(), "loaded target e-mail list"); + + let semaphore = Arc::new(Semaphore::new(concurrency)); + let mut handles: Vec> = Vec::with_capacity(emails.len()); + + for email in emails { + let username = match email_to_username.get(&email) { + Some(u) => u.clone(), + None => { + error!(%email, "e-mail not found in sync map, skipping"); + continue; + } + }; + + let permit = semaphore.clone().acquire_owned().await?; + let client_clone = client.clone(); + let pool_id = pool_id.to_string(); + let groups = groups.to_vec(); + + let handle = tokio::spawn(async move { + let _permit = permit; // keep permit alive for the duration of this task + + if let Err(err) = admin_add_user_to_groups( + &client_clone, + &pool_id, + &username, + &groups, + ) + .await + { + error!( + %email, + %username, + error = ?err, + "failed to add user to one or more groups" + ); + } else { + info!( + %email, + %username, + groups = ?groups, + "user successfully processed for all groups" + ); + } + }); + + handles.push(handle); + } + + // Wait for all tasks to complete + for handle in handles { + // Ignore panics here, just surface as error logs. + if let Err(join_err) = handle.await { + error!(error = ?join_err, "join error while processing a user"); + } + } + + info!("finished processing all users for add-groups operation"); + Ok(()) +}