diff --git a/.gitignore b/.gitignore index 18dfef8f..2ae10116 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ /target *.vscode /chain/assets -/types/pkg \ No newline at end of file +/types/pkg +/storage/rocksdb +.idea diff --git a/Cargo.lock b/Cargo.lock index a3d67ca7..995cdfe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,10 +41,13 @@ name = "alto-chain" version = "0.0.4" dependencies = [ "alto-client", + "alto-storage", "alto-types", + "alto-vm", "axum", "bytes", "clap", + "commonware-codec", "commonware-consensus", "commonware-cryptography", "commonware-deployer", @@ -63,6 +66,7 @@ dependencies = [ "serde_yaml", "sysinfo", "thiserror 2.0.12", + "tokio", "tracing", "tracing-subscriber", "uuid", @@ -102,21 +106,50 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "alto-storage" +version = "0.1.0" +dependencies = [ + "alto-types", + "bytes", + "commonware-codec", + "commonware-cryptography", + "futures", + "rand", + "rocksdb", + "tempfile", + "tracing", +] + [[package]] name = "alto-types" version = "0.0.4" dependencies = [ "bytes", + "commonware-codec", "commonware-cryptography", "commonware-utils", "getrandom 0.2.15", + "more-asserts", "rand", "serde", "serde-wasm-bindgen", + "sha3", "thiserror 2.0.12", "wasm-bindgen", ] +[[package]] +name = "alto-vm" +version = "0.1.0" +dependencies = [ + "alto-storage", + "alto-types", + "commonware-codec", + "tracing", + "tracing-subscriber", +] + [[package]] name = "anstream" version = "0.6.18" @@ -644,6 +677,26 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.99", +] + [[package]] name = "bitflags" version = "2.9.0" @@ -720,6 +773,16 @@ dependencies = [ "either", ] +[[package]] +name = "bzip2-sys" +version = "0.1.13+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "cc" version = "1.2.16" @@ -731,6 +794,15 @@ dependencies = [ "shlex", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -772,6 +844,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.31" @@ -805,6 +888,17 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "commonware-codec" +version = "0.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ea2a8daeca3c1346a30a4c92af7791b1c37e669fad9e1907203e2cc6f92eec7" +dependencies = [ + "bytes", + "paste", + "thiserror 2.0.12", +] + [[package]] name = "commonware-consensus" version = "0.0.40" @@ -1953,6 +2047,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -1987,24 +2090,75 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "keccak" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +dependencies = [ + "cpufeatures", +] + [[package]] name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.170" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +[[package]] +name = "libloading" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +dependencies = [ + "cfg-if", + "windows-targets", +] + [[package]] name = "libm" version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" +[[package]] +name = "librocksdb-sys" +version = "0.17.1+9.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b7869a512ae9982f4d46ba482c2a304f1efd80c6412a3d4bf57bb79a619679f" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + +[[package]] +name = "libz-sys" +version = "1.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b70e7a7df205e92a1a4cd9aaae7898dac0aa555503cc0a649494d0d60e7651d" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.9.2" @@ -2033,6 +2187,16 @@ version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "matchit" version = "0.8.4" @@ -2051,6 +2215,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.5" @@ -2071,6 +2241,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "more-asserts" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fafa6961cabd9c63bcd77a45d7e3b7f3b552b70417831fb0f56db717e72407e" + [[package]] name = "multimap" version = "0.10.0" @@ -2100,6 +2276,16 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nonzero_ext" version = "0.3.0" @@ -2272,6 +2458,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -2425,7 +2617,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck", - "itertools", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -2445,7 +2637,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.99", @@ -2677,12 +2869,28 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rocksdb" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26ec73b20525cb235bad420f911473b69f9fe27cc856c5461bccd7e4af037f43" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.1" @@ -2980,6 +3188,16 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha3" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75872d278a8f37ef87fa0ddbda7802605cb18344497949862c0d4dcb291eba60" +dependencies = [ + "digest 0.10.7", + "keccak", +] + [[package]] name = "sharded-slab" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index bfa926c9..b05eb5ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,15 +1,19 @@ [workspace] -members = [ +members = [ "chain", "client", "inspector", + "storage", "types", + "vm", ] resolver = "2" [workspace.dependencies] alto-client = { version = "0.0.4", path = "client" } alto-types = { version = "0.0.4", path = "types" } +alto-storage = { version = "0.1.0", path = "storage"} +alto-vm = { version = "0.1.0", path = "vm"} commonware-consensus = { version = "0.0.40" } commonware-cryptography = { version = "0.0.40" } commonware-deployer = { version = "0.0.40" } @@ -20,6 +24,7 @@ commonware-runtime = { version = "0.0.40" } commonware-storage = { version = "0.0.40" } commonware-stream = { version = "0.0.40" } commonware-utils = { version = "0.0.40" } +commonware-codec = { version = "0.0.40"} thiserror = "2.0.12" bytes = "1.7.1" rand = "0.8.5" @@ -32,6 +37,7 @@ tracing-subscriber = "0.3.19" governor = "0.6.3" prometheus-client = "0.22.3" clap = "4.5.18" +more-asserts = "0.3.1" [profile.bench] # Because we enable overflow checks in "release," we should benchmark with them. diff --git a/LICENSE-APACHE b/LICENSE-APACHE deleted file mode 100644 index e5268c58..00000000 --- a/LICENSE-APACHE +++ /dev/null @@ -1,178 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - Copyright (c) 2025 Commonware, Inc. - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS \ No newline at end of file diff --git a/LICENSE-MIT b/LICENSE-MIT deleted file mode 100644 index 21092130..00000000 --- a/LICENSE-MIT +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2025 Commonware, Inc. - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index 007664d6..f8bda3bc 100644 --- a/README.md +++ b/README.md @@ -14,10 +14,6 @@ _Components are designed for deployment in adversarial environments. If you find * [inspector](./inspector/README.md): Inspect `alto` activity. * [types](./types/README.md): Common types used throughout `alto`. -## Licensing - -This repository is dual-licensed under both the [Apache 2.0](./LICENSE-APACHE) and [MIT](./LICENSE-MIT) licenses. You may choose either license when employing this code. - ## Support If you have any questions about `alto`, we encourage you to post in [GitHub Discussions](https://github.com/commonwarexyz/monorepo/discussions). We're happy to help! \ No newline at end of file diff --git a/chain/Cargo.toml b/chain/Cargo.toml index 81ee85ea..d91af01e 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -13,7 +13,10 @@ documentation = "https://docs.rs/alto-chain" [dependencies] alto-types = { workspace = true } alto-client = { workspace = true } +alto-storage = { workspace = true} +alto-vm = { workspace = true} commonware-consensus = { workspace = true } +commonware-codec = {workspace = true} commonware-cryptography = { workspace = true } commonware-deployer = { workspace = true } commonware-macros = { workspace = true } @@ -37,6 +40,7 @@ axum = "0.8.1" uuid = "1.15.1" serde = { version = "1.0.218", features = ["derive"] } serde_yaml = "0.9.34" +tokio = "1.44.0" [[bin]] name = "validator" diff --git a/chain/src/actors/application/actor.rs b/chain/src/actors/application/actor.rs index 89094f93..d0f0053b 100644 --- a/chain/src/actors/application/actor.rs +++ b/chain/src/actors/application/actor.rs @@ -3,8 +3,19 @@ use super::{ supervisor::Supervisor, Config, }; -use crate::actors::syncer; -use alto_types::{Block, Finalization, Notarization, Seed}; +use crate::{actors::syncer, GenesisAllocations}; +use alto_storage::state_db::DB_WRITE_BUFFER_CAPACITY; +use alto_storage::{ + database::Database, + state_db::StateViewDb, + transactional_db::{Key, Op, OpAction}, +}; +use alto_types::{ + account::Account, address::Address, signed_tx::unpack_signed_txs, Block, Finalization, + Notarization, Seed, +}; +use alto_vm::vm::VM; +use commonware_codec::{Codec, WriteBuffer}; use commonware_consensus::threshold_simplex::Prover; use commonware_cryptography::{sha256::Digest, Hasher, Sha256}; use commonware_macros::select; @@ -19,6 +30,7 @@ use futures::{ }; use rand::Rng; use std::{ + collections::HashMap, pin::Pin, sync::{Arc, Mutex}, }; @@ -58,6 +70,17 @@ pub struct Actor { prover: Prover, hasher: Sha256, mailbox: mpsc::Receiver, + // chain id. + chain_id: u64, + // genesis allocations. + genesis: GenesisAllocations, + // State cache. + state_cache: Arc>>, + // Unfinalized State. + // hashmap of block number -> touched keys. + unfinalized_state: Arc>>>, + // State database. + state_db: Arc>, } impl Actor { @@ -70,6 +93,11 @@ impl Actor { prover: config.prover, hasher: Sha256::new(), mailbox, + chain_id: config.chain_id, + genesis: config.genesis, + state_cache: config.state_cache, + unfinalized_state: config.unfinalized_state, + state_db: config.state_db, }, Supervisor::new(config.identity, config.participants, config.share), Mailbox::new(sender), @@ -85,17 +113,50 @@ impl Actor { // Compute genesis digest self.hasher.update(GENESIS); let genesis_parent = self.hasher.finalize(); - let genesis = Block::new(genesis_parent, 0, 0); + let genesis_state_root = [0u8; 32]; + let genesis = Block::new(genesis_parent, 0, 0, Vec::new(), genesis_state_root.into()); let genesis_digest = genesis.digest(); + // --> prepared the genesis digest. + + // there are no blocks built, while genesis. let built: Option = None; let built = Arc::new(Mutex::new(built)); + // @todo initiate fee manager here. + // @todo commit to database. while let Some(message) = self.mailbox.next().await { match message { + // return the genesis digest Message::Genesis { response } => { // Use the digest of the genesis message as the initial // payload. + + // @todo check formating of genesis in validator.rs + let address = self.genesis.address.clone(); + let allocations = self.genesis.value.clone(); + // lock state database. + let mut s_db = self.state_db.lock().unwrap(); + // zip address with allocation. + address + .iter() + .zip(allocations.iter()) + .for_each(|(addr, allo)| { + // create key from address + let adrs = Address(*addr); + let key = StateViewDb::key_accounts(&adrs); + let mut write_buf = WriteBuffer::new(DB_WRITE_BUFFER_CAPACITY); + let acc = Account { + address: adrs, + balance: *allo, + }; + // write the account to the buffer. + acc.write(&mut write_buf); + // store the account in the state database. + let _ = s_db.put(&key, write_buf.as_ref()); + }); let _ = response.send(genesis_digest.clone()); } + // its this validators turn to propose the block. + // So, it should check for available blocks and propose a block. Message::Propose { view, parent, @@ -112,6 +173,10 @@ impl Actor { // continue processing other messages) self.context.with_label("propose").spawn({ let built = built.clone(); + let state_cache = Arc::clone(&self.state_cache); + let unfinalized_state = Arc::clone(&self.unfinalized_state); + let state_db = Arc::clone(&self.state_db); + let mut syncer_clone = syncer.clone(); move |context| async move { let response_closed = oneshot_closed_future(&mut response); select! { @@ -124,16 +189,35 @@ impl Actor { if current <= parent.timestamp { current = parent.timestamp + 1; } - let block = Block::new(parent.digest(), parent.height+1, current); + // fetch transactions from mempool. + // serialize the transactions fetched from mempool into a vec. + // execute the transactions and get the result? + let txs = Vec::new(); // @todo get txs from mempool. + // all the touched keys by the block will be added to unfinalized state. + let mut executor_vm = VM::new( + parent.height + 1, + current, + self.chain_id, + state_cache, + unfinalized_state, + state_db + ); + let results = executor_vm.apply(txs.clone()); // @todo store results seperately. + let dummy_state_root = [0u8;32]; //@todo + // touched keys per block are stored in unfinalized state. + // when a block finalises, touched keys are removed from unfinalized state and moved to cache afte writing to db. + // @todo post finalisation moving to cache and db, removing unfinalized state. + let block = Block::new(parent.digest(), parent.height+1, current, txs, dummy_state_root.into()); let digest = block.digest(); { let mut built = built.lock().unwrap(); *built = Some(block); } - // Send the digest to the consensus let result = response.send(digest.clone()); info!(view, ?digest, success=result.is_ok(), "proposed new block"); + // send the result to syncer. + syncer_clone.store_results(digest, results).await; }, _ = response_closed => { // The response was cancelled @@ -166,14 +250,16 @@ impl Actor { } else { Either::Right(syncer.get(Some(parent.0), parent.1).await) }; - + let state_cache = Arc::clone(&self.state_cache); + let unfinalized_state = Arc::clone(&self.unfinalized_state); + let state_db = Arc::clone(&self.state_db); // Wait for the blocks to be available or the request to be cancelled in a separate task (to // continue processing other messages) self.context.with_label("verify").spawn({ let mut syncer = syncer.clone(); move |context| async move { let requester = - try_join(parent_request, syncer.get(None, payload).await); + try_join(parent_request, syncer.get(None, payload.clone()).await); let response_closed = oneshot_closed_future(&mut response); select! { result = requester => { @@ -199,9 +285,31 @@ impl Actor { return; } + // State transition checks. + + // unpack transactions from the block. + let stxs = unpack_signed_txs(block.raw_txs.clone()); + // create a new vm instance. + let mut executor_vm = VM::new( + block.height, + block.timestamp, + self.chain_id, + state_cache, + unfinalized_state, + state_db + ); + // apply transactions. + let results = executor_vm.apply(stxs.clone()); + // state root generation. + let dummy_state_root = [0u8;32]; //@todo + // verify state root equivalence. + if block.state_root != dummy_state_root.into() { + let _ = response.send(false); + return; + } // Persist the verified block syncer.verified(view, block).await; - + syncer.store_results(payload, results).await; // Send the verification result to the consensus let _ = response.send(true); }, @@ -224,12 +332,38 @@ impl Actor { syncer.notarized(notarization, seed).await; } Message::Finalized { proof, payload } => { + // @todo let state changes be restricted to application. // Parse the proof let (view, parent, _, signature, seed) = self.prover.deserialize_finalization(proof.clone()).unwrap(); let finalization = Finalization::new(view, parent, payload, signature.into()); let seed = Seed::new(view, seed.into()); + // @todo syncer does the heavy lifting of post finalization processing. + if let Some(at_view_touched) = + self.unfinalized_state.lock().unwrap().remove(&view) + { + if at_view_touched.is_empty() { + info!(view, "finalized block with no touched keys"); + } else { + // lock state database. + let mut s_db = self.state_db.lock().unwrap(); + // iterate over the touched keys and write to the state database. + for (key, op) in at_view_touched.iter() { + match op.action { + OpAction::Update => { + let _ = s_db.put(key, &op.value); + } + OpAction::Delete => { + let _ = s_db.delete(key); + } + _ => { /*nothing to do with the database. */ } + } + } + info!(view, "finalized block with touched keys"); + } + } + // Send the finalization to the syncer syncer.finalized(finalization, seed).await; } diff --git a/chain/src/actors/application/mod.rs b/chain/src/actors/application/mod.rs index 70f2dcbc..5eef80c5 100644 --- a/chain/src/actors/application/mod.rs +++ b/chain/src/actors/application/mod.rs @@ -1,17 +1,28 @@ +use alto_storage::{ + database::Database, + transactional_db::{Key, Op}, +}; use commonware_consensus::threshold_simplex::Prover; use commonware_cryptography::{ bls12381::primitives::{group, poly::Poly}, ed25519::PublicKey, sha256::Digest, }; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; mod actor; pub use actor::Actor; mod ingress; pub use ingress::Mailbox; +mod router; mod supervisor; pub use supervisor::Supervisor; +use crate::GenesisAllocations; + /// Configuration for the application. pub struct Config { /// Prover used to decode opaque proofs from consensus. @@ -27,4 +38,11 @@ pub struct Config { /// Number of messages from consensus to hold in our backlog /// before blocking. pub mailbox_size: usize, + + pub chain_id: u64, + /// State + pub state_cache: Arc>>, + pub unfinalized_state: Arc>>>, + pub state_db: Arc>, + pub genesis: GenesisAllocations, } diff --git a/chain/src/actors/application/router.rs b/chain/src/actors/application/router.rs new file mode 100644 index 00000000..28e7bc4a --- /dev/null +++ b/chain/src/actors/application/router.rs @@ -0,0 +1,107 @@ +use axum::response::IntoResponse; +use commonware_runtime::{Clock, Handle, Metrics, Spawner}; +use rand::Rng; +use std::io; +use tokio::net::TcpListener; +use tracing::{event, Level}; + +pub struct RouterConfig { + port: i32, +} + +impl RouterConfig { + pub fn default_config() -> RouterConfig { + RouterConfig { port: 7844 } + } +} + +pub struct Router { + context: R, + cfg: RouterConfig, + listener: Option, + router: Option, + is_active: bool, +} + +impl Router { + const PATH_SUBMIT_BLOCK: &'static str = "/builder/submit"; + + pub fn new(context: R, cfg: RouterConfig) -> Self { + if cfg.port == 0 { + panic!("Invalid port number"); + } + + Router { + context, + cfg, + listener: None, + router: None, + is_active: false, + } + } + + pub async fn start(mut self) -> Handle<()> { + self.context.spawn_ref()(self.run()) + } + + pub fn stop(&self) { + if !self.is_active { + return; + } + + event!(Level::INFO, "stopped router service"); + } + + async fn init_listener(&mut self) -> io::Result { + let listener = TcpListener::bind(format!("127.0.0.1:{}", self.cfg.port)).await?; + Ok(listener) + } + + async fn handle_default() -> impl IntoResponse { + "Hello world!" + } + + async fn handle_submit_block() -> impl IntoResponse { + "Submit block" + } + + fn init_router(&mut self) { + // let router = axum::Router::new() + // .route("/", get(Router::handle_default)) + // .route(Router::PATH_SUBMIT_BLOCK, get(Router::handle_submit_block())); + // + // self.router = Some(router) + } + + async fn serve(&mut self) -> Result<(), Box> { + let listener = self + .listener + .take() + .ok_or("serve failed because listener is None"); + let router = self + .router + .take() + .ok_or("serve failed because router is None"); + axum::serve(listener.unwrap(), router.unwrap()).await?; + Ok(()) + } + + async fn run(mut self) { + event!(Level::INFO, "starting router service"); + + let listener_res = self.init_listener(); + match listener_res.await { + Ok(value) => self.listener = Some(value), + Err(error) => { + println!("Error during listener init: {}", error); + return; + } + } + + self.init_router(); + self.serve().await.unwrap(); + self.is_active = true; + + event!(Level::INFO, "finished starting router service"); + } +} diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index 0b2d85f5..b9c9dc0b 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -13,7 +13,11 @@ use crate::{ }, Indexer, }; -use alto_types::{Block, Finalization, Finalized, Notarized}; +use alto_storage::{ + database::Database, + transactional_db::{Key, Op}, +}; +use alto_types::{tx::TxResult, Block, Finalization, Finalized, Notarized}; use bytes::Bytes; use commonware_cryptography::{bls12381, ed25519::PublicKey, sha256::Digest}; use commonware_macros::select; @@ -57,6 +61,9 @@ pub struct Actor, + // @todo this should be improvised. results should be streamed as soon as the block is finalised. + // and should be stored in a seperate db and maintained by rpc services. + results: Arc>>>, // Blocks verified stored by view<>digest verified: Archive, // Blocks notarized stored by view<>digest @@ -76,6 +83,14 @@ pub struct Actor>>, + // Unfinalized State. + // hashmap of block number -> touched keys. + unfinalized_state: Arc>>>, + // State database. + state_db: Arc>, } impl, I: Indexer> Actor { @@ -211,6 +226,8 @@ impl, I: Index activity_timeout: config.activity_timeout, indexer: config.indexer, + results: config.results, + verified: verified_archive, notarized: notarized_archive, @@ -221,6 +238,10 @@ impl, I: Index finalized_height, contiguous_height, + + state_cache: config.state_cache, + unfinalized_state: config.unfinalized_state, + state_db: config.state_db, }, Mailbox::new(sender), ) @@ -276,12 +297,14 @@ impl, I: Index ); resolver_engine.start(backfill_network); + // @todo block syncing process is happening here. // Process all finalized blocks in order (fetching any that are missing) let last_view_processed = Arc::new(Mutex::new(0)); let verified = Wrapped::new(self.verified); let notarized = Wrapped::new(self.notarized); let finalized = Wrapped::new(self.finalized); let blocks = Wrapped::new(self.blocks); + let results = Arc::clone(&self.results); let (mut finalizer_sender, mut finalizer_receiver) = mpsc::channel::<()>(1); self.context.with_label("finalizer").spawn({ let mut resolver = resolver.clone(); @@ -464,6 +487,9 @@ impl, I: Index mailbox_message = self.mailbox.next() => { let message = mailbox_message.expect("Mailbox closed"); match message { + Message::StoreResults {payload, result} => { + results.lock().unwrap().insert(payload, result); + } Message::Broadcast { payload } => { broadcast_network .0 @@ -591,6 +617,7 @@ impl, I: Index let view = proof.view; let digest = proof.payload.clone(); let height = block.height; + // if we have block either in verified or notarized, it means we have the block verified and unfinalized state map. finalized .put(height, proof.payload.clone(), proof.serialize().into()) .await @@ -693,6 +720,7 @@ impl, I: Index debug!(view, ?payload, "registering waiter"); waiters.entry(payload).or_default().push(response); } + } }, // Handle incoming broadcasts @@ -830,7 +858,7 @@ impl, I: Index debug!(height, "received finalization"); let _ = response.send(true); - // Persist the finalization + // Persist the finalization @todo finalized .put(height, finalization.block.digest(), finalization.proof.serialize().into()) .await diff --git a/chain/src/actors/syncer/ingress.rs b/chain/src/actors/syncer/ingress.rs index 7cc0cc24..8ef4ab16 100644 --- a/chain/src/actors/syncer/ingress.rs +++ b/chain/src/actors/syncer/ingress.rs @@ -1,4 +1,4 @@ -use alto_types::{Block, Finalization, Notarization, Seed}; +use alto_types::{tx::TxResult, Block, Finalization, Notarization, Seed}; use commonware_cryptography::sha256::Digest; use futures::{ channel::{mpsc, oneshot}, @@ -27,6 +27,10 @@ pub enum Message { proof: Finalization, seed: Seed, }, + StoreResults { + payload: Digest, + result: Vec, + }, } /// Mailbox for the application. @@ -83,4 +87,13 @@ impl Mailbox { .await .expect("Failed to send lock"); } + + /// store results, stores block execution results in in mem cache. + /// @todo find a better way to store results. + pub async fn store_results(&mut self, payload: Digest, result: Vec) { + self.sender + .send(Message::StoreResults { payload, result }) + .await + .expect("Failed to send store results"); + } } diff --git a/chain/src/actors/syncer/mod.rs b/chain/src/actors/syncer/mod.rs index 6e568253..8e804d6b 100644 --- a/chain/src/actors/syncer/mod.rs +++ b/chain/src/actors/syncer/mod.rs @@ -1,5 +1,14 @@ -use commonware_cryptography::{bls12381::primitives::group, ed25519::PublicKey}; +use alto_storage::{ + database::Database, + transactional_db::{Key, Op}, +}; +use alto_types::tx::TxResult; +use commonware_cryptography::{bls12381::primitives::group, ed25519::PublicKey, sha256::Digest}; use governor::Quota; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; mod actor; mod archive; @@ -33,4 +42,11 @@ pub struct Config { pub activity_timeout: u64, pub indexer: Option, + + pub results: Arc>>>, + + // State + pub state_cache: Arc>>, + pub unfinalized_state: Arc>>>, + pub state_db: Arc>, } diff --git a/chain/src/bin/setup.rs b/chain/src/bin/setup.rs index a68f4629..97bee6e1 100644 --- a/chain/src/bin/setup.rs +++ b/chain/src/bin/setup.rs @@ -1,4 +1,5 @@ -use alto_chain::Config; +use alto_chain::{Config, GenesisAllocations}; +use alto_types::address::Address; use clap::{value_parser, Arg, ArgMatches, Command}; use commonware_cryptography::{ bls12381::{ @@ -244,6 +245,7 @@ fn generate(sub_matches: &ArgMatches) { message_backlog, mailbox_size, + state_db_directory: "/home/ubuntu/alto/state".to_string(), indexer: None, }; peer_configs.push((peer_config_file.clone(), peer_config)); @@ -297,6 +299,18 @@ fn generate(sub_matches: &ArgMatches) { let file = fs::File::create(&path).unwrap(); serde_yaml::to_writer(file, &config).unwrap(); info!(path = "config.yaml", "wrote configuration file"); + + // genesis @todo this is a simple genesis implementation. + // addr should be considered along with the values. + let addr1 = Address::create_random_address(); + let genesis = GenesisAllocations { + address: vec![*addr1.as_bytes()], + value: vec![1000000000000], + }; + let genesis_path = format!("{}/genesis.yaml", output); + let file = fs::File::create(&genesis_path).unwrap(); + serde_yaml::to_writer(file, &genesis).unwrap(); + info!(path = "genesis.yaml", "wrote genesis file"); } fn indexer(sub_matches: &ArgMatches) { diff --git a/chain/src/bin/validator.rs b/chain/src/bin/validator.rs index 8ed27842..4821521e 100644 --- a/chain/src/bin/validator.rs +++ b/chain/src/bin/validator.rs @@ -1,5 +1,6 @@ -use alto_chain::{engine, Config}; +use alto_chain::{engine, Config, GenesisAllocations}; use alto_client::Client; +use alto_storage::rocks_db::RocksDbDatabase; use alto_types::P2P_NAMESPACE; use axum::{routing::get, serve, Extension, Router}; use clap::{Arg, Command}; @@ -18,6 +19,7 @@ use commonware_utils::{from_hex_formatted, hex, quorum}; use futures::future::try_join_all; use governor::Quota; use prometheus_client::metrics::gauge::Gauge; +use std::sync::{Arc, Mutex}; use std::{ collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -54,6 +56,7 @@ fn main() { .about("Validator for an alto chain.") .arg(Arg::new("peers").long("peers").required(true)) .arg(Arg::new("config").long("config").required(true)) + .arg(Arg::new("genesis").long("genesis").required(true)) .get_matches(); // Create logger @@ -79,7 +82,11 @@ fn main() { .collect(); info!(peers = peers.len(), "loaded peers"); let peers_u32 = peers.len() as u32; - + // @todo load genesis and use to initiate the application. + let genesis_file = matches.get_one::("genesis").unwrap(); + let genesis_file = std::fs::read_to_string(genesis_file).expect("Could not read genesis file"); + let genesis: GenesisAllocations = + serde_yaml::from_str(&genesis_file).expect("Could not parse genesis file"); // Load config let config_file = matches.get_one::("config").unwrap(); let config_file = std::fs::read_to_string(config_file).expect("Could not read config file"); @@ -185,6 +192,11 @@ fn main() { indexer = Some(Client::new(&uri, identity_public.into())); } + // create state db: rocks db for now. + let state_db = RocksDbDatabase::new_with_path(&config.state_db_directory) + .expect("Could not create state db"); + let wrapped_state_db = Arc::new(Mutex::new(state_db)); + // Create engine let config = engine::Config { partition_prefix: "engine".to_string(), @@ -204,6 +216,9 @@ fn main() { fetch_concurrent: FETCH_CONCURRENT, fetch_rate_per_peer: resolver_limit, indexer, + chain_id: 10, + state_db: wrapped_state_db, + genesis, }; let engine = engine::Engine::new(context.with_label("engine"), config).await; diff --git a/chain/src/engine.rs b/chain/src/engine.rs index 2bb8dabc..99de46ed 100644 --- a/chain/src/engine.rs +++ b/chain/src/engine.rs @@ -1,6 +1,10 @@ use crate::{ actors::{application, syncer}, - Indexer, + GenesisAllocations, Indexer, +}; +use alto_storage::{ + database::Database, + transactional_db::{Key, Op}, }; use alto_types::NAMESPACE; use commonware_consensus::threshold_simplex::{self, Engine as Consensus, Prover}; @@ -17,6 +21,8 @@ use futures::future::try_join_all; use governor::clock::Clock as GClock; use governor::Quota; use rand::{CryptoRng, Rng}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::{error, warn}; @@ -40,6 +46,10 @@ pub struct Config { pub fetch_rate_per_peer: Quota, pub indexer: Option, + + pub chain_id: u64, + pub state_db: Arc>, + pub genesis: GenesisAllocations, } pub struct Engine< @@ -62,12 +72,24 @@ pub struct Engine< application::Mailbox, application::Supervisor, >, + + // state + state_cache: Arc>>, + unfinalized_state: Arc>>>, + state_db: Arc>, } impl + Metrics, I: Indexer> Engine { pub async fn new(context: E, cfg: Config) -> Self { + // @todo initalizing state cache and unfinalized state. + // if it is necessary pass state_cache, unfinalized_state and state_db to both application and syncer. + let results = Arc::new(Mutex::new(HashMap::new())); + let state_cache: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let unfinalized_state: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + // Create the application let public = public(&cfg.identity); let (application, supervisor, application_mailbox) = application::Actor::new( @@ -78,6 +100,11 @@ impl + Metri identity: cfg.identity.clone(), share: cfg.share, mailbox_size: cfg.mailbox_size, + chain_id: cfg.chain_id, + state_cache: Arc::clone(&state_cache), + unfinalized_state: Arc::clone(&unfinalized_state), + state_db: Arc::clone(&cfg.state_db), + genesis: cfg.genesis.clone(), }, ); @@ -93,6 +120,10 @@ impl + Metri backfill_quota: cfg.backfill_quota, activity_timeout: cfg.activity_timeout, indexer: cfg.indexer, + results: Arc::clone(&results), + state_cache: Arc::clone(&state_cache), + unfinalized_state: Arc::clone(&unfinalized_state), + state_db: Arc::clone(&cfg.state_db), }, ) .await; @@ -129,7 +160,6 @@ impl + Metri fetch_rate_per_peer: cfg.fetch_rate_per_peer, }, ); - // Return the engine Self { context, @@ -138,6 +168,9 @@ impl + Metri syncer, syncer_mailbox, consensus, + state_cache, + unfinalized_state, + state_db: cfg.state_db, } } @@ -201,6 +234,9 @@ impl + Metri // Start consensus let consensus_handle = self.consensus.start(voter_network, resolver_network); + // Start the router + // let router_config = RouterConfig::default_config(); + // Wait for any actor to finish if let Err(e) = try_join_all(vec![application_handle, syncer_handle, consensus_handle]).await diff --git a/chain/src/lib.rs b/chain/src/lib.rs index dd73bb76..0a73518e 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -75,12 +75,21 @@ pub struct Config { pub mailbox_size: usize, pub indexer: Option, + + pub state_db_directory: String, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct GenesisAllocations { + pub address: Vec<[u8; 32]>, + pub value: Vec, } #[cfg(test)] mod tests { use super::*; - use alto_types::{Finalized, Notarized, Seed}; + use alto_storage::rocks_db::RocksDbDatabase; + use alto_types::{address::Address, Finalized, Notarized, Seed}; use bls12381::primitives::poly; use commonware_cryptography::{bls12381::dkg::ops, ed25519::PublicKey, Ed25519, Scheme}; use commonware_macros::test_traced; @@ -93,6 +102,7 @@ mod tests { use engine::{Config, Engine}; use governor::Quota; use rand::{rngs::StdRng, Rng, SeedableRng}; + use std::{ collections::{HashMap, HashSet}, num::NonZeroU32, @@ -227,6 +237,12 @@ mod tests { }; let (executor, mut context, auditor) = Executor::init(cfg); executor.start(async move { + let addr1 = Address::create_random_address(); + let addr2 = Address::create_random_address(); + let genesis = GenesisAllocations { + address: vec![*addr1.as_bytes(), *addr2.as_bytes()], + value: vec![1000000000000, 1000000000000], + }; // Create simulated network let (network, mut oracle) = Network::new( context.with_label("network"), @@ -263,7 +279,11 @@ mod tests { // Create scheme context let public_key = scheme.public_key(); public_keys.insert(public_key.clone()); - + let state_db = RocksDbDatabase::new_with_path( + format!("/home/ubuntu/state_db/{}", idx).as_str(), + ) + .expect("Could not create state db"); + let wrapped_state_db = Arc::new(Mutex::new(state_db)); // Configure engine let uid = format!("validator-{}", public_key); let config: Config = engine::Config { @@ -284,6 +304,9 @@ mod tests { fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), indexer: None, + chain_id: 10, + state_db: wrapped_state_db, + genesis: genesis.clone(), }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -373,6 +396,12 @@ mod tests { let final_container_required = 20; let (executor, mut context, _) = Executor::timed(Duration::from_secs(30)); executor.start(async move { + let addr1 = Address::create_random_address(); + let addr2 = Address::create_random_address(); + let genesis = GenesisAllocations { + address: vec![*addr1.as_bytes(), *addr2.as_bytes()], + value: vec![1000000000000, 1000000000000], + }; // Create simulated network let (network, mut oracle) = Network::new( context.with_label("network"), @@ -424,6 +453,11 @@ mod tests { // Configure engine let public_key = scheme.public_key(); let uid = format!("validator-{}", public_key); + let state_db = RocksDbDatabase::new_with_path( + format!("/home/ubuntu/state_db/{}", idx).as_str(), + ) + .expect("Could not create state db"); + let wrapped_state_db = Arc::new(Mutex::new(state_db)); let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme.clone(), @@ -442,6 +476,9 @@ mod tests { fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), indexer: None, + chain_id: 10, + state_db: wrapped_state_db, + genesis: genesis.clone(), }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -507,6 +544,10 @@ mod tests { let share = shares[0]; let public_key = scheme.public_key(); let uid = format!("validator-{}", public_key); + let state_db = + RocksDbDatabase::new_with_path(format!("/home/ubuntu/state_db/{}", uid).as_str()) + .expect("Could not create state db"); + let wrapped_state_db = Arc::new(Mutex::new(state_db)); let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme.clone(), @@ -525,6 +566,9 @@ mod tests { fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), indexer: None, + chain_id: 10, + state_db: wrapped_state_db, + genesis: genesis.clone(), }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -594,6 +638,12 @@ mod tests { while !*done.lock().unwrap() { runs += 1; executor.start({ + let addr1 = Address::create_random_address(); + let addr2 = Address::create_random_address(); + let genesis = GenesisAllocations { + address: vec![*addr1.as_bytes(), *addr2.as_bytes()], + value: vec![1000000000000, 1000000000000], + }; let mut context = context.clone(); let public = public.clone(); let shares = shares.clone(); @@ -637,7 +687,11 @@ mod tests { // Create scheme context let public_key = scheme.public_key(); public_keys.insert(public_key.clone()); - + let state_db = RocksDbDatabase::new_with_path( + format!("/home/ubuntu/state_db/{}", idx).as_str(), + ) + .expect("Could not create state db"); + let wrapped_state_db = Arc::new(Mutex::new(state_db)); // Configure engine let uid = format!("validator-{}", public_key); let config: Config = engine::Config { @@ -658,6 +712,9 @@ mod tests { fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), indexer: None, + chain_id: 10, + state_db: wrapped_state_db, + genesis: genesis.clone(), }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -736,6 +793,12 @@ mod tests { let required_container = 10; let (executor, mut context, _) = Executor::timed(Duration::from_secs(30)); executor.start(async move { + let addr1 = Address::create_random_address(); + let addr2 = Address::create_random_address(); + let genesis = GenesisAllocations { + address: vec![*addr1.as_bytes(), *addr2.as_bytes()], + value: vec![1000000000000, 1000000000000], + }; // Create simulated network let (network, mut oracle) = Network::new( context.with_label("network"), @@ -783,6 +846,11 @@ mod tests { // Configure engine let uid = format!("validator-{}", public_key); + let state_db = RocksDbDatabase::new_with_path( + format!("/home/ubuntu/state_db/{}", idx).as_str(), + ) + .expect("Could not create state db"); + let wrapped_state_db = Arc::new(Mutex::new(state_db)); let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme, @@ -801,6 +869,9 @@ mod tests { fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), indexer: Some(indexer.clone()), + chain_id: 10, + state_db: wrapped_state_db, + genesis: genesis.clone(), }; let engine = Engine::new(context.with_label(&uid), config).await; diff --git a/storage/Cargo.toml b/storage/Cargo.toml new file mode 100644 index 00000000..a78dbb1f --- /dev/null +++ b/storage/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "alto-storage" +version = "0.1.0" +edition = "2021" + +[dependencies] +alto-types = { workspace = true } +commonware-cryptography = { workspace = true } +tracing = {workspace = true} +futures = {workspace = true} +rand = "0.8.5" +rocksdb = "0.23.0" +commonware-codec = { workspace = true} +bytes = "1.7.1" +tempfile = "3.18.0" \ No newline at end of file diff --git a/storage/src/database.rs b/storage/src/database.rs new file mode 100644 index 00000000..0964eef3 --- /dev/null +++ b/storage/src/database.rs @@ -0,0 +1,10 @@ +use std::error::Error; + +// Define database interface that will be used for all impls +pub trait Database { + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), Box>; + + fn get(&mut self, key: &[u8]) -> Result>, Box>; + + fn delete(&mut self, key: &[u8]) -> Result<(), Box>; +} diff --git a/storage/src/hashmap_db.rs b/storage/src/hashmap_db.rs new file mode 100644 index 00000000..89195625 --- /dev/null +++ b/storage/src/hashmap_db.rs @@ -0,0 +1,51 @@ +use crate::database::Database; +use std::collections::HashMap; +use std::error::Error; + +pub struct HashmapDatabase { + data: HashMap, Vec>, +} + +impl Default for HashmapDatabase { + fn default() -> Self { + Self::new() + } +} + +impl HashmapDatabase { + pub fn new() -> Self { + Self { + data: HashMap::new(), + } + } +} + +impl Database for HashmapDatabase { + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), Box> { + self.data.insert(key.into(), value.into()); + Ok(()) + } + + fn get(&mut self, key: &[u8]) -> Result>, Box> { + self.data.get(key).map_or(Ok(None), |v| Ok(Some(v.clone()))) + } + + fn delete(&mut self, key: &[u8]) -> Result<(), Box> { + self.data.remove(key); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_hashmap_db() { + let mut db = HashmapDatabase::new(); + let key = b"key1"; + let value = b"value1"; + db.put(key, value).unwrap(); + let retrieved = db.get(key).unwrap().unwrap(); + assert_eq!(retrieved.as_slice(), value); + } +} diff --git a/storage/src/lib.rs b/storage/src/lib.rs new file mode 100644 index 00000000..0734abad --- /dev/null +++ b/storage/src/lib.rs @@ -0,0 +1,5 @@ +pub mod database; +pub mod hashmap_db; +pub mod rocks_db; +pub mod state_db; +pub mod transactional_db; diff --git a/storage/src/rocks_db.rs b/storage/src/rocks_db.rs new file mode 100644 index 00000000..67af4397 --- /dev/null +++ b/storage/src/rocks_db.rs @@ -0,0 +1,63 @@ +use crate::database::Database; +use rocksdb::{Options, DB}; +use std::error::Error; +use std::path::Path; +use tempfile::TempDir; + +const SAL_ROCKS_DB_PATH: &str = "rocksdb"; + +pub struct RocksDbDatabase { + db: DB, +} + +impl RocksDbDatabase { + pub fn new() -> Result> { + Self::new_with_path(SAL_ROCKS_DB_PATH) + } + + pub fn new_with_path(path: &str) -> Result> { + let mut opts = Options::default(); + opts.create_if_missing(true); + + let db_path = Path::new(path); + let db = DB::open(&opts, db_path)?; + Ok(RocksDbDatabase { db }) + } + + pub fn new_tmp_db() -> Result> { + let temp_dir = TempDir::new()?; + let db_path = temp_dir.path().join("testdb"); + Self::new_with_path(db_path.to_str().unwrap()) + } +} + +impl Database for RocksDbDatabase { + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), Box> { + self.db.put(key, value)?; + Ok(()) + } + + fn get(&mut self, key: &[u8]) -> Result>, Box> { + let result = self.db.get(key)?; + Ok(result) + } + + fn delete(&mut self, key: &[u8]) -> Result<(), Box> { + self.db.delete(key)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_rocks_db_basic() { + let mut db = RocksDbDatabase::new().expect("db could not be created"); + let key = b"key1"; + let value = b"value1"; + db.put(key, value).unwrap(); + let retrieved = db.get(key).unwrap().unwrap(); + assert_eq!(retrieved.as_slice(), value); + } +} diff --git a/storage/src/state_db.rs b/storage/src/state_db.rs new file mode 100644 index 00000000..676dd407 --- /dev/null +++ b/storage/src/state_db.rs @@ -0,0 +1,148 @@ +use crate::transactional_db::TransactionalDb; +use alto_types::account::{Account, Balance}; +use alto_types::address::Address; +use alto_types::state_view::StateView; +use bytes::Bytes; +use commonware_codec::{Codec, ReadBuffer, WriteBuffer}; +use std::error::Error; +use tracing::{info, warn}; +const ACCOUNTS_PREFIX: u8 = 0x0; +pub const DB_WRITE_BUFFER_CAPACITY: usize = 500; + +/// StateViewDb is a wrapper around TransactionalDb that provides StateViews for block execution. +/// StateViewDb simplifies the interactions with state by providing methods that abstract away the underlying database operations. +/// It allows for easy retrieval and modification of account states, such as balances. +pub struct StateViewDb<'a> { + db: &'a mut dyn TransactionalDb, +} + +impl StateView for StateViewDb<'_> { + fn get_account(&mut self, address: &Address) -> Result, Box> { + let key = Self::key_accounts(address); + self.db.get(&key).and_then(|v| { + if let Some(value) = v { + let bytes = Bytes::from(value); + let mut read_buf = ReadBuffer::new(bytes); + Account::read(&mut read_buf) + .map(Some) + .map_err(|e| Box::new(e) as Box) + } else { + Err("Account not found".into()) + } + }) + } + + fn set_account(&mut self, acc: &Account) -> Result<(), Box> { + let key = Self::key_accounts(&acc.address); + let mut write_buf = WriteBuffer::new(DB_WRITE_BUFFER_CAPACITY); + acc.write(&mut write_buf); + self.db.insert(&key, write_buf.as_ref().to_vec()) + } + + fn get_balance(&mut self, address: &Address) -> Option { + info!("Getting balance for address: {}", address); + match self.get_account(address) { + // return balance if account exists + Ok(Some(acc)) => Some(acc.balance), + // return 0 if no account + Ok(None) => { + info!("Account not found, returning 0 balance"); + Some(0) + } + // return none if an err occurred + Err(e) => { + warn!("Error getting account: {}", e); + None + } + } + } + + fn set_balance(&mut self, address: &Address, amt: Balance) -> bool { + info!("Setting balance for address: {}", address); + match self.get_account(address) { + Ok(Some(mut acc)) => { + acc.balance = amt; + self.set_account(&acc).is_ok() + } + Err(e) => { + warn!("Error getting account: {}", e); + let acc = Account { + address: address.clone(), + balance: amt, + }; + self.set_account(&acc).is_ok() + } + _ => false, + } + } +} + +impl<'a> StateViewDb<'a> { + pub fn new(db: &'a mut dyn TransactionalDb) -> Self { + StateViewDb { db } + } + + pub fn key_accounts(addr: &Address) -> [u8; 33] { + Self::make_multi_key(ACCOUNTS_PREFIX, addr.as_slice()) + } + + fn make_multi_key(prefix: u8, sub_id: &[u8]) -> [u8; 33] { + assert_eq!(sub_id.len(), 32, "Sub_id must be exactly 32 bytes"); + + let mut key = [0u8; 33]; + key[0] = prefix; + key[1..33].copy_from_slice(sub_id); + key + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::hashmap_db::HashmapDatabase; + use crate::transactional_db::{InMemoryCachingTransactionalDb, Key, Op}; + use alto_types::address::Address; + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + + #[test] + fn test_it_works() { + // setup state db + let cache: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let unfinalized: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let db = Arc::new(Mutex::new(HashmapDatabase::new())); + let mut in_mem = + InMemoryCachingTransactionalDb::new(Arc::clone(&cache), Arc::clone(&unfinalized), db); + + let address = Address::create_random_address(); + let account = Account { + address: address.clone(), + balance: 1000, + }; + let mut state_db = StateViewDb::new(&mut in_mem); + state_db.set_account(&account).unwrap(); + let _ = in_mem.commit_last_tx(); + let _ = in_mem.commit(); + assert_eq!(unfinalized.lock().unwrap().len(), 1); + let mut state_db2 = StateViewDb::new(&mut in_mem); + let retrieved = state_db2.get_account(&address).unwrap().unwrap(); + assert_eq!(retrieved, account); + } + + #[test] + #[should_panic] + fn test_no_account_earlier() { + // setup state db + let cache: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let unfinalized: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let db = Arc::new(Mutex::new(HashmapDatabase::new())); + let mut in_mem = + InMemoryCachingTransactionalDb::new(Arc::clone(&cache), Arc::clone(&unfinalized), db); + + let address = Address::create_random_address(); + + let mut state_db = StateViewDb::new(&mut in_mem); + + let _ = state_db.get_account(&address).unwrap(); + } +} diff --git a/storage/src/transactional_db.rs b/storage/src/transactional_db.rs new file mode 100644 index 00000000..dfd4e926 --- /dev/null +++ b/storage/src/transactional_db.rs @@ -0,0 +1,445 @@ +use std::collections::HashMap; +use std::error::Error; + +use crate::database::Database; +use std::sync::{Arc, Mutex}; + +pub type Key = [u8; 33]; + +// i. should track every operation, that a tx does. +// ii. should be able to rollback if a tx reverts. +// iii. should be able to rollback if a block forks. +// iv. should be able to commit to all the state changes once a block has been accepted. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum OpAction { + // key is read + Read, + // key is created + Create, + // key is updated + Update, + // key got deleted + Delete, +} + +/// Op contains action performed and value stored over a key. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Op { + // Action performed + pub action: OpAction, + // Resulting value after perfromed action. + pub value: Vec, +} + +/// Implements finalization to database out of TransactionalDb trait. +pub trait TransactionalDb { + /// initialize the cache with an already available hashmap of key-value pairs. + fn init_cache(&mut self, cache: Arc>>); + /// get the value corresponding to the key. + /// use this method for querying state. + fn get(&mut self, key: &Key) -> Result>, Box>; + /// insert a key-pair. could be a create or delete action. + /// underlying struct should handle the OpAction part. + fn insert(&mut self, key: &Key, value: Vec) -> Result<(), Box>; + /// delete a key-pair + fn delete(&mut self, key: &Key) -> Result<(), Box>; + /// get a key from cache. do not call this method directly, instead call get. + fn get_from_cache(&self, key: &Key) -> Result>, Box>; + /// get a key from the underlying storage. do not call this method directly, instead call get. If the key is not in the storage, it will return an error. + fn get_from_db(&mut self, key: &Key) -> Result>, Box>; + /// commit last tx changes within the cache + fn commit_last_tx(&mut self) -> Result<(), Box>; + /// commit changes to the unfinalized map. + fn commit(&mut self) -> Result<(), Box>; + /// rollback last tx changes within the cache. + fn rollback_last_tx(&mut self) -> Result<(), Box>; + /// rollback entirely. + fn rollback(&mut self) -> Result<(), Box>; +} + +pub struct InMemoryCachingTransactionalDb { + /// cache is init'ed at the start. + pub cache: Arc>>, + /// unfinalized changes from previous block(s). + pub unfinalized: Arc>>, + /// set of all key value changes from last init. + pub touched: HashMap, + /// set of all key value changes from last commit_last_tx + pub touched_tx: HashMap, + /// underlying database. + pub db: Arc>, +} + +impl InMemoryCachingTransactionalDb { + pub fn new( + cache: Arc>>, + unfinalized: Arc>>, + db: Arc>, + ) -> Self { + Self { + cache, + unfinalized, + touched: HashMap::new(), + touched_tx: HashMap::new(), + db, + } + } +} + +impl TransactionalDb for InMemoryCachingTransactionalDb { + fn init_cache(&mut self, cache: Arc>>) { + self.cache = cache; + } + + fn get(&mut self, key: &Key) -> Result>, Box> { + match self.touched_tx.get(key) { + // the key is used in the current transaction. + Some(op) => { + // No need to update the op.action as create, insert or delete are superior to read. + Ok(Some(op.value.clone())) + } + // key is not used in the current transaction. + None => { + match self.touched.get(key) { + // key is used in the current block. + Some(t_op) => { + let v = Op { + action: OpAction::Read, + value: t_op.value.clone(), + }; + self.touched_tx.insert(*key, v); + Ok(Some(t_op.value.clone())) + } + // key is not used in the current block. + None => { + match self.unfinalized.lock().unwrap().get(key) { + // the key is used in the previous block(s). but the blocks did nt finalze yet. + Some(u_op) => { + let v = Op { + action: OpAction::Read, + value: u_op.value.clone(), + }; + self.touched_tx.insert(*key, v); + Ok(Some(u_op.value.clone())) + } + // the key is not used in the previous block(s). + None => { + // check if the key is in the cache. + // if it is, return the value. + // if it is not, check if the key is in the underlying db. + // if it is, return the value. + // if it is not, return an error. + if let Some(f_c) = self + .get_from_cache(key) + .ok() + .flatten() + .or_else(|| self.db.lock().ok()?.get(key).ok().flatten()) + { + let v = Op { + action: OpAction::Read, + value: f_c.clone(), + }; + self.touched_tx.insert(*key, v); + Ok(Some(f_c)) + } else { + Err("Key does not exist.".into()) + } + } + } + } + } + } + } + } + + fn insert(&mut self, key: &Key, value: Vec) -> Result<(), Box> { + let op = Op { + action: OpAction::Update, // @todo change this to OpAction::Update or OpAction::Create? based on if the key-pair actually exists. + value, + }; + self.touched_tx.insert(*key, op); + Ok(()) + } + + fn delete(&mut self, key: &Key) -> Result<(), Box> { + let op = Op { + action: OpAction::Delete, + value: vec![], + }; + self.touched_tx.insert(*key, op); + Ok(()) + } + + fn get_from_cache(&self, key: &Key) -> Result>, Box> { + self.cache + .lock() + .unwrap() + .get(key) + .map(|op| Some(op.value.clone())) + .ok_or_else(|| "Key not found in cache.".into()) + } + + fn get_from_db(&mut self, key: &Key) -> Result>, Box> { + self.db + .lock() + .unwrap() + .get(key) + .map(|v| Some(v.clone()))? + .ok_or_else(|| "Key not found in db.".into()) + } + + fn commit_last_tx(&mut self) -> Result<(), Box> { + merge_maps_nl(&mut self.touched, &self.touched_tx); + self.touched_tx.clear(); + Ok(()) + } + + fn commit(&mut self) -> Result<(), Box> { + merge_maps(Arc::clone(&self.unfinalized), &mut self.touched); + self.touched.clear(); + Ok(()) + } + + fn rollback_last_tx(&mut self) -> Result<(), Box> { + self.touched_tx.clear(); + Ok(()) + } + + fn rollback(&mut self) -> Result<(), Box> { + self.touched.clear(); + Ok(()) + } +} + +pub fn merge_maps_l( + map1: Arc>>, + map2: Arc>>, +) -> Arc>> { + let mut map1_g = map1.lock().unwrap(); + let map2_g = map2.lock().unwrap(); + for (key, op) in map2_g.iter() { + if let Some(existing_op) = map1_g.get(key) { + // there is a key existing in both maps. + if op.action == OpAction::Delete { + map1_g.insert(*key, op.clone()); + } else if op.action == OpAction::Update { + // if op.action is update, then update the map1. as update superseeds everything. + map1_g.insert(*key, op.clone()); + } else if op.action == OpAction::Read + && (existing_op.action == OpAction::Update + || existing_op.action == OpAction::Create) + { + // reading on a delete will return a nill value with existing op set to delete. this should not be an issue. + let new_op = Op { + action: existing_op.action, + value: op.value.clone(), + }; + map1_g.insert(*key, new_op.clone()); + } + } else { + // there is a key not existing in the first map. + map1_g.insert(*key, op.clone()); + } + } + Arc::clone(&map1) +} + +pub fn merge_maps( + map1: Arc>>, + map2: &mut HashMap, +) -> Arc>> { + let mut map1_g = map1.lock().unwrap(); + for (key, op) in map2.iter() { + if let Some(existing_op) = map1_g.get(key) { + // there is a key existing in both maps. + if op.action == OpAction::Delete { + map1_g.insert(*key, op.clone()); + } else if op.action == OpAction::Update { + // if op.action is update, then update the map1. as update superseeds everything. + map1_g.insert(*key, op.clone()); + } else if op.action == OpAction::Read + && (existing_op.action == OpAction::Update + || existing_op.action == OpAction::Create) + { + // reading on a delete will return a nill value with existing op set to delete. this should not be an issue. + let new_op = Op { + action: existing_op.action, + value: op.value.clone(), + }; + map1_g.insert(*key, new_op.clone()); + } + } else { + // there is a key not existing in the first map. + map1_g.insert(*key, op.clone()); + } + } + Arc::clone(&map1) +} + +pub fn merge_maps_nl<'a>( + map1: &'a mut HashMap, + map2: &HashMap, +) -> &'a mut HashMap { + for (key, op) in map2.iter() { + if let Some(existing_op) = map1.get(key) { + // there is a key existing in both maps. + if op.action == OpAction::Delete { + map1.insert(*key, op.clone()); + } else if op.action == OpAction::Update { + // if op.action is update, then update the map1. as update superseeds everything. + map1.insert(*key, op.clone()); + } else if op.action == OpAction::Read + && (existing_op.action == OpAction::Update + || existing_op.action == OpAction::Create) + { + // reading on a delete will return a nill value with existing op set to delete. this should not be an issue. + let new_op = Op { + action: existing_op.action, + value: op.value.clone(), + }; + map1.insert(*key, new_op.clone()); + } + } else { + // there is a key not existing in the first map. + map1.insert(*key, op.clone()); + } + } + map1 +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::hashmap_db::HashmapDatabase; + use std::sync::Mutex; + #[test] + fn test_it_works() { + let cache: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let unfinalized: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let db = Arc::new(Mutex::new(HashmapDatabase::new())); + let key1 = [1; 33]; + let value1 = [1; 33]; + let key2 = [2; 33]; + let value2 = [2; 33]; + db.lock().unwrap().put(&key1, &value1).unwrap(); + { + let mut in_mem_db = InMemoryCachingTransactionalDb::new( + Arc::clone(&cache), + Arc::clone(&unfinalized), + db.clone(), + ); + + // start a tx + assert_eq!(in_mem_db.get(&key1).unwrap(), Some(value1.to_vec())); + // end a tx + assert_eq!(in_mem_db.touched_tx.len(), 1); + assert_eq!(in_mem_db.touched.len(), 0); + assert_eq!(in_mem_db.cache.lock().unwrap().len(), 0); + assert_eq!( + in_mem_db.touched_tx.get(&key1).unwrap(), + &Op { + action: OpAction::Read, + value: value1.to_vec(), + } + ); + // commit the tx + let _ = in_mem_db.commit_last_tx(); + assert_eq!(in_mem_db.touched_tx.len(), 0); + assert_eq!(in_mem_db.touched.len(), 1); + assert_eq!(in_mem_db.cache.lock().unwrap().len(), 0); + assert_eq!( + in_mem_db.touched.get(&key1).unwrap(), + &Op { + action: OpAction::Read, + value: value1.to_vec(), + } + ); + // start a new tx + in_mem_db.insert(&key2, value2.to_vec()).unwrap(); + assert_eq!(in_mem_db.touched_tx.len(), 1); + assert_eq!(in_mem_db.touched.len(), 1); + assert_eq!(in_mem_db.cache.lock().unwrap().len(), 0); + assert_eq!( + in_mem_db.touched_tx.get(&key2).unwrap(), + &Op { + action: OpAction::Update, + value: value2.to_vec(), + } + ); + assert_eq!( + in_mem_db.touched.get(&key1).unwrap(), + &Op { + action: OpAction::Read, + value: value1.to_vec(), + } + ); + // end tx + assert_eq!(in_mem_db.get(&key2).unwrap(), Some(value2.to_vec())); + assert_eq!(in_mem_db.touched_tx.len(), 1); + assert_eq!(in_mem_db.touched.len(), 1); + assert_eq!(in_mem_db.cache.lock().unwrap().len(), 0); + // commit tx + in_mem_db.commit_last_tx().unwrap(); + assert_eq!(in_mem_db.touched_tx.len(), 0); + assert_eq!(in_mem_db.touched.len(), 2); + assert_eq!(in_mem_db.cache.lock().unwrap().len(), 0); + // commit block + in_mem_db.commit().unwrap(); + assert_eq!(in_mem_db.touched_tx.len(), 0); + assert_eq!(in_mem_db.touched.len(), 0); + assert_eq!(in_mem_db.cache.lock().unwrap().len(), 0); + assert_eq!(in_mem_db.unfinalized.lock().unwrap().len(), 2); + } + // implement finalize out of db trait. + assert_eq!(unfinalized.lock().unwrap().len(), 2); + assert_eq!(cache.lock().unwrap().len(), 0); + for (key, op) in unfinalized.lock().unwrap().iter() { + if op.action == OpAction::Delete { + db.lock().unwrap().delete(key).unwrap(); + } else if op.action == OpAction::Update { + db.lock().unwrap().put(key, &op.value).unwrap(); + } + } + merge_maps_l(Arc::clone(&cache), Arc::clone(&unfinalized)); + assert_eq!(cache.lock().unwrap().len(), 2); + unfinalized.lock().unwrap().clear(); + assert_eq!(unfinalized.lock().unwrap().len(), 0); + // try fetching key2 from db. this should pass. + assert_eq!( + db.lock().unwrap().get(&key2).unwrap(), + Some(value2.to_vec()) + ); + // try fetching key1 from db. this should pass. + assert_eq!( + db.lock().unwrap().get(&key1).unwrap(), + Some(value1.to_vec()) + ); + // new block + { + let mut in_mem_db = InMemoryCachingTransactionalDb::new( + Arc::clone(&cache), + Arc::clone(&unfinalized), + db.clone(), + ); + assert_eq!( + db.lock().unwrap().get(&key1).unwrap(), + Some(value1.to_vec()) + ); + assert_eq!(in_mem_db.get(&key1).unwrap(), Some(value1.to_vec())); + assert_eq!(in_mem_db.get(&key2).unwrap(), Some(value2.to_vec())); + // delete key1 + in_mem_db.delete(&key1).unwrap(); + assert_eq!(in_mem_db.touched_tx.len(), 2); + assert_eq!(in_mem_db.touched.len(), 0); + assert_eq!(in_mem_db.cache.lock().unwrap().len(), 2); + assert_eq!(in_mem_db.unfinalized.lock().unwrap().len(), 0); + assert_eq!( + in_mem_db.touched_tx.get(&key1).unwrap(), + &Op { + action: OpAction::Delete, + value: vec![], + } + ); + } + } +} diff --git a/types/Cargo.toml b/types/Cargo.toml index 34cc41ed..8d7f10fe 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -16,12 +16,15 @@ crate-type = ["rlib", "cdylib"] [dependencies] commonware-cryptography = { workspace = true } commonware-utils = { workspace = true } +commonware-codec = { workspace = true} bytes = { workspace = true } rand = { workspace = true } thiserror = { workspace = true } wasm-bindgen = "0.2.100" serde = { version = "1.0.219", features = ["derive"] } serde-wasm-bindgen = "0.6.5" +more-asserts = "0.3.1" +sha3 = "0.10" # Enable "js" feature when WASM is target [target.'cfg(target_arch = "wasm32")'.dependencies.getrandom] diff --git a/types/src/account.rs b/types/src/account.rs new file mode 100644 index 00000000..a69d2be4 --- /dev/null +++ b/types/src/account.rs @@ -0,0 +1,58 @@ +use crate::address::Address; +use crate::pub_key_to_address; +use commonware_codec::{Codec, Error, Reader, Writer}; +use commonware_cryptography::ed25519::PublicKey; + +pub type Balance = u64; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Account { + pub address: Address, + pub balance: Balance, +} + +impl Default for Account { + fn default() -> Self { + Self::new() + } +} + +impl Account { + pub fn new() -> Self { + Self { + address: Address::empty(), + balance: 0, + } + } + + pub fn from_pubkey(pk: PublicKey) -> Self { + Self::from_address(pub_key_to_address(&pk)) + } + + pub fn from_address(address: Address) -> Self { + Self { + address, + balance: 0, + } + } +} + +impl Codec for Account { + fn write(&self, writer: &mut impl Writer) { + // @rikoeldon I think we don't need to write account address into the state. + // account address is part of the key. + writer.write_bytes(self.address.0.as_slice()); + self.balance.write(writer); + } + + fn read(reader: &mut impl Reader) -> Result { + let addr_bytes = <[u8; 33]>::read(reader)?; + let address = Address::from_bytes(&addr_bytes[1..]).unwrap(); + let balance = ::read(reader)?; + Ok(Self { address, balance }) + } + + fn len_encoded(&self) -> usize { + Codec::len_encoded(&self.address.0) + Codec::len_encoded(&self.balance) + } +} diff --git a/types/src/address.rs b/types/src/address.rs new file mode 100644 index 00000000..df6391c1 --- /dev/null +++ b/types/src/address.rs @@ -0,0 +1,63 @@ +use std::{ + fmt::{Display, Formatter}, + io::copy, +}; + +use crate::{pub_key_to_address, PublicKey, ADDRESSLEN}; +use commonware_cryptography::sha256::hash; +use commonware_utils::hex; +use more_asserts::assert_le; +use rand::Rng; +#[derive(Hash, Eq, PartialEq, Clone, Debug)] +pub struct Address(pub [u8; ADDRESSLEN]); + +impl Address { + pub fn new(slice: &[u8]) -> Self { + assert_le!(slice.len(), ADDRESSLEN, "address slice is too large"); + let mut arr = [0u8; ADDRESSLEN]; + arr[..slice.len()].copy_from_slice(slice); + Address(arr) + } + + pub fn create_random_address() -> Self { + let mut arr = [0u8; ADDRESSLEN]; + rand::thread_rng().fill(&mut arr); + Address(arr) + } + + pub fn from_pub_key(pub_key: &PublicKey) -> Self { + pub_key_to_address(pub_key) + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + if bytes.len() != 32 { + return Err("Address must be 32 bytes."); + } + + Ok(Address(<[u8; 32]>::try_from(bytes).unwrap())) + } + + pub fn empty() -> Self { + Self([0; ADDRESSLEN]) + } + + pub fn is_empty(&self) -> bool { + self.0 == Self::empty().0 + } + + pub fn as_slice(&self) -> &[u8] { + &self.0 + } + + pub fn as_bytes(&self) -> &[u8; ADDRESSLEN] { + &self.0 + } +} + +impl Display for Address { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex(&self.0)) + } +} + +// @todo write tests for all the methods. diff --git a/types/src/block.rs b/types/src/block.rs index f86afe30..49ff4fe1 100644 --- a/types/src/block.rs +++ b/types/src/block.rs @@ -1,9 +1,12 @@ +use crate::signed_tx::{pack_signed_txs, unpack_signed_txs, SignedTx, SignedTxChars}; use crate::{Finalization, Notarization}; use bytes::{Buf, BufMut}; -use commonware_cryptography::{bls12381::PublicKey, sha256::Digest, Hasher, Sha256}; +use commonware_cryptography::{bls12381::PublicKey, sha256, sha256::Digest, Hasher, Sha256}; use commonware_utils::{Array, SizedSerialize}; -#[derive(Clone, Debug, PartialEq, Eq)] +// @todo add state root, fee manager and results to the block struct. +// what method of state root generation should be used? +#[derive(Clone, Debug)] pub struct Block { /// The parent block's digest. pub parent: Digest, @@ -14,25 +17,52 @@ pub struct Block { /// The timestamp of the block (in milliseconds since the Unix epoch). pub timestamp: u64, + /// The raw transactions in the block. + pub raw_txs: Vec, + + /// The state root of the block. + pub state_root: Digest, + + txs: Vec, /// Pre-computed digest of the block. digest: Digest, } impl Block { - fn compute_digest(parent: &Digest, height: u64, timestamp: u64) -> Digest { + fn compute_digest( + parent: &Digest, + height: u64, + timestamp: u64, + raw_txs: Vec, + state_root: &Digest, + ) -> Digest { let mut hasher = Sha256::new(); hasher.update(parent); hasher.update(&height.to_be_bytes()); hasher.update(×tamp.to_be_bytes()); + hasher.update(&raw_txs); + hasher.update(state_root); hasher.finalize() } - pub fn new(parent: Digest, height: u64, timestamp: u64) -> Self { - let digest = Self::compute_digest(&parent, height, timestamp); + pub fn new( + parent: Digest, + height: u64, + timestamp: u64, + txs: Vec, + state_root: Digest, + ) -> Self { + // let mut txs = txs; + // @todo this is packing txs in a block. + let raw_txs = pack_signed_txs(txs.clone()); + let digest = Self::compute_digest(&parent, height, timestamp, raw_txs.clone(), &state_root); Self { parent, height, timestamp, + raw_txs, + state_root, + txs, digest, } } @@ -42,24 +72,32 @@ impl Block { bytes.extend_from_slice(&self.parent); bytes.put_u64(self.height); bytes.put_u64(self.timestamp); + bytes.extend_from_slice(&self.state_root); + bytes.extend_from_slice(self.raw_txs.as_slice()); bytes } pub fn deserialize(mut bytes: &[u8]) -> Option { // Parse the block - if bytes.len() != Self::SERIALIZED_LEN { - return None; - } + // if bytes.len() != Self::SERIALIZED_LEN { + // return None; + // } let parent = Digest::read_from(&mut bytes).ok()?; let height = bytes.get_u64(); let timestamp = bytes.get_u64(); + let state_root = Digest::read_from(&mut bytes).ok()?; + let raw_txs = bytes.to_vec(); + let digest = Self::compute_digest(&parent, height, timestamp, raw_txs.clone(), &state_root); + let txs = unpack_signed_txs(raw_txs.clone()); // Return block - let digest = Self::compute_digest(&parent, height, timestamp); Some(Self { parent, height, timestamp, + raw_txs, + state_root, + txs, digest, }) } @@ -67,6 +105,23 @@ impl Block { pub fn digest(&self) -> Digest { self.digest.clone() } + //todo check logic below + pub fn encode(&mut self) -> Vec { + let mut bytes: Vec = Vec::new(); + bytes.extend_from_slice(&self.parent); + bytes.extend_from_slice(&(self.height.to_be_bytes())); + bytes.extend_from_slice(&(self.timestamp.to_be_bytes())); + bytes.extend_from_slice(self.raw_txs.as_slice()); + bytes.extend_from_slice(&self.state_root); + // encoding signed txs + for tx in self.txs.iter_mut() { + bytes.extend_from_slice(&tx.encode()); + } + + // return encoded digest. + self.digest = sha256::hash(&bytes); + bytes + } } impl SizedSerialize for Block { diff --git a/types/src/lib.rs b/types/src/lib.rs index 9ffac637..d290dcd0 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -1,19 +1,68 @@ //! Common types used throughout `alto`. mod block; + pub use block::{Block, Finalized, Notarized}; +use commonware_cryptography::{Ed25519, Scheme}; +use commonware_utils::SystemTimeExt; +use sha3::{Digest, Keccak256}; +use std::time::SystemTime; + mod consensus; pub use consensus::{leader_index, Finalization, Kind, Notarization, Nullification, Seed}; +pub mod account; +pub mod address; +pub mod null_error; +pub mod signed_tx; +pub mod state_view; +pub mod tx; +pub mod units; +pub mod wallet; pub mod wasm; +use crate::address::Address; +use rand::rngs::OsRng; + // We don't use functions here to guard against silent changes. pub const NAMESPACE: &[u8] = b"_ALTO"; +pub const TX_NAMESPACE: &[u8] = b"_tx_namespace_"; pub const P2P_NAMESPACE: &[u8] = b"_ALTO_P2P"; pub const SEED_NAMESPACE: &[u8] = b"_ALTO_SEED"; pub const NOTARIZE_NAMESPACE: &[u8] = b"_ALTO_NOTARIZE"; pub const NULLIFY_NAMESPACE: &[u8] = b"_ALTO_NULLIFY"; pub const FINALIZE_NAMESPACE: &[u8] = b"_ALTO_FINALIZE"; +const ADDRESSLEN: usize = 32; + +type PublicKey = commonware_cryptography::ed25519::PublicKey; +type PrivateKey = commonware_cryptography::ed25519::PrivateKey; +type Signature = commonware_cryptography::ed25519::Signature; + +pub fn create_test_keypair() -> (PublicKey, PrivateKey) { + let mut rng = OsRng; + // generates keypair using random number generator + let keypair = Ed25519::new(&mut rng); + + let public_key = keypair.public_key(); + let private_key = keypair.private_key(); + + (public_key, private_key) +} + +pub fn curr_timestamp() -> u64 { + SystemTime::now().epoch_millis() +} + +// returns an address from pubkey provided +fn pub_key_to_address(pk: &PublicKey) -> Address { + let pk_hash = Keccak256::digest(pk.as_ref()); + + // Return the full 32-byte Keccak256 hash as the address + let mut address = [0u8; 32]; + address.copy_from_slice(&pk_hash); + Address(address) +} + #[cfg(test)] mod tests { use super::*; @@ -117,7 +166,7 @@ mod tests { let parent_digest = hash(&[0; 32]); let height = 0; let timestamp = 1; - let block = Block::new(parent_digest, height, timestamp); + let block = Block::new(parent_digest, height, timestamp, Vec::new(), [0; 32].into()); let block_digest = block.digest(); // Check block serialization @@ -127,6 +176,7 @@ mod tests { assert_eq!(block.parent, deserialized.parent); assert_eq!(block.height, deserialized.height); assert_eq!(block.timestamp, deserialized.timestamp); + // @todo add deserialization checks for signed transactions. // Create notarization let view = 0; @@ -168,7 +218,7 @@ mod tests { let parent_digest = hash(&[0; 32]); let height = 0; let timestamp = 1; - let block = Block::new(parent_digest, height, timestamp); + let block = Block::new(parent_digest, height, timestamp, Vec::new(), [0; 32].into()); // Create notarization let view = 0; diff --git a/types/src/null_error.rs b/types/src/null_error.rs new file mode 100644 index 00000000..15ed0238 --- /dev/null +++ b/types/src/null_error.rs @@ -0,0 +1,15 @@ +use std::{ + error::Error, + fmt::{Display, Formatter, Result}, +}; + +#[derive(Debug)] +pub struct NullError; + +impl Display for NullError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + write!(f, "NoError") + } +} + +impl Error for NullError {} diff --git a/types/src/signed_tx.rs b/types/src/signed_tx.rs new file mode 100644 index 00000000..89d945ac --- /dev/null +++ b/types/src/signed_tx.rs @@ -0,0 +1,184 @@ +use crate::address::Address; +use crate::tx::{Tx, TxMethods}; +use crate::wallet::{Wallet, WalletMethods}; +use crate::{PublicKey, Signature, TX_NAMESPACE}; +use commonware_cryptography::{Ed25519, Scheme}; +// this is sent by the user to the validators. +#[derive(Clone, Debug)] +pub struct SignedTx { + pub tx: Tx, + + pub_key: PublicKey, + address: Address, + signature: Vec, +} + +// function names are self explanatory. +pub trait SignedTxChars: Sized { + fn new(tx: Tx, pub_key: PublicKey, signature: Vec) -> Self; + fn verify(&mut self) -> bool; + fn signature(&self) -> Vec; + fn public_key(&self) -> Vec; + fn address(&self) -> Address; + fn encode(&mut self) -> Vec; + fn decode(bytes: &[u8]) -> Result; +} + +impl SignedTxChars for SignedTx { + // @todo either have all fields initialized or none. + fn new(tx: Tx, pub_key: PublicKey, signature: Vec) -> Self { + Self { + tx, + pub_key: pub_key.clone(), + address: Address::from_pub_key(&pub_key), + signature: signature.clone(), + } + } + + fn verify(&mut self) -> bool { + let tx_data = self.tx.encode(); + let signature = Signature::try_from(self.signature.as_slice()); + if signature.is_err() { + return false; + } + let signature = signature.unwrap(); + Ed25519::verify(Some(TX_NAMESPACE), &tx_data, &self.pub_key, &signature) + } + + fn signature(&self) -> Vec { + self.signature.clone() + } + + fn public_key(&self) -> Vec { + self.pub_key.to_vec() + } + + fn address(&self) -> Address { + self.address.clone() + } + + // @todo add syntactic checks. + fn encode(&mut self) -> Vec { + let mut bytes = Vec::new(); + + let raw_tx = self.tx.encode(); + let raw_tx_len = raw_tx.len() as u64; + bytes.extend(raw_tx_len.to_be_bytes()); + bytes.extend_from_slice(&raw_tx); + bytes.extend_from_slice(&self.pub_key); + bytes.extend_from_slice(&self.signature); + bytes + } + + // @todo add syntactic checks and use methods consume. + fn decode(bytes: &[u8]) -> Result { + let raw_tx_len = u64::from_be_bytes(bytes[0..8].try_into().unwrap()); + let raw_tx = &bytes[8..8 + raw_tx_len as usize]; + let pub_key = &bytes[8 + raw_tx_len as usize..8 + raw_tx_len as usize + 32]; + let signature = &bytes[8 + raw_tx_len as usize + 32..]; + let public_key = PublicKey::try_from(pub_key); + if public_key.is_err() { + return Err(public_key.unwrap_err().to_string()); + } + let public_key = public_key.unwrap(); + let tx = Tx::decode(raw_tx); + if tx.is_err() { + return Err(tx.unwrap_err()); + } + + Ok(SignedTx { + tx: tx.unwrap(), + pub_key: public_key.clone(), + address: Address::from_pub_key(&public_key), + signature: signature.to_vec(), + }) + } +} + +impl SignedTx { + pub fn sign(mut tx: Tx, mut wallet: Wallet) -> SignedTx { + let tx_data = tx.encode(); + SignedTx { + tx: tx.clone(), + signature: wallet.sign(&tx_data), + address: wallet.address(), + pub_key: wallet.public_key(), + } + } +} + +pub fn pack_signed_txs(signed_txs: Vec) -> Vec { + let mut bytes = Vec::new(); + bytes.extend((signed_txs.len() as u64).to_be_bytes()); + for signed_tx in signed_txs { + // @todo improvise + let mut signed_tx = signed_tx; + let signed_tx_bytes = signed_tx.encode(); + bytes.extend((signed_tx_bytes.len() as u64).to_be_bytes()); + bytes.extend_from_slice(&signed_tx_bytes); + } + bytes +} + +pub fn unpack_signed_txs(bytes: Vec) -> Vec { + let signed_txs_len = u64::from_be_bytes(bytes[0..8].try_into().unwrap()); + let mut signed_txs = Vec::with_capacity(signed_txs_len as usize); + let mut offset = 8; + for _ in 0..signed_txs_len { + let signed_tx_len = u64::from_be_bytes(bytes[offset..offset + 8].try_into().unwrap()); + offset += 8; + let signed_tx_bytes = &bytes[offset..offset + signed_tx_len as usize]; + offset += signed_tx_len as usize; + let signed_tx = SignedTx::decode(signed_tx_bytes); + if signed_tx.is_err() { + panic!("Failed to unpack signed tx: {}", signed_tx.unwrap_err()); + } + signed_txs.push(signed_tx.unwrap()); + } + signed_txs +} + +#[cfg(test)] +mod tests { + + use super::*; + use crate::curr_timestamp; + use crate::tx::Unit; + use crate::units::transfer::Transfer; + use crate::wallet::Wallet; + use commonware_cryptography::sha256::Digest; + use more_asserts::assert_gt; + + #[test] + fn test_encode_decode() { + let timestamp = curr_timestamp(); + let max_fee = 100; + let priority_fee = 75; + let chain_id = 45205; + let transfer = Transfer::new(); + let units: Vec> = vec![Box::new(transfer)]; + let digest: [u8; 32] = [0; 32]; + let id = Digest::from(digest.clone()); + + let tx = Tx { + timestamp, + max_fee, + priority_fee, + chain_id, + units: units.clone(), + id, + digest: vec![], + actor: Address::empty(), + }; + let mut rng = rand::rngs::OsRng; + let wallet = Wallet::generate(&mut rng); + let mut signed_tx = SignedTx::sign(tx, wallet); + let encoded_bytes = signed_tx.encode(); + assert_gt!(encoded_bytes.len(), 0); + let decoded_msg = SignedTx::decode(&encoded_bytes).unwrap(); + assert_eq!(signed_tx.pub_key, decoded_msg.pub_key); + assert_eq!(signed_tx.address, decoded_msg.address); + assert_eq!(signed_tx.signature, decoded_msg.signature); + // @todo make helper to compare fields in tx and units. same issue when testing in tx.rs file. + } +} diff --git a/types/src/state_view.rs b/types/src/state_view.rs new file mode 100644 index 00000000..e61fed16 --- /dev/null +++ b/types/src/state_view.rs @@ -0,0 +1,10 @@ +use crate::account::{Account, Balance}; +use crate::address::Address; +use std::error::Error; + +pub trait StateView { + fn get_account(&mut self, address: &Address) -> Result, Box>; + fn set_account(&mut self, acc: &Account) -> Result<(), Box>; + fn get_balance(&mut self, address: &Address) -> Option; + fn set_balance(&mut self, address: &Address, amt: Balance) -> bool; +} diff --git a/types/src/tx.rs b/types/src/tx.rs new file mode 100644 index 00000000..b75ac57d --- /dev/null +++ b/types/src/tx.rs @@ -0,0 +1,389 @@ +use commonware_cryptography::sha256; +use commonware_cryptography::sha256::Digest; +use std::any::Any; + +use crate::address::Address; +use crate::signed_tx::SignedTx; +use crate::state_view::StateView; +use crate::units; +use crate::wallet::Wallet; +use commonware_utils::SystemTimeExt; +use std::{error::Error, time::SystemTime}; + +#[derive(Debug)] +pub enum UnitType { + Transfer, + SequencerMsg, +} + +impl TryFrom for UnitType { + type Error = String; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(UnitType::Transfer), + 1 => Ok(UnitType::SequencerMsg), + _ => Err(format!("unknown unit type: {}", value)), + } + } +} + +pub struct UnitContext { + // timestamp of the tx. + pub timestamp: u64, + // chain id of the tx. + pub chain_id: u64, + // sender of the tx. + pub sender: Address, +} + +pub trait UnitClone { + fn clone_box(&self) -> Box; +} + +impl UnitClone for T +where + T: 'static + Unit + Clone, +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +// unit need to be simple and easy to be packed in the tx and executed by the vm. +pub trait Unit: UnitClone + Send + Sync + std::fmt::Debug { + fn unit_type(&self) -> UnitType; + fn encode(&self) -> Vec; + fn decode(&mut self, bytes: &[u8]); + + fn apply( + &self, + context: &UnitContext, + state: &mut Box<&mut dyn StateView>, + ) -> Result>, Box>; + + fn as_any(&self) -> &dyn Any; +} + +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +} + +#[derive(Clone, Debug)] +pub struct Tx { + /// timestamp of the tx creation. set by the user. + /// will be verified if the tx is in the valid window once received by validators. + /// if the timestamp is not in the valid window, the tx will be rejected. + /// if tx is in a valid window it is added to mempool. + /// timestamp is used to prevent replay attacks. and counter infinite spam attacks as Tx does not have nonce. + pub timestamp: u64, + /// max fee is the maximum fee the user is willing to pay for the tx. + pub max_fee: u64, + /// priority fee is the fee the user is willing to pay for the tx to be included in the next block. + pub priority_fee: u64, + /// chain id is the id of the chain the tx is intended for. + pub chain_id: u64, + /// units are fundamental unit of a tx. similar to actions. + pub units: Vec>, + + /// id is the transaction id. It is the hash of digest. + pub id: Digest, + /// digest is encoded tx. + pub digest: Vec, + /// address of the tx sender. wrap this in a better way. + pub actor: Address, +} + +#[derive(Debug)] +pub struct TxResult { + pub status: bool, + pub error: String, + pub output: Vec>, + pub exec_logs: String, +} + +pub trait TxMethods: Sized { + /// new is used to create a new instance of Tx with given units and chain id. + fn new(units: Vec>, chain_id: u64) -> Self; + /// set_fee is used to set the max fee and priority fee of the tx. + fn set_fee(&mut self, max_fee: u64, priority_fee: u64); + /// sign is used to sign the tx with the given wallet. + fn sign(&mut self, wallet: Wallet) -> SignedTx; + fn from( + timestamp: u64, + units: Vec>, + priority_fee: u64, + max_fee: u64, + chain_id: u64, + actor: Address, + ) -> Self; + + /// returns tx id. + fn id(&mut self) -> Digest; + /// returns digest of the tx. + fn digest(&self) -> Vec; + /// encodes the tx, writes to digest and returns the digest. + /// ensure all fields are properly set before calling this function. + fn encode(&mut self) -> Vec; + + fn decode(bytes: &[u8]) -> Result; + + fn set_actor(&mut self, actor: Address); + + fn actor(&self) -> Address; +} + +impl Default for Tx { + fn default() -> Self { + Self { + timestamp: 0, + units: vec![], + max_fee: 0, + priority_fee: 0, + chain_id: 19517, + id: [0; 32].into(), + digest: vec![], + actor: Address::empty(), + } + } +} + +impl TxMethods for Tx { + fn new(units: Vec>, chain_id: u64) -> Self { + let mut tx = Self::default(); + tx.timestamp = SystemTime::now().epoch_millis(); + tx.units = units; + tx.chain_id = chain_id; + + // do not encode and generate tx_id as Tx::new doesnot yet have priority fee and max fee. + tx + } + + fn set_fee(&mut self, max_fee: u64, priority_fee: u64) { + self.max_fee = max_fee; + self.priority_fee = priority_fee; + } + + fn sign(&mut self, wallet: Wallet) -> SignedTx { + SignedTx::sign(self.clone(), wallet) + } + + fn from( + timestamp: u64, + units: Vec>, + priority_fee: u64, + max_fee: u64, + chain_id: u64, + actor: Address, + ) -> Self { + let mut tx = Self::default(); + tx.timestamp = timestamp; + tx.units = units; + tx.max_fee = max_fee; + tx.priority_fee = priority_fee; + tx.chain_id = chain_id; + tx.actor = actor; + tx.encode(); + tx + } + + fn id(&mut self) -> Digest { + if self.digest.len() == 0 { + self.encode(); + } + self.id.clone() + } + + fn digest(&self) -> Vec { + self.digest.clone() + } + + fn encode(&mut self) -> Vec { + if !self.digest.is_empty() { + return self.digest.clone(); + } + // pack tx timestamp. + self.digest.extend(self.timestamp.to_be_bytes()); + // pack max fee + self.digest.extend(self.max_fee.to_be_bytes()); + // pack priority fee + self.digest.extend(self.priority_fee.to_be_bytes()); + // pack chain id + self.digest.extend(self.chain_id.to_be_bytes()); + // pack # of units. + self.digest.extend((self.units.len() as u64).to_be_bytes()); + // pack individual units + self.units.iter().for_each(|unit| { + let unit_bytes = unit.encode(); + // pack the unit type info. + self.digest.extend((unit.unit_type() as u8).to_be_bytes()); + // pack len of inidividual unit. + self.digest.extend((unit_bytes.len() as u64).to_be_bytes()); + // pack individual unit. + self.digest.extend(&unit_bytes); + }); + + // generate tx id. + self.id = sha256::hash(&self.digest); + + // return encoded digest. + self.digest.clone() + } + + fn decode(bytes: &[u8]) -> Result { + if bytes.is_empty() { + return Err("Empty bytes".to_string()); + } + let mut tx = Self::default(); + tx.timestamp = u64::from_be_bytes(bytes[0..8].try_into().unwrap()); + tx.max_fee = u64::from_be_bytes(bytes[8..16].try_into().unwrap()); + tx.priority_fee = u64::from_be_bytes(bytes[16..24].try_into().unwrap()); + tx.chain_id = u64::from_be_bytes(bytes[24..32].try_into().unwrap()); + let units = unpack_units(&bytes[32..]); + if units.is_err() { + return Err(format!("Failed to unpack units: {}", units.unwrap_err())); + } + tx.units = units?; + // generate tx id. + tx.id = sha256::hash(bytes); + tx.digest = bytes.to_vec(); + // return transaction. + Ok(tx) + } + + fn set_actor(&mut self, actor: Address) { + self.actor = actor; + } + + fn actor(&self) -> Address { + self.actor.clone() + } +} + +fn unpack_units(digest: &[u8]) -> Result>, String> { + let mut offset = 0; + + fn read_u8(input: &[u8], offset: &mut usize) -> Result { + if input.len() < *offset + 1 { + return Err("Unexpected end of input when reading u8".into()); + } + let val = input[*offset]; + *offset += 1; + Ok(val) + } + + fn read_u64(input: &[u8], offset: &mut usize) -> Result { + if input.len() < *offset + 8 { + return Err("Unexpected end of input when reading u64".into()); + } + let val = u64::from_be_bytes(input[*offset..*offset + 8].try_into().unwrap()); + *offset += 8; + Ok(val) + } + + fn read_bytes<'a>( + input: &'a [u8], + offset: &'a mut usize, + len: usize, + ) -> Result<&'a [u8], String> { + if input.len() < *offset + len { + return Err("Unexpected end of input when reading bytes".into()); + } + let bytes = &input[*offset..*offset + len]; + *offset += len; + Ok(bytes) + } + + let unit_count = read_u64(digest, &mut offset)?; + + let mut units: Vec> = Vec::with_capacity(unit_count as usize); + + for _ in 0..unit_count { + let unit_type = read_u8(digest, &mut offset)?; + let unit_len = read_u64(digest, &mut offset)?; + let unit_bytes = read_bytes(digest, &mut offset, unit_len as usize)?.to_vec(); + let unit_type = UnitType::try_from(unit_type); + if unit_type.is_err() { + return Err(format!("Invalid unit type: {}", unit_type.unwrap_err())); + } + let unit_type = unit_type?; + let unit: Box = match unit_type { + UnitType::Transfer => { + let mut transfer = units::transfer::Transfer::default(); + transfer.decode(&unit_bytes); + Box::new(transfer) + } + UnitType::SequencerMsg => { + let mut msg = units::msg::SequencerMsg::default(); + msg.decode(&unit_bytes); + Box::new(msg) + } + }; + units.push(unit); + } + + Ok(units) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::curr_timestamp; + use crate::units::transfer::Transfer; + use more_asserts::assert_gt; + use std::error::Error; + use std::vec; + + #[test] + fn test_encode_decode() -> Result<(), Box> { + let timestamp = curr_timestamp(); + let max_fee = 100; + let priority_fee = 75; + let chain_id = 45205; + let transfer = Transfer::new(); + let units: Vec> = vec![Box::new(transfer)]; + let digest: [u8; 32] = [0; 32]; + let id = Digest::from(digest.clone()); + // TODO: the .encode call on next line gave error and said origin_msg needed to be mut? but why? + // shouldn't encode be able to encode without changing the msg? + let mut origin_msg = Tx { + timestamp, + max_fee, + priority_fee, + chain_id, + units: units.clone(), + id, + actor: Address::empty(), + digest: vec![], + }; + let encoded_bytes = origin_msg.encode(); + assert_gt!(encoded_bytes.len(), 0); + let decoded_msg = Tx::decode(&encoded_bytes)?; + let origin_transfer = origin_msg.units[0] + .as_ref() + .as_any() + .downcast_ref::() + .expect("Failed to downcast to Transfer"); + + let decode_transfer = decoded_msg.units[0] + .as_ref() + .as_any() + .downcast_ref::() + .expect("Failed to downcast to Transfer"); + + assert_eq!(origin_msg.timestamp, decoded_msg.timestamp); + assert_eq!(origin_msg.max_fee, decoded_msg.max_fee); + assert_eq!(origin_msg.priority_fee, decoded_msg.priority_fee); + assert_eq!(origin_msg.chain_id, decoded_msg.chain_id); + assert_eq!(origin_msg.id, decoded_msg.id); + assert_eq!(origin_msg.digest, decoded_msg.digest); + + // units + assert_eq!(origin_transfer.to_address, decode_transfer.to_address); + assert_eq!(origin_transfer.value, decode_transfer.value); + assert_eq!(origin_transfer.memo, decode_transfer.memo); + Ok(()) + } +} diff --git a/types/src/units/mod.rs b/types/src/units/mod.rs new file mode 100644 index 00000000..faef4436 --- /dev/null +++ b/types/src/units/mod.rs @@ -0,0 +1,2 @@ +pub mod msg; +pub mod transfer; diff --git a/types/src/units/msg.rs b/types/src/units/msg.rs new file mode 100644 index 00000000..aa071286 --- /dev/null +++ b/types/src/units/msg.rs @@ -0,0 +1,105 @@ +use crate::{ + address::Address, + state_view::StateView, + tx::{Unit, UnitContext, UnitType}, +}; +use std::any::Any; + +// @todo couple SequencerMsg with DA. +// and skip execution no-op. +#[derive(Clone, Debug)] +pub struct SequencerMsg { + pub chain_id: u64, + pub data: Vec, + pub from_address: Address, +} + +impl SequencerMsg { + pub fn new() -> SequencerMsg { + Self { + chain_id: 0, + data: Vec::new(), + from_address: Address::empty(), + } + } +} + +impl Unit for SequencerMsg { + fn unit_type(&self) -> UnitType { + UnitType::SequencerMsg + } + + fn encode(&self) -> Vec { + let mut bytes: Vec = Vec::new(); + // data length is 8 bytes. + let data_len = self.data.len() as u64; + // chain id length is 8 bytes.n store chain id. + bytes.extend(&self.chain_id.to_be_bytes()); + // address length is 32. store address. + bytes.extend_from_slice(self.from_address.as_slice()); + // store data length. + bytes.extend(data_len.to_be_bytes()); + // store data. + bytes.extend_from_slice(&self.data); + + bytes + } + + // @todo introduce syntactic checks. + fn decode(&mut self, bytes: &[u8]) { + self.chain_id = u64::from_be_bytes(bytes[0..8].try_into().unwrap()); + self.from_address = Address::from_bytes(&bytes[8..40]).unwrap(); + let data_len = u64::from_be_bytes(bytes[40..48].try_into().unwrap()); + self.data = bytes[48..(48 + data_len as usize)].to_vec(); + } + + fn apply( + &self, + _: &UnitContext, + _: &mut Box<&mut dyn StateView>, + ) -> Result>, Box> { + Ok(None) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl Default for SequencerMsg { + fn default() -> Self { + Self { + chain_id: 0, + data: vec![], + from_address: Address::empty(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use more_asserts::assert_gt; + use std::error::Error; + + #[test] + fn test_encode_decode() -> Result<(), Box> { + let chain_id = 4502; + let data = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let from_address = Address::create_random_address(); + let origin_msg = SequencerMsg { + chain_id, + data, + from_address, + }; + let encoded_bytes = origin_msg.encode(); + assert_gt!(encoded_bytes.len(), 0); + let mut decoded_msg = SequencerMsg::new(); + decoded_msg.decode(&encoded_bytes); + assert_eq!(origin_msg.chain_id, decoded_msg.chain_id); + assert_eq!(origin_msg.data.len(), decoded_msg.data.len()); + assert_eq!(origin_msg.data, decoded_msg.data); + assert_eq!(origin_msg.from_address, decoded_msg.from_address); + Ok(()) + } +} diff --git a/types/src/units/transfer.rs b/types/src/units/transfer.rs new file mode 100644 index 00000000..2bd61667 --- /dev/null +++ b/types/src/units/transfer.rs @@ -0,0 +1,141 @@ +use crate::address::Address; +use crate::state_view::StateView; +use crate::tx::{Unit, UnitContext, UnitType}; +use std::{any::Any, error::Error, fmt::Display}; + +const MAX_MEMO_SIZE: usize = 256; + +#[derive(Debug, Clone)] +pub struct Transfer { + pub to_address: Address, + pub value: u64, + pub memo: Vec, +} + +impl Transfer { + pub fn new() -> Transfer { + Self { + to_address: Address::empty(), + value: 0, + memo: Vec::new(), + } + } +} + +#[derive(Debug)] +pub enum TransferError { + SenderAccountNotFound, + InsufficientFunds, + InvalidMemoSize, + StorageError, +} + +impl Display for TransferError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TransferError::SenderAccountNotFound => write!(f, "Sender account not found"), + TransferError::InsufficientFunds => write!(f, "Insufficient funds"), + TransferError::InvalidMemoSize => write!(f, "Invalid memo size"), + TransferError::StorageError => write!(f, "Storage error"), + } + } +} + +impl Error for TransferError {} + +impl Unit for Transfer { + fn unit_type(&self) -> UnitType { + UnitType::Transfer + } + + fn encode(&self) -> Vec { + let mut bytes = Vec::new(); + let memo_len = self.memo.len() as u64; + bytes.extend_from_slice(self.to_address.as_slice()); + bytes.extend(self.value.to_be_bytes()); + bytes.extend(memo_len.to_be_bytes()); + if memo_len > 0 { + bytes.extend_from_slice(&self.memo); + } + + bytes + } + + // @todo introduce syntactic checks. + fn decode(&mut self, bytes: &[u8]) { + self.to_address = Address::from_bytes(&bytes[0..32]).unwrap(); + self.value = u64::from_be_bytes(bytes[32..40].try_into().unwrap()); + let memo_len = u64::from_be_bytes(bytes[40..48].try_into().unwrap()); + if memo_len > 0 { + self.memo = bytes[48..(48 + memo_len as usize)].to_vec(); + } + } + + fn apply( + &self, + context: &UnitContext, + state: &mut Box<&mut dyn StateView>, + ) -> Result>, Box> { + if self.memo.len() > MAX_MEMO_SIZE { + return Err(TransferError::InvalidMemoSize.into()); + } + + if let Some(bal) = state.get_balance(&context.sender) { + if bal < self.value { + return Err(TransferError::InsufficientFunds.into()); + } + let receiver_bal = state.get_balance(&self.to_address).unwrap_or(0); + + if !state.set_balance(&context.sender, bal - self.value) + || !state.set_balance(&self.to_address, receiver_bal + self.value) + { + return Err(TransferError::StorageError.into()); + } + } else { + return Err(TransferError::SenderAccountNotFound.into()); + } + + Ok(None) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl Default for Transfer { + fn default() -> Self { + Self { + to_address: Address::empty(), + value: 0, + memo: vec![], + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use more_asserts::assert_gt; + use std::error::Error; + + #[test] + fn test_encode_decode() -> Result<(), Box> { + let to_address = Address::create_random_address(); + let value = 5; + let memo = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let origin_msg = Transfer { + to_address, + value, + memo, + }; + let encoded_bytes = origin_msg.encode(); + assert_gt!(encoded_bytes.len(), 0); + let mut decoded_msg = Transfer::new(); + decoded_msg.decode(&encoded_bytes); + assert_eq!(origin_msg.to_address, decoded_msg.to_address); + assert_eq!(origin_msg.value, decoded_msg.value); + assert_eq!(origin_msg.memo, decoded_msg.memo); + Ok(()) + } +} diff --git a/types/src/wallet.rs b/types/src/wallet.rs new file mode 100644 index 00000000..e9e17908 --- /dev/null +++ b/types/src/wallet.rs @@ -0,0 +1,129 @@ +use crate::address::Address; +use crate::{PrivateKey, PublicKey, Signature, TX_NAMESPACE}; +use commonware_cryptography::ed25519::Ed25519; +use commonware_cryptography::Scheme; +use rand::{CryptoRng, Rng}; +use std::fmt::Error; + +#[derive(Clone, Debug)] +pub enum AuthTypes { + ED25519, +} + +/// auth should have a method to verify signatures. +/// also batch signature verification. +pub trait Auth { + // returns the public key of the signer. + fn public_key(&self) -> PublicKey; + // returns the account address of the signer. + fn address(&self) -> Address; + // verifys the signature. + fn verify(&self, data: &[u8], signature: &[u8]) -> bool; + // batch verify signatures. returns false if batch verification fails. + fn batch_verify(&self, data: &[u8], signatures: Vec<&[u8]>) -> bool; +} + +/// Wallet is the module used by the user to sign transactions. Wallet uses Ed25519 signature scheme. +#[derive(Clone)] +pub struct Wallet { + // Private key + priv_key: PrivateKey, + // Public key + pub_key: PublicKey, + // Account Address, is derived from the public key. + address: Address, + // Signer + signer: Ed25519, +} + +// wallet generation, management, and signing should be functions of the wallet. +pub trait WalletMethods { + // create a new wallet using the given randomness. + fn generate(r: &mut R) -> Self; + // load signer from bytes rep of a private key and initialize the wallet. + fn load(&self, priv_key: &[u8]) -> Self; + // sign the given arbitary data with the private key of the wallet. + fn sign(&mut self, data: &[u8]) -> Vec; + // verify the signature of the given data with the public key of the wallet. + fn verify(&self, data: &[u8], signature: &[u8]) + -> Result; + // return corresponding wallet's address. + fn address(&self) -> Address; + // return corresponding wallet's public key. + fn public_key(&self) -> PublicKey; + // return corresponding wallet's private key. + fn private_key(&self) -> Vec; + // store the private key at the given path. + fn store_private_key(&self, path: &str) -> Result<(), Error>; + // @todo remove this? + fn init_address(&mut self); +} + +impl WalletMethods for Wallet { + fn generate(r: &mut R) -> Self { + let signer = Ed25519::new(r); + let pub_key = signer.public_key(); + let address = Address::from_pub_key(&pub_key); + Self { + priv_key: signer.private_key(), + pub_key: signer.public_key(), + address, + signer, + } + } + + fn load(&self, priv_key: &[u8]) -> Self { + let private_key = PrivateKey::try_from(priv_key).expect("Invalid private key"); + let signer = ::from(private_key).unwrap(); + Self { + priv_key: signer.private_key(), + pub_key: signer.public_key(), + address: Address::from_pub_key(&signer.public_key()), + signer, + } + } + + fn sign(&mut self, data: &[u8]) -> Vec { + self.signer.sign(Some(TX_NAMESPACE), data).as_ref().to_vec() + } + + fn verify( + &self, + data: &[u8], + signature: &[u8], + ) -> Result { + let signature = Signature::try_from(signature); + if let Err(e) = signature { + return Err(e); + } + + let signature = signature.unwrap(); + let pub_key = self.signer.public_key(); + Ok(Ed25519::verify( + Some(TX_NAMESPACE), + data, + &pub_key, + &signature, + )) + } + + fn address(&self) -> Address { + self.address.clone() + } + + fn public_key(&self) -> PublicKey { + self.pub_key.clone() + } + + fn private_key(&self) -> Vec { + self.priv_key.as_ref().to_vec() + } + + fn store_private_key(&self, _path: &str) -> Result<(), Error> { + todo!() + } + + fn init_address(&mut self) { + todo!() + } +} diff --git a/vm/Cargo.toml b/vm/Cargo.toml new file mode 100644 index 00000000..0884eda3 --- /dev/null +++ b/vm/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "alto-vm" +version = "0.1.0" +edition = "2021" + +[dependencies] +alto-storage = { workspace = true } +alto-types = { workspace = true } +tracing ={ workspace = true} +tracing-subscriber = { workspace = true } +commonware-codec = {workspace = true} \ No newline at end of file diff --git a/vm/src/capture_logs.rs b/vm/src/capture_logs.rs new file mode 100644 index 00000000..15969e72 --- /dev/null +++ b/vm/src/capture_logs.rs @@ -0,0 +1,49 @@ +use tracing::dispatcher::with_default; +use tracing_subscriber::{fmt, layer::SubscriberExt, registry::Registry}; + +use std::io::{self, Write}; +use std::sync::{Arc, Mutex}; + +struct SharedWriter { + buffer: Arc>>, +} + +impl Write for SharedWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buffer.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +/// Captures all logs emitted inside the given closure and returns them as a string +pub fn capture_logs(func: F) -> (R, String) +where + F: FnOnce() -> R, +{ + let buffer = Arc::new(Mutex::new(Vec::new())); + let writer = { + let buf = Arc::clone(&buffer); + move || SharedWriter { + buffer: Arc::clone(&buf), + } + }; + + let layer = fmt::layer() + .with_writer(writer) + .with_ansi(false) + .without_time(); + + let subscriber = Registry::default().with(layer); + + let dispatch = tracing::Dispatch::new(subscriber); + let result = with_default(&dispatch, func); + + let logs = buffer.lock().unwrap(); + let log_str = String::from_utf8(logs.clone()).unwrap_or_default(); + + (result, log_str) +} diff --git a/vm/src/lib.rs b/vm/src/lib.rs new file mode 100644 index 00000000..6264646b --- /dev/null +++ b/vm/src/lib.rs @@ -0,0 +1,2 @@ +pub mod capture_logs; +pub mod vm; diff --git a/vm/src/vm.rs b/vm/src/vm.rs new file mode 100644 index 00000000..5ce8af70 --- /dev/null +++ b/vm/src/vm.rs @@ -0,0 +1,234 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::vec; + +use crate::capture_logs::capture_logs; +use alto_storage::state_db::StateViewDb; +use alto_storage::transactional_db::TransactionalDb; +use alto_storage::{ + database::Database, + transactional_db::{InMemoryCachingTransactionalDb, Key, Op}, +}; +use alto_types::null_error::NullError; +use alto_types::state_view::StateView; +use alto_types::{ + signed_tx::{SignedTx, SignedTxChars}, + tx::{Tx, TxMethods, TxResult, UnitContext}, +}; + +pub struct VM { + pub block_number: u64, + pub timestamp: u64, + pub chain_id: u64, + pub state_cache: Arc>>, + pub unfinalized_state: Arc>>>, + pub state_db: Arc>, +} + +impl VM { + pub fn new( + block_number: u64, + timestamp: u64, + chain_id: u64, + state_cache: Arc>>, + unfinalized_state: Arc>>>, + state_db: Arc>, + ) -> Self { + Self { + block_number, + timestamp, + chain_id, + state_cache, + unfinalized_state, + state_db, + } + } + + // applies new set of txs on the given state. + // apply assumes apply is equivalent to executing all the txs in a block. + // and moves all the touched state by the txs into unfinalized state. + pub fn apply(&mut self, stxs: Vec) -> Vec { + let unfinalized_state_for_in_mem = + merge_maps(self.unfinalized_state.lock().unwrap().clone()); + let mut in_mem_db = InMemoryCachingTransactionalDb::new( + Arc::clone(&self.state_cache), + Arc::new(Mutex::new(unfinalized_state_for_in_mem)), + Arc::clone(&self.state_db), + ); + let mut results = Vec::new(); + for stx in stxs { + let mut state_view = StateViewDb::new(&mut in_mem_db); + let mut tx = stx.tx.clone(); + tx.set_actor(stx.address()); + let result = self.apply_tx(tx, &mut state_view); + if result.status { + let _ = in_mem_db.commit_last_tx(); + } else { + let _ = in_mem_db.rollback_last_tx(); + } + results.push(result); + } + self.unfinalized_state + .lock() + .unwrap() + .insert(self.block_number, in_mem_db.touched); + // let _ = in_mem_db.commit(); + // @todo we did not call commit, but moved all the touched state into unfinalized state. + results + } + + // applies a single tx on the given state. + fn apply_tx<'a, T: StateView>( + &mut self, + tx: Tx, + state_view: &mut T, + // exec_logs: &'a mut Vec, + ) -> TxResult { + let tx_context = UnitContext { + timestamp: self.timestamp, + chain_id: self.chain_id, + sender: tx.actor(), + }; + let mut sv_boxed: Box<&mut dyn StateView> = Box::new(state_view); + let mut outputs: Vec> = Vec::new(); + // apply units one by one. + // stop and revert if any unit fails. + let (result, log) = capture_logs(|| { + for unit in tx.units { + let res = unit.apply(&tx_context, &mut sv_boxed); + match res { + Ok(output) => { + if let Some(output) = output { + outputs.push(output); + } else { + // if output is None, unit execution does not return anything. + // push empty vec. + outputs.push(vec![]); + } + } + Err(e) => { + // return the error. + return Err(e); + } + } + } + Ok(()) + }); + if result.is_err() { + TxResult { + status: false, + error: result.err().unwrap().to_string(), + exec_logs: log, + output: outputs, + } + } else { + TxResult { + status: true, + error: NullError.to_string(), + exec_logs: log, + output: outputs, + } + } + } +} + +fn merge_maps( + input: HashMap>, +) -> HashMap { + let mut merged = HashMap::new(); + + // Sort the keys in ascending order so we can override with higher keys last + let mut keys: Vec<_> = input.keys().cloned().collect(); + keys.sort(); + + for k in keys { + if let Some(inner_map) = input.get(&k) { + for (inner_key, val) in inner_map { + // Insert or override + merged.insert(inner_key.clone(), val.clone()); + } + } + } + + merged +} + +#[cfg(test)] +mod tests { + use alto_storage::database::Database; + use alto_storage::hashmap_db::HashmapDatabase; + use alto_storage::state_db::StateViewDb; + use alto_storage::transactional_db::{Key, Op}; + use alto_types::account::Account; + use alto_types::address::Address; + use alto_types::create_test_keypair; + use alto_types::curr_timestamp; + use alto_types::signed_tx::{SignedTx, SignedTxChars}; + use alto_types::tx::{Tx, TxMethods, Unit}; + use alto_types::units::msg::SequencerMsg; + use alto_types::units::transfer::Transfer; + use commonware_codec::{Codec, WriteBuffer}; + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + use std::vec; + + use super::VM; + + const DB_WRITE_BUFFER_CAPACITY: usize = 500; + + #[test] + fn test_single_tx() { + let state_db = Arc::new(Mutex::new(HashmapDatabase::new())); + let cache: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let unfinalized: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + let address = Address::create_random_address(); + let account = Account { + address: address.clone(), + balance: 1000, + }; + + let key = StateViewDb::key_accounts(&account.address); + let mut write_buf = WriteBuffer::new(DB_WRITE_BUFFER_CAPACITY); + account.write(&mut write_buf); + state_db + .lock() + .unwrap() + .put(&key, write_buf.as_ref()) + .unwrap(); + + let block_number = 10; + let timestamp = curr_timestamp(); + let chain_id = 1; + let mut vm = VM::new( + block_number, + timestamp, + chain_id, + cache, + unfinalized, + state_db, + ); + let tfer_unit = Transfer { + to_address: Address::create_random_address(), + value: 100, + memo: vec![], + }; + let msg_unit = SequencerMsg { + chain_id: 10, + data: vec![0, 0, 0, 0], + from_address: Address::create_random_address(), + }; + let units: Vec> = vec![Box::new(tfer_unit), Box::new(msg_unit)]; + let tx = ::from(timestamp, units, 10, 5, 1, address); + let (pk, _sk) = create_test_keypair(); + let stx = SignedTx::new(tx, pk, vec![]); + let results = vm.apply(vec![stx]); + assert_eq!(results.len(), 1); + assert_eq!(results[0].status, true); + assert_eq!(results[0].output.len(), 2); + assert_eq!(results[0].output[0].len(), 0); + assert_eq!(results[0].error.to_string(), "NoError"); + println!("{}", results[0].exec_logs); + } +}