Skip to content

Commit 6df4992

Browse files
committed
graph, store: Require function syntax for aggregations
1 parent 1125d3a commit 6df4992

File tree

6 files changed

+153
-20
lines changed

6 files changed

+153
-20
lines changed

docs/implementation/sql-interface.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ The body of the request must contain the following keys:
2020
result is sent in the response
2121

2222
The SQL query can use all the tables of the given subgraph. Table and
23-
attribute names are snake-cased from their form in the GraphQL schema, so
24-
that data for `SomeDailyStuff` is stored in a table `some_daily_stuff`.
23+
attribute names for normal `@entity` types are snake-cased from their form
24+
in the GraphQL schema, so that data for `SomeDailyStuff` is stored in a
25+
table `some_daily_stuff`. For `@aggregation` types, the table can be
26+
accessed as `<aggregation>(<interval>)`, for example, `my_stats('hour')` for
27+
`type MyStats @aggregation(..) { .. }`
2528

2629
The query can use fairly arbitrary SQL, including aggregations and most
2730
functions built into PostgreSQL.

graph/src/schema/input/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,6 +1383,14 @@ impl InputSchema {
13831383
.any(|ti| matches!(ti, TypeInfo::Aggregation(_)))
13841384
}
13851385

1386+
pub fn aggregation_names(&self) -> impl Iterator<Item = &str> {
1387+
self.inner
1388+
.type_infos
1389+
.iter()
1390+
.filter_map(TypeInfo::aggregation)
1391+
.map(|agg_type| self.inner.pool.get(agg_type.name).unwrap())
1392+
}
1393+
13861394
pub fn entity_fulltext_definitions(
13871395
&self,
13881396
entity: &str,

store/postgres/src/relational.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ use graph::data_source::CausalityRegion;
3939
use graph::internal_error;
4040
use graph::prelude::{q, EntityQuery, StopwatchMetrics, ENV_VARS};
4141
use graph::schema::{
42-
EntityKey, EntityType, Field, FulltextConfig, FulltextDefinition, InputSchema,
42+
AggregationInterval, EntityKey, EntityType, Field, FulltextConfig, FulltextDefinition,
43+
InputSchema,
4344
};
4445
use graph::slog::warn;
4546
use index::IndexList;
@@ -1156,6 +1157,27 @@ impl Layout {
11561157
Ok(rollups)
11571158
}
11581159

1160+
/// Given an aggregation name that is already snake-cased like `stats`
1161+
/// (for an an aggregation `type Stats @aggregation(..)`) and an
1162+
/// interval, return the table that holds the aggregated data, like
1163+
/// `stats_hour`.
1164+
pub fn aggregation_table(
1165+
&self,
1166+
aggregation: &str,
1167+
interval: AggregationInterval,
1168+
) -> Option<&Table> {
1169+
let sql_name = format!("{}_{interval}", aggregation);
1170+
self.table(&sql_name)
1171+
}
1172+
1173+
/// Return true if the layout has an aggregation with the given name
1174+
/// like `stats` (already snake_cased)
1175+
pub fn has_aggregation(&self, aggregation: &str) -> bool {
1176+
self.input_schema
1177+
.aggregation_names()
1178+
.any(|agg_name| SqlName::from(agg_name).as_str() == aggregation)
1179+
}
1180+
11591181
/// Roll up all timeseries for each entry in `block_times`. The overall
11601182
/// effect is that all buckets that end after `last_rollup` and before
11611183
/// the last entry in `block_times` are filled. This will fill all

store/postgres/src/sql/parser.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ mod test {
4343

4444
use super::Parser;
4545

46-
const TEST_GQL: &str = "
46+
const TEST_GQL: &str = r#"
4747
type Swap @entity(immutable: true) {
4848
id: Bytes!
4949
timestamp: BigInt!
@@ -68,7 +68,19 @@ mod test {
6868
name: String!
6969
decimals: Int!
7070
}
71-
";
71+
72+
type Data @entity(timeseries: true) {
73+
id: Int8!
74+
timestamp: Timestamp!
75+
price: Int!
76+
}
77+
78+
type Stats @aggregation(intervals: ["hour", "day"], source: "Data") {
79+
id: Int8!
80+
timestamp: Timestamp!
81+
sum: BigDecimal! @aggregate(fn: "sum", arg: "price")
82+
}
83+
"#;
7284

7385
fn parse_and_validate(sql: &str) -> Result<String, anyhow::Error> {
7486
let parser = Parser::new(Arc::new(make_layout(TEST_GQL)), BLOCK_NUMBER_MAX);

store/postgres/src/sql/parser_tests.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,30 @@
9898
- name: unknown tables forbidden
9999
sql: select * from unknown_table
100100
err: Unknown table unknown_table
101+
- name: qualified tables are forbidden
102+
sql: select * from pg_catalog.pg_class
103+
err: "Qualified table names are not supported: pg_catalog.pg_class"
104+
- name: aggregation tables are hidden
105+
sql: select * from stats_hour
106+
err: Unknown table stats_hour
107+
- name: CTEs take precedence
108+
sql: with stats_hour as (select 1) select * from stats_hour
109+
ok: WITH stats_hour AS (SELECT 1) SELECT * FROM stats_hour
110+
- name: aggregation tables use function syntax
111+
sql: select * from stats('hour')
112+
ok: SELECT * FROM (SELECT "id", "timestamp", "sum" FROM "sgd0815"."stats_hour" WHERE block$ <= 2147483647) AS stats_hour
113+
- name: unknown aggregation interval
114+
sql: select * from stats('fortnight')
115+
err: Unknown aggregation interval `fortnight` for table stats
116+
- name: aggregation tables with empty arg
117+
sql: select * from stats('')
118+
err: Unknown aggregation interval `` for table stats
119+
- name: aggregation tables with no args
120+
sql: select * from stats()
121+
err: Invalid syntax for aggregation stats
122+
- name: aggregation tables with multiple args
123+
sql: select * from stats('hour', 'day')
124+
err: Invalid syntax for aggregation stats
125+
- name: aggregation tables with alias
126+
sql: select * from stats('hour') as sh
127+
ok: SELECT * FROM (SELECT "id", "timestamp", "sum" FROM "sgd0815"."stats_hour" WHERE block$ <= 2147483647) AS sh

store/postgres/src/sql/validation.rs

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use graph::prelude::BlockNumber;
2+
use graph::schema::AggregationInterval;
23
use sqlparser::ast::{
3-
Expr, Ident, ObjectName, Offset, Query, SetExpr, Statement, TableAlias, TableFactor, Value,
4-
VisitMut, VisitorMut,
4+
Expr, FunctionArg, FunctionArgExpr, Ident, ObjectName, Offset, Query, SetExpr, Statement,
5+
TableAlias, TableFactor, Value, VisitMut, VisitorMut,
56
};
67
use sqlparser::parser::Parser;
78
use std::result::Result;
@@ -22,12 +23,18 @@ pub enum Error {
2223
NotSelectQuery,
2324
#[error("Unknown table {0}")]
2425
UnknownTable(String),
26+
#[error("Unknown aggregation interval `{1}` for table {0}")]
27+
UnknownAggregationInterval(String, String),
28+
#[error("Invalid syntax for aggregation {0}")]
29+
InvalidAggregationSyntax(String),
2530
#[error("Only constant numbers are supported for LIMIT and OFFSET.")]
2631
UnsupportedLimitOffset,
2732
#[error("The limit of {0} is greater than the maximum allowed limit of {1}.")]
2833
UnsupportedLimit(u32, u32),
2934
#[error("The offset of {0} is greater than the maximum allowed offset of {1}.")]
3035
UnsupportedOffset(u32, u32),
36+
#[error("Qualified table names are not supported: {0}")]
37+
NoQualifiedTables(String),
3138
}
3239

3340
pub struct Validator<'a> {
@@ -151,25 +158,79 @@ impl VisitorMut for Validator<'_> {
151158
&mut self,
152159
table_factor: &mut TableFactor,
153160
) -> ControlFlow<Self::Break> {
161+
/// Check whether `args` is a single string argument and return that
162+
/// string
163+
fn extract_string_arg(args: &Vec<FunctionArg>) -> Option<String> {
164+
if args.len() != 1 {
165+
return None;
166+
}
167+
match &args[0] {
168+
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(
169+
Value::SingleQuotedString(s),
170+
))) => Some(s.clone()),
171+
_ => None,
172+
}
173+
}
174+
154175
if let TableFactor::Table {
155176
name, args, alias, ..
156177
} = table_factor
157178
{
158-
if args.is_some() {
159-
return self.validate_function_name(name);
179+
if name.0.len() != 1 {
180+
// We do not support schema qualified table names
181+
return ControlFlow::Break(Error::NoQualifiedTables(name.to_string()));
160182
}
161-
let table = if let Some(table_name) = name.0.last() {
162-
let name = &table_name.value;
163-
let Some(table) = self.layout.table(name) else {
164-
if self.ctes.contains(name) {
165-
return ControlFlow::Continue(());
166-
} else {
167-
return ControlFlow::Break(Error::UnknownTable(name.to_string()));
168-
}
169-
};
170-
table
171-
} else {
183+
let table_name = &name.0[0].value;
184+
185+
// CTES override subgraph tables
186+
if self.ctes.contains(&table_name.to_lowercase()) && args.is_none() {
172187
return ControlFlow::Continue(());
188+
}
189+
190+
let table = match (self.layout.table(table_name), args) {
191+
(None, None) => {
192+
return ControlFlow::Break(Error::UnknownTable(table_name.clone()));
193+
}
194+
(Some(_), Some(_)) => {
195+
// Table exists but has args, must be a function
196+
return self.validate_function_name(&name);
197+
}
198+
(None, Some(args)) => {
199+
// Table does not exist but has args, is either an
200+
// aggregation table in the form <name>(<interval>) or
201+
// must be a function
202+
203+
if !self.layout.has_aggregation(table_name) {
204+
// Not an aggregation, must be a function
205+
return self.validate_function_name(&name);
206+
}
207+
208+
let Some(intv) = extract_string_arg(args) else {
209+
// Looks like an aggregation, but argument is not a single string
210+
return ControlFlow::Break(Error::InvalidAggregationSyntax(
211+
table_name.clone(),
212+
));
213+
};
214+
let Some(intv) = intv.parse::<AggregationInterval>().ok() else {
215+
return ControlFlow::Break(Error::UnknownAggregationInterval(
216+
table_name.clone(),
217+
intv,
218+
));
219+
};
220+
221+
let Some(table) = self.layout.aggregation_table(table_name, intv) else {
222+
return self.validate_function_name(&name);
223+
};
224+
table
225+
}
226+
(Some(table), None) => {
227+
if !table.object.is_object_type() {
228+
// Interfaces and aggregations can not be queried
229+
// with the table name directly
230+
return ControlFlow::Break(Error::UnknownTable(table_name.clone()));
231+
}
232+
table
233+
}
173234
};
174235

175236
// Change 'from table [as alias]' to 'from (select {columns} from table) as alias'

0 commit comments

Comments
 (0)