DEV Community

Cover image for Rust Actor Model: Building Resilient Distributed Systems with Memory Safety and Performance
Aarav Joshi
Aarav Joshi

Posted on

Rust Actor Model: Building Resilient Distributed Systems with Memory Safety and Performance

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Distributed computing has transformed how we build modern applications. After years of working with traditional threading models and shared memory paradigms, I've found that Rust's approach to distributed systems offers something genuinely different. The combination of memory safety, performance, and the actor pattern creates a foundation for building systems that are both resilient and efficient.

The actor model fundamentally changes how we think about distributed computation. Instead of worrying about locks, mutexes, and shared state, actors provide isolated computational units that communicate exclusively through messages. This paradigm shift eliminates entire categories of bugs that plague traditional distributed systems.

In Rust, the actor pattern gains additional strength from the language's ownership model. Each actor owns its state completely, and the type system prevents data races at compile time. This means we can build distributed systems with confidence, knowing that many runtime errors simply cannot occur.

Building Your First Actor System

Creating an actor system starts with defining the messages actors will exchange. Each message type must implement specific traits that enable serialization and type-safe communication across the network.

use actix::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;

#[derive(Message, Serialize, Deserialize, Debug, Clone)]
#[rtype(result = "UserResponse")]
pub struct GetUser {
    pub user_id: Uuid,
}

#[derive(Message, Serialize, Deserialize, Debug, Clone)]
#[rtype(result = "UserResponse")]
pub struct CreateUser {
    pub name: String,
    pub email: String,
}

#[derive(Message, Serialize, Deserialize, Debug, Clone)]
#[rtype(result = "UserResponse")]
pub struct UpdateUser {
    pub user_id: Uuid,
    pub name: Option<String>,
    pub email: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct User {
    pub id: Uuid,
    pub name: String,
    pub email: String,
    pub created_at: u64,
}

#[derive(Serialize, Deserialize, Debug)]
pub enum UserResponse {
    User(User),
    Users(Vec<User>),
    Created(Uuid),
    Updated,
    NotFound,
    Error(String),
}
Enter fullscreen mode Exit fullscreen mode

The actor itself maintains state and processes messages sequentially. This sequential processing eliminates race conditions while maintaining high performance through the actor system's internal concurrency management.

pub struct UserActor {
    users: HashMap<Uuid, User>,
    node_id: String,
}

impl UserActor {
    pub fn new(node_id: String) -> Self {
        Self {
            users: HashMap::new(),
            node_id,
        }
    }

    fn current_timestamp(&self) -> u64 {
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs()
    }
}

impl Actor for UserActor {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        println!("UserActor started on node: {}", self.node_id);
    }

    fn stopped(&mut self, _ctx: &mut Self::Context) {
        println!("UserActor stopped on node: {}", self.node_id);
    }
}

impl Handler<GetUser> for UserActor {
    type Result = UserResponse;

    fn handle(&mut self, msg: GetUser, _ctx: &mut Self::Context) -> Self::Result {
        match self.users.get(&msg.user_id) {
            Some(user) => UserResponse::User(user.clone()),
            None => UserResponse::NotFound,
        }
    }
}

impl Handler<CreateUser> for UserActor {
    type Result = UserResponse;

    fn handle(&mut self, msg: CreateUser, _ctx: &mut Self::Context) -> Self::Result {
        let user_id = Uuid::new_v4();
        let user = User {
            id: user_id,
            name: msg.name,
            email: msg.email,
            created_at: self.current_timestamp(),
        };

        self.users.insert(user_id, user);
        UserResponse::Created(user_id)
    }
}

impl Handler<UpdateUser> for UserActor {
    type Result = UserResponse;

