1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
// This file is part of Gear.
// Copyright (C) 2021-2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Module for future-management.
#[cfg(not(feature = "ethexe"))]
use crate::critical;
use crate::{prelude::Box, MessageId};
use core::{
future::Future,
pin::Pin,
task::{Context, Waker},
};
use futures::FutureExt;
use hashbrown::HashMap;
pub(crate) type FuturesMap = HashMap<MessageId, Task>;
type PinnedFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
/// Matches a task to a some message in order to avoid duplicate execution
/// of code that was running before the program was interrupted by `wait`.
pub struct Task {
waker: Waker,
future: PinnedFuture,
lock_exceeded: bool,
}
impl Task {
fn new<F>(future: F) -> Self
where
F: Future<Output = ()> + 'static,
{
Self {
waker: waker_fn::waker_fn(|| {}),
future: future.boxed_local(),
lock_exceeded: false,
}
}
pub(crate) fn set_lock_exceeded(&mut self) {
self.lock_exceeded = true;
}
}
/// The main asynchronous message handling loop.
///
/// Gear allows user and program interaction via
/// messages. This function is the entry point to run the asynchronous message
/// processing.
pub fn message_loop<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
let msg_id = crate::msg::id();
let task = super::futures().entry(msg_id).or_insert_with(|| {
#[cfg(not(feature = "ethexe"))]
{
let system_reserve_amount = crate::Config::system_reserve();
crate::exec::system_reserve_gas(system_reserve_amount)
.expect("Failed to reserve gas for system signal");
}
Task::new(future)
});
if task.lock_exceeded {
// Futures and locks for the message will be cleaned up by
// the async_runtime::handle_signal function
panic!(
"Message 0x{} has exceeded lock ownership time",
hex::encode(msg_id)
);
}
let mut cx = Context::from_waker(&task.waker);
if Pin::new(&mut task.future).poll(&mut cx).is_ready() {
super::futures().remove(&msg_id);
super::locks().remove_message_entry(msg_id);
#[cfg(not(feature = "ethexe"))]
let _ = critical::take_hook();
} else {
super::locks().wait(msg_id);
}
}