1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
// This file is part of Gear.
// Copyright (C) 2022-2024 Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Module for scheduler implementation.
//!
//! Scheduler provides API for all available regular or time-dependent actions.
use crate::storage::{
CountedByKey, DoubleMapStorage, EmptyCallback, KeyIterableByKeyMap, ValueStorage,
};
use core::{fmt::Debug, marker::PhantomData};
/// Represents scheduler's logic of centralized delayed tasks management logic.
pub trait Scheduler {
/// Block number type of the messenger.
type BlockNumber;
/// Task type.
type Task;
/// Cost type.
type Cost;
/// Inner error type generated by gear's storage types.
type Error: TaskPoolError;
/// Output error of each storage algorithm.
///
/// Implements `From<Self::Error>` to be able to return
/// any required error type.
type OutputError: From<Self::Error> + Debug;
/// Storing costs per block.
type CostsPerBlock: SchedulingCostsPerBlock<BlockNumber = Self::BlockNumber, Cost = Self::Cost>;
/// The first block of incomplete tasks, which have already passed,
/// but still contain tasks to deal with.
///
/// Used for checking if scheduler is able to process
/// current block aimed tasks, or there are some
/// incomplete job from previous blocks.
type FirstIncompleteTasksBlock: ValueStorage<Value = Self::BlockNumber>;
/// Gear task pool.
///
/// Task pool contains tasks with block number when they should be done.
type TaskPool: TaskPool<
BlockNumber = Self::BlockNumber,
Task = Self::Task,
Error = Self::Error,
OutputError = Self::OutputError,
> + CountedByKey<Key = Self::BlockNumber, Length = usize>
+ KeyIterableByKeyMap<Key1 = Self::BlockNumber, Key2 = Self::Task>;
/// Resets all related to messenger storages.
///
/// It's a temporary production solution to avoid DB migrations
/// and would be available for test purposes only in the future.
fn reset() {
Self::FirstIncompleteTasksBlock::kill();
Self::TaskPool::clear();
}
}
/// Storing costs getter trait.
pub trait SchedulingCostsPerBlock {
/// Block number type.
type BlockNumber;
/// Cost type.
type Cost;
/// Extra reserve for being able to pay for blocks with incomplete tasks.
fn reserve_for() -> Self::BlockNumber;
/// Cost for storing code per block.
fn code() -> Self::Cost;
/// Cost for storing message in mailbox per block.
fn mailbox() -> Self::Cost;
/// Cost for storing program per block.
fn program() -> Self::Cost;
/// Cost for storing message in waitlist per block.
fn waitlist() -> Self::Cost;
/// Cost for reservation holding.
fn reservation() -> Self::Cost;
/// Cost for storing message in dispatch stash.
/// Everything sent delayed goes into dispatch stash.
fn dispatch_stash() -> Self::Cost;
/// Derives the cost per block based on the lock identifier
fn by_storage_type(storage: StorageType) -> Self::Cost;
}
/// The type whose variants correspond to various storages used in Gear,
/// including waitlist, mailbox, delayed messages stash etc.
/// Used as a parameter in functions performing some common actions on storages
/// like, for instance, holding cost calculation, to signal a concrete storage kind.
#[derive(Debug, Clone, Copy)]
pub enum StorageType {
Code,
Mailbox,
Program,
Waitlist,
Reservation,
DispatchStash,
}
/// Represents tasks managing logic.
pub trait TaskPool {
/// Block number type.
type BlockNumber;
/// Task type.
type Task;
/// Inner error type of queue storing algorithm.
type Error: TaskPoolError;
/// Output error type of the queue.
type OutputError: From<Self::Error>;
/// Inserts given task in task pool.
fn add(bn: Self::BlockNumber, task: Self::Task) -> Result<(), Self::OutputError>;
/// Removes all tasks from task pool.
fn clear();
/// Returns bool, defining does task exist in task pool.
fn contains(bn: &Self::BlockNumber, task: &Self::Task) -> bool;
/// Removes task from task pool by given keys,
/// if present, else returns error.
fn delete(bn: Self::BlockNumber, task: Self::Task) -> Result<(), Self::OutputError>;
}
/// Represents store of task pool's action callbacks.
pub trait TaskPoolCallbacks {
/// Callback on success `add`.
type OnAdd: EmptyCallback;
/// Callback on success `delete`.
type OnDelete: EmptyCallback;
}
/// Represents task pool error type.
///
/// Contains constructors for all existing errors.
pub trait TaskPoolError {
/// Occurs when given task already exists in task pool.
fn duplicate_task() -> Self;
/// Occurs when task wasn't found in storage.
fn task_not_found() -> Self;
}
/// `TaskPool` implementation based on `DoubleMapStorage`.
///
/// Generic parameter `Error` requires `TaskPoolError` implementation.
/// Generic parameter `Callbacks` presents actions for success operations
/// over task pool.
pub struct TaskPoolImpl<T, Task, Error, OutputError, Callbacks>(
PhantomData<(T, Task, Error, OutputError, Callbacks)>,
)
where
T: DoubleMapStorage<Key2 = Task, Value = ()>,
Error: TaskPoolError,
OutputError: From<Error>,
Callbacks: TaskPoolCallbacks;
// Implementation of `TaskPool` for `TaskPoolImpl`.
impl<T, Task, Error, OutputError, Callbacks> TaskPool
for TaskPoolImpl<T, Task, Error, OutputError, Callbacks>
where
T: DoubleMapStorage<Key2 = Task, Value = ()>,
Error: TaskPoolError,
OutputError: From<Error>,
Callbacks: TaskPoolCallbacks,
{
type BlockNumber = T::Key1;
type Task = T::Key2;
type Error = Error;
type OutputError = OutputError;
fn add(bn: Self::BlockNumber, task: Self::Task) -> Result<(), Self::OutputError> {
if !Self::contains(&bn, &task) {
T::insert(bn, task, ());
Callbacks::OnAdd::call();
Ok(())
} else {
Err(Self::Error::duplicate_task().into())
}
}
fn clear() {
T::clear()
}
fn contains(bn: &Self::BlockNumber, task: &Self::Task) -> bool {
T::contains_keys(bn, task)
}
fn delete(bn: Self::BlockNumber, task: Self::Task) -> Result<(), Self::OutputError> {
if T::contains_keys(&bn, &task) {
T::remove(bn, task);
Callbacks::OnDelete::call();
Ok(())
} else {
Err(Self::Error::task_not_found().into())
}
}
}
// Implementation of `CountedByKey` trait for `TaskPoolImpl` in case,
// when inner `DoubleMapStorage` implements `CountedByKey`.
impl<T, Task, Error, OutputError, Callbacks> CountedByKey
for TaskPoolImpl<T, Task, Error, OutputError, Callbacks>
where
T: DoubleMapStorage<Key2 = Task, Value = ()> + CountedByKey<Key = T::Key1>,
Error: TaskPoolError,
OutputError: From<Error>,
Callbacks: TaskPoolCallbacks,
{
type Key = T::Key1;
type Length = T::Length;
fn len(key: &Self::Key) -> Self::Length {
T::len(key)
}
}
// Implementation of `KeyIterableByKeyMap` trait for `TaskPoolImpl` in case,
// when inner `DoubleMapStorage` implements `KeyIterableByKeyMap`.
impl<T, Task, Error, OutputError, Callbacks> KeyIterableByKeyMap
for TaskPoolImpl<T, Task, Error, OutputError, Callbacks>
where
T: DoubleMapStorage<Key2 = Task, Value = ()> + KeyIterableByKeyMap,
Error: TaskPoolError,
OutputError: From<Error>,
Callbacks: TaskPoolCallbacks,
{
type Key1 = <T as KeyIterableByKeyMap>::Key1;
type Key2 = <T as KeyIterableByKeyMap>::Key2;
type DrainIter = T::DrainIter;
type Iter = T::Iter;
fn drain_prefix_keys(bn: Self::Key1) -> Self::DrainIter {
T::drain_prefix_keys(bn)
}
fn iter_prefix_keys(bn: Self::Key1) -> Self::Iter {
T::iter_prefix_keys(bn)
}
}