Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,8 @@ serde_derive = "1.0.125"
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
serde_regex = "1.1.0"
serde_yaml = "0.9.21"
slog = { version = "2.7.0", features = [
"release_max_level_trace",
"max_level_trace",
] }
sqlparser = "0.46.0"
slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] }
sqlparser = { version = "0.46.0", features = ["visitor"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest version (0.59.0) has an additional variant for SetExpr for DELETE statements. It's probably worth updating and covering the new variant in the validator.

strum = { version = "0.26", features = ["derive"] }
syn = { version = "2.0.106", features = ["full"] }
test-store = { path = "./store/test-store" }
Expand Down
3 changes: 3 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,6 @@ those.
graph-node bugs, but since it is hard to work around them, setting this
variable to something like 10 makes it possible to work around such a bug
while it is being fixed (default: 0)
- `GRAPH_ENABLE_SQL_QUERIES`: Enable the experimental [SQL query
interface](implementation/sql-interface.md).
(default: false)
89 changes: 89 additions & 0 deletions docs/implementation/sql-interface.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# SQL Queries

**This interface is extremely experimental. There is no guarantee that this
interface will ever be brought to production use. It's solely here to help
evaluate the utility of such an interface**

**The interface is only available if the environment variable `GRAPH_ENABLE_SQL_QUERIES` is set to `true`**

SQL queries can be issued by posting a JSON document to
`/subgraphs/sql`. The server will respond with a JSON response that
contains the records matching the query in JSON form.

The body of the request must contain the following keys:

* `deployment`: the hash of the deployment against which the query should
be run
* `query`: the SQL query
* `mode`: either `info` or `data`. When the mode is `info` only some
information of the response is reported, with a mode of `data` the query
result is sent in the response

The SQL query can use all the tables of the given subgraph. Table and
attribute names for normal `@entity` types are snake-cased from their form
in the GraphQL schema, so that data for `SomeDailyStuff` is stored in a
table `some_daily_stuff`. For `@aggregation` types, the table can be
accessed as `<aggregation>(<interval>)`, for example, `my_stats('hour')` for
`type MyStats @aggregation(..) { .. }`

The query can use fairly arbitrary SQL, including aggregations and most
functions built into PostgreSQL.

## Example

For a subgraph whose schema defines an entity `Block`, the following query
```json
{
"query": "select number, hash, parent_hash, timestamp from block order by number desc limit 2",
"deployment": "QmSoMeThInG",
"mode": "data"
}
```

might result in this response
```json
{
"data": [
{
"hash": "\\x5f91e535ee4d328725b869dd96f4c42059e3f2728dfc452c32e5597b28ce68d6",
"number": 5000,
"parent_hash": "\\x82e95c1ee3a98cd0646225b5ae6afc0b0229367b992df97aeb669c898657a4bb",
"timestamp": "2015-07-30T20:07:44+00:00"
},
{
"hash": "\\x82e95c1ee3a98cd0646225b5ae6afc0b0229367b992df97aeb669c898657a4bb",
"number": 4999,
"parent_hash": "\\x875c9a0f8215258c3b17fd5af5127541121cca1f594515aae4fbe5a7fbef8389",
"timestamp": "2015-07-30T20:07:36+00:00"
}
]
}
```

## Limitations/Ideas/Disclaimers

Most of these are fairly easy to address:

- bind variables/query parameters are not supported, only literal SQL
queries
* queries must finish within `GRAPH_SQL_STATEMENT_TIMEOUT` (unlimited by
default)
* queries are always executed at the subgraph head. It would be easy to add
a way to specify a block at which the query should be executed
* the interface right now pretty much exposes the raw SQL schema for a
subgraph, though system columns like `vid` or `block_range` are made
inaccessible.
* it is not possible to join across subgraphs, though it would be possible
to add that. Implenting that would require some additional plumbing that
hides the effects of sharding.
* JSON as the response format is pretty terrible, and we should change that
to something that isn't so inefficient
* the response contains data that's pretty raw; as the example shows,
binary data uses Postgres' notation for hex strings
* because of how broad the supported SQL is, it is pretty easy to issue
queries that take a very long time. It will therefore not be hard to take
down a `graph-node`, especially when no query timeout is set

Most importantly: while quite a bit of effort has been put into making this
interface safe, in particular, making sure it's not possible to write
through this interface, there's no guarantee that this works without bugs.
10 changes: 8 additions & 2 deletions graph/src/components/graphql.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::data::query::QueryResults;
use crate::data::query::{Query, QueryTarget};
use crate::prelude::DeploymentHash;
use crate::data::query::{QueryResults, SqlQueryReq};
use crate::data::store::SqlQueryObject;
use crate::prelude::{DeploymentHash, QueryExecutionError};

use async_trait::async_trait;
use std::sync::Arc;
Expand Down Expand Up @@ -28,6 +29,11 @@ pub trait GraphQlRunner: Send + Sync + 'static {
) -> QueryResults;

fn metrics(&self) -> Arc<dyn GraphQLMetrics>;

async fn run_sql_query(
self: Arc<Self>,
req: SqlQueryReq,
) -> Result<Vec<SqlQueryObject>, QueryExecutionError>;
}

