1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// This file is part of Gear.

// Copyright (C) 2021-2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Module for signal-management and waking concrete message based on reply
//! received.

use crate::{prelude::Vec, MessageId};
use core::task::{Context, Waker};
use gear_core_errors::ReplyCode;
use hashbrown::HashMap;

pub type Payload = Vec<u8>;

#[derive(Debug)]
pub(crate) enum ReplyPoll {
    None,
    Pending,
    Some((Payload, ReplyCode)),
}

struct WakeSignal {
    message_id: MessageId,
    payload: Option<(Payload, ReplyCode)>,
    waker: Option<Waker>,
}

pub(crate) struct WakeSignals {
    signals: HashMap<MessageId, WakeSignal>,
}

impl WakeSignals {
    pub fn new() -> Self {
        Self {
            signals: HashMap::new(),
        }
    }

    pub fn register_signal(&mut self, waiting_reply_to: MessageId) {
        let message_id = crate::msg::id();

        self.signals.insert(
            waiting_reply_to,
            WakeSignal {
                message_id,
                payload: None,
                waker: None,
            },
        );

        crate::async_runtime::locks().lock(message_id, waiting_reply_to, Default::default());
    }

    pub fn record_reply(&mut self) {
        if let Some(signal) = self
            .signals
            .get_mut(&crate::msg::reply_to().expect("Shouldn't be called with incorrect context"))
        {
            signal.payload = Some((
                crate::msg::load_bytes().expect("Failed to load bytes"),
                crate::msg::reply_code().expect("Shouldn't be called with incorrect context"),
            ));

            if let Some(waker) = &signal.waker {
                waker.wake_by_ref();
            }

            crate::exec::wake(signal.message_id).expect("Failed to wake the message")
        } else {
            crate::debug!("A message has received a reply though it wasn't to receive one, or a processed message has received a reply");
        }
    }

    pub fn waits_for(&self, reply_to: MessageId) -> bool {
        self.signals.contains_key(&reply_to)
    }

    pub fn poll(&mut self, reply_to: MessageId, cx: &mut Context<'_>) -> ReplyPoll {
        match self.signals.remove(&reply_to) {
            None => ReplyPoll::None,
            Some(mut signal @ WakeSignal { payload: None, .. }) => {
                signal.waker = Some(cx.waker().clone());
                self.signals.insert(reply_to, signal);
                ReplyPoll::Pending
            }
            Some(WakeSignal {
                payload: Some(payload),
                ..
            }) => ReplyPoll::Some(payload),
        }
    }
}