#862 much faster version of aggregate_15_min_mvt#863
Conversation
|
Here is an example just for a single day (only testing select, not update): ran in 8s:
WITH temp AS (
-- Cross product of dates, intersections, legal movement for cars, bikes, and peds to aggregate
SELECT
im.intersection_uid,
dt.datetime_bin,
im.classification_uid,
im.leg,
im.movement_uid,
0 AS volume
FROM miovision_api.intersection_movements AS im
CROSS JOIN generate_series(
'2024-02-07'::date,
'2024-02-08'::date - interval '15 minutes',
interval '15 minutes'
) AS dt(datetime_bin)
WHERE
--0 padding for certain modes (padding)
im.classification_uid IN (1,2,6,10)
--AND im.intersection_uid = ANY(target_intersections)
UNION ALL
--real volumes
SELECT
v.intersection_uid,
datetime_bin_15(v.datetime_bin) AS datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
SUM(volume)
FROM miovision_api.volumes AS v
--only aggregate common movements
JOIN miovision_api.intersection_movements USING (
intersection_uid, classification_uid, leg, movement_uid
)
WHERE
v.datetime_bin >= '2024-02-07'::date
AND v.datetime_bin < '2024-02-08'::date
--AND v.intersection_uid = ANY(target_intersections)
GROUP BY
v.intersection_uid,
datetime_bin_15(v.datetime_bin),
v.classification_uid,
v.leg,
v.movement_uid
),
aggregate_insert AS (
/*INSERT INTO miovision_api.volumes_15min_mvt(
intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
)*/
SELECT DISTINCT ON (
v.intersection_uid, v.datetime_bin, v.classification_uid, v.leg, v.movement_uid
)
v.intersection_uid,
v.datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
CASE
--set unacceptable gaps as nulls
WHEN un.datetime_bin IS NOT NULL THEN NULL
--gap fill with zeros (restricted to certain modes in temp CTE)
ELSE v.volume
END AS volume
FROM temp AS v
JOIN miovision_api.intersections AS i USING (intersection_uid)
--set unacceptable gaps as null
LEFT JOIN miovision_api.unacceptable_gaps AS un USING (
intersection_uid, datetime_bin
)
WHERE
-- Only include dates during which intersection is active
-- (excludes entire day it was added/removed)
v.datetime_bin >= i.date_installed + interval '1 day'
AND (
i.date_decommissioned IS NULL
OR (v.datetime_bin < i.date_decommissioned - interval '1 day')
)
ORDER BY
v.intersection_uid,
v.datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
--select real value instead of padding value if available
v.volume DESC
)
SELECT COUNT(*), SUM(volume), 'new' AS source FROM aggregate_insert
UNION
SELECT COUNT(*), SUM(volume), 'old' AS source FROM miovision_api.volumes_15min_mvt WHERE datetime_bin >= '2024-02-07'::date AND datetime_bin < '2024-02-08'::date And this select for 1 month, 2 intersections ran in 6s!!!
WITH temp AS (
-- Cross product of dates, intersections, legal movement for cars, bikes, and peds to aggregate
SELECT
im.intersection_uid,
dt.datetime_bin,
im.classification_uid,
im.leg,
im.movement_uid,
0 AS volume
FROM miovision_api.intersection_movements AS im
CROSS JOIN generate_series(
'2024-01-07'::date,
'2024-02-08'::date - interval '15 minutes',
interval '15 minutes'
) AS dt(datetime_bin)
WHERE
--0 padding for certain modes (padding)
im.classification_uid IN (1,2,6,10)
AND im.intersection_uid = ANY(ARRAY[12,25]::integer [])
UNION ALL
--real volumes
SELECT
v.intersection_uid,
datetime_bin_15(v.datetime_bin) AS datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
SUM(volume)
FROM miovision_api.volumes AS v
--only aggregate common movements
JOIN miovision_api.intersection_movements USING (
intersection_uid, classification_uid, leg, movement_uid
)
WHERE
v.datetime_bin >= '2024-01-07'::date
AND v.datetime_bin < '2024-02-08'::date
AND v.intersection_uid = ANY(ARRAY[12,25]::integer [])
GROUP BY
v.intersection_uid,
datetime_bin_15(v.datetime_bin),
v.classification_uid,
v.leg,
v.movement_uid
),
aggregate_insert AS (
/*INSERT INTO miovision_api.volumes_15min_mvt(
intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
)*/
SELECT DISTINCT ON (
v.intersection_uid, v.datetime_bin, v.classification_uid, v.leg, v.movement_uid
)
v.intersection_uid,
v.datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
CASE
--set unacceptable gaps as nulls
WHEN un.datetime_bin IS NOT NULL THEN NULL
--gap fill with zeros (restricted to certain modes in temp CTE)
ELSE v.volume
END AS volume
FROM temp AS v
JOIN miovision_api.intersections AS i USING (intersection_uid)
--set unacceptable gaps as null
LEFT JOIN miovision_api.unacceptable_gaps AS un USING (
intersection_uid, datetime_bin
)
WHERE
-- Only include dates during which intersection is active
-- (excludes entire day it was added/removed)
v.datetime_bin >= i.date_installed + interval '1 day'
AND (
i.date_decommissioned IS NULL
OR (v.datetime_bin < i.date_decommissioned - interval '1 day')
)
ORDER BY
v.intersection_uid,
v.datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
--select real value instead of padding value if available
v.volume DESC
)
SELECT COUNT(*), SUM(volume), 'new' AS source FROM aggregate_insert
UNION
SELECT COUNT(*), SUM(volume), 'old' AS source FROM miovision_api.volumes_15min_mvt
WHERE datetime_bin >= '2024-01-07'::date AND datetime_bin < '2024-02-08'::date
AND intersection_uid = ANY(ARRAY[12,25]::integer []) |
| OR (v.datetime_bin < i.date_decommissioned - interval '1 day') | ||
| ) | ||
| --exclude movements already aggregated | ||
| AND v.volume_15min_mvt_uid IS NULL |
There was a problem hiding this comment.
is this excluded cause we have the clearing function?
There was a problem hiding this comment.
yes exactly! it is redundant - we always run clear + aggregate together whether through run-api or airflow.
|
I tested it on a day with unacceptable gaps and the result is different comparing to the data in 15_mvt. I am wondering if the following worked previously with the old function as intended cause I dont see Nulls for unacceptable gaps in the 15_mvt table. I tested with the date |
|
Here's the test you ran last week @chmnata which gave suspicious results along with my investigation ✅
--create a temp table to store and compare testing results
create table gwolofs.miovision_Test_null as
WITH temp AS (
-- Cross product of dates, intersections, legal movement for cars, bikes, and peds to aggregate
SELECT
im.intersection_uid,
dt.datetime_bin,
im.classification_uid,
im.leg,
im.movement_uid,
0 AS volume
FROM miovision_api.intersection_movements AS im
CROSS JOIN generate_series(
'2024-01-02'::date,
'2024-01-03'::date - interval '15 minutes',
interval '15 minutes'
) AS dt(datetime_bin)
WHERE
--0 padding for certain modes (padding)
im.classification_uid IN (1,2,6,10)
--AND im.intersection_uid = ANY(target_intersections)
UNION ALL
--real volumes
SELECT
v.intersection_uid,
datetime_bin_15(v.datetime_bin) AS datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
SUM(volume)
FROM miovision_api.volumes AS v
--only aggregate common movements
JOIN miovision_api.intersection_movements USING (
intersection_uid, classification_uid, leg, movement_uid
)
WHERE
v.datetime_bin >= '2024-01-02'::date
AND v.datetime_bin < '2024-01-03'::date
--AND v.intersection_uid = ANY(target_intersections)
GROUP BY
v.intersection_uid,
datetime_bin_15(v.datetime_bin),
v.classification_uid,
v.leg,
v.movement_uid
)
/*INSERT INTO miovision_api.volumes_15min_mvt(
intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
)*/
SELECT DISTINCT ON (
v.intersection_uid, v.datetime_bin, v.classification_uid, v.leg, v.movement_uid
)
v.intersection_uid,
v.datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
CASE
--set unacceptable gaps as nulls
WHEN un.datetime_bin IS NOT NULL THEN NULL
--gap fill with zeros (restricted to certain modes in temp CTE)
ELSE v.volume
END AS volume
FROM temp AS v
JOIN miovision_api.intersections AS i USING (intersection_uid)
--set unacceptable gaps as null
LEFT JOIN miovision_api.unacceptable_gaps AS un USING (
intersection_uid, datetime_bin
)
WHERE
-- Only include dates during which intersection is active
-- (excludes entire day it was added/removed)
v.datetime_bin >= i.date_installed + interval '1 day'
AND (
i.date_decommissioned IS NULL
OR (v.datetime_bin < i.date_decommissioned - interval '1 day')
)
ORDER BY
v.intersection_uid,
v.datetime_bin,
v.classification_uid,
v.leg,
v.movement_uid,
--select real value instead of padding value if available
v.volume DESC NULLS LAST
GRANT ALL ON gwolofs.miovision_Test_null TO natalie, dbadmin;
--the 44 records that do not match are all for intersection_uid = 60 AND datetime_bin = '2024-01-02 23:45:00'.
--These records are zero in the old result and null in the new result. The record is in unacceptable_ranges, so it should be null.
SELECT intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
FROM miovision_api.volumes_15min_mvt as v
WHERE v.datetime_bin >= '2024-01-02'::date
AND v.datetime_bin < '2024-01-03'::date
EXCEPT
SELECT * from gwolofs.miovision_Test_null
WHERE intersection_uid NOT IN (67, 68) --these intersections were not aggregated in the old dag run from 01-02 because they were only added to intersection_movements later
--the 44 records that do not match are all for intersection_uid = 60 AND datetime_bin = '2024-01-02 23:45:00'.
--These records are null in the new result and zero in the old result. The record is in unacceptable_ranges, so it should be null.
SELECT * from gwolofs.miovision_Test_null
WHERE intersection_uid NOT IN (67, 68) --these intersections were not aggregated in the old dag run from 01-02 because they were only added to intersection_movements later
EXCEPT
SELECT intersection_uid, datetime_bin, classification_uid, leg, movement_uid,
volume
FROM miovision_api.volumes_15min_mvt as v
WHERE v.datetime_bin >= '2024-01-02'::date AND v.datetime_bin < '2024-01-03'::date
--correctly get nulls for this case when re-aggregating using old function:
SELECT
im.intersection_uid,
dt.datetime_bin,
im.classification_uid,
im.leg,
im.movement_uid,
CASE
--set unacceptable gaps as nulls
WHEN un.datetime_bin IS NOT NULL THEN NULL
--gap fill with zeros (restricted to certain modes in having clause)
ELSE (COALESCE(SUM(v.volume), 0))
END AS volume
-- Cross product of dates, intersections, legal movement for cars, bikes, and peds to aggregate
FROM miovision_api.intersection_movements AS im
CROSS JOIN generate_series(
'2024-01-02 23:45:00'::timestamp,
'2024-01-03'::timestamp - interval '15 minutes',
interval '15 minutes'
) AS dt(datetime_bin)
JOIN miovision_api.intersections AS mai USING (intersection_uid)
--To avoid aggregating unacceptable gaps
LEFT JOIN miovision_api.unacceptable_gaps AS un ON
un.intersection_uid = im.intersection_uid
--remove the 15 minute bin containing any unacceptable gaps
AND dt.datetime_bin = un.datetime_bin
--To get 1min bins
LEFT JOIN miovision_api.volumes AS v ON
--help query choose correct partition
v.datetime_bin = '2024-01-02 23:45:00'::timestamp
AND v.datetime_bin >= dt.datetime_bin
AND v.datetime_bin < dt.datetime_bin + interval '15 minutes'
AND v.intersection_uid = im.intersection_uid
AND v.classification_uid = im.classification_uid
AND v.leg = im.leg
AND v.movement_uid = im.movement_uid
WHERE
-- Only include dates during which intersection is active
-- (excludes entire day it was added/removed)
dt.datetime_bin > mai.date_installed + interval '1 day'
AND (
mai.date_decommissioned IS NULL
OR (dt.datetime_bin < mai.date_decommissioned - interval '1 day')
)
--exclude movements already aggregated
AND v.volume_15min_mvt_uid IS NULL
AND im.intersection_uid = ANY(ARRAY[46]::integer[])
GROUP BY
im.intersection_uid,
dt.datetime_bin,
im.classification_uid,
im.leg,
im.movement_uid,
un.datetime_bin
HAVING
--retain 0s for certain modes (padding)
im.classification_uid IN (1,2,6,10)
OR SUM(v.volume) > 0
|
Thanks for the investigation! Do you know how much of the data currently in 15_mvt will need to be updated due to changed unacceptable gap? |
|
Here are 12 dates with records that should be nulls but are not. You were lucky to pick one of them at random! I will create a new issue for this investigation.
SELECT datetime_bin::date, COUNT(*)
FROM miovision_api.volumes_15min_mvt AS v
JOIN miovision_api.unacceptable_gaps AS un USING (intersection_uid, datetime_bin)
WHERE v.volume IS NOT NULL
GROUP BY 1
ORDER BY 1 DESC |
|
This new year is giving me the luck we needed!!!!! 🧧 |
We'll fix all the downstream tables as well by running |
|
updated in the database |
|
This aggregation today took 5 minutes instead of 6 hours like above! 🥳 only 70x faster in the end, not 3000. |
What this pull request accomplishes:
Issue(s) this solves:
What, in particular, needs to reviewed:
What needs to be done by a sysadmin after this PR is merged