Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 105 additions & 37 deletions contracts/stream_contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ impl StreamContract {

/// Top up an active stream with additional tokens.
///
/// Only the original sender may top up their own stream.
/// Only the original sender may top up their own stream. The top-up amount
/// is subject to protocol fees (if configured) before being added to the stream.
///
/// # Errors
/// - `InvalidAmount` — `amount` ≤ 0.
Expand All @@ -199,24 +200,25 @@ impl StreamContract {

let mut stream = load_stream(&env, stream_id)?;

if stream.sender != sender {
return Err(StreamError::Unauthorized);
}
if !stream.is_active {
return Err(StreamError::StreamInactive);
}
// Validate ownership and active status using helper functions
Self::validate_stream_ownership(&stream, &sender)?;
Self::validate_stream_active(&stream)?;

// Transfer tokens from sender to contract
let token_client = token::Client::new(&env, &stream.token_address);
let contract_address = env.current_contract_address();
token_client.transfer(&sender, &contract_address, &amount);

// Collect protocol fee and get net amount
let net_amount = Self::collect_fee(&env, &stream.token_address, amount, stream_id);

// Update stream state
stream.deposited_amount += net_amount;
stream.last_update_time = env.ledger().timestamp();

save_stream(&env, stream_id, &stream);

// Emit top-up event
env.events().publish(
(Symbol::new(&env, "stream_topped_up"), stream_id),
StreamToppedUpEvent {
Expand Down Expand Up @@ -244,6 +246,17 @@ impl StreamContract {
}
}

/// Calculate the claimable amount for a stream at a given timestamp.
///
/// This helper computes how many tokens have been streamed since the last
/// update, capped at the remaining balance to prevent over-withdrawal.
///
/// # Arguments
/// * `stream` - The stream to calculate claimable amount for
/// * `now` - Current ledger timestamp
///
/// # Returns
/// The amount of tokens that can be claimed, never exceeding remaining balance
fn calculate_claimable(stream: &Stream, now: u64) -> i128 {
let elapsed = now.saturating_sub(stream.last_update_time);

Expand All @@ -255,16 +268,63 @@ impl StreamContract {
.deposited_amount
.saturating_sub(stream.withdrawn_amount);

if streamed > remaining {
remaining
} else {
streamed
streamed.min(remaining)
}

/// Validate that a stream exists and is owned by the caller.
///
/// # Errors
/// - `StreamNotFound` — no stream exists with `stream_id`.
/// - `Unauthorized` — caller is not the stream's sender.
fn validate_stream_ownership(
stream: &Stream,
caller: &Address,
) -> Result<(), StreamError> {
if stream.sender != *caller {
return Err(StreamError::Unauthorized);
}
Ok(())
}

/// Validate that a stream is active.
///
/// # Errors
/// - `StreamInactive` — stream has been cancelled or fully withdrawn.
fn validate_stream_active(stream: &Stream) -> Result<(), StreamError> {
if !stream.is_active {
return Err(StreamError::StreamInactive);
}
Ok(())
}

/// Transfer tokens from contract to recipient and update stream state.
///
/// This helper consolidates the token transfer logic and stream state updates
/// to reduce code duplication across withdrawal operations.
fn transfer_and_update_stream(
env: &Env,
stream: &mut Stream,
recipient: &Address,
amount: i128,
now: u64,
) {
let token_client = token::Client::new(env, &stream.token_address);
let contract_address = env.current_contract_address();
token_client.transfer(&contract_address, recipient, &amount);

stream.withdrawn_amount += amount;
stream.last_update_time = now;

// Mark stream as inactive if fully drained
if stream.withdrawn_amount >= stream.deposited_amount {
stream.is_active = false;
}
}

/// Withdraw all currently claimable tokens from a stream.
///
/// Only the stream's recipient may call this. The stream is marked
/// Only the stream's recipient may call this. The amount withdrawn is calculated
/// based on elapsed time and the stream's rate. The stream is automatically marked
/// inactive once fully drained.
///
/// # Errors
Expand All @@ -277,12 +337,13 @@ impl StreamContract {

let mut stream = load_stream(&env, stream_id)?;

// Validate recipient authorization
if stream.recipient != recipient {
return Err(StreamError::Unauthorized);
}
if !stream.is_active {
return Err(StreamError::StreamInactive);
}

// Validate stream is active
Self::validate_stream_active(&stream)?;

let now = env.ledger().timestamp();
let claimable = Self::calculate_claimable(&stream, now);
Expand All @@ -291,20 +352,12 @@ impl StreamContract {
return Err(StreamError::InvalidAmount);
}

let token_client = token::Client::new(&env, &stream.token_address);
let contract_address = env.current_contract_address();
token_client.transfer(&contract_address, &recipient, &claimable);

stream.withdrawn_amount += claimable;
stream.last_update_time = now;

// Mark stream as inactive if all funds have been withdrawn
if stream.withdrawn_amount >= stream.deposited_amount {
stream.is_active = false;
}
// Use helper function to transfer tokens and update state
Self::transfer_and_update_stream(&env, &mut stream, &recipient, claimable, now);

save_stream(&env, stream_id, &stream);

// Emit withdrawal event
env.events().publish(
(Symbol::new(&env, "tokens_withdrawn"), stream_id),
TokensWithdrawnEvent {
Expand All @@ -320,8 +373,9 @@ impl StreamContract {

/// Cancel an active stream.
///
/// Only the stream's original sender may cancel. Any unspent balance
/// (deposited − withdrawn) is returned to the sender.
/// Only the stream's original sender may cancel. The recipient receives all
/// accrued tokens up to the cancellation moment, and any remaining unspent
/// balance is refunded to the sender.
///
/// # Errors
/// - `StreamNotFound` — no stream exists with `stream_id`.
Expand All @@ -332,27 +386,23 @@ impl StreamContract {

let mut stream = load_stream(&env, stream_id)?;

if stream.sender != sender {
return Err(StreamError::Unauthorized);
}
if !stream.is_active {
return Err(StreamError::StreamInactive);
}
// Validate ownership and active status
Self::validate_stream_ownership(&stream, &sender)?;
Self::validate_stream_active(&stream)?;

// Calculate accrued tokens that belong to the recipient
let now = env.ledger().timestamp();
let accrued_amount = Self::calculate_claimable(&stream, now);

let token_client = token::Client::new(&env, &stream.token_address);
let contract_address = env.current_contract_address();

// Settle recipient immediately with all final claimable amount at cancellation.
// Settle recipient with all accrued tokens at cancellation
if accrued_amount > 0 {
token_client.transfer(&contract_address, &stream.recipient, &accrued_amount);
stream.withdrawn_amount = stream.withdrawn_amount.saturating_add(accrued_amount);
}

// Refund remaining unspent balance after recipient settlement.
// Calculate and refund remaining balance to sender
let refunded_amount = stream
.deposited_amount
.saturating_sub(stream.withdrawn_amount);
Expand All @@ -361,6 +411,7 @@ impl StreamContract {
token_client.transfer(&contract_address, &sender, &refunded_amount);
}

// Mark stream as inactive
stream.is_active = false;
stream.last_update_time = now;

Expand All @@ -369,6 +420,7 @@ impl StreamContract {

save_stream(&env, stream_id, &stream);

// Emit cancellation event
env.events().publish(
(Symbol::new(&env, "stream_cancelled"), stream_id),
StreamCancelledEvent {
Expand All @@ -390,6 +442,22 @@ impl StreamContract {
try_load_stream(&env, stream_id)
}

/// Get the current claimable amount for a stream without modifying state.
///
/// This is a read-only query that calculates how many tokens the recipient
/// can currently withdraw based on elapsed time and stream rate.
///
/// Returns `None` if the stream doesn't exist, otherwise returns the claimable amount.
pub fn get_claimable_amount(env: Env, stream_id: u64) -> Option<i128> {
try_load_stream(&env, stream_id).map(|stream| {
if !stream.is_active {
return 0;
}
let now = env.ledger().timestamp();
Self::calculate_claimable(&stream, now)
})
}

// ─── Internal Helpers ─────────────────────────────────────────────────────

/// Deducts the protocol fee from `amount`, transfers it to the treasury,
Expand Down
Loading