    fn handle(&mut self, msg: UpdateUser, _ctx: &mut Self::Context) -> Self::Result {
        match self.users.get_mut(&msg.user_id) {
            Some(user) => {
                if let Some(name) = msg.name {
                    user.name = name;
                }
                if let Some(email) = msg.email {
                    user.email = email;
                }
                UserResponse::Updated
            },
            None => UserResponse::NotFound,
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Implementing Supervision and Fault Tolerance

Supervision hierarchies provide the backbone of resilient distributed systems. Supervisors monitor child actors and implement recovery strategies when failures occur. This approach isolates failures and prevents cascading system collapse.

use actix::prelude::*;
use std::time::Duration;

#[derive(Message)]
#[rtype(result = "()")]
pub struct StartWorker {
    pub worker_id: String,
}

#[derive(Message)]
#[rtype(result = "SupervisorStatus")]
pub struct GetStatus;

#[derive(Debug)]
pub struct SupervisorStatus {
    pub active_workers: usize,
    pub failed_workers: usize,
    pub restart_count: usize,
}

pub struct WorkerSupervisor {
    workers: HashMap<String, Addr<WorkerActor>>,
    failed_workers: HashSet<String>,
    restart_count: usize,
    max_restarts: usize,
}

impl WorkerSupervisor {
    pub fn new(max_restarts: usize) -> Self {
        Self {
            workers: HashMap::new(),
            failed_workers: HashSet::new(),
            restart_count: 0,
            max_restarts,
        }
    }

    fn restart_worker(&mut self, worker_id: &str, ctx: &mut Context<Self>) {
        if self.restart_count >= self.max_restarts {
            println!("Maximum restart limit reached for worker: {}", worker_id);
            return;
        }

        println!("Restarting worker: {}", worker_id);

        let worker = WorkerActor::new(worker_id.to_string()).start();
        self.workers.insert(worker_id.to_string(), worker);
        self.restart_count += 1;

        // Monitor the new worker
        ctx.run_later(Duration::from_secs(5), move |act, ctx| {
            act.check_worker_health(worker_id, ctx);
        });
    }

    fn check_worker_health(&mut self, worker_id: &str, ctx: &mut Context<Self>) {
        if let Some(worker) = self.workers.get(worker_id) {
            let health_check = worker.send(HealthCheck);

            let worker_id = worker_id.to_string();
            ctx.spawn(
                health_check
                    .into_actor(self)
                    .map(move |result, act, ctx| {
                        match result {
                            Ok(_) => {
                                // Worker is healthy
                                act.failed_workers.remove(&worker_id);
                            },
                            Err(_) => {
                                // Worker failed
                                println!("Worker {} failed health check", worker_id);
                                act.failed_workers.insert(worker_id.clone());
                                act.workers.remove(&worker_id);
                                act.restart_worker(&worker_id, ctx);
                            }
                        }
                    })
            );
        }
    }
}

impl Actor for WorkerSupervisor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        println!("WorkerSupervisor started");

        // Periodic health checks
        ctx.run_interval(Duration::from_secs(30), |act, ctx| {
            for worker_id in act.workers.keys().cloned().collect::<Vec<_>>() {
                act.check_worker_health(&worker_id, ctx);
            }
        });
    }
}

impl Handler<StartWorker> for WorkerSupervisor {
    type Result = ();

    fn handle(&mut self, msg: StartWorker, ctx: &mut Self::Context) {
        let worker = WorkerActor::new(msg.worker_id.clone()).start();
        self.workers.insert(msg.worker_id.clone(), worker);

        // Start monitoring this worker
        let worker_id = msg.worker_id;
        ctx.run_later(Duration::from_secs(5), move |act, ctx| {
            act.check_worker_health(&worker_id, ctx);
        });
    }
}

impl Handler<GetStatus> for WorkerSupervisor {
    type Result = SupervisorStatus;

