use crate::{
config::GearConfig,
result::{Error, Result},
};
use futures_util::{StreamExt, TryStreamExt};
use jsonrpsee::{
core::{
client::{ClientT, Subscription, SubscriptionClientT, SubscriptionKind},
traits::ToRpcParams,
},
http_client::{HttpClient, HttpClientBuilder},
types::SubscriptionId,
ws_client::{WsClient, WsClientBuilder},
};
use sp_runtime::DeserializeOwned;
use std::{ops::Deref, result::Result as StdResult, time::Duration};
use subxt::{
backend::{
legacy::LegacyRpcMethods,
rpc::{
RawRpcFuture as RpcFuture, RawRpcSubscription as RpcSubscription, RawValue,
RpcClient as SubxtRpcClient, RpcClientT, RpcParams,
},
},
error::RpcError,
};
const ONE_HUNDRED_MEGA_BYTES: u32 = 100 * 1024 * 1024;
struct Params(Option<Box<RawValue>>);
impl ToRpcParams for Params {
fn to_rpc_params(self) -> StdResult<Option<Box<RawValue>>, serde_json::Error> {
Ok(self.0)
}
}
pub enum RpcClient {
Ws(WsClient),
Http(HttpClient),
}
impl RpcClient {
pub async fn new(uri: &str, timeout: u64) -> Result<Self> {
log::info!("Connecting to {uri} ...");
if uri.starts_with("ws") {
Ok(Self::Ws(
WsClientBuilder::default()
.max_request_size(ONE_HUNDRED_MEGA_BYTES)
.connection_timeout(Duration::from_millis(timeout))
.request_timeout(Duration::from_millis(timeout))
.build(uri)
.await
.map_err(Error::SubxtRpc)?,
))
} else if uri.starts_with("http") {
Ok(Self::Http(
HttpClientBuilder::default()
.request_timeout(Duration::from_millis(timeout))
.build(uri)
.map_err(Error::SubxtRpc)?,
))
} else {
Err(Error::InvalidUrl)
}
}
}
impl RpcClientT for RpcClient {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RpcFuture<'a, Box<RawValue>> {
Box::pin(async move {
let res = match self {
RpcClient::Http(c) => ClientT::request(c, method, Params(params))
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?,
RpcClient::Ws(c) => ClientT::request(c, method, Params(params))
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?,
};
Ok(res)
})
}
fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RpcFuture<'a, RpcSubscription> {
Box::pin(async move {
let stream = match self {
RpcClient::Http(c) => subscription_stream(c, sub, params, unsub).await?,
RpcClient::Ws(c) => subscription_stream(c, sub, params, unsub).await?,
};
let id = match stream.kind() {
SubscriptionKind::Subscription(SubscriptionId::Str(id)) => {
Some(id.clone().into_owned())
}
_ => None,
};
let stream = stream
.map_err(|e| RpcError::ClientError(Box::new(e)))
.boxed();
Ok(RpcSubscription { stream, id })
})
}
}
async fn subscription_stream<C: SubscriptionClientT>(
client: &C,
sub: &str,
params: Option<Box<RawValue>>,
unsub: &str,
) -> StdResult<Subscription<Box<RawValue>>, RpcError> {
SubscriptionClientT::subscribe::<Box<RawValue>, _>(client, sub, Params(params), unsub)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))
}
#[derive(Clone)]
pub struct Rpc {
rpc: SubxtRpcClient,
methods: LegacyRpcMethods<GearConfig>,
retries: u8,
}
impl Rpc {
pub async fn new(uri: &str, timeout: u64, retries: u8) -> Result<Self> {
let rpc = SubxtRpcClient::new(RpcClient::new(uri, timeout).await?);
let methods = LegacyRpcMethods::new(rpc.clone());
Ok(Self {
rpc,
methods,
retries,
})
}
pub fn client(&self) -> SubxtRpcClient {
self.rpc.clone()
}
pub async fn request<Res: DeserializeOwned>(
&self,
method: &str,
params: RpcParams,
) -> Result<Res> {
let mut retries = 0;
loop {
let r = self
.rpc
.request(method, params.clone())
.await
.map_err(Into::into);
if retries == self.retries || r.is_ok() {
return r;
}
retries += 1;
log::warn!("Failed to send request: {:?}, retries: {retries}", r.err());
}
}
}
impl Deref for Rpc {
type Target = LegacyRpcMethods<GearConfig>;
fn deref(&self) -> &Self::Target {
&self.methods
}
}