1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
// This file is part of Gear.
//
// Copyright (C) 2021-2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Subscription implementation.
use crate::{config::GearConfig, metadata::Event};
use anyhow::Result;
use futures::{Stream, StreamExt};
use sp_core::H256;
use std::{marker::Unpin, ops::Deref, pin::Pin, task::Poll};
use subxt::{backend::StreamOfResults, blocks::Block, events::Events as SubxtEvents, OnlineClient};
type SubxtBlock = Block<GearConfig, OnlineClient<GearConfig>>;
type BlockSubscription = StreamOfResults<SubxtBlock>;
/// Subscription of finalized blocks.
pub struct Blocks(BlockSubscription);
impl Unpin for Blocks {}
impl Stream for Blocks {
type Item = Result<SubxtBlock>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let res = futures::ready!(self.0.poll_next_unpin(cx));
Poll::Ready(res.map(|inner| inner.map_err(Into::into)))
}
}
impl Blocks {
/// Wait for the next block from the subscription.
pub async fn next_events(&mut self) -> Result<Option<BlockEvents>> {
let Some(next) = StreamExt::next(self).await else {
return Ok(None);
};
Ok(Some(BlockEvents::new(next?).await?))
}
}
impl From<BlockSubscription> for Blocks {
fn from(sub: BlockSubscription) -> Self {
Self(sub)
}
}
/// Subscription of events.
pub struct Events(Blocks);
impl Events {
/// Wait for the next events from the subscription.
pub async fn next(&mut self) -> Result<Vec<Event>> {
if let Some(es) = self.0.next_events().await? {
es.events()
} else {
Ok(Default::default())
}
}
}
impl From<BlockSubscription> for Events {
fn from(sub: BlockSubscription) -> Self {
Self(sub.into())
}
}
/// Subxt events wrapper with block info
#[derive(Clone, Debug)]
pub struct BlockEvents {
/// Block hash of the provided events
block_hash: H256,
/// subxt events
events: SubxtEvents<GearConfig>,
}
impl BlockEvents {
/// Wrap subxt events with block info
pub async fn new(block: Block<GearConfig, OnlineClient<GearConfig>>) -> Result<Self> {
Ok(Self {
block_hash: block.hash(),
events: block.events().await?,
})
}
/// Get the block hash of the holding events
pub fn block_hash(&self) -> H256 {
self.block_hash
}
/// Get gear events
pub fn events(&self) -> Result<Vec<Event>> {
self.events
.iter()
.map(|ev| {
ev.and_then(|e| e.as_root_event::<Event>())
.map_err(Into::into)
})
.collect::<Result<Vec<_>>>()
}
}
impl Deref for BlockEvents {
type Target = SubxtEvents<GearConfig>;
fn deref(&self) -> &Self::Target {
&self.events
}
}