    fn handle(&mut self, _msg: GetStatus, _ctx: &mut Self::Context) -> Self::Result {
        SupervisorStatus {
            active_workers: self.workers.len(),
            failed_workers: self.failed_workers.len(),
            restart_count: self.restart_count,
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Network Communication and Location Transparency

Location transparency allows actors to communicate without knowing whether their targets are local or remote. This abstraction simplifies distributed system design and enables seamless scaling across multiple nodes.

use actix::prelude::*;
use actix_web::{web, App, HttpServer, HttpResponse, Result as WebResult};
use serde_json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

pub struct DistributedActorRegistry {
    local_actors: HashMap<String, Recipient<RemoteMessage>>,
    remote_nodes: HashMap<String, String>, // node_id -> endpoint
}

impl DistributedActorRegistry {
    pub fn new() -> Self {
        Self {
            local_actors: HashMap::new(),
            remote_nodes: HashMap::new(),
        }
    }

    pub fn register_local_actor(&mut self, actor_id: String, recipient: Recipient<RemoteMessage>) {
        self.local_actors.insert(actor_id, recipient);
    }

    pub fn register_remote_node(&mut self, node_id: String, endpoint: String) {
        self.remote_nodes.insert(node_id, endpoint);
    }

    pub async fn send_message(&self, target_actor: &str, message: RemoteMessage) -> Result<(), String> {
        // Check if actor is local first
        if let Some(recipient) = self.local_actors.get(target_actor) {
            recipient.try_send(message)
                .map_err(|e| format!("Local send failed: {}", e))?;
            return Ok(());
        }

        // Try to find actor on remote nodes
        for (node_id, endpoint) in &self.remote_nodes {
            match self.send_remote_message(endpoint, target_actor, &message).await {
                Ok(_) => return Ok(()),
                Err(_) => continue, // Try next node
            }
        }

        Err(format!("Actor {} not found on any node", target_actor))
    }

    async fn send_remote_message(
        &self, 
        endpoint: &str, 
        target_actor: &str, 
        message: &RemoteMessage
    ) -> Result<(), Box<dyn std::error::Error>> {
        let client = reqwest::Client::new();
        let url = format!("{}/actor/{}/message", endpoint, target_actor);

        let response = client
            .post(&url)
            .json(message)
            .send()
            .await?;

        if response.status().is_success() {
            Ok(())
        } else {
            Err(format!("Remote call failed: {}", response.status()).into())
        }
    }
}

#[derive(Message, Serialize, Deserialize, Debug, Clone)]
#[rtype(result = "()")]
pub struct RemoteMessage {
    pub from: String,
    pub to: String,
    pub payload: String,
    pub message_type: String,
    pub timestamp: u64,
}

pub struct NetworkActor {
    registry: Arc<RwLock<DistributedActorRegistry>>,
    node_id: String,
}

impl NetworkActor {
    pub fn new(node_id: String) -> Self {
        Self {
            registry: Arc::new(RwLock::new(DistributedActorRegistry::new())),
            node_id,
        }
    }

    pub fn get_registry(&self) -> Arc<RwLock<DistributedActorRegistry>> {
        self.registry.clone()
    }
}

impl Actor for NetworkActor {
    type Context = Context<Self>;
}

impl Handler<RemoteMessage> for NetworkActor {
    type Result = ();

    fn handle(&mut self, msg: RemoteMessage, _ctx: &mut Self::Context) {
        println!("Received message from {} to {} on node {}", 
                 msg.from, msg.to, self.node_id);

        // Process the message locally or forward to appropriate handler
        // This is where you'd implement your distributed processing logic
    }
}

// HTTP endpoint for receiving remote messages
async fn receive_message(
    path: web::Path<String>,
    message: web::Json<RemoteMessage>,
    registry: web::Data<Arc<RwLock<DistributedActorRegistry>>>,
) -> WebResult<HttpResponse> {
    let actor_id = path.into_inner();
    let registry = registry.get_ref();

    match registry.read().await.local_actors.get(&actor_id) {
        Some(recipient) => {
            match recipient.try_send(message.into_inner()) {
                Ok(_) => Ok(HttpResponse::Ok().json("Message delivered")),
                Err(e) => Ok(HttpResponse::InternalServerError().json(format!("Delivery failed: {}", e))),
            }
        },
        None => Ok(HttpResponse::NotFound().json("Actor not found")),
    }
}

#[actix_web::main]
async fn start_node(node_id: String, port: u16) -> std::io::Result<()> {
    let network_actor = NetworkActor::new(node_id.clone()).start();
    let registry = network_actor.send(GetRegistry).await.unwrap();

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(registry.clone()))
            .route("/actor/{actor_id}/message", web::post().to(receive_message))
    })
    .bind(format!("127.0.0.1:{}", port))?
    .run()
    .await
}
Enter fullscreen mode Exit fullscreen mode

Load Balancing and Backpressure Management

Effective load balancing prevents system overload and ensures smooth operation under varying loads. Backpressure mechanisms control message flow and prevent memory exhaustion when actors cannot keep up with incoming requests.

use actix::prelude::*;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use tokio::time::interval;

#[derive(Message)]
#[rtype(result = "LoadBalancerStats")]
pub struct GetStats;

#[derive(Debug)]
pub struct LoadBalancerStats {
    pub total_requests: u64,
    pub active_workers: usize,
    pub queue_size: usize,
    pub average_response_time: Duration,
}

#[derive(Message)]
#[rtype(result = "Result<String, String>")]
pub struct WorkRequest {
    pub id: String,
    pub payload: Vec<u8>,
    pub priority: u8,
}

pub struct LoadBalancer {
    workers: Vec<Addr<WorkerActor>>,
    request_queue: VecDeque<(WorkRequest, Instant)>,
    current_worker: usize,
    total_requests: u64,
    response_times: VecDeque<Duration>,
    max_queue_size: usize,
    backpressure_threshold: usize,
}

impl LoadBalancer {
    pub fn new(max_queue_size: usize, backpressure_threshold: usize) -> Self {
        Self {
            workers: Vec::new(),
            request_queue: VecDeque::new(),
            current_worker: 0,
            total_requests: 0,
            response_times: VecDeque::new(),
            max_queue_size,
            backpressure_threshold,
        }
    }

