1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// This file is part of Gear.
//
// Copyright (C) 2021-2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Subscription implementation.

use crate::{config::GearConfig, metadata::Event};
use anyhow::Result;
use futures::{Stream, StreamExt};
use sp_core::H256;
use std::{marker::Unpin, ops::Deref, pin::Pin, task::Poll};
use subxt::{backend::StreamOfResults, blocks::Block, events::Events as SubxtEvents, OnlineClient};

type SubxtBlock = Block<GearConfig, OnlineClient<GearConfig>>;
type BlockSubscription = StreamOfResults<SubxtBlock>;

/// Subscription of finalized blocks.
pub struct Blocks(BlockSubscription);

impl Unpin for Blocks {}

impl Stream for Blocks {
    type Item = Result<SubxtBlock>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let res = futures::ready!(self.0.poll_next_unpin(cx));

        Poll::Ready(res.map(|inner| inner.map_err(Into::into)))
    }
}

impl Blocks {
    /// Wait for the next block from the subscription.
    pub async fn next_events(&mut self) -> Result<Option<BlockEvents>> {
        let Some(next) = StreamExt::next(self).await else {
            return Ok(None);
        };

        Ok(Some(BlockEvents::new(next?).await?))
    }
}

impl From<BlockSubscription> for Blocks {
    fn from(sub: BlockSubscription) -> Self {
        Self(sub)
    }
}

/// Subscription of events.
pub struct Events(Blocks);

impl Events {
    /// Wait for the next events from the subscription.
    pub async fn next(&mut self) -> Result<Vec<Event>> {
        if let Some(es) = self.0.next_events().await? {
            es.events()
        } else {
            Ok(Default::default())
        }
    }
}

impl From<BlockSubscription> for Events {
    fn from(sub: BlockSubscription) -> Self {
        Self(sub.into())
    }
}

/// Subxt events wrapper with block info
#[derive(Clone, Debug)]
pub struct BlockEvents {
    /// Block hash of the provided events
    block_hash: H256,
    /// subxt events
    events: SubxtEvents<GearConfig>,
}

impl BlockEvents {
    /// Wrap subxt events with block info
    pub async fn new(block: Block<GearConfig, OnlineClient<GearConfig>>) -> Result<Self> {
        Ok(Self {
            block_hash: block.hash(),
            events: block.events().await?,
        })
    }

    /// Get the block hash of the holding events
    pub fn block_hash(&self) -> H256 {
        self.block_hash
    }

    /// Get gear events
    pub fn events(&self) -> Result<Vec<Event>> {
        self.events
            .iter()
            .map(|ev| {
                ev.and_then(|e| e.as_root_event::<Event>())
                    .map_err(Into::into)
            })
            .collect::<Result<Vec<_>>>()
    }
}

impl Deref for BlockEvents {
    type Target = SubxtEvents<GearConfig>;

    fn deref(&self) -> &Self::Target {
        &self.events
    }
}