use crate::{
collections::BTreeMap,
config::WaitType,
errors::{Error, Result, UsageError},
exec,
sync::MutexId,
BlockCount, BlockNumber, Config, MessageId,
};
use core::cmp::Ordering;
use hashbrown::HashMap;
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum LockType {
WaitFor(BlockCount),
WaitUpTo(BlockCount),
}
#[derive(Debug, PartialEq, Eq)]
pub struct Lock {
pub at: BlockNumber,
ty: LockType,
}
impl Lock {
pub fn exactly(b: BlockCount) -> Result<Self> {
if b == 0 {
return Err(Error::Gstd(UsageError::EmptyWaitDuration));
}
Ok(Self {
at: exec::block_height(),
ty: LockType::WaitFor(b),
})
}
pub fn up_to(b: BlockCount) -> Result<Self> {
if b == 0 {
return Err(Error::Gstd(UsageError::EmptyWaitDuration));
}
Ok(Self {
at: exec::block_height(),
ty: LockType::WaitUpTo(b),
})
}
pub fn wait(&self) {
if let Some(blocks) = self.deadline().checked_sub(exec::block_height()) {
if blocks == 0 {
unreachable!(
"Checked in `crate::msg::async::poll`, will trigger the timeout error automatically."
);
}
match self.ty {
LockType::WaitFor(_) => exec::wait_for(blocks),
LockType::WaitUpTo(_) => exec::wait_up_to(blocks),
}
} else {
unreachable!(
"Checked in `crate::msg::async::poll`, will trigger the timeout error automatically."
);
}
}
pub fn deadline(&self) -> BlockNumber {
match &self.ty {
LockType::WaitFor(d) | LockType::WaitUpTo(d) => self.at.saturating_add(*d),
}
}
pub fn timeout(&self) -> Option<(BlockNumber, BlockNumber)> {
let current = exec::block_height();
let expected = self.deadline();
if current >= expected {
Some((expected, current))
} else {
None
}
}
}
impl PartialOrd for Lock {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Lock {
fn cmp(&self, other: &Self) -> Ordering {
self.deadline().cmp(&other.deadline())
}
}
impl Default for Lock {
fn default() -> Self {
Lock::up_to(Config::wait_up_to()).expect("Checked zero case in config.")
}
}
impl Default for LockType {
fn default() -> Self {
match Config::wait_type() {
WaitType::WaitFor => LockType::WaitFor(Config::wait_for()),
WaitType::WaitUpTo => LockType::WaitUpTo(Config::wait_up_to()),
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum LockContext {
ReplyTo(MessageId),
Sleep(BlockNumber),
MxLockMonitor(MutexId),
}
#[derive(Default, Debug)]
pub struct LocksMap(HashMap<MessageId, BTreeMap<LockContext, Lock>>);
impl LocksMap {
pub fn wait(&mut self, message_id: MessageId) {
let map = self.message_locks(message_id);
if map.is_empty() {
map.insert(LockContext::ReplyTo(message_id), Default::default());
}
let now = exec::block_height();
map.iter()
.filter_map(|(_, lock)| (lock.deadline() > now).then_some(lock))
.min_by(|lock1, lock2| lock1.cmp(lock2))
.expect("Cannot find lock to be waited")
.wait();
}
pub fn lock(&mut self, message_id: MessageId, waiting_reply_to: MessageId, lock: Lock) {
self.message_locks(message_id)
.insert(LockContext::ReplyTo(waiting_reply_to), lock);
}
pub fn remove(&mut self, message_id: MessageId, waiting_reply_to: MessageId) {
self.message_locks(message_id)
.remove(&LockContext::ReplyTo(waiting_reply_to));
}
pub fn insert_sleep(&mut self, message_id: MessageId, wake_up_at: BlockNumber) {
let locks = self.message_locks(message_id);
let current_block = exec::block_height();
if current_block < wake_up_at {
locks.insert(
LockContext::Sleep(wake_up_at),
Lock::exactly(wake_up_at - current_block)
.expect("Never fails with block count > 0"),
);
} else {
locks.remove(&LockContext::Sleep(wake_up_at));
}
}
pub fn remove_sleep(&mut self, message_id: MessageId, wake_up_at: BlockNumber) {
self.message_locks(message_id)
.remove(&LockContext::Sleep(wake_up_at));
}
pub(crate) fn insert_mx_lock_monitor(
&mut self,
message_id: MessageId,
mutex_id: MutexId,
wake_up_at: BlockNumber,
) {
let locks = self.message_locks(message_id);
locks.insert(
LockContext::MxLockMonitor(mutex_id),
Lock::exactly(
wake_up_at
.checked_sub(exec::block_height())
.expect("Value of after_block must be greater than current block"),
)
.expect("Never fails with block count > 0"),
);
}
pub(crate) fn remove_mx_lock_monitor(&mut self, message_id: MessageId, mutex_id: MutexId) {
self.message_locks(message_id)
.remove(&LockContext::MxLockMonitor(mutex_id));
}
pub fn remove_message_entry(&mut self, message_id: MessageId) {
self.0.remove(&message_id);
}
pub fn is_timeout(
&mut self,
message_id: MessageId,
waiting_reply_to: MessageId,
) -> Option<(BlockNumber, BlockNumber)> {
self.0.get(&message_id).and_then(|locks| {
locks
.get(&LockContext::ReplyTo(waiting_reply_to))
.and_then(|l| l.timeout())
})
}
fn message_locks(&mut self, message_id: MessageId) -> &mut BTreeMap<LockContext, Lock> {
self.0.entry(message_id).or_default()
}
}