    pub fn add_worker(&mut self, worker: Addr<WorkerActor>) {
        self.workers.push(worker);
    }

    fn next_worker(&mut self) -> Option<&Addr<WorkerActor>> {
        if self.workers.is_empty() {
            return None;
        }

        let worker = &self.workers[self.current_worker];
        self.current_worker = (self.current_worker + 1) % self.workers.len();
        Some(worker)
    }

    fn should_apply_backpressure(&self) -> bool {
        self.request_queue.len() >= self.backpressure_threshold
    }

    fn process_queue(&mut self, ctx: &mut Context<Self>) {
        while let Some((request, start_time)) = self.request_queue.pop_front() {
            if let Some(worker) = self.next_worker() {
                let worker_addr = worker.clone();

                ctx.spawn(
                    async move {
                        worker_addr.send(request).await
                    }
                    .into_actor(self)
                    .map(move |result, act, _ctx| {
                        let response_time = start_time.elapsed();
                        act.response_times.push_back(response_time);

                        // Keep only recent response times for accurate averages
                        if act.response_times.len() > 100 {
                            act.response_times.pop_front();
                        }

                        match result {
                            Ok(_) => {
                                // Request completed successfully
                            },
                            Err(e) => {
                                println!("Worker request failed: {}", e);
                                // Could implement retry logic here
                            }
                        }
                    })
                );

                // Break if we should apply backpressure
                if self.should_apply_backpressure() {
                    break;
                }
            } else {
                // No workers available, put request back
                self.request_queue.push_front((request, start_time));
                break;
            }
        }
    }

    fn calculate_average_response_time(&self) -> Duration {
        if self.response_times.is_empty() {
            return Duration::from_millis(0);
        }

        let total: Duration = self.response_times.iter().sum();
        total / self.response_times.len() as u32
    }
}

impl Actor for LoadBalancer {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        println!("LoadBalancer started with {} workers", self.workers.len());

        // Process queue periodically
        ctx.run_interval(Duration::from_millis(10), |act, ctx| {
            act.process_queue(ctx);
        });

        // Log statistics periodically
        ctx.run_interval(Duration::from_secs(30), |act, _ctx| {
            println!("LoadBalancer stats: {} requests, {} queued, avg response: {:?}",
                     act.total_requests,
                     act.request_queue.len(),
                     act.calculate_average_response_time());
        });
    }
}

impl Handler<WorkRequest> for LoadBalancer {
    type Result = Result<String, String>;

    fn handle(&mut self, msg: WorkRequest, ctx: &mut Self::Context) -> Self::Result {
        self.total_requests += 1;

        // Check if queue is full
        if self.request_queue.len() >= self.max_queue_size {
            return Err("System overloaded, try again later".to_string());
        }

        // Apply backpressure if needed
        if self.should_apply_backpressure() {
            // Slow down request acceptance
            ctx.run_later(Duration::from_millis(100), |act, ctx| {
                act.process_queue(ctx);
            });
        }

        self.request_queue.push_back((msg, Instant::now()));
        Ok("Request queued".to_string())
    }
}