pub trait GraphQLMetrics: Send + Sync + 'static {
Expand Down
4 changes: 3 additions & 1 deletion graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::components::transaction_receipt;
use crate::components::versions::ApiVersion;
use crate::data::query::Trace;
use crate::data::store::ethereum::call;
use crate::data::store::QueryObject;
use crate::data::store::{QueryObject, SqlQueryObject};
use crate::data::subgraph::{status, DeploymentFeatures};
use crate::data::{query::QueryTarget, subgraph::schema::*};
use crate::prelude::{DeploymentState, NodeId, QueryExecutionError, SubgraphName};
Expand Down Expand Up @@ -652,6 +652,8 @@ pub trait QueryStore: Send + Sync {
query: EntityQuery,
) -> Result<(Vec<QueryObject>, Trace), QueryExecutionError>;

fn execute_sql(&self, sql: &str) -> Result<Vec<SqlQueryObject>, QueryExecutionError>;

async fn is_deployment_synced(&self) -> Result<bool, Error>;

async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError>;
Expand Down
5 changes: 4 additions & 1 deletion graph/src/data/query/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub enum QueryExecutionError {
InvalidSubgraphManifest,
ResultTooBig(usize, usize),
DeploymentNotFound(String),
SqlError(String),
IdMissing,
IdNotString,
InternalError(String),
Expand Down Expand Up @@ -135,6 +136,7 @@ impl QueryExecutionError {
| IdMissing
| IdNotString
| InternalError(_) => false,
SqlError(_) => false,
}
}
}
Expand Down Expand Up @@ -213,7 +215,7 @@ impl fmt::Display for QueryExecutionError {
}
InvalidFilterError => write!(f, "Filter must by an object"),
InvalidOrFilterStructure(fields, example) => {
write!(f, "Cannot mix column filters with 'or' operator at the same level. Found column filter(s) {} alongside 'or' operator.\n\n{}",
write!(f, "Cannot mix column filters with 'or' operator at the same level. Found column filter(s) {} alongside 'or' operator.\n\n{}",
fields.join(", "), example)
}
EntityFieldError(e, a) => {
Expand Down Expand Up @@ -281,6 +283,7 @@ impl fmt::Display for QueryExecutionError {
IdMissing => write!(f, "entity is missing an `id` attribute"),
IdNotString => write!(f, "entity `id` attribute is not a string"),
InternalError(msg) => write!(f, "internal error: {}", msg),
SqlError(e) => write!(f, "sql error: {}", e),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion graph/src/data/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ mod trace;

pub use self::cache_status::CacheStatus;
pub use self::error::{QueryError, QueryExecutionError};
pub use self::query::{Query, QueryTarget, QueryVariables};
pub use self::query::{Query, QueryTarget, QueryVariables, SqlQueryMode, SqlQueryReq};
pub use self::result::{LatestBlockInfo, QueryResult, QueryResults};
pub use self::trace::Trace;
26 changes: 25 additions & 1 deletion graph/src/data/query/query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use serde::de::Deserializer;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
use std::hash::{DefaultHasher, Hash as _, Hasher as _};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

Expand Down Expand Up @@ -165,3 +166,26 @@ impl Query {
}
}
}

#[derive(Copy, Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum SqlQueryMode {
Data,
Info,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SqlQueryReq {
pub deployment: DeploymentHash,
pub query: String,
pub mode: SqlQueryMode,
}

impl SqlQueryReq {
pub fn query_hash(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.deployment.hash(&mut hasher);
self.query.hash(&mut hasher);
hasher.finish()
}
}
4 changes: 4 additions & 0 deletions graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,10 @@ pub struct QueryObject {
pub entity: r::Object,
}

/// An object that is returned from a SQL query. It wraps an `r::Value`
#[derive(CacheWeight, Serialize)]
pub struct SqlQueryObject(pub r::Value);

impl CacheWeight for QueryObject {
fn indirect_weight(&self) -> usize {
self.parent.indirect_weight() + self.entity.indirect_weight()
Expand Down
29 changes: 29 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ lazy_static! {
#[cfg(debug_assertions)]
lazy_static! {
pub static ref TEST_WITH_NO_REORG: Mutex<bool> = Mutex::new(false);
pub static ref TEST_SQL_QUERIES_ENABLED: Mutex<bool> = Mutex::new(false);
}

/// Panics if:
Expand Down Expand Up @@ -189,6 +190,10 @@ pub struct EnvVars {
/// Set by the environment variable `ETHEREUM_REORG_THRESHOLD`. The default
/// value is 250 blocks.
reorg_threshold: BlockNumber,
/// Enable SQL query interface. SQL queries are disabled by default
/// because they are still experimental. Set by the environment variable
/// `GRAPH_ENABLE_SQL_QUERIES`. Off by default.
enable_sql_queries: bool,
/// The time to wait between polls when using polling block ingestor.
/// The value is set by `ETHERUM_POLLING_INTERVAL` in millis and the
/// default is 1000.
Expand Down Expand Up @@ -341,6 +346,7 @@ impl EnvVars {
external_ws_base_url: inner.external_ws_base_url,
static_filters_threshold: inner.static_filters_threshold,
reorg_threshold: inner.reorg_threshold,
enable_sql_queries: inner.enable_sql_queries.0,
ingestor_polling_interval: Duration::from_millis(inner.ingestor_polling_interval),
subgraph_settings: inner.subgraph_settings,
prefer_substreams_block_streams: inner.prefer_substreams_block_streams,
Expand Down Expand Up @@ -414,6 +420,27 @@ impl EnvVars {
pub fn reorg_threshold(&self) -> i32 {
self.reorg_threshold
}

#[cfg(debug_assertions)]
pub fn sql_queries_enabled(&self) -> bool {
// SQL queries are disabled by default for security.
// For testing purposes, we allow tests to enable SQL queries via TEST_SQL_QUERIES_ENABLED.
if *TEST_SQL_QUERIES_ENABLED.lock().unwrap() {
true
} else {
self.enable_sql_queries
}
}
#[cfg(not(debug_assertions))]
pub fn sql_queries_enabled(&self) -> bool {
self.enable_sql_queries
}

#[cfg(debug_assertions)]
pub fn enable_sql_queries_for_tests(&self, enable: bool) {
let mut lock = TEST_SQL_QUERIES_ENABLED.lock().unwrap();
*lock = enable;
}
}

impl Default for EnvVars {
Expand Down Expand Up @@ -514,6 +541,8 @@ struct Inner {
// JSON-RPC specific.
#[envconfig(from = "ETHEREUM_REORG_THRESHOLD", default = "250")]
reorg_threshold: BlockNumber,
#[envconfig(from = "GRAPH_ENABLE_SQL_QUERIES", default = "false")]
enable_sql_queries: EnvVarBoolean,
#[envconfig(from = "ETHEREUM_POLLING_INTERVAL", default = "1000")]
ingestor_polling_interval: u64,
#[envconfig(from = "GRAPH_EXPERIMENTAL_SUBGRAPH_SETTINGS")]
Expand Down
8 changes: 8 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,14 @@ impl InputSchema {
.any(|ti| matches!(ti, TypeInfo::Aggregation(_)))
}

pub fn aggregation_names(&self) -> impl Iterator<Item = &str> {
self.inner
.type_infos
.iter()
.filter_map(TypeInfo::aggregation)
.map(|agg_type| self.inner.pool.get(agg_type.name).unwrap())
}

pub fn entity_fulltext_definitions(
&self,
entity: &str,
Expand Down
Loading