impl Handler<GetStats> for LoadBalancer {
    type Result = LoadBalancerStats;

    fn handle(&mut self, _msg: GetStats, _ctx: &mut Self::Context) -> Self::Result {
        LoadBalancerStats {
            total_requests: self.total_requests,
            active_workers: self.workers.len(),
            queue_size: self.request_queue.len(),
            average_response_time: self.calculate_average_response_time(),
        }
    }
}

// Usage example
#[actix::main]
async fn main() {
    let mut load_balancer = LoadBalancer::new(1000, 800);

    // Add workers
    for i in 0..4 {
        let worker = WorkerActor::new(format!("worker-{}", i)).start();
        load_balancer.add_worker(worker);
    }

    let lb_addr = load_balancer.start();

    // Send some test requests
    for i in 0..10 {
        let request = WorkRequest {
            id: format!("req-{}", i),
            payload: vec![1, 2, 3, 4, 5],
            priority: (i % 3) as u8,
        };

        match lb_addr.send(request).await {
            Ok(Ok(response)) => println!("Request {}: {}", i, response),
            Ok(Err(error)) => println!("Request {} failed: {}", i, error),
            Err(e) => println!("Communication error: {}", e),
        }
    }

    // Get statistics
    if let Ok(stats) = lb_addr.send(GetStats).await {
        println!("Final stats: {:?}", stats);
    }
}
Enter fullscreen mode Exit fullscreen mode

State Management and Persistence

Distributed systems must handle state persistence and recovery gracefully. Actors can implement checkpointing and state recovery mechanisms to ensure data durability across system failures.

use actix::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use tokio::time::{interval, Duration};

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ActorState {
    pub version: u64,
    pub data: HashMap<String, String>,
    pub metadata: StateMetadata,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct StateMetadata {
    pub last_checkpoint: u64,
    pub total_operations: u64,
    pub node_id: String,
}

#[derive(Message)]
#[rtype(result = "Result<(), String>")]
pub struct SaveCheckpoint;

#[derive(Message)]
#rtype(result = "Result<(), String>")]
pub struct LoadCheckpoint {
    pub checkpoint_path: String,
}

#[derive(Message, Serialize, Deserialize)]
#[rtype(result = "Result<(), String>")]
pub struct UpdateState {
    pub key: String,
    pub value: String,
}

#[derive(Message)]
#[rtype(result = "Option<String>")]
pub struct GetState {
    pub key: String,
}

pub struct PersistentActor {
    state: ActorState,
    checkpoint_path: String,
    operations_since_checkpoint: u64,
    checkpoint_interval: u64,
}

impl PersistentActor {
    pub fn new(node_id: String, checkpoint_path: String) -> Self {
        let state = ActorState {
            version: 1,
            data: HashMap::new(),
            metadata: StateMetadata {
                last_checkpoint: 0,
                total_operations: 0,
                node_id,
            },
        };

        Self {
            state,
            checkpoint_path,
            operations_since_checkpoint: 0,
            checkpoint_interval: 100, // Checkpoint every 100 operations
        }
    }

    fn should_checkpoint(&self) -> bool {
        self.operations_since_checkpoint >= self.checkpoint_interval
    }

    fn save_checkpoint_to_disk(&self) -> Result<(), String> {
        let json = serde_json::to_string_pretty(&self.state)
            .map_err(|e| format!("Serialization failed: {}", e))?;

        fs::write(&self.checkpoint_path, json)
            .map_err(|e| format!("Write failed: {}", e))?;

        println!("Checkpoint saved: version {}", self.state.version);
        Ok(())
    }

    fn load_checkpoint_from_disk(&mut self, path: &str) -> Result<(), String> {
        if !Path::new(path).exists() {
            return Err("Checkpoint file not found".to_string());
        }

        let content = fs::read_to_string(path)
            .map_err(|e| format!("Read failed: {}", e))?;

        self.state = serde_json::from_str(&content)
            .map_err(|e| format!("Deserialization failed: {}", e))?;

        self.operations_since_checkpoint = 0;
        println!("Checkpoint loaded: version {}", self.state.version);
        Ok(())
    }

    fn increment_operation_count(&mut self) {
        self.state.metadata.total_operations += 1;
        self.operations_since_checkpoint += 1;
    }

    fn update_checkpoint_metadata(&mut self) {
        self.state.metadata.last_checkpoint = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs();
        self.state.version += 1;
        self.operations_since_checkpoint = 0;
    }
}

impl Actor for PersistentActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        println!("PersistentActor started on node: {}", self.state.metadata.node_id);

        // Try to load existing checkpoint
        if let Err(e) = self.load_checkpoint_from_disk(&self.checkpoint_path) {
            println!("No existing checkpoint found: {}", e);
        }

        // Periodic checkpointing
        ctx.run_interval(Duration::from_secs(60), |act, _ctx| {
            if act.should_checkpoint() {
                if let Err(e) = act.save_checkpoint_to_disk() {
                    println!("Checkpoint save failed: {}", e);
                } else {
                    act.update_checkpoint_metadata();
                }
            }
        });
    }

    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
        // Save final checkpoint before stopping
        if let Err(e) = self.save_checkpoint_to_disk() {
            println!("Final checkpoint save failed: {}", e);
        } else {
            self.update_checkpoint_metadata();
            println!("Final checkpoint saved");
        }
        Running::Stop
    }
}

impl Handler<UpdateState> for PersistentActor {
    type Result = Result<(), String>;

    fn handle(&mut self, msg: UpdateState, ctx: &mut Self::Context) -> Self::Result {
        self.state.data.insert(msg.key.clone(), msg.value.clone());
        self.increment_operation_count();

        println!("State updated: {} = {}", msg.key, msg.value);

        // Check if we should checkpoint
        if self.should_checkpoint() {
            ctx.notify(SaveCheckpoint);
        }

        Ok(())
    }
}

impl Handler<GetState> for PersistentActor {
    type Result = Option<String>;

    fn handle(&mut self, msg: GetState, _ctx: &mut Self::Context) -> Self::Result {
        self.state.data.get(&msg.key).cloned()
    }
}

impl Handler<SaveCheckpoint> for PersistentActor {
    type Result = Result<(), String>;

    fn handle(&mut self, _msg: SaveCheckpoint, _ctx: &mut Self::Context) -> Self::Result {
        match self.save_checkpoint_to_disk() {
            Ok(_) => {
                self.update_checkpoint_metadata();
                Ok(())
            },
            Err(e) => Err(e),
        }
    }
}

impl Handler<LoadCheckpoint> for PersistentActor {
    type Result = Result<(), String>;

    fn handle(&mut self, msg: LoadCheckpoint, _ctx: &mut Self::Context) -> Self::Result {
        self.load_checkpoint_from_disk(&msg.checkpoint_path)
    }
}

// Example usage with recovery testing
#[actix::main]
async fn main() {
    let checkpoint_path = "./actor_checkpoint.json".to_string();
    let actor = PersistentActor::new("node-001".to_string(), checkpoint_path.clone()).start();

    // Perform some operations
    for i in 0..150 {
        let update = UpdateState {
            key: format!("key-{}", i),
            value: format!("value-{}", i),
        };

        if let Err(e) = actor.send(update).await {
            println!("Update failed: {}", e);
        }

        tokio::time::sleep(Duration::from_millis(10)).await;
    }

    // Force a checkpoint
    match actor.send(SaveCheckpoint).await {
        Ok(Ok(_)) => println!("Manual checkpoint completed"),
        Ok(Err(e)) => println!("Manual checkpoint failed: {}", e),
        Err(e) => println!("Communication error: {}", e),
    }

    // Test state retrieval
    for i in 0..5 {
        let get_msg = GetState {
            key: format!("key-{}", i * 30),
        };

        match actor.send(get_msg).await {
            Ok(Some(value)) => println!("Retrieved: key-{} = {}", i * 30, value),
            Ok(None) => println!("Key not found: key-{}", i * 30),
            Err(e) => println!("Retrieval error: {}", e),
        }
    }

    tokio::time::sleep(Duration::from_secs(2)).await;
}
Enter fullscreen mode Exit fullscreen mode

101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)

OSZAR »