From 75f608c9b54546a3666bebc7a2a9cb70abc16153 Mon Sep 17 00:00:00 2001 From: Michael Chavinda Date: Wed, 19 Nov 2025 13:03:19 -0800 Subject: [PATCH 1/7] Add Haskell implementation (#1) * Add Haskell dataframe benchmark entry --------- Co-authored-by: Claude --- .github/workflows/regression.yml | 2 +- .gitignore | 2 + README.md | 1 + _benchplot/benchplot-dict.R | 36 +- _control/solutions.csv | 2 + _launcher/launcher.R | 2 +- _launcher/solution.R | 4 +- _report/report.R | 2 +- haskell/README.md | 85 ++++ haskell/VERSION | 1 + haskell/exec.sh | 6 + haskell/groupby-haskell.hs | 765 +++++++++++++++++++++++++++++++ haskell/haskell-benchmark.cabal | 30 ++ haskell/join-haskell.hs | 537 ++++++++++++++++++++++ haskell/setup-haskell.sh | 24 + haskell/stack.yaml | 67 +++ haskell/upg-haskell.sh | 13 + haskell/ver-haskell.sh | 20 + run.conf | 2 +- run.sh | 2 + 20 files changed, 1592 insertions(+), 11 deletions(-) create mode 100644 haskell/README.md create mode 100644 haskell/VERSION create mode 100755 haskell/exec.sh create mode 100755 haskell/groupby-haskell.hs create mode 100644 haskell/haskell-benchmark.cabal create mode 100644 haskell/join-haskell.hs create mode 100755 haskell/setup-haskell.sh create mode 100644 haskell/stack.yaml create mode 100755 haskell/upg-haskell.sh create mode 100755 haskell/ver-haskell.sh diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml index 41717226..d7173efa 100644 --- a/.github/workflows/regression.yml +++ b/.github/workflows/regression.yml @@ -17,7 +17,7 @@ jobs: strategy: fail-fast: false matrix: - solution: [data.table, collapse, dplyr, pandas, spark, polars, R-arrow, duckdb, datafusion, dask, clickhouse, chdb] + solution: [data.table, collapse, dplyr, pandas, spark, polars, R-arrow, duckdb, datafusion, dask, clickhouse, chdb, haskell] name: Solo solutions runs-on: ubuntu-latest env: diff --git a/.gitignore b/.gitignore index 78694453..8ab87bb6 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ workdir/ timeout-exit-codes.out */target *.lock +dist-newstyle +.stack-work diff --git a/README.md b/README.md index 9bbe2c6b..d3b8e601 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ Contribution and feedback are very welcome! - [x] [DataFrames.jl](https://github.com/JuliaData/DataFrames.jl) - [x] [In Memory DataSets](https://github.com/sl-solution/InMemoryDatasets.jl) - [x] [Datafusion](https://github.com/apache/arrow-datafusion) + - [x] [(haskell)dataframe](https://github.com/mchav/dataframe) If you would like your solution to be included, feel free to file a PR with the necessary setup-_solution_/ver-_solution_/groupby-_solution_/join-_solution_ scripts. If the team at DuckDB Labs approves the PR it will be merged. In the interest of transparency and fairness, only results from open-source data-science tools will be merged. diff --git a/_benchplot/benchplot-dict.R b/_benchplot/benchplot-dict.R index fedc4aac..c944579a 100644 --- a/_benchplot/benchplot-dict.R +++ b/_benchplot/benchplot-dict.R @@ -46,7 +46,8 @@ solution.dict = {list( "duckdb" = list(name=c(short="duckdb", long="DuckDB"), color=c(strong="#ddcd07", light="#fff100")), "duckdb-latest" = list(name=c(short="duckdb-latest", long="duckdb-latest"), color=c(strong="#ddcd07", light="#fff100")), "datafusion" = list(name=c(short="datafusion", long="Datafusion"), color=c(strong="deepskyblue4", light="deepskyblue3")), - "chdb" = list(name=c(short="chdb", long="chDB"), color=c(strong="hotpink4", light="hotpink1")) + "chdb" = list(name=c(short="chdb", long="chDB"), color=c(strong="hotpink4", light="hotpink1")), + "haskell" = list(name=c(short="haskell", long="Haskell"), color=c(strong="#3d0569ff", light="#61298bff")), )} #barplot(rep(c(0L,1L,1L), length(solution.dict)), # col=rev(c(rbind(sapply(solution.dict, `[[`, "color"), "black"))), @@ -259,7 +260,19 @@ groupby.syntax.dict = {list( "largest two v3 by id6" = "SELECT id6, arrayJoin(arraySlice(arrayReverseSort(groupArray(v3)), 1, 2)) AS v3 FROM (SELECT id6, v3 FROM db_benchmark.x WHERE v3 IS NOT NULL) AS subq GROUP BY id6", "regression v1 v2 by id2 id4" = "SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM db_benchmark.x GROUP BY id2, id4", "sum v3 count by id1:id6" = "SELECT id1, id2, id3, id4, id5, id6, sum(v3) AS v3, count() AS cnt FROM db_benchmark.x GROUP BY id1, id2, id3, id4, id5, id6" - )} + )}, + "haskell" = {c( + "sum v1 by id1" = "df |> D.groupby [\"id1\"] |> D.aggregate [\"v1_sum\" .= F.sum (F.col @Int \"v1\")]", + "sum v1 by id1:id2" = "df |> D.groupby [\"id1\", \"id2\"] |> D.aggregate [\"v1_sum\" .= F.sum (F.col @Int \"v1\")]", + "sum v1 mean v3 by id3" = "df |> D.groupby [\"id3\"] |> D.aggregate [\"v1_sum\" .= F.sum (F.col @Int \"v1\"), \"v3_mean\" .= F.mean (F.col @Double \"v3\")]", + "mean v1:v3 by id4" = "df |> D.groupby [\"id4\"] |> D.aggregate [\"v1_mean\" .= F.mean (F.col @Int \"v1\"), \"v2_mean\" .= F.mean (F.col @Int \"v2\"), \"v3_mean\" .= F.mean (F.col @Double \"v3\")]", + "sum v1:v3 by id6" = "df |> D.groupby [\"id6\"] |> D.aggregate [\"v1_sum\" .= F.sum (F.col @Int \"v1\"), \"v2_sum\" .= F.sum (F.col @Int \"v2\"), \"v3_sum\" .= F.sum (F.col @Double \"v3\")]", + "median v3 sd v3 by id4 id5" = "df |> D.groupby [\"id4\", \"id5\"] |> D.aggregate [\"v3_median\" .= F.median (F.col @Doublee \"v3\"), \"v3_sd\" .= F.stddev (F.col @Double \"v3\")]", + "max v1 - min v2 by id3" = "df |> D.groupby [\"id3\"] |> D.aggregate [\"diff\" .= F.maximum (F.col @Int \"v1\") - F.minimum (F.col @Int \"v2\")]", + "largest two v3 by id6" = "", + "regression v1 v2 by id2 id4" = "", + "sum v3 count by id1:id6" = "df |> D.groupBy [\"id1\",\"id2\",\"id3\",\"id4\",\"id5\",\"id6\"]).agg([F.sum (F.col @Double \"v3\") `F.as` \"v3\", F..count (F.col @Int \"v1\") `F.as` \"count\"]" + )}, )} groupby.query.exceptions = {list( "collapse" = list(), @@ -277,7 +290,8 @@ groupby.syntax.dict = {list( "duckdb" = list(), "duckdb-latest" = list(), "datafusion" = list(), - "chdb" = list() + "chdb" = list(), + "haskell" = list() )} groupby.data.exceptions = {list( # exceptions as of run 1575727624 "collapse" = {list( @@ -348,6 +362,8 @@ groupby.data.exceptions = {list( "Not Tested" = c("G1_1e9_1e2_0_0") )}, "chdb" = {list( + )}, + "haskell" = {list( )} )} groupby.exceptions = task.exceptions(groupby.query.exceptions, groupby.data.exceptions) @@ -472,7 +488,14 @@ join.syntax.dict = {list( "medium outer on int" = "SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 as medium_id5, v2 FROM db_benchmark.x AS x LEFT JOIN db_benchmark.medium AS medium USING (id2)", "medium inner on factor" = "SELECT x.*, medium.id1 AS medium_id1, medium.id2 AS medium_id2, medium.id4 as medium_id4, v2 FROM db_benchmark.x AS x INNER JOIN db_benchmark.medium AS medium USING (id5)", "big inner on int" = "SELECT x.*, big.id1 AS big_id1, big.id2 AS big_id2, big.id4 as big_id4, big.id5 AS big_id5, big.id6 AS big_id6, v2 FROM db_benchmark.x AS x INNER JOIN db_benchmark.big AS big USING (id3)" - )} + )}, + "haskell" = {c( + "small inner on int" = "D.innerJoin [\"id1\"] small small", + "medium inner on int" = "D.innerJoin [\"id2\"] medium medium", + "medium outer on int" = "D.leftJoin [\"id2\"] medium medium", + "medium inner on factor" = "D.innerJoin [\"id5\"] medium medium", + "big inner on int" = "D.innerJoin [\"id3\"] big big" + )}, )} join.query.exceptions = {list( "collapse" = list(), @@ -490,7 +513,8 @@ join.query.exceptions = {list( "duckdb" = list(), "duckdb-latest" = list(), "datafusion" = list(), - "chdb" = list() + "chdb" = list(), + "haskell" = list() )} join.data.exceptions = {list( # exceptions as of run 1575727624 "collapse" = {list( @@ -550,6 +574,8 @@ join.data.exceptions = {list( "Not tested" = c("J1_1e9_NA_0_0") )}, "chdb" = {list( + )}, + "haskell" = {list( )} )} join.exceptions = task.exceptions(join.query.exceptions, join.data.exceptions) diff --git a/_control/solutions.csv b/_control/solutions.csv index 2888dd13..3c12ae32 100644 --- a/_control/solutions.csv +++ b/_control/solutions.csv @@ -33,3 +33,5 @@ datafusion,groupby datafusion,join chdb,groupby chdb,join +haskell,groupby +haskell,join diff --git a/_launcher/launcher.R b/_launcher/launcher.R index be0e4b2b..ae92dbdf 100644 --- a/_launcher/launcher.R +++ b/_launcher/launcher.R @@ -16,7 +16,7 @@ file.ext = function(x) { x, "collapse"=, "data.table"=, "dplyr"=, "h2o"=, "R-arrow"=, "duckdb"="R", "duckdb-latest"="R", "pandas"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "datafusion"=, "polars"="py", - "clickhouse"="sh", "juliadf"="jl", "juliads"="jl", "chdb"="py" + "clickhouse"="sh", "juliadf"="jl", "juliads"="jl", "chdb"="py", "haskell"="hs", ) if (is.null(ans)) stop(sprintf("solution %s does not have file extension defined in file.ext helper function", x)) ans diff --git a/_launcher/solution.R b/_launcher/solution.R index 98c4298e..48bf86b6 100755 --- a/_launcher/solution.R +++ b/_launcher/solution.R @@ -112,7 +112,7 @@ file.ext = function(x) { x, "collapse"=, "data.table"=, "dplyr"=, "h2o"=, "R-arrow"=, "duckdb"="R", "duckdb-latest"="R", "pandas"="py", "spark"=, "pydatatable"=, "modin"=, "dask"=, "datafusion"=, "polars"="py", - "clickhouse"="sh", "juliadf"="jl", "juliads"="jl", "chdb"="py" + "clickhouse"="sh", "juliadf"="jl", "juliads"="jl", "chdb"="py", "haskell"="hs", ) if (is.null(ans)) stop(sprintf("solution %s does not have file extension defined in file.ext helper function", x)) ans @@ -153,7 +153,7 @@ setenv("SRC_DATANAME", d) ns = solution.path(s) ext = file.ext(s) -localcmd = if (s %in% c("clickhouse","h2o","juliadf", "juliads")) { # custom launcher bash script, for clickhouse h2o juliadf +localcmd = if (s %in% c("clickhouse","h2o","juliadf", "juliads", "haskell")) { # custom launcher bash script, for clickhouse h2o juliadf sprintf("exec.sh %s", t) } else if (s %in% c("dask")) { sprintf("%s_%s.%s", t, ns, ext) diff --git a/_report/report.R b/_report/report.R index a726b628..6cd5a483 100644 --- a/_report/report.R +++ b/_report/report.R @@ -6,7 +6,7 @@ get_report_status_file = function(path=getwd()) { file.path(path, "report-done") } get_report_solutions = function() { - c("duckdb-latest", "collapse", "data.table", "dplyr", "pandas", "pydatatable", "spark", "dask", "juliadf", "juliads", "clickhouse", "cudf", "polars", "duckdb", "datafusion", "arrow", "R-arrow", "chdb") + c("duckdb-latest", "collapse", "data.table", "dplyr", "pandas", "pydatatable", "spark", "dask", "juliadf", "juliads", "clickhouse", "cudf", "polars", "duckdb", "datafusion", "arrow", "R-arrow", "chdb", "haskell") } get_data_levels = function() { ## groupby diff --git a/haskell/README.md b/haskell/README.md new file mode 100644 index 00000000..d8f9abe2 --- /dev/null +++ b/haskell/README.md @@ -0,0 +1,85 @@ +# Haskell DataFrame Benchmark + +This benchmark entry uses Haskell with the `mchav/dataframe` library to implement dataframe operations. + +## Implementation Details + +- **Language**: Haskell (GHC) +- **DataFrame Library**: [mchav/dataframe](https://github.com/mchav/dataframe) +- **Build Tool**: Stack + +## About mchav/dataframe + +The `dataframe` library is a fast, safe, and intuitive DataFrame library for Haskell that provides: +- Type-safe column operations +- Familiar operations for users coming from pandas, dplyr, or polars +- Concise, declarative, and composable data pipelines +- Static typing that catches many bugs at compile time + +Resources: +- GitHub: https://github.com/mchav/dataframe +- Hackage: https://hackage.haskell.org/package/dataframe +- Documentation: https://dataframe.readthedocs.io/ + +## Implemented Benchmarks + +### Groupby (`groupby-haskell.hs`) +Implements 5 out of 10 groupby questions: +1. sum v1 by id1 +2. sum v1 by id1:id2 +3. sum v1 mean v3 by id3 +4. mean v1:v3 by id4 +5. sum v1:v3 by id6 + +Uses `D.groupBy` and `D.aggregate` with expression DSL (`F.sum`, `F.mean`). + +Note: Questions 6-10 would require additional statistical functions (median, standard deviation, regression, top-n selection). + +### Join (`join-haskell.hs`) +Implements all 5 join questions: +1. small inner on int +2. medium inner on int +3. medium outer on int (using leftJoin) +4. medium inner on factor +5. big inner on int + +Uses `DJ.innerJoin` and `DJ.leftJoin` from `DataFrame.Operations.Join`. + +## Setup + +Run the setup script to install dependencies: +```bash +./haskell/setup-haskell.sh +``` + +This will: +1. Install Stack (if not present) +2. Initialize the Stack project +3. Build all dependencies +4. Compile the benchmark executables + +## API Usage Examples + +```haskell +-- Read CSV +df <- D.readCsv "data/file.csv" + +-- GroupBy with aggregation +let grouped = D.groupBy ["id1"] df +let result = D.aggregate [F.sum (F.col @Double "v1") `F.as` "v1_sum"] grouped + +-- Inner Join +let joined = DJ.innerJoin ["id1"] df1 df2 + +-- Get dimensions +let (rows, cols) = D.dimensions df +``` + +## Performance Notes + +The implementation uses: +- Type-safe column operations with `TypeApplications` +- Expression DSL for clean aggregation syntax +- Efficient grouping and joining operations from the dataframe library + +This benchmark demonstrates Haskell's capabilities for high-performance dataframe operations with the additional benefits of static typing and functional programming. diff --git a/haskell/VERSION b/haskell/VERSION new file mode 100644 index 00000000..1d45831f --- /dev/null +++ b/haskell/VERSION @@ -0,0 +1 @@ +0.3.3.7 diff --git a/haskell/exec.sh b/haskell/exec.sh new file mode 100755 index 00000000..65e04cc8 --- /dev/null +++ b/haskell/exec.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -e + +cd ./haskell + +stack run "$1-haskell" diff --git a/haskell/groupby-haskell.hs b/haskell/groupby-haskell.hs new file mode 100755 index 00000000..5be66398 --- /dev/null +++ b/haskell/groupby-haskell.hs @@ -0,0 +1,765 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeApplications #-} + +import Control.Exception (evaluate) +import Data.List (intercalate) +import Data.Maybe +import qualified Data.Text as T +import Data.Time.Clock.POSIX (getPOSIXTime) +import qualified Data.Vector as V +import qualified Data.Vector.Unboxed as VU +import qualified DataFrame as D +import qualified DataFrame.Functions as F +import GHC.Stats +import System.Directory (doesFileExist) +import System.Environment (getEnv, lookupEnv) +import System.IO (hFlush, hPutStrLn, stderr, stdout) +import System.Posix.Process (getProcessID) +import System.Process (readProcess) +import Text.Read + + +main :: IO () +main = do + putStrLn "# groupby-haskell.hs" + hFlush stdout + + let ver = "0.3.3" + let git = "dataframe" + let task = "groupby" + let solution = "haskell" + let fun = "groupBy" + let cache = "TRUE" + let onDisk = "FALSE" + + dataName <- getEnv "SRC_DATANAME" + machineType <- getEnv "MACHINE_TYPE" + let srcFile = "../data/" ++ dataName ++ ".csv" + + putStrLn $ "loading dataset " ++ dataName + hFlush stdout + + -- Check if data has NAs + let parts = T.splitOn "_" (T.pack dataName) + let naFlag = if length parts > 3 then read (T.unpack $ parts !! 3) :: Int else 0 + + if naFlag > 0 + then do + hPutStrLn stderr "skip due to na_flag>0" + return () + else do + -- Load CSV data using dataframe + x <- D.readCsv srcFile + + let (inRows, _) = D.dimensions x + putStrLn $ show inRows + hFlush stdout + + putStrLn "grouping..." + hFlush stdout + + -- Question 1: sum v1 by id1 + let question1 = "sum v1 by id1" + (ans1, t1_1) <- timeIt $ do + let grouped = D.groupBy ["id1"] x + let result = D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] grouped + return result + m1_1 <- getMemoryUsage + let (outRows1, outCols1) = D.dimensions ans1 + (chk1, chkt1_1) <- timeIt $ do + let sumV1 = D.sum (F.col @Int "v1_sum") ans1 + print sumV1 + return sumV1 + writeLog + task + dataName + inRows + question1 + outRows1 + outCols1 + solution + ver + git + fun + 1 + t1_1 + m1_1 + cache + (makeChk [fromIntegral chk1]) + chkt1_1 + onDisk + machineType + + -- Run 2 + (ans1_2, t1_2) <- timeIt $ do + let grouped = D.groupBy ["id1"] x + let result = D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] grouped + return result + m1_2 <- getMemoryUsage + let (outRows1_2, outCols1_2) = D.dimensions ans1_2 + (chk1_2, chkt1_2) <- timeIt $ do + let sumV1 = D.sum (F.col @Int "v1_sum") ans1_2 + print sumV1 + return sumV1 + writeLog + task + dataName + inRows + question1 + outRows1_2 + outCols1_2 + solution + ver + git + fun + 2 + t1_2 + m1_2 + cache + (makeChk [fromIntegral chk1_2]) + chkt1_2 + onDisk + machineType + putStrLn $ "Question 1 completed: " ++ show outRows1_2 ++ " groups" + + -- Question 2: sum v1 by id1:id2 + let question2 = "sum v1 by id1:id2" + (ans2, t2_1) <- timeIt $ do + let grouped = D.groupBy ["id1", "id2"] x + let result = D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] grouped + return result + m2_1 <- getMemoryUsage + let (outRows2, outCols2) = D.dimensions ans2 + (chk2, chkt2_1) <- timeIt $ do + let sumV1 = D.sum (F.col @Int "v1_sum") ans2 + print sumV1 + return sumV1 + writeLog + task + dataName + inRows + question2 + outRows2 + outCols2 + solution + ver + git + fun + 1 + t2_1 + m2_1 + cache + (makeChk [fromIntegral chk2]) + chkt2_1 + onDisk + machineType + + -- Run 2 + (ans2_2, t2_2) <- timeIt $ do + let grouped = D.groupBy ["id1", "id2"] x + let result = D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] grouped + return result + m2_2 <- getMemoryUsage + let (outRows2_2, outCols2_2) = D.dimensions ans2_2 + (chk2_2, chkt2_2) <- timeIt $ do + let sumV1 = D.sum (F.col @Int "v1_sum") ans2_2 + print sumV1 + return sumV1 + writeLog + task + dataName + inRows + question2 + outRows2_2 + outCols2_2 + solution + ver + git + fun + 2 + t2_2 + m2_2 + cache + (makeChk [fromIntegral chk2_2]) + chkt2_2 + onDisk + machineType + putStrLn $ "Question 2 completed: " ++ show outRows2_2 ++ " groups" + + -- Question 3: sum v1 mean v3 by id3 + let question3 = "sum v1 mean v3 by id3" + (ans3, t3_1) <- timeIt $ do + let grouped = D.groupBy ["id3"] x + let result = + D.aggregate + [ F.sum (F.col @Int "v1") `F.as` "v1_sum" + , F.mean (F.col @Double "v3") `F.as` "v3_mean" + ] + grouped + return result + m3_1 <- getMemoryUsage + let (outRows3, outCols3) = D.dimensions ans3 + (chk3, chkt3_1) <- timeIt $ do + let sumV1 = D.sum (F.col @Int "v1_sum") ans3 + let sumV3 = D.sum (F.col @Double "v3_mean") ans3 + print (sumV1, sumV3) + return (sumV1, sumV3) + writeLog + task + dataName + inRows + question3 + outRows3 + outCols3 + solution + ver + git + fun + 1 + t3_1 + m3_1 + cache + (makeChk [fromIntegral (fst chk3), snd chk3]) + chkt3_1 + onDisk + machineType + + -- Run 2 + (ans3_2, t3_2) <- timeIt $ do + let grouped = D.groupBy ["id3"] x + let result = + D.aggregate + [ F.sum (F.col @Int "v1") `F.as` "v1_sum" + , F.mean (F.col @Double "v3") `F.as` "v3_mean" + ] + grouped + return result + m3_2 <- getMemoryUsage + let (outRows3_2, outCols3_2) = D.dimensions ans3_2 + (chk3_2, chkt3_2) <- timeIt $ do + let sumV1 = D.sum (F.col @Int "v1_sum") ans3_2 + let sumV3 = D.sum (F.col @Double "v3_mean") ans3_2 + print (sumV1, sumV3) + return (sumV1, sumV3) + writeLog + task + dataName + inRows + question3 + outRows3_2 + outCols3_2 + solution + ver + git + fun + 2 + t3_2 + m3_2 + cache + (makeChk [fromIntegral (fst chk3_2), snd chk3_2]) + chkt3_2 + onDisk + machineType + putStrLn $ "Question 3 completed: " ++ show outRows3_2 ++ " groups" + + -- Question 4: mean v1:v3 by id4 + let question4 = "mean v1:v3 by id4" + (ans4, t4_1) <- timeIt $ do + let grouped = D.groupBy ["id4"] x + let result = + D.aggregate + [ F.mean (F.col @Int "v1") `F.as` "v1_mean" + , F.mean (F.col @Int "v2") `F.as` "v2_mean" + , F.mean (F.col @Double "v3") `F.as` "v3_mean" + ] + grouped + return result + m4_1 <- getMemoryUsage + let (outRows4, outCols4) = D.dimensions ans4 + (chk4, chkt4_1) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1_mean" ans4 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2_mean" ans4 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV3 = case D.columnAsDoubleVector "v3_mean" ans4 of + Right vec -> VU.sum vec + Left _ -> 0 + print (sumV1, sumV2, sumV3) + return (sumV1, sumV2, sumV3) + writeLog + task + dataName + inRows + question4 + outRows4 + outCols4 + solution + ver + git + fun + 1 + t4_1 + m4_1 + cache + (makeChk [(\(a, _, _) -> a) chk4, (\(_, b, _) -> b) chk4, (\(_, _, c) -> c) chk4]) + chkt4_1 + onDisk + machineType + + -- Run 2 + (ans4_2, t4_2) <- timeIt $ do + let grouped = D.groupBy ["id4"] x + let result = + D.aggregate + [ F.mean (F.col @Int "v1") `F.as` "v1_mean" + , F.mean (F.col @Int "v2") `F.as` "v2_mean" + , F.mean (F.col @Double "v3") `F.as` "v3_mean" + ] + grouped + return result + m4_2 <- getMemoryUsage + let (outRows4_2, outCols4_2) = D.dimensions ans4_2 + (chk4_2, chkt4_2) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1_mean" ans4_2 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2_mean" ans4_2 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV3 = case D.columnAsDoubleVector "v3_mean" ans4_2 of + Right vec -> VU.sum vec + Left _ -> 0 + print (sumV1, sumV2, sumV3) + return (sumV1, sumV2, sumV3) + writeLog + task + dataName + inRows + question4 + outRows4_2 + outCols4_2 + solution + ver + git + fun + 2 + t4_2 + m4_2 + cache + ( makeChk + [(\(a, _, _) -> a) chk4_2, (\(_, b, _) -> b) chk4_2, (\(_, _, c) -> c) chk4_2] + ) + chkt4_2 + onDisk + machineType + putStrLn $ "Question 4 completed: " ++ show outRows4_2 ++ " groups" + + -- Question 6: sum v1:v3 by id6 + let question5 = "sum v1:v3 by id6" + (ans5, t5_1) <- timeIt $ do + let grouped = D.groupBy ["id6"] x + let result = + D.aggregate + [ F.sum (F.col @Int "v1") `F.as` "v1_sum" + , F.sum (F.col @Int "v2") `F.as` "v2_sum" + , F.sum (F.col @Double "v3") `F.as` "v3_sum" + ] + grouped + return result + m5_1 <- getMemoryUsage + let (outRows5, outCols5) = D.dimensions ans5 + (chk5, chkt5_1) <- timeIt $ do + let sumV1 = D.sum (F.col @Int "v1_sum") ans5 + let sumV2 = D.sum (F.col @Int "v2_sum") ans5 + let sumV3 = D.sum (F.col @Double "v3_sum") ans5 + print (sumV1, sumV2, sumV3) + return (sumV1, sumV2, sumV3) + writeLog + task + dataName + inRows + question5 + outRows5 + outCols5 + solution + ver + git + fun + 1 + t5_1 + m5_1 + cache + ( makeChk + [ (\(a, _, _) -> fromIntegral a) chk5 + , (\(_, b, _) -> fromIntegral b) chk5 + , (\(_, _, c) -> c) chk5 + ] + ) + chkt5_1 + onDisk + machineType + + -- Run 2 + (ans5_2, t5_2) <- timeIt $ do + let grouped = D.groupBy ["id6"] x + let result = + D.aggregate + [ F.sum (F.col @Int "v1") `F.as` "v1_sum" + , F.sum (F.col @Int "v2") `F.as` "v2_sum" + , F.sum (F.col @Double "v3") `F.as` "v3_sum" + ] + grouped + return result + m5_2 <- getMemoryUsage + let (outRows5_2, outCols5_2) = D.dimensions ans5_2 + (chk5_2, chkt5_2) <- timeIt $ do + let sumV1 = D.sum (F.col @Int "v1_sum") ans5_2 + let sumV2 = D.sum (F.col @Int "v2_sum") ans5_2 + let sumV3 = D.sum (F.col @Double "v3_sum") ans5_2 + print (sumV1, sumV2, sumV3) + return (sumV1, sumV2, sumV3) + writeLog + task + dataName + inRows + question5 + outRows5_2 + outCols5_2 + solution + ver + git + fun + 2 + t5_2 + m5_2 + cache + ( makeChk + [ (\(a, _, _) -> fromIntegral a) chk5_2 + , (\(_, b, _) -> fromIntegral b) chk5_2 + , (\(_, _, c) -> c) chk5_2 + ] + ) + chkt5_2 + onDisk + machineType + putStrLn $ "Question 5 completed: " ++ show outRows5_2 ++ " groups" + + -- Question 6: median v3 sd v3 by id4 id5 + let question6 = "median v3 sd v3 by id4 id5" + (ans6, t6_1) <- timeIt $ do + let grouped = D.groupBy ["id4", "id5"] x + let result = + D.aggregate + [ F.median (F.col @Double "v3") `F.as` "v3_median" + , F.stddev (F.col @Double "v3") `F.as` "v3_sd" + ] + grouped + return result + m6_1 <- getMemoryUsage + let (outRows6, outCols6) = D.dimensions ans6 + (chk6, chkt6_1) <- timeIt $ do + let sumMedianV3 = D.sum (F.col @Double "v3_median") ans6 + let sumSdV3 = D.sum (F.col @Double "v3_sd") ans6 + print (sumMedianV3, sumSdV3) + return (sumMedianV3, sumSdV3) + writeLog + task + dataName + inRows + question6 + outRows6 + outCols6 + solution + ver + git + fun + 1 + t6_1 + m6_1 + cache + (makeChk [fst chk6, snd chk6]) + chkt6_1 + onDisk + machineType + + -- Run 2 + (ans6_2, t6_2) <- timeIt $ do + let grouped = D.groupBy ["id4", "id5"] x + let result = + D.aggregate + [ F.median (F.col @Double "v3") `F.as` "v3_median" + , F.stddev (F.col @Double "v3") `F.as` "v3_sd" + ] + grouped + return result + m6_2 <- getMemoryUsage + let (outRows6_2, outCols6_2) = D.dimensions ans6_2 + (chk6_2, chkt6_2) <- timeIt $ do + let sumMedianV3 = D.sum (F.col @Double "v3_median") ans6 + let sumSdV3 = D.sum (F.col @Double "v3_sd") ans6 + print (sumMedianV3, sumSdV3) + return (sumMedianV3, sumSdV3) + writeLog + task + dataName + inRows + question6 + outRows6_2 + outCols6_2 + solution + ver + git + fun + 2 + t6_2 + m6_2 + cache + (makeChk [fst chk6_2, snd chk6_2]) + chkt6_2 + onDisk + machineType + putStrLn $ "Question 6 completed: " ++ show outRows6_2 ++ " groups" + + -- "max v1 - min v2 by id3" + let question7 = "median v3 sd v3 by id4 id5" + (ans7, t7_1) <- timeIt $ do + let grouped = D.groupBy ["id3"] x + let result = + D.aggregate + [(F.maximum (F.col @Int "v1") - F.minimum (F.col @Int "v2")) `F.as` "diff"] + grouped + return result + m7_1 <- getMemoryUsage + let (outRows7, outCols7) = D.dimensions ans7 + (chk7, chkt7_1) <- timeIt $ do + let sumDiff = D.sum (F.col @Int "diff") ans7 + print sumDiff + return sumDiff + writeLog + task + dataName + inRows + question7 + outRows7 + outCols7 + solution + ver + git + fun + 1 + t7_1 + m7_1 + cache + (makeChk [fromIntegral chk7]) + chkt7_1 + onDisk + machineType + + -- Run 2 + (ans7_2, t7_2) <- timeIt $ do + let grouped = D.groupBy ["id3"] x + let result = + D.aggregate + [(F.maximum (F.col @Int "v1") - F.minimum (F.col @Int "v2")) `F.as` "diff"] + grouped + return result + m7_2 <- getMemoryUsage + let (outRows7_2, outCols7_2) = D.dimensions ans7_2 + (chk7_2, chkt7_2) <- timeIt $ do + let sumDiff = D.sum (F.col @Int "diff") ans7 + print sumDiff + return sumDiff + writeLog + task + dataName + inRows + question7 + outRows7_2 + outCols7_2 + solution + ver + git + fun + 2 + t7_2 + m7_2 + cache + (makeChk [fromIntegral chk7_2]) + chkt7_2 + onDisk + machineType + putStrLn $ "Question 7 completed: " ++ show outRows7_2 ++ " groups" + + -- "largest two v3 by id6" + putStrLn "largest two v3 by id6 unimplemented" + + -- "regression v1 v2 by id2 id4" + putStrLn "regression v1 v2 by id2 id4 unimplemented" + + -- "sum v3 count by id1:id6" + let question10 = "sum v3 count by id1:id6" + (ans10, t10_1) <- timeIt $ do + let grouped = + D.groupBy (zipWith (\i n -> i <> (T.pack . show) n) (cycle ["id"]) [1 .. 6]) x + let result = + D.aggregate + [F.sum (F.col @Double "v3") `F.as` "v3_sum"] + grouped + return result + m10_1 <- getMemoryUsage + let (outRows10, outCols10) = D.dimensions ans10 + (chk10, chkt10_1) <- timeIt $ do + let sumV3 = D.sum (F.col @Double "v3_sum") ans10 + print sumV3 + return sumV3 + writeLog + task + dataName + inRows + question10 + outRows10 + outCols10 + solution + ver + git + fun + 1 + t10_1 + m10_1 + cache + (makeChk [chk10]) + chkt10_1 + onDisk + machineType + + -- Run 2 + (ans10_2, t10_2) <- timeIt $ do + let grouped = + D.groupBy (zipWith (\i n -> i <> (T.pack . show) n) (cycle ["id"]) [1 .. 6]) x + let result = + D.aggregate + [F.sum (F.col @Double "v3") `F.as` "v3_sum"] + grouped + return result + m10_2 <- getMemoryUsage + let (outRows10_2, outCols10_2) = D.dimensions ans10_2 + (chk10_2, chkt10_2) <- timeIt $ do + let sumDiff = D.sum (F.col @Double "v3_sum") ans10 + print sumDiff + return sumDiff + writeLog + task + dataName + inRows + question10 + outRows10_2 + outCols10_2 + solution + ver + git + fun + 2 + t10_2 + m10_2 + cache + (makeChk [chk10_2]) + chkt10_2 + onDisk + machineType + putStrLn $ "Question 10 completed: " ++ show outRows7_2 ++ " groups" + + putStrLn + "Haskell dataframe groupby benchmark completed (8 questions implemented)!" + +-- Helper functions for logging +writeLog :: + String -> + String -> + Int -> + String -> + Int -> + Int -> + String -> + String -> + String -> + String -> + Int -> + Double -> + Double -> + String -> + String -> + Double -> + String -> + String -> + IO () +writeLog task dataName inRows question outRows outCols solution version git fun run timeSec memGb cache chk chkTimeSec onDisk machineType = do + batch <- lookupEnv "BATCH" >>= return . maybe "" id + timestamp <- getPOSIXTime + csvFile <- lookupEnv "CSV_TIME_FILE" >>= return . maybe "time.csv" id + nodename <- fmap init (readProcess "hostname" [] "") + + let comment = "" + let timeSecRound = roundTo 3 timeSec + let chkTimeSecRound = roundTo 3 chkTimeSec + let memGbRound = roundTo 3 memGb + + let logRow = + intercalate + "," + [ nodename + , batch + , show timestamp + , task + , dataName + , show inRows + , question + , show outRows + , show outCols + , solution + , version + , git + , fun + , show run + , show timeSecRound + , show memGbRound + , cache + , chk + , show chkTimeSecRound + , comment + , onDisk + , machineType + ] + + fileExists <- doesFileExist csvFile + if fileExists + then appendFile csvFile (logRow ++ "\n") + else do + let header = + "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" + writeFile csvFile (header ++ logRow ++ "\n") + +roundTo :: Int -> Double -> Double +roundTo n x = (fromInteger $ round $ x * (10 ^ n)) / (10.0 ^^ n) + +makeChk :: [Double] -> String +makeChk values = intercalate ";" (map formatVal values) + where + formatVal x = map (\c -> if c == ',' then '_' else c) (show $ roundTo 3 x) + +getMemoryUsage :: IO Double +getMemoryUsage = do + pid <- getProcessID + mem <- + fmap (filter (/= ' ') . init) (readProcess "ps" ["-o", "rss", show pid] "") + let rssKb = if null mem then 0 else fromMaybe 0 (readMaybe @Double mem) + return (rssKb / (1024 * 1024)) + +timeIt :: (Show a) => IO a -> IO (a, Double) +timeIt action = do + start <- getPOSIXTime + result <- action + _ <- print result + end <- getPOSIXTime + return (result, realToFrac (end - start)) diff --git a/haskell/haskell-benchmark.cabal b/haskell/haskell-benchmark.cabal new file mode 100644 index 00000000..5c0a06d9 --- /dev/null +++ b/haskell/haskell-benchmark.cabal @@ -0,0 +1,30 @@ +name: haskell-benchmark +version: 0.1.0.0 +build-type: Simple +cabal-version: >=1.10 + +executable groupby-haskell + main-is: groupby-haskell.hs + build-depends: base >= 4.7 && < 5 + , dataframe >= 0.3 + , text >= 1.2 + , vector >= 0.12 + , time >= 1.9 + , process >= 1.6 + , directory >= 1.3 + , unix + default-language: Haskell2010 + ghc-options: -O2 -threaded -rtsopts -with-rtsopts=-N + +executable join-haskell + main-is: join-haskell.hs + build-depends: base >= 4.7 && < 5 + , dataframe >= 0.3 + , text >= 1.2 + , vector >= 0.12 + , time >= 1.9 + , process >= 1.6 + , directory >= 1.3 + , unix + default-language: Haskell2010 + ghc-options: -O2 -threaded -rtsopts -with-rtsopts=-N diff --git a/haskell/join-haskell.hs b/haskell/join-haskell.hs new file mode 100644 index 00000000..57036111 --- /dev/null +++ b/haskell/join-haskell.hs @@ -0,0 +1,537 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeApplications #-} + +import Control.Exception (evaluate) +import Data.List (intercalate) +import qualified Data.Text as T +import Data.Time.Clock.POSIX (getPOSIXTime) +import qualified Data.Vector as V +import qualified Data.Vector.Unboxed as VU +import qualified DataFrame as D +import qualified DataFrame.Operations.Join as DJ +import System.Directory (doesFileExist) +import System.Environment (getEnv, lookupEnv) +import System.IO (hFlush, stdout) +import System.Process (readProcess) + +-- Helper functions for logging +writeLog :: + String -> + String -> + Int -> + String -> + Int -> + Int -> + String -> + String -> + String -> + String -> + Int -> + Double -> + Double -> + String -> + String -> + Double -> + String -> + String -> + IO () +writeLog task dataName inRows question outRows outCols solution version git fun run timeSec memGb cache chk chkTimeSec onDisk machineType = do + batch <- lookupEnv "BATCH" >>= return . maybe "" id + timestamp <- getPOSIXTime + csvFile <- lookupEnv "CSV_TIME_FILE" >>= return . maybe "time.csv" id + nodename <- fmap init (readProcess "hostname" [] "") + + let comment = "" + let timeSecRound = roundTo 3 timeSec + let chkTimeSecRound = roundTo 3 chkTimeSec + let memGbRound = roundTo 3 memGb + + let logRow = + intercalate + "," + [ nodename + , batch + , show timestamp + , task + , dataName + , show inRows + , question + , show outRows + , show outCols + , solution + , version + , git + , fun + , show run + , show timeSecRound + , show memGbRound + , cache + , chk + , show chkTimeSecRound + , comment + , onDisk + , machineType + ] + + fileExists <- doesFileExist csvFile + if fileExists + then appendFile csvFile (logRow ++ "\n") + else do + let header = + "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" + writeFile csvFile (header ++ logRow ++ "\n") + +roundTo :: Int -> Double -> Double +roundTo n x = (fromInteger $ round $ x * (10 ^ n)) / (10.0 ^^ n) + +makeChk :: [Double] -> String +makeChk values = intercalate ";" (map formatVal values) + where + formatVal x = map (\c -> if c == ',' then '_' else c) (show $ roundTo 3 x) + +getMemoryUsage :: IO Double +getMemoryUsage = do + pid <- getProcessID + mem <- + fmap (filter (/= ' ') . init) (readProcess "ps" ["-o", "rss", show pid] "") + let rssKb = if null mem then 0 else fromMaybe 0 (readMaybe @Double mem) + return (rssKb / (1024 * 1024)) + +timeIt :: IO a -> IO (a, Double) +timeIt action = do + start <- getPOSIXTime + result <- action + _ <- evaluate result + end <- getPOSIXTime + return (result, realToFrac (end - start)) + +-- Parse join_to_tbls logic to get table names +joinToTbls :: String -> [String] +joinToTbls dataName = + let parts = T.splitOn "_" (T.pack dataName) + xnStr = if length parts > 1 then T.unpack (parts !! 1) else "1e7" + xn = read xnStr :: Double + yn1 = show (floor (xn / 1e6) :: Int) ++ "e4" + yn2 = show (floor (xn / 1e3) :: Int) ++ "e3" + yn3 = show (floor xn :: Int) + in [ T.unpack $ T.replace "NA" (T.pack yn1) (T.pack dataName) + , T.unpack $ T.replace "NA" (T.pack yn2) (T.pack dataName) + , T.unpack $ T.replace "NA" (T.pack yn3) (T.pack dataName) + ] + +main :: IO () +main = do + putStrLn "# join-haskell.hs" + hFlush stdout + + let ver = "0.3.3" + let git = "dataframe" + let task = "join" + let solution = "haskell" + let fun = "innerJoin" + let cache = "TRUE" + let onDisk = "FALSE" + + dataName <- getEnv "SRC_DATANAME" + machineType <- getEnv "MACHINE_TYPE" + + let yDataNames = joinToTbls dataName + let srcJnX = "../data/" ++ dataName ++ ".csv" + let srcJnY = + [ "../data/" ++ yDataNames !! 0 ++ ".csv" + , "../data/" ++ yDataNames !! 1 ++ ".csv" + , "../data/" ++ yDataNames !! 2 ++ ".csv" + ] + + putStrLn $ + "loading datasets " + ++ dataName + ++ ", " + ++ yDataNames !! 0 + ++ ", " + ++ yDataNames !! 1 + ++ ", " + ++ yDataNames !! 2 + hFlush stdout + + -- Load all datasets using dataframe + x <- D.readCsv srcJnX + small <- D.readCsv (srcJnY !! 0) + medium <- D.readCsv (srcJnY !! 1) + big <- D.readCsv (srcJnY !! 2) + + let (xRows, _) = D.dimensions x + let (smallRows, _) = D.dimensions small + let (mediumRows, _) = D.dimensions medium + let (bigRows, _) = D.dimensions big + + putStrLn $ show xRows + putStrLn $ show smallRows + putStrLn $ show mediumRows + putStrLn $ show bigRows + hFlush stdout + + putStrLn "joining..." + hFlush stdout + + -- Question 1: small inner on int + let question1 = "small inner on int" + (ans1, t1_1) <- timeIt $ do + let result = DJ.innerJoin ["id1"] x small + return result + m1_1 <- getMemoryUsage + let (outRows1, outCols1) = D.dimensions ans1 + (chk1, chkt1_1) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans1 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans1 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question1 + outRows1 + outCols1 + solution + ver + git + fun + 1 + t1_1 + m1_1 + cache + (makeChk [fst chk1, snd chk1]) + chkt1_1 + onDisk + machineType + + -- Run 2 + (ans1_2, t1_2) <- timeIt $ do + let result = DJ.innerJoin ["id1"] x small + return result + m1_2 <- getMemoryUsage + let (outRows1_2, outCols1_2) = D.dimensions ans1_2 + (chk1_2, chkt1_2) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans1_2 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans1_2 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question1 + outRows1_2 + outCols1_2 + solution + ver + git + fun + 2 + t1_2 + m1_2 + cache + (makeChk [fst chk1_2, snd chk1_2]) + chkt1_2 + onDisk + machineType + putStrLn $ "Question 1 completed: " ++ show outRows1_2 ++ " rows" + + -- Question 2: medium inner on int + let question2 = "medium inner on int" + (ans2, t2_1) <- timeIt $ do + let result = DJ.innerJoin ["id1"] x medium + return result + m2_1 <- getMemoryUsage + let (outRows2, outCols2) = D.dimensions ans2 + (chk2, chkt2_1) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans2 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans2 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question2 + outRows2 + outCols2 + solution + ver + git + fun + 1 + t2_1 + m2_1 + cache + (makeChk [fst chk2, snd chk2]) + chkt2_1 + onDisk + machineType + + -- Run 2 + (ans2_2, t2_2) <- timeIt $ do + let result = DJ.innerJoin ["id1"] x medium + return result + m2_2 <- getMemoryUsage + let (outRows2_2, outCols2_2) = D.dimensions ans2_2 + (chk2_2, chkt2_2) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans2_2 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans2_2 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question2 + outRows2_2 + outCols2_2 + solution + ver + git + fun + 2 + t2_2 + m2_2 + cache + (makeChk [fst chk2_2, snd chk2_2]) + chkt2_2 + onDisk + machineType + putStrLn $ "Question 2 completed: " ++ show outRows2_2 ++ " rows" + + -- Question 3: medium outer on int + let question3 = "medium outer on int" + (ans3, t3_1) <- timeIt $ do + let result = DJ.leftJoin ["id1"] x medium + return result + m3_1 <- getMemoryUsage + let (outRows3, outCols3) = D.dimensions ans3 + (chk3, chkt3_1) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans3 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans3 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question3 + outRows3 + outCols3 + solution + ver + git + fun + 1 + t3_1 + m3_1 + cache + (makeChk [fst chk3, snd chk3]) + chkt3_1 + onDisk + machineType + + -- Run 2 + (ans3_2, t3_2) <- timeIt $ do + let result = DJ.leftJoin ["id1"] x medium + return result + m3_2 <- getMemoryUsage + let (outRows3_2, outCols3_2) = D.dimensions ans3_2 + (chk3_2, chkt3_2) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans3_2 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans3_2 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question3 + outRows3_2 + outCols3_2 + solution + ver + git + fun + 2 + t3_2 + m3_2 + cache + (makeChk [fst chk3_2, snd chk3_2]) + chkt3_2 + onDisk + machineType + putStrLn $ "Question 3 completed: " ++ show outRows3_2 ++ " rows" + + -- Question 4: medium inner on factor + let question4 = "medium inner on factor" + (ans4, t4_1) <- timeIt $ do + let result = DJ.innerJoin ["id4"] x medium + return result + m4_1 <- getMemoryUsage + let (outRows4, outCols4) = D.dimensions ans4 + (chk4, chkt4_1) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans4 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans4 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question4 + outRows4 + outCols4 + solution + ver + git + fun + 1 + t4_1 + m4_1 + cache + (makeChk [fst chk4, snd chk4]) + chkt4_1 + onDisk + machineType + + -- Run 2 + (ans4_2, t4_2) <- timeIt $ do + let result = DJ.innerJoin ["id4"] x medium + return result + m4_2 <- getMemoryUsage + let (outRows4_2, outCols4_2) = D.dimensions ans4_2 + (chk4_2, chkt4_2) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans4_2 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans4_2 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question4 + outRows4_2 + outCols4_2 + solution + ver + git + fun + 2 + t4_2 + m4_2 + cache + (makeChk [fst chk4_2, snd chk4_2]) + chkt4_2 + onDisk + machineType + putStrLn $ "Question 4 completed: " ++ show outRows4_2 ++ " rows" + + -- Question 5: big inner on int + let question5 = "big inner on int" + (ans5, t5_1) <- timeIt $ do + let result = DJ.innerJoin ["id1"] x big + return result + m5_1 <- getMemoryUsage + let (outRows5, outCols5) = D.dimensions ans5 + (chk5, chkt5_1) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans5 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans5 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question5 + outRows5 + outCols5 + solution + ver + git + fun + 1 + t5_1 + m5_1 + cache + (makeChk [fst chk5, snd chk5]) + chkt5_1 + onDisk + machineType + + -- Run 2 + (ans5_2, t5_2) <- timeIt $ do + let result = DJ.innerJoin ["id1"] x big + return result + m5_2 <- getMemoryUsage + let (outRows5_2, outCols5_2) = D.dimensions ans5_2 + (chk5_2, chkt5_2) <- timeIt $ do + let sumV1 = case D.columnAsDoubleVector "v1" ans5_2 of + Right vec -> VU.sum vec + Left _ -> 0 + let sumV2 = case D.columnAsDoubleVector "v2" ans5_2 of + Right vec -> VU.sum vec + Left _ -> 0 + evaluate (sumV1, sumV2) + return (sumV1, sumV2) + writeLog + task + dataName + xRows + question5 + outRows5_2 + outCols5_2 + solution + ver + git + fun + 2 + t5_2 + m5_2 + cache + (makeChk [fst chk5_2, snd chk5_2]) + chkt5_2 + onDisk + machineType + putStrLn $ "Question 5 completed: " ++ show outRows5_2 ++ " rows" + + putStrLn "Haskell dataframe join benchmark completed (5 questions implemented)!" diff --git a/haskell/setup-haskell.sh b/haskell/setup-haskell.sh new file mode 100755 index 00000000..be35eaf7 --- /dev/null +++ b/haskell/setup-haskell.sh @@ -0,0 +1,24 @@ +#!/bin/bash +set -e + +# Install Stack (Haskell build tool) if not present +if ! command -v stack &> /dev/null; then + echo "Installing Stack..." + curl -sSL https://get.haskellstack.org/ | sh +fi + +cd haskell + +# Initialize stack project if not already done +if [ ! -f "stack.yaml" ]; then + stack init --force +fi + +# Install dependencies and build +stack setup +stack build --only-dependencies --ghc-options "-O2" +stack build --ghc-options "-O2" + +cd .. + +./haskell/ver-haskell.sh diff --git a/haskell/stack.yaml b/haskell/stack.yaml new file mode 100644 index 00000000..95f46dd4 --- /dev/null +++ b/haskell/stack.yaml @@ -0,0 +1,67 @@ +# This file was automatically generated by 'stack init' +# +# Some commonly used options have been documented as comments in this file. +# For advanced use and comprehensive documentation of the format, please see: +# https://docs.haskellstack.org/en/stable/configure/yaml/ + +# A 'specific' Stackage snapshot or a compiler version. +# A snapshot resolver dictates the compiler version and the set of packages +# to be used for project dependencies. For example: +# +# snapshot: lts-23.0 +# snapshot: nightly-2024-12-13 +# snapshot: ghc-9.8.4 +# +# The location of a snapshot can be provided as a file or url. Stack assumes +# a snapshot provided as a file might change, whereas a url resource does not. +# +# snapshot: ./custom-snapshot.yaml +# snapshot: https://example.com/snapshots/2024-01-01.yaml +snapshot: + url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/nightly/2025/11/19.yaml + +# User packages to be built. +# Various formats can be used as shown in the example below. +# +# packages: +# - some-directory +# - https://example.com/foo/bar/baz-0.0.2.tar.gz +# subdirs: +# - auto-update +# - wai +packages: +- . +# Dependency packages to be pulled from upstream that are not in the snapshot. +# These entries can reference officially published versions as well as +# forks / in-progress versions pinned to a git hash. For example: +# +# extra-deps: +# - acme-missiles-0.3 +# - git: https://github.com/commercialhaskell/stack.git +# commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a +# +# extra-deps: [] + +# Override default flag values for project packages and extra-deps +# flags: {} + +# Extra package databases containing global packages +# extra-package-dbs: [] + +# Control whether we use the GHC we find on the path +# system-ghc: true +# +# Require a specific version of Stack, using version ranges +# require-stack-version: -any # Default +# require-stack-version: ">=3.3" +# +# Override the architecture used by Stack, especially useful on Windows +# arch: i386 +# arch: x86_64 +# +# Extra directories used by Stack for building +# extra-include-dirs: [/path/to/dir] +# extra-lib-dirs: [/path/to/dir] +# +# Allow a newer minor version of GHC than the snapshot specifies +# compiler-check: newer-minor diff --git a/haskell/upg-haskell.sh b/haskell/upg-haskell.sh new file mode 100755 index 00000000..d453cdce --- /dev/null +++ b/haskell/upg-haskell.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -e + +cd haskell + +# Update stack resolver and dependencies +stack update +stack upgrade +stack build --only-dependencies + +cd .. + +./haskell/ver-haskell.sh diff --git a/haskell/ver-haskell.sh b/haskell/ver-haskell.sh new file mode 100755 index 00000000..7ba223d1 --- /dev/null +++ b/haskell/ver-haskell.sh @@ -0,0 +1,20 @@ +#!/bin/bash +set -e + +cd haskell + +# Get dataframe version from stack +DF_VERSION=$(stack exec -- ghc-pkg field dataframe version 2>/dev/null | awk '{print $2}' || echo "0.3.3") + +# Write version to VERSION file +echo "${DF_VERSION}" > VERSION + +# Get git revision of dataframe if available +GIT_REV=$(stack path --local-install-root 2>/dev/null && git -C $(stack path --local-install-root 2>/dev/null || echo ".") rev-parse --short HEAD 2>/dev/null || echo "") +if [ -n "$GIT_REV" ]; then + echo "$GIT_REV" > REVISION +else + echo "dataframe-${DF_VERSION}" > REVISION +fi + +cd .. diff --git a/run.conf b/run.conf index 0aa2cf69..641a49f5 100644 --- a/run.conf +++ b/run.conf @@ -1,7 +1,7 @@ # task, used in init-setup-iteration.R export RUN_TASKS="groupby join" # solution, used in init-setup-iteration.R -export RUN_SOLUTIONS="collapse data.table juliads juliadf dplyr pandas pydatatable spark dask clickhouse polars R-arrow duckdb duckdb-latest datafusion chdb" +export RUN_SOLUTIONS="collapse data.table juliads juliadf dplyr pandas pydatatable spark dask clickhouse polars R-arrow duckdb duckdb-latest datafusion chdb haskell" # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=false diff --git a/run.sh b/run.sh index f17a5644..c2408bb2 100755 --- a/run.sh +++ b/run.sh @@ -92,6 +92,8 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" == "duckdb-latest" ]]; then ./du if [[ "$RUN_SOLUTIONS" == "duckdb-latest" ]]; then ./duckdb-latest/ver-duckdb-latest.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/upg-datafusion.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/ver-datafusion.sh; fi; +if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "haskell" ]]; then ./haskell/upg-haskell.sh; fi; +if [[ "$RUN_SOLUTIONS" =~ "haskell" ]]; then ./haskell/ver-haskell.sh; fi; # run if [[ -f ./stop ]]; then echo "# Benchmark run $BATCH has been interrupted after $(($(date +%s)-$BATCH))s due to 'stop' file" && rm -f ./stop && rm -f ./run.lock && exit; fi; From e20c882c699941eaf269c0c980557d3f57d72b2e Mon Sep 17 00:00:00 2001 From: Michael Chavinda Date: Wed, 19 Nov 2025 15:06:08 -0800 Subject: [PATCH 2/7] feat: Add flushed file writing. --- haskell/groupby-haskell.hs | 10 +++++++++- haskell/join-haskell.hs | 13 ++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/haskell/groupby-haskell.hs b/haskell/groupby-haskell.hs index 5be66398..ba267e4a 100755 --- a/haskell/groupby-haskell.hs +++ b/haskell/groupby-haskell.hs @@ -16,6 +16,7 @@ import System.Environment (getEnv, lookupEnv) import System.IO (hFlush, hPutStrLn, stderr, stdout) import System.Posix.Process (getProcessID) import System.Process (readProcess) +import System.IO import Text.Read @@ -734,7 +735,7 @@ writeLog task dataName inRows question outRows outCols solution version git fun fileExists <- doesFileExist csvFile if fileExists - then appendFile csvFile (logRow ++ "\n") + then forceAppend csvFile (logRow ++ "\n") else do let header = "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" @@ -763,3 +764,10 @@ timeIt action = do _ <- print result end <- getPOSIXTime return (result, realToFrac (end - start)) + +forceAppend :: FilePath -> String -> IO () +forceAppend path content = + withFile path AppendMode $ \h -> do + hSetBuffering h NoBuffering + hPutStr h content + hFlush h diff --git a/haskell/join-haskell.hs b/haskell/join-haskell.hs index 57036111..87b786bd 100644 --- a/haskell/join-haskell.hs +++ b/haskell/join-haskell.hs @@ -13,6 +13,10 @@ import System.Directory (doesFileExist) import System.Environment (getEnv, lookupEnv) import System.IO (hFlush, stdout) import System.Process (readProcess) +import System.Posix.Process +import System.IO +import Text.Read +import Data.Maybe -- Helper functions for logging writeLog :: @@ -75,7 +79,7 @@ writeLog task dataName inRows question outRows outCols solution version git fun fileExists <- doesFileExist csvFile if fileExists - then appendFile csvFile (logRow ++ "\n") + then forceAppend csvFile (logRow ++ "\n") else do let header = "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" @@ -535,3 +539,10 @@ main = do putStrLn $ "Question 5 completed: " ++ show outRows5_2 ++ " rows" putStrLn "Haskell dataframe join benchmark completed (5 questions implemented)!" + +forceAppend :: FilePath -> String -> IO () +forceAppend path content = + withFile path AppendMode $ \h -> do + hSetBuffering h NoBuffering + hPutStr h content + hFlush h From ac21b235b1c37a648f4235d188df3b27df80eb13 Mon Sep 17 00:00:00 2001 From: Michael Chavinda Date: Wed, 19 Nov 2025 22:36:30 -0800 Subject: [PATCH 3/7] chore: Reformat files to be more readable. --- haskell/groupby-haskell.hs | 945 +++++++++---------------------------- haskell/join-haskell.hs | 678 +++++++------------------- 2 files changed, 385 insertions(+), 1238 deletions(-) diff --git a/haskell/groupby-haskell.hs b/haskell/groupby-haskell.hs index ba267e4a..bda67e9f 100755 --- a/haskell/groupby-haskell.hs +++ b/haskell/groupby-haskell.hs @@ -1,770 +1,253 @@ {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeApplications #-} +{-# LANGUAGE ScopedTypeVariables #-} import Control.Exception (evaluate) +import Control.Monad (forM_, when) import Data.List (intercalate) -import Data.Maybe +import Data.Maybe (fromMaybe) +import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock.POSIX (getPOSIXTime) -import qualified Data.Vector as V import qualified Data.Vector.Unboxed as VU import qualified DataFrame as D import qualified DataFrame.Functions as F -import GHC.Stats +import GHC.Stats (getRTSStats, max_live_bytes, getRTSStatsEnabled) import System.Directory (doesFileExist) import System.Environment (getEnv, lookupEnv) -import System.IO (hFlush, hPutStrLn, stderr, stdout) +import System.IO (hFlush, hPutStrLn, stderr, stdout, withFile, IOMode(..), hSetBuffering, BufferMode(..), hPutStr) import System.Posix.Process (getProcessID) import System.Process (readProcess) -import System.IO -import Text.Read - +import Text.Read (readMaybe) + +-- | Configuration context for the benchmark. +data BenchConfig = BenchConfig + { cfgTask :: String + , cfgDataName :: String + , cfgMachineType :: String + , cfgSolution :: String + , cfgVer :: String + , cfgGit :: String + , cfgFun :: String + , cfgCache :: String + , cfgOnDisk :: String + , cfgInRows :: Int + } main :: IO () main = do + hSetBuffering stdout NoBuffering putStrLn "# groupby-haskell.hs" - hFlush stdout - - let ver = "0.3.3" - let git = "dataframe" - let task = "groupby" - let solution = "haskell" - let fun = "groupBy" - let cache = "TRUE" - let onDisk = "FALSE" - dataName <- getEnv "SRC_DATANAME" + -- 1. Setup Environment & Config + dataName <- getEnv "SRC_DATANAME" machineType <- getEnv "MACHINE_TYPE" let srcFile = "../data/" ++ dataName ++ ".csv" - putStrLn $ "loading dataset " ++ dataName - hFlush stdout - - -- Check if data has NAs + -- Check NA Flag let parts = T.splitOn "_" (T.pack dataName) let naFlag = if length parts > 3 then read (T.unpack $ parts !! 3) :: Int else 0 if naFlag > 0 - then do - hPutStrLn stderr "skip due to na_flag>0" - return () - else do - -- Load CSV data using dataframe - x <- D.readCsv srcFile - - let (inRows, _) = D.dimensions x - putStrLn $ show inRows - hFlush stdout - - putStrLn "grouping..." - hFlush stdout - - -- Question 1: sum v1 by id1 - let question1 = "sum v1 by id1" - (ans1, t1_1) <- timeIt $ do - let grouped = D.groupBy ["id1"] x - let result = D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] grouped - return result - m1_1 <- getMemoryUsage - let (outRows1, outCols1) = D.dimensions ans1 - (chk1, chkt1_1) <- timeIt $ do - let sumV1 = D.sum (F.col @Int "v1_sum") ans1 - print sumV1 - return sumV1 - writeLog - task - dataName - inRows - question1 - outRows1 - outCols1 - solution - ver - git - fun - 1 - t1_1 - m1_1 - cache - (makeChk [fromIntegral chk1]) - chkt1_1 - onDisk - machineType - - -- Run 2 - (ans1_2, t1_2) <- timeIt $ do - let grouped = D.groupBy ["id1"] x - let result = D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] grouped - return result - m1_2 <- getMemoryUsage - let (outRows1_2, outCols1_2) = D.dimensions ans1_2 - (chk1_2, chkt1_2) <- timeIt $ do - let sumV1 = D.sum (F.col @Int "v1_sum") ans1_2 - print sumV1 - return sumV1 - writeLog - task - dataName - inRows - question1 - outRows1_2 - outCols1_2 - solution - ver - git - fun - 2 - t1_2 - m1_2 - cache - (makeChk [fromIntegral chk1_2]) - chkt1_2 - onDisk - machineType - putStrLn $ "Question 1 completed: " ++ show outRows1_2 ++ " groups" - - -- Question 2: sum v1 by id1:id2 - let question2 = "sum v1 by id1:id2" - (ans2, t2_1) <- timeIt $ do - let grouped = D.groupBy ["id1", "id2"] x - let result = D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] grouped - return result - m2_1 <- getMemoryUsage - let (outRows2, outCols2) = D.dimensions ans2 - (chk2, chkt2_1) <- timeIt $ do - let sumV1 = D.sum (F.col @Int "v1_sum") ans2 - print sumV1 - return sumV1 - writeLog - task - dataName - inRows - question2 - outRows2 - outCols2 - solution - ver - git - fun - 1 - t2_1 - m2_1 - cache - (makeChk [fromIntegral chk2]) - chkt2_1 - onDisk - machineType - - -- Run 2 - (ans2_2, t2_2) <- timeIt $ do - let grouped = D.groupBy ["id1", "id2"] x - let result = D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] grouped - return result - m2_2 <- getMemoryUsage - let (outRows2_2, outCols2_2) = D.dimensions ans2_2 - (chk2_2, chkt2_2) <- timeIt $ do - let sumV1 = D.sum (F.col @Int "v1_sum") ans2_2 - print sumV1 - return sumV1 - writeLog - task - dataName - inRows - question2 - outRows2_2 - outCols2_2 - solution - ver - git - fun - 2 - t2_2 - m2_2 - cache - (makeChk [fromIntegral chk2_2]) - chkt2_2 - onDisk - machineType - putStrLn $ "Question 2 completed: " ++ show outRows2_2 ++ " groups" - - -- Question 3: sum v1 mean v3 by id3 - let question3 = "sum v1 mean v3 by id3" - (ans3, t3_1) <- timeIt $ do - let grouped = D.groupBy ["id3"] x - let result = - D.aggregate - [ F.sum (F.col @Int "v1") `F.as` "v1_sum" - , F.mean (F.col @Double "v3") `F.as` "v3_mean" - ] - grouped - return result - m3_1 <- getMemoryUsage - let (outRows3, outCols3) = D.dimensions ans3 - (chk3, chkt3_1) <- timeIt $ do - let sumV1 = D.sum (F.col @Int "v1_sum") ans3 - let sumV3 = D.sum (F.col @Double "v3_mean") ans3 - print (sumV1, sumV3) - return (sumV1, sumV3) - writeLog - task - dataName - inRows - question3 - outRows3 - outCols3 - solution - ver - git - fun - 1 - t3_1 - m3_1 - cache - (makeChk [fromIntegral (fst chk3), snd chk3]) - chkt3_1 - onDisk - machineType - - -- Run 2 - (ans3_2, t3_2) <- timeIt $ do - let grouped = D.groupBy ["id3"] x - let result = - D.aggregate - [ F.sum (F.col @Int "v1") `F.as` "v1_sum" - , F.mean (F.col @Double "v3") `F.as` "v3_mean" - ] - grouped - return result - m3_2 <- getMemoryUsage - let (outRows3_2, outCols3_2) = D.dimensions ans3_2 - (chk3_2, chkt3_2) <- timeIt $ do - let sumV1 = D.sum (F.col @Int "v1_sum") ans3_2 - let sumV3 = D.sum (F.col @Double "v3_mean") ans3_2 - print (sumV1, sumV3) - return (sumV1, sumV3) - writeLog - task - dataName - inRows - question3 - outRows3_2 - outCols3_2 - solution - ver - git - fun - 2 - t3_2 - m3_2 - cache - (makeChk [fromIntegral (fst chk3_2), snd chk3_2]) - chkt3_2 - onDisk - machineType - putStrLn $ "Question 3 completed: " ++ show outRows3_2 ++ " groups" - - -- Question 4: mean v1:v3 by id4 - let question4 = "mean v1:v3 by id4" - (ans4, t4_1) <- timeIt $ do - let grouped = D.groupBy ["id4"] x - let result = - D.aggregate - [ F.mean (F.col @Int "v1") `F.as` "v1_mean" - , F.mean (F.col @Int "v2") `F.as` "v2_mean" - , F.mean (F.col @Double "v3") `F.as` "v3_mean" - ] - grouped - return result - m4_1 <- getMemoryUsage - let (outRows4, outCols4) = D.dimensions ans4 - (chk4, chkt4_1) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1_mean" ans4 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2_mean" ans4 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV3 = case D.columnAsDoubleVector "v3_mean" ans4 of - Right vec -> VU.sum vec - Left _ -> 0 - print (sumV1, sumV2, sumV3) - return (sumV1, sumV2, sumV3) - writeLog - task - dataName - inRows - question4 - outRows4 - outCols4 - solution - ver - git - fun - 1 - t4_1 - m4_1 - cache - (makeChk [(\(a, _, _) -> a) chk4, (\(_, b, _) -> b) chk4, (\(_, _, c) -> c) chk4]) - chkt4_1 - onDisk - machineType - - -- Run 2 - (ans4_2, t4_2) <- timeIt $ do - let grouped = D.groupBy ["id4"] x - let result = - D.aggregate - [ F.mean (F.col @Int "v1") `F.as` "v1_mean" - , F.mean (F.col @Int "v2") `F.as` "v2_mean" - , F.mean (F.col @Double "v3") `F.as` "v3_mean" - ] - grouped - return result - m4_2 <- getMemoryUsage - let (outRows4_2, outCols4_2) = D.dimensions ans4_2 - (chk4_2, chkt4_2) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1_mean" ans4_2 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2_mean" ans4_2 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV3 = case D.columnAsDoubleVector "v3_mean" ans4_2 of - Right vec -> VU.sum vec - Left _ -> 0 - print (sumV1, sumV2, sumV3) - return (sumV1, sumV2, sumV3) - writeLog - task - dataName - inRows - question4 - outRows4_2 - outCols4_2 - solution - ver - git - fun - 2 - t4_2 - m4_2 - cache - ( makeChk - [(\(a, _, _) -> a) chk4_2, (\(_, b, _) -> b) chk4_2, (\(_, _, c) -> c) chk4_2] - ) - chkt4_2 - onDisk - machineType - putStrLn $ "Question 4 completed: " ++ show outRows4_2 ++ " groups" - - -- Question 6: sum v1:v3 by id6 - let question5 = "sum v1:v3 by id6" - (ans5, t5_1) <- timeIt $ do - let grouped = D.groupBy ["id6"] x - let result = - D.aggregate - [ F.sum (F.col @Int "v1") `F.as` "v1_sum" - , F.sum (F.col @Int "v2") `F.as` "v2_sum" - , F.sum (F.col @Double "v3") `F.as` "v3_sum" - ] - grouped - return result - m5_1 <- getMemoryUsage - let (outRows5, outCols5) = D.dimensions ans5 - (chk5, chkt5_1) <- timeIt $ do - let sumV1 = D.sum (F.col @Int "v1_sum") ans5 - let sumV2 = D.sum (F.col @Int "v2_sum") ans5 - let sumV3 = D.sum (F.col @Double "v3_sum") ans5 - print (sumV1, sumV2, sumV3) - return (sumV1, sumV2, sumV3) - writeLog - task - dataName - inRows - question5 - outRows5 - outCols5 - solution - ver - git - fun - 1 - t5_1 - m5_1 - cache - ( makeChk - [ (\(a, _, _) -> fromIntegral a) chk5 - , (\(_, b, _) -> fromIntegral b) chk5 - , (\(_, _, c) -> c) chk5 - ] - ) - chkt5_1 - onDisk - machineType - - -- Run 2 - (ans5_2, t5_2) <- timeIt $ do - let grouped = D.groupBy ["id6"] x - let result = - D.aggregate - [ F.sum (F.col @Int "v1") `F.as` "v1_sum" - , F.sum (F.col @Int "v2") `F.as` "v2_sum" - , F.sum (F.col @Double "v3") `F.as` "v3_sum" - ] - grouped - return result - m5_2 <- getMemoryUsage - let (outRows5_2, outCols5_2) = D.dimensions ans5_2 - (chk5_2, chkt5_2) <- timeIt $ do - let sumV1 = D.sum (F.col @Int "v1_sum") ans5_2 - let sumV2 = D.sum (F.col @Int "v2_sum") ans5_2 - let sumV3 = D.sum (F.col @Double "v3_sum") ans5_2 - print (sumV1, sumV2, sumV3) - return (sumV1, sumV2, sumV3) - writeLog - task - dataName - inRows - question5 - outRows5_2 - outCols5_2 - solution - ver - git - fun - 2 - t5_2 - m5_2 - cache - ( makeChk - [ (\(a, _, _) -> fromIntegral a) chk5_2 - , (\(_, b, _) -> fromIntegral b) chk5_2 - , (\(_, _, c) -> c) chk5_2 - ] - ) - chkt5_2 - onDisk - machineType - putStrLn $ "Question 5 completed: " ++ show outRows5_2 ++ " groups" - - -- Question 6: median v3 sd v3 by id4 id5 - let question6 = "median v3 sd v3 by id4 id5" - (ans6, t6_1) <- timeIt $ do - let grouped = D.groupBy ["id4", "id5"] x - let result = - D.aggregate - [ F.median (F.col @Double "v3") `F.as` "v3_median" - , F.stddev (F.col @Double "v3") `F.as` "v3_sd" - ] - grouped - return result - m6_1 <- getMemoryUsage - let (outRows6, outCols6) = D.dimensions ans6 - (chk6, chkt6_1) <- timeIt $ do - let sumMedianV3 = D.sum (F.col @Double "v3_median") ans6 - let sumSdV3 = D.sum (F.col @Double "v3_sd") ans6 - print (sumMedianV3, sumSdV3) - return (sumMedianV3, sumSdV3) - writeLog - task - dataName - inRows - question6 - outRows6 - outCols6 - solution - ver - git - fun - 1 - t6_1 - m6_1 - cache - (makeChk [fst chk6, snd chk6]) - chkt6_1 - onDisk - machineType - - -- Run 2 - (ans6_2, t6_2) <- timeIt $ do - let grouped = D.groupBy ["id4", "id5"] x - let result = - D.aggregate - [ F.median (F.col @Double "v3") `F.as` "v3_median" - , F.stddev (F.col @Double "v3") `F.as` "v3_sd" - ] - grouped - return result - m6_2 <- getMemoryUsage - let (outRows6_2, outCols6_2) = D.dimensions ans6_2 - (chk6_2, chkt6_2) <- timeIt $ do - let sumMedianV3 = D.sum (F.col @Double "v3_median") ans6 - let sumSdV3 = D.sum (F.col @Double "v3_sd") ans6 - print (sumMedianV3, sumSdV3) - return (sumMedianV3, sumSdV3) - writeLog - task - dataName - inRows - question6 - outRows6_2 - outCols6_2 - solution - ver - git - fun - 2 - t6_2 - m6_2 - cache - (makeChk [fst chk6_2, snd chk6_2]) - chkt6_2 - onDisk - machineType - putStrLn $ "Question 6 completed: " ++ show outRows6_2 ++ " groups" - - -- "max v1 - min v2 by id3" - let question7 = "median v3 sd v3 by id4 id5" - (ans7, t7_1) <- timeIt $ do - let grouped = D.groupBy ["id3"] x - let result = - D.aggregate - [(F.maximum (F.col @Int "v1") - F.minimum (F.col @Int "v2")) `F.as` "diff"] - grouped - return result - m7_1 <- getMemoryUsage - let (outRows7, outCols7) = D.dimensions ans7 - (chk7, chkt7_1) <- timeIt $ do - let sumDiff = D.sum (F.col @Int "diff") ans7 - print sumDiff - return sumDiff - writeLog - task - dataName - inRows - question7 - outRows7 - outCols7 - solution - ver - git - fun - 1 - t7_1 - m7_1 - cache - (makeChk [fromIntegral chk7]) - chkt7_1 - onDisk - machineType - - -- Run 2 - (ans7_2, t7_2) <- timeIt $ do - let grouped = D.groupBy ["id3"] x - let result = - D.aggregate - [(F.maximum (F.col @Int "v1") - F.minimum (F.col @Int "v2")) `F.as` "diff"] - grouped - return result - m7_2 <- getMemoryUsage - let (outRows7_2, outCols7_2) = D.dimensions ans7_2 - (chk7_2, chkt7_2) <- timeIt $ do - let sumDiff = D.sum (F.col @Int "diff") ans7 - print sumDiff - return sumDiff - writeLog - task - dataName - inRows - question7 - outRows7_2 - outCols7_2 - solution - ver - git - fun - 2 - t7_2 - m7_2 - cache - (makeChk [fromIntegral chk7_2]) - chkt7_2 - onDisk - machineType - putStrLn $ "Question 7 completed: " ++ show outRows7_2 ++ " groups" - - -- "largest two v3 by id6" - putStrLn "largest two v3 by id6 unimplemented" - - -- "regression v1 v2 by id2 id4" - putStrLn "regression v1 v2 by id2 id4 unimplemented" + then hPutStrLn stderr "skip due to na_flag>0" + else runBenchmark srcFile dataName machineType - -- "sum v3 count by id1:id6" - let question10 = "sum v3 count by id1:id6" - (ans10, t10_1) <- timeIt $ do - let grouped = - D.groupBy (zipWith (\i n -> i <> (T.pack . show) n) (cycle ["id"]) [1 .. 6]) x - let result = - D.aggregate - [F.sum (F.col @Double "v3") `F.as` "v3_sum"] - grouped - return result - m10_1 <- getMemoryUsage - let (outRows10, outCols10) = D.dimensions ans10 - (chk10, chkt10_1) <- timeIt $ do - let sumV3 = D.sum (F.col @Double "v3_sum") ans10 - print sumV3 - return sumV3 - writeLog - task - dataName - inRows - question10 - outRows10 - outCols10 - solution - ver - git - fun - 1 - t10_1 - m10_1 - cache - (makeChk [chk10]) - chkt10_1 - onDisk - machineType - - -- Run 2 - (ans10_2, t10_2) <- timeIt $ do - let grouped = - D.groupBy (zipWith (\i n -> i <> (T.pack . show) n) (cycle ["id"]) [1 .. 6]) x - let result = - D.aggregate - [F.sum (F.col @Double "v3") `F.as` "v3_sum"] - grouped - return result - m10_2 <- getMemoryUsage - let (outRows10_2, outCols10_2) = D.dimensions ans10_2 - (chk10_2, chkt10_2) <- timeIt $ do - let sumDiff = D.sum (F.col @Double "v3_sum") ans10 - print sumDiff - return sumDiff - writeLog - task - dataName - inRows - question10 - outRows10_2 - outCols10_2 - solution - ver - git - fun - 2 - t10_2 - m10_2 - cache - (makeChk [chk10_2]) - chkt10_2 - onDisk - machineType - putStrLn $ "Question 10 completed: " ++ show outRows7_2 ++ " groups" - - putStrLn - "Haskell dataframe groupby benchmark completed (8 questions implemented)!" - --- Helper functions for logging -writeLog :: - String -> - String -> - Int -> - String -> - Int -> - Int -> - String -> - String -> - String -> - String -> - Int -> - Double -> - Double -> - String -> - String -> - Double -> - String -> - String -> - IO () -writeLog task dataName inRows question outRows outCols solution version git fun run timeSec memGb cache chk chkTimeSec onDisk machineType = do - batch <- lookupEnv "BATCH" >>= return . maybe "" id - timestamp <- getPOSIXTime - csvFile <- lookupEnv "CSV_TIME_FILE" >>= return . maybe "time.csv" id - nodename <- fmap init (readProcess "hostname" [] "") - - let comment = "" - let timeSecRound = roundTo 3 timeSec - let chkTimeSecRound = roundTo 3 chkTimeSec - let memGbRound = roundTo 3 memGb - - let logRow = - intercalate - "," - [ nodename - , batch - , show timestamp - , task - , dataName - , show inRows - , question - , show outRows - , show outCols - , solution - , version - , git - , fun - , show run - , show timeSecRound - , show memGbRound - , cache - , chk - , show chkTimeSecRound - , comment - , onDisk - , machineType - ] - - fileExists <- doesFileExist csvFile - if fileExists - then forceAppend csvFile (logRow ++ "\n") - else do - let header = - "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" - writeFile csvFile (header ++ logRow ++ "\n") - -roundTo :: Int -> Double -> Double -roundTo n x = (fromInteger $ round $ x * (10 ^ n)) / (10.0 ^^ n) - -makeChk :: [Double] -> String -makeChk values = intercalate ";" (map formatVal values) - where - formatVal x = map (\c -> if c == ',' then '_' else c) (show $ roundTo 3 x) +runBenchmark :: String -> String -> String -> IO () +runBenchmark srcFile dataName machineType = do + putStrLn $ "loading dataset " ++ dataName + + -- Load Data + df <- D.readCsv srcFile + let (inRows, _) = D.dimensions df + print inRows + + let config = BenchConfig + { cfgTask = "groupby" + , cfgDataName = dataName + , cfgMachineType = machineType + , cfgSolution = "haskell" + , cfgVer = "0.3.3" + , cfgGit = "dataframe" + , cfgFun = "groupBy" + , cfgCache = "TRUE" + , cfgOnDisk = "FALSE" + , cfgInRows = inRows + } + + putStrLn "grouping..." + + -- Q1: Sum v1 by id1 + runQuestion config df "sum v1 by id1" + (\d -> D.groupBy ["id1"] d) + (\g -> D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] g) + (\res -> [chkSumInt "v1_sum" res]) + + -- Q2: Sum v1 by id1:id2 + runQuestion config df "sum v1 by id1:id2" + (\d -> D.groupBy ["id1", "id2"] d) + (\g -> D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] g) + (\res -> [chkSumInt "v1_sum" res]) + + -- Q3: Sum v1, Mean v3 by id3 + runQuestion config df "sum v1 mean v3 by id3" + (\d -> D.groupBy ["id3"] d) + (\g -> D.aggregate + [ F.sum (F.col @Int "v1") `F.as` "v1_sum" + , F.mean (F.col @Double "v3") `F.as` "v3_mean" + ] g) + (\res -> [chkSumInt "v1_sum" res, chkSumDbl "v3_mean" res]) + + -- Q4: Mean v1, v2, v3 by id4 + runQuestion config df "mean v1:v3 by id4" + (\d -> D.groupBy ["id4"] d) + (\g -> D.aggregate + [ F.mean (F.col @Int "v1") `F.as` "v1_mean" + , F.mean (F.col @Int "v2") `F.as` "v2_mean" + , F.mean (F.col @Double "v3") `F.as` "v3_mean" + ] g) + (\res -> [chkSumDbl "v1_mean" res, chkSumDbl "v2_mean" res, chkSumDbl "v3_mean" res]) + + -- Q5 (Question 6 in original): Sum v1, v2, v3 by id6 + runQuestion config df "sum v1:v3 by id6" + (\d -> D.groupBy ["id6"] d) + (\g -> D.aggregate + [ F.sum (F.col @Int "v1") `F.as` "v1_sum" + , F.sum (F.col @Int "v2") `F.as` "v2_sum" + , F.sum (F.col @Double "v3") `F.as` "v3_sum" + ] g) + (\res -> [chkSumInt "v1_sum" res, chkSumInt "v2_sum" res, chkSumDbl "v3_sum" res]) + + -- Q6: median v3, sd v3 by id4, id5 + runQuestion config df "median v3 sd v3 by id4 id5" + (\d -> D.groupBy ["id4", "id5"] d) + (\g -> D.aggregate + [ F.median (F.col @Double "v3") `F.as` "v3_median" + , F.stddev (F.col @Double "v3") `F.as` "v3_sd" + ] g) + (\res -> [chkSumDbl "v3_median" res, chkSumDbl "v3_sd" res]) + + -- Q7: max v1 - min v2 by id3 + runQuestion config df "max v1 - min v2 by id3" + (\d -> D.groupBy ["id3"] d) + (\g -> D.aggregate + [ (F.maximum (F.col @Int "v1") - F.minimum (F.col @Int "v2")) `F.as` "diff" + ] g) + (\res -> [chkSumInt "diff" res]) + + -- Q10: sum v3 count by id1:id6 + runQuestion config df "sum v3 count by id1:id6" + (\d -> D.groupBy (zipWith (\i n -> i <> (T.pack . show) n) (cycle ["id"]) [1 .. 6]) d) + (\g -> D.aggregate [F.sum (F.col @Double "v3") `F.as` "v3_sum"] g) + (\res -> [chkSumDbl "v3_sum" res]) + + putStrLn "Haskell dataframe groupby benchmark completed!" + +runQuestion :: BenchConfig + -> D.DataFrame + -> String -- ^ Question label + -> (D.DataFrame -> D.GroupedDataFrame) -- ^ Grouping logic + -> (D.GroupedDataFrame -> D.DataFrame) -- ^ Aggregation logic + -> (D.DataFrame -> [Double]) -- ^ Checksum extractor + -> IO () +runQuestion cfg inputDF qLabel groupFn aggFn chkFn = do + forM_ [1, 2] $ \runNum -> do + + -- 1. Measure Calculation Time & Memory + (resultDF, calcTime) <- timeIt $ do + let grouped = groupFn inputDF + let aggregated = aggFn grouped + print aggregated + return aggregated + + memUsage <- getMemoryUsage + + let (outRows, outCols) = D.dimensions resultDF + + (chkValues, chkTime) <- timeIt $ do + let vals = chkFn resultDF + print vals + return vals + + writeLog cfg qLabel outRows outCols runNum calcTime memUsage chkValues chkTime + + putStrLn $ qLabel ++ " completed." + +chkSumInt :: String -> D.DataFrame -> Double +chkSumInt col df = + case D.columnAsIntVector (T.pack col) df of + Right vec -> fromIntegral $ VU.sum vec + Left _ -> 0.0 + +chkSumDbl :: String -> D.DataFrame -> Double +chkSumDbl col df = + case D.columnAsDoubleVector (T.pack col) df of + Right vec -> VU.sum vec + Left _ -> 0.0 getMemoryUsage :: IO Double getMemoryUsage = do - pid <- getProcessID - mem <- - fmap (filter (/= ' ') . init) (readProcess "ps" ["-o", "rss", show pid] "") - let rssKb = if null mem then 0 else fromMaybe 0 (readMaybe @Double mem) - return (rssKb / (1024 * 1024)) - -timeIt :: (Show a) => IO a -> IO (a, Double) + isStats <- getRTSStatsEnabled + if isStats then do + s <- getRTSStats + return $ fromIntegral (max_live_bytes s) / (1024 * 1024) + else do + pid <- getProcessID + raw <- readProcess "ps" ["-p", show pid, "-o", "rss="] "" + case readMaybe @Double (filter (/= '\n') raw) of + Just kb -> return (kb / 1024) + Nothing -> return 0.0 + +timeIt :: IO a -> IO (a, Double) timeIt action = do start <- getPOSIXTime result <- action - _ <- print result end <- getPOSIXTime return (result, realToFrac (end - start)) +writeLog :: BenchConfig -> String -> Int -> Int -> Int -> Double -> Double -> [Double] -> Double -> IO () +writeLog BenchConfig{..} question outRows outCols run timeSec memGb chkValues chkTime = do + batch <- lookupEnv "BATCH" >>= return . fromMaybe "" + timestamp <- getPOSIXTime + csvFile <- lookupEnv "CSV_TIME_FILE" >>= return . fromMaybe "time.csv" + nodename <- fmap init (readProcess "hostname" [] "") + + let formatNum x = show (roundTo 3 x) + let chkStr = intercalate ";" $ map (map (\c -> if c == ',' then '_' else c) . formatNum) chkValues + + let logRow = intercalate "," + [ nodename + , batch + , show timestamp + , cfgTask + , cfgDataName + , show cfgInRows + , question + , show outRows + , show outCols + , cfgSolution + , cfgVer + , cfgGit + , cfgFun + , show run + , formatNum timeSec + , formatNum memGb + , cfgCache + , chkStr + , formatNum chkTime + , "" -- comment + , cfgOnDisk + , cfgMachineType + ] + + exists <- doesFileExist csvFile + let header = "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" + + forceAppend csvFile $ (if not exists then header else "") ++ logRow ++ "\n" + +roundTo :: Int -> Double -> Double +roundTo n x = (fromInteger $ round $ x * (10 ^ n)) / (10.0 ^^ n) + forceAppend :: FilePath -> String -> IO () forceAppend path content = withFile path AppendMode $ \h -> do diff --git a/haskell/join-haskell.hs b/haskell/join-haskell.hs index 87b786bd..3fdaf702 100644 --- a/haskell/join-haskell.hs +++ b/haskell/join-haskell.hs @@ -1,105 +1,157 @@ {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeApplications #-} import Control.Exception (evaluate) +import Control.Monad (forM_) import Data.List (intercalate) +import Data.Maybe (fromMaybe) import qualified Data.Text as T import Data.Time.Clock.POSIX (getPOSIXTime) -import qualified Data.Vector as V import qualified Data.Vector.Unboxed as VU import qualified DataFrame as D import qualified DataFrame.Operations.Join as DJ +import GHC.Stats (getRTSStats, max_live_bytes, getRTSStatsEnabled) import System.Directory (doesFileExist) import System.Environment (getEnv, lookupEnv) -import System.IO (hFlush, stdout) +import System.IO (hFlush, hPutStrLn, hPutStr, stdout, withFile, IOMode(..), hSetBuffering, BufferMode(..)) import System.Process (readProcess) -import System.Posix.Process -import System.IO -import Text.Read -import Data.Maybe +import System.Posix.Process (getProcessID) +import Text.Read (readMaybe) + +-- | Configuration context +data BenchConfig = BenchConfig + { cfgTask :: String + , cfgDataName :: String + , cfgMachineType :: String + , cfgSolution :: String + , cfgVer :: String + , cfgGit :: String + , cfgFun :: String + , cfgCache :: String + , cfgOnDisk :: String + , cfgInRows :: Int + } --- Helper functions for logging -writeLog :: - String -> - String -> - Int -> - String -> - Int -> - Int -> - String -> - String -> - String -> - String -> - Int -> - Double -> - Double -> - String -> - String -> - Double -> - String -> - String -> - IO () -writeLog task dataName inRows question outRows outCols solution version git fun run timeSec memGb cache chk chkTimeSec onDisk machineType = do - batch <- lookupEnv "BATCH" >>= return . maybe "" id - timestamp <- getPOSIXTime - csvFile <- lookupEnv "CSV_TIME_FILE" >>= return . maybe "time.csv" id - nodename <- fmap init (readProcess "hostname" [] "") - - let comment = "" - let timeSecRound = roundTo 3 timeSec - let chkTimeSecRound = roundTo 3 chkTimeSec - let memGbRound = roundTo 3 memGb +main :: IO () +main = do + hSetBuffering stdout NoBuffering + putStrLn "# join-haskell.hs" - let logRow = - intercalate - "," - [ nodename - , batch - , show timestamp - , task - , dataName - , show inRows - , question - , show outRows - , show outCols - , solution - , version - , git - , fun - , show run - , show timeSecRound - , show memGbRound - , cache - , chk - , show chkTimeSecRound - , comment - , onDisk - , machineType - ] + dataName <- getEnv "SRC_DATANAME" + machineType <- getEnv "MACHINE_TYPE" + + let auxTableNames = determineAuxTables dataName + let srcMain = "../data/" ++ dataName ++ ".csv" + let srcAux = map (\n -> "../data/" ++ n ++ ".csv") auxTableNames + + putStrLn $ "loading datasets: " ++ dataName ++ ", " ++ intercalate ", " auxTableNames + + dfX <- D.readCsv srcMain + dfSmall <- D.readCsv (srcAux !! 0) + dfMedium <- D.readCsv (srcAux !! 1) + dfBig <- D.readCsv (srcAux !! 2) + + let (rowsX, _) = D.dimensions dfX + print (rowsX, fst (D.dimensions dfSmall), fst (D.dimensions dfMedium), fst (D.dimensions dfBig)) + + let config = BenchConfig + { cfgTask = "join" + , cfgDataName = dataName + , cfgMachineType = machineType + , cfgSolution = "haskell" + , cfgVer = "0.3.3" + , cfgGit = "dataframe" + , cfgFun = "innerJoin" + , cfgCache = "TRUE" + , cfgOnDisk = "FALSE" + , cfgInRows = rowsX + } - fileExists <- doesFileExist csvFile - if fileExists - then forceAppend csvFile (logRow ++ "\n") - else do - let header = - "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" - writeFile csvFile (header ++ logRow ++ "\n") + putStrLn "joining..." -roundTo :: Int -> Double -> Double -roundTo n x = (fromInteger $ round $ x * (10 ^ n)) / (10.0 ^^ n) + -- Q1: small inner on int + runJoin config dfX dfSmall "small inner on int" + (\l r -> DJ.innerJoin ["id1"] l r) + + -- Q2: medium inner on int + runJoin config dfX dfMedium "medium inner on int" + (\l r -> DJ.innerJoin ["id1"] l r) + + -- Q3: medium outer on int + -- Note: We update the function name in the config for logging accuracy + runJoin config{cfgFun="leftJoin"} dfX dfMedium "medium outer on int" + (\l r -> DJ.leftJoin ["id1"] l r) + + -- Q4: medium inner on factor (id4) + runJoin config dfX dfMedium "medium inner on factor" + (\l r -> DJ.innerJoin ["id4"] l r) + + -- Q5: big inner on int + runJoin config dfX dfBig "big inner on int" + (\l r -> DJ.innerJoin ["id1"] l r) + + putStrLn "Haskell dataframe join benchmark completed!" + +runJoin :: BenchConfig + -> D.DataFrame -- ^ Left Table + -> D.DataFrame -- ^ Right Table + -> String -- ^ Question Label + -> (D.DataFrame -> D.DataFrame -> D.DataFrame) -- ^ Join Function + -> IO () +runJoin cfg leftDF rightDF qLabel joinFn = do + forM_ [1, 2] $ \runNum -> do + + (resultDF, calcTime) <- timeIt $ do + let res = joinFn leftDF rightDF + _ <- evaluate res + return res + + memUsage <- getMemoryUsage + let (outRows, outCols) = D.dimensions resultDF + + (chkValues, chkTime) <- timeIt $ do + let sumV1 = sumCol "v1" resultDF + let sumV2 = sumCol "v2" resultDF + let res = (sumV1, sumV2) + _ <- evaluate res + return [sumV1, sumV2] + + writeLog cfg qLabel outRows outCols runNum calcTime memUsage chkValues chkTime + + putStrLn $ qLabel ++ " completed." + +sumCol :: String -> D.DataFrame -> Double +sumCol name df = + case D.columnAsDoubleVector (T.pack name) df of + Right vec -> VU.sum vec + Left _ -> 0.0 + +determineAuxTables :: String -> [String] +determineAuxTables dataName = + let parts = T.splitOn "_" (T.pack dataName) + xnStr = if length parts > 1 then T.unpack (parts !! 1) else "1e7" + xn = read xnStr :: Double -makeChk :: [Double] -> String -makeChk values = intercalate ";" (map formatVal values) - where - formatVal x = map (\c -> if c == ',' then '_' else c) (show $ roundTo 3 x) + yn1 = show (floor (xn / 1e6) :: Int) ++ "e4" + yn2 = show (floor (xn / 1e3) :: Int) ++ "e3" + yn3 = show (floor xn :: Int) + + replaceNA s = T.unpack $ T.replace "NA" (T.pack s) (T.pack dataName) + in [ replaceNA yn1, replaceNA yn2, replaceNA yn3 ] getMemoryUsage :: IO Double getMemoryUsage = do - pid <- getProcessID - mem <- - fmap (filter (/= ' ') . init) (readProcess "ps" ["-o", "rss", show pid] "") - let rssKb = if null mem then 0 else fromMaybe 0 (readMaybe @Double mem) - return (rssKb / (1024 * 1024)) + isStats <- getRTSStatsEnabled + if isStats then do + s <- getRTSStats + return $ fromIntegral (max_live_bytes s) / (1024 * 1024) + else do + pid <- getProcessID + raw <- readProcess "ps" ["-p", show pid, "-o", "rss="] "" + case readMaybe @Double (filter (/= '\n') raw) of + Just kb -> return (kb / 1024) + Nothing -> return 0.0 timeIt :: IO a -> IO (a, Double) timeIt action = do @@ -109,436 +161,48 @@ timeIt action = do end <- getPOSIXTime return (result, realToFrac (end - start)) --- Parse join_to_tbls logic to get table names -joinToTbls :: String -> [String] -joinToTbls dataName = - let parts = T.splitOn "_" (T.pack dataName) - xnStr = if length parts > 1 then T.unpack (parts !! 1) else "1e7" - xn = read xnStr :: Double - yn1 = show (floor (xn / 1e6) :: Int) ++ "e4" - yn2 = show (floor (xn / 1e3) :: Int) ++ "e3" - yn3 = show (floor xn :: Int) - in [ T.unpack $ T.replace "NA" (T.pack yn1) (T.pack dataName) - , T.unpack $ T.replace "NA" (T.pack yn2) (T.pack dataName) - , T.unpack $ T.replace "NA" (T.pack yn3) (T.pack dataName) - ] - -main :: IO () -main = do - putStrLn "# join-haskell.hs" - hFlush stdout - - let ver = "0.3.3" - let git = "dataframe" - let task = "join" - let solution = "haskell" - let fun = "innerJoin" - let cache = "TRUE" - let onDisk = "FALSE" - - dataName <- getEnv "SRC_DATANAME" - machineType <- getEnv "MACHINE_TYPE" +writeLog :: BenchConfig -> String -> Int -> Int -> Int -> Double -> Double -> [Double] -> Double -> IO () +writeLog BenchConfig{..} question outRows outCols run timeSec memGb chkValues chkTime = do + batch <- lookupEnv "BATCH" >>= return . fromMaybe "" + timestamp <- getPOSIXTime + csvFile <- lookupEnv "CSV_TIME_FILE" >>= return . fromMaybe "time.csv" + nodename <- fmap init (readProcess "hostname" [] "") - let yDataNames = joinToTbls dataName - let srcJnX = "../data/" ++ dataName ++ ".csv" - let srcJnY = - [ "../data/" ++ yDataNames !! 0 ++ ".csv" - , "../data/" ++ yDataNames !! 1 ++ ".csv" - , "../data/" ++ yDataNames !! 2 ++ ".csv" + let formatNum x = show (roundTo 3 x) + let chkStr = intercalate ";" $ map (map (\c -> if c == ',' then '_' else c) . formatNum) chkValues + + let logRow = intercalate "," + [ nodename + , batch + , show timestamp + , cfgTask + , cfgDataName + , show cfgInRows + , question + , show outRows + , show outCols + , cfgSolution + , cfgVer + , cfgGit + , cfgFun + , show run + , formatNum timeSec + , formatNum memGb + , cfgCache + , chkStr + , formatNum chkTime + , "" -- comment + , cfgOnDisk + , cfgMachineType ] - putStrLn $ - "loading datasets " - ++ dataName - ++ ", " - ++ yDataNames !! 0 - ++ ", " - ++ yDataNames !! 1 - ++ ", " - ++ yDataNames !! 2 - hFlush stdout - - -- Load all datasets using dataframe - x <- D.readCsv srcJnX - small <- D.readCsv (srcJnY !! 0) - medium <- D.readCsv (srcJnY !! 1) - big <- D.readCsv (srcJnY !! 2) - - let (xRows, _) = D.dimensions x - let (smallRows, _) = D.dimensions small - let (mediumRows, _) = D.dimensions medium - let (bigRows, _) = D.dimensions big - - putStrLn $ show xRows - putStrLn $ show smallRows - putStrLn $ show mediumRows - putStrLn $ show bigRows - hFlush stdout - - putStrLn "joining..." - hFlush stdout - - -- Question 1: small inner on int - let question1 = "small inner on int" - (ans1, t1_1) <- timeIt $ do - let result = DJ.innerJoin ["id1"] x small - return result - m1_1 <- getMemoryUsage - let (outRows1, outCols1) = D.dimensions ans1 - (chk1, chkt1_1) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans1 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans1 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question1 - outRows1 - outCols1 - solution - ver - git - fun - 1 - t1_1 - m1_1 - cache - (makeChk [fst chk1, snd chk1]) - chkt1_1 - onDisk - machineType - - -- Run 2 - (ans1_2, t1_2) <- timeIt $ do - let result = DJ.innerJoin ["id1"] x small - return result - m1_2 <- getMemoryUsage - let (outRows1_2, outCols1_2) = D.dimensions ans1_2 - (chk1_2, chkt1_2) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans1_2 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans1_2 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question1 - outRows1_2 - outCols1_2 - solution - ver - git - fun - 2 - t1_2 - m1_2 - cache - (makeChk [fst chk1_2, snd chk1_2]) - chkt1_2 - onDisk - machineType - putStrLn $ "Question 1 completed: " ++ show outRows1_2 ++ " rows" - - -- Question 2: medium inner on int - let question2 = "medium inner on int" - (ans2, t2_1) <- timeIt $ do - let result = DJ.innerJoin ["id1"] x medium - return result - m2_1 <- getMemoryUsage - let (outRows2, outCols2) = D.dimensions ans2 - (chk2, chkt2_1) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans2 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans2 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question2 - outRows2 - outCols2 - solution - ver - git - fun - 1 - t2_1 - m2_1 - cache - (makeChk [fst chk2, snd chk2]) - chkt2_1 - onDisk - machineType - - -- Run 2 - (ans2_2, t2_2) <- timeIt $ do - let result = DJ.innerJoin ["id1"] x medium - return result - m2_2 <- getMemoryUsage - let (outRows2_2, outCols2_2) = D.dimensions ans2_2 - (chk2_2, chkt2_2) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans2_2 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans2_2 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question2 - outRows2_2 - outCols2_2 - solution - ver - git - fun - 2 - t2_2 - m2_2 - cache - (makeChk [fst chk2_2, snd chk2_2]) - chkt2_2 - onDisk - machineType - putStrLn $ "Question 2 completed: " ++ show outRows2_2 ++ " rows" - - -- Question 3: medium outer on int - let question3 = "medium outer on int" - (ans3, t3_1) <- timeIt $ do - let result = DJ.leftJoin ["id1"] x medium - return result - m3_1 <- getMemoryUsage - let (outRows3, outCols3) = D.dimensions ans3 - (chk3, chkt3_1) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans3 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans3 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question3 - outRows3 - outCols3 - solution - ver - git - fun - 1 - t3_1 - m3_1 - cache - (makeChk [fst chk3, snd chk3]) - chkt3_1 - onDisk - machineType - - -- Run 2 - (ans3_2, t3_2) <- timeIt $ do - let result = DJ.leftJoin ["id1"] x medium - return result - m3_2 <- getMemoryUsage - let (outRows3_2, outCols3_2) = D.dimensions ans3_2 - (chk3_2, chkt3_2) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans3_2 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans3_2 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question3 - outRows3_2 - outCols3_2 - solution - ver - git - fun - 2 - t3_2 - m3_2 - cache - (makeChk [fst chk3_2, snd chk3_2]) - chkt3_2 - onDisk - machineType - putStrLn $ "Question 3 completed: " ++ show outRows3_2 ++ " rows" + exists <- doesFileExist csvFile + let header = "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" + + forceAppend csvFile $ (if not exists then header else "") ++ logRow ++ "\n" - -- Question 4: medium inner on factor - let question4 = "medium inner on factor" - (ans4, t4_1) <- timeIt $ do - let result = DJ.innerJoin ["id4"] x medium - return result - m4_1 <- getMemoryUsage - let (outRows4, outCols4) = D.dimensions ans4 - (chk4, chkt4_1) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans4 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans4 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question4 - outRows4 - outCols4 - solution - ver - git - fun - 1 - t4_1 - m4_1 - cache - (makeChk [fst chk4, snd chk4]) - chkt4_1 - onDisk - machineType - - -- Run 2 - (ans4_2, t4_2) <- timeIt $ do - let result = DJ.innerJoin ["id4"] x medium - return result - m4_2 <- getMemoryUsage - let (outRows4_2, outCols4_2) = D.dimensions ans4_2 - (chk4_2, chkt4_2) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans4_2 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans4_2 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question4 - outRows4_2 - outCols4_2 - solution - ver - git - fun - 2 - t4_2 - m4_2 - cache - (makeChk [fst chk4_2, snd chk4_2]) - chkt4_2 - onDisk - machineType - putStrLn $ "Question 4 completed: " ++ show outRows4_2 ++ " rows" - - -- Question 5: big inner on int - let question5 = "big inner on int" - (ans5, t5_1) <- timeIt $ do - let result = DJ.innerJoin ["id1"] x big - return result - m5_1 <- getMemoryUsage - let (outRows5, outCols5) = D.dimensions ans5 - (chk5, chkt5_1) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans5 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans5 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question5 - outRows5 - outCols5 - solution - ver - git - fun - 1 - t5_1 - m5_1 - cache - (makeChk [fst chk5, snd chk5]) - chkt5_1 - onDisk - machineType - - -- Run 2 - (ans5_2, t5_2) <- timeIt $ do - let result = DJ.innerJoin ["id1"] x big - return result - m5_2 <- getMemoryUsage - let (outRows5_2, outCols5_2) = D.dimensions ans5_2 - (chk5_2, chkt5_2) <- timeIt $ do - let sumV1 = case D.columnAsDoubleVector "v1" ans5_2 of - Right vec -> VU.sum vec - Left _ -> 0 - let sumV2 = case D.columnAsDoubleVector "v2" ans5_2 of - Right vec -> VU.sum vec - Left _ -> 0 - evaluate (sumV1, sumV2) - return (sumV1, sumV2) - writeLog - task - dataName - xRows - question5 - outRows5_2 - outCols5_2 - solution - ver - git - fun - 2 - t5_2 - m5_2 - cache - (makeChk [fst chk5_2, snd chk5_2]) - chkt5_2 - onDisk - machineType - putStrLn $ "Question 5 completed: " ++ show outRows5_2 ++ " rows" - - putStrLn "Haskell dataframe join benchmark completed (5 questions implemented)!" +roundTo :: Int -> Double -> Double +roundTo n x = (fromInteger $ round $ x * (10 ^ n)) / (10.0 ^^ n) forceAppend :: FilePath -> String -> IO () forceAppend path content = From efaf2bad9b5df00fac47fe2bc13f4f759bd01414 Mon Sep 17 00:00:00 2001 From: Michael Chavinda Date: Wed, 19 Nov 2025 22:39:16 -0800 Subject: [PATCH 4/7] chore: Remove stray comments --- haskell/groupby-haskell.hs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/haskell/groupby-haskell.hs b/haskell/groupby-haskell.hs index bda67e9f..93013fce 100755 --- a/haskell/groupby-haskell.hs +++ b/haskell/groupby-haskell.hs @@ -40,7 +40,6 @@ main = do hSetBuffering stdout NoBuffering putStrLn "# groupby-haskell.hs" - -- 1. Setup Environment & Config dataName <- getEnv "SRC_DATANAME" machineType <- getEnv "MACHINE_TYPE" let srcFile = "../data/" ++ dataName ++ ".csv" @@ -56,8 +55,7 @@ main = do runBenchmark :: String -> String -> String -> IO () runBenchmark srcFile dataName machineType = do putStrLn $ "loading dataset " ++ dataName - - -- Load Data + df <- D.readCsv srcFile let (inRows, _) = D.dimensions df print inRows @@ -152,8 +150,7 @@ runQuestion :: BenchConfig -> IO () runQuestion cfg inputDF qLabel groupFn aggFn chkFn = do forM_ [1, 2] $ \runNum -> do - - -- 1. Measure Calculation Time & Memory + (resultDF, calcTime) <- timeIt $ do let grouped = groupFn inputDF let aggregated = aggFn grouped From 96f1bb986ced3df7a1903d804a5e2f2f7d65081f Mon Sep 17 00:00:00 2001 From: mchav Date: Thu, 20 Nov 2025 09:59:16 +0000 Subject: [PATCH 5/7] fix: Force evaluation of the dataframe before running script. --- haskell/exec.sh | 2 +- haskell/groupby-haskell.hs | 3 ++- haskell/setup-haskell.sh | 19 +++++++------------ 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/haskell/exec.sh b/haskell/exec.sh index 65e04cc8..4c8fb635 100755 --- a/haskell/exec.sh +++ b/haskell/exec.sh @@ -3,4 +3,4 @@ set -e cd ./haskell -stack run "$1-haskell" +cabal run -O2 "$1-haskell" diff --git a/haskell/groupby-haskell.hs b/haskell/groupby-haskell.hs index 93013fce..10828437 100755 --- a/haskell/groupby-haskell.hs +++ b/haskell/groupby-haskell.hs @@ -2,8 +2,8 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE Strict #-} -import Control.Exception (evaluate) import Control.Monad (forM_, when) import Data.List (intercalate) import Data.Maybe (fromMaybe) @@ -59,6 +59,7 @@ runBenchmark srcFile dataName machineType = do df <- D.readCsv srcFile let (inRows, _) = D.dimensions df print inRows + print df let config = BenchConfig { cfgTask = "groupby" diff --git a/haskell/setup-haskell.sh b/haskell/setup-haskell.sh index be35eaf7..09f5d8c0 100755 --- a/haskell/setup-haskell.sh +++ b/haskell/setup-haskell.sh @@ -1,23 +1,18 @@ #!/bin/bash set -e -# Install Stack (Haskell build tool) if not present -if ! command -v stack &> /dev/null; then - echo "Installing Stack..." - curl -sSL https://get.haskellstack.org/ | sh +# Install Cabal (Haskell build tool) if not present +if ! command -v ghcup &> /dev/null; then + echo "Installing Cabal..." + curl --proto '=https' --tlsv1.2 -sSf https://get-ghcup.haskell.org | BOOTSTRAP_HASKELL_NONINTERACTIVE=1 BOOTSTRAP_HASKELL_MINIMAL=1 sh + ghcup install cabal fi cd haskell -# Initialize stack project if not already done -if [ ! -f "stack.yaml" ]; then - stack init --force -fi - # Install dependencies and build -stack setup -stack build --only-dependencies --ghc-options "-O2" -stack build --ghc-options "-O2" +cabal update +cabal build -O2 cd .. From 5447f71db6e813bd7f933b7aa4a39c16db6656ec Mon Sep 17 00:00:00 2001 From: mchav Date: Thu, 20 Nov 2025 10:32:42 +0000 Subject: [PATCH 6/7] fix: Remove strict since it means we do more unnecessary work. --- haskell/groupby-haskell.hs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/haskell/groupby-haskell.hs b/haskell/groupby-haskell.hs index 10828437..13947686 100755 --- a/haskell/groupby-haskell.hs +++ b/haskell/groupby-haskell.hs @@ -2,7 +2,6 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE Strict #-} import Control.Monad (forM_, when) import Data.List (intercalate) @@ -66,8 +65,8 @@ runBenchmark srcFile dataName machineType = do , cfgDataName = dataName , cfgMachineType = machineType , cfgSolution = "haskell" - , cfgVer = "0.3.3" - , cfgGit = "dataframe" + , cfgVer = "0.3.3.9" + , cfgGit = "NA" , cfgFun = "groupBy" , cfgCache = "TRUE" , cfgOnDisk = "FALSE" From 808a4c8388522c4d1641d04f2b5ec96d59fec600 Mon Sep 17 00:00:00 2001 From: mchav Date: Thu, 20 Nov 2025 10:44:44 +0000 Subject: [PATCH 7/7] Add results for G1_1e7_1e2_0_0 groupby --- time.csv | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/time.csv b/time.csv index 284fcfcd..900c3170 100644 --- a/time.csv +++ b/time.csv @@ -36966,3 +36966,19 @@ ip-172-31-18-189,1761209486,1761212936,groupby,G1_1e9_1e2_5_0,1000000000,regress ip-172-31-18-189,1761209486,1761212940,groupby,G1_1e9_1e2_5_0,1000000000,regression v1 v2 by id2 id4,9216,3,duckdb,1.4.1,b390a7c376,group_by,2,3.935,NA,TRUE,0.09857194,0.001,NA,TRUE,c6id.4xlarge ip-172-31-18-189,1761209486,1761213076,groupby,G1_1e9_1e2_5_0,1000000000,sum v3 count by id1:id6,999939563,8,duckdb,1.4.1,b390a7c376,group_by,1,123.828,NA,TRUE,47498842806;1000000000,12.969,NA,TRUE,c6id.4xlarge ip-172-31-18-189,1761209486,1761213213,groupby,G1_1e9_1e2_5_0,1000000000,sum v3 count by id1:id6,999939563,8,duckdb,1.4.1,b390a7c376,group_by,2,121.902,NA,TRUE,47498842806;1000000000,13.061,NA,TRUE,c6id.4xlarge +ip-172-31-38-245,,1763634448.084065271s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 by id1,100,2,haskell,0.3.3.9,NA,groupBy,1,4.724,17786.52,TRUE,2.9998789e7,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634448.090627419s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 by id1,100,2,haskell,0.3.3.9,NA,groupBy,2,1.0e-3,17786.793,TRUE,2.9998789e7,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634454.606981034s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 by id1:id2,10000,3,haskell,0.3.3.9,NA,groupBy,1,6.51,17786.793,TRUE,2.9998789e7,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634454.613670393s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 by id1:id2,10000,3,haskell,0.3.3.9,NA,groupBy,2,1.0e-3,17786.793,TRUE,2.9998789e7,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634461.465441205s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 mean v3 by id3,100000,3,haskell,0.3.3.9,NA,groupBy,1,6.845,17786.793,TRUE,2.9998789e7;4999719.622,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634461.473431198s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 mean v3 by id3,100000,3,haskell,0.3.3.9,NA,groupBy,2,2.0e-3,17786.793,TRUE,2.9998789e7;4999719.622,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634468.523071921s,groupby,G1_1e7_1e2_0_0,10000000,mean v1:v3 by id4,100,4,haskell,0.3.3.9,NA,groupBy,1,7.043,17786.918,TRUE,299.988;799.894;4999.767,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634468.531968066s,groupby,G1_1e7_1e2_0_0,10000000,mean v1:v3 by id4,100,4,haskell,0.3.3.9,NA,groupBy,2,3.0e-3,17786.918,TRUE,299.988;799.894;4999.767,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634480.545535913s,groupby,G1_1e7_1e2_0_0,10000000,sum v1:v3 by id6,100000,4,haskell,0.3.3.9,NA,groupBy,1,12.007,17786.918,TRUE,2.9998789e7;7.998936e7;4.99976651408e8,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634480.553569496s,groupby,G1_1e7_1e2_0_0,10000000,sum v1:v3 by id6,100000,4,haskell,0.3.3.9,NA,groupBy,2,2.0e-3,17786.918,TRUE,2.9998789e7;7.998936e7;4.99976651408e8,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634491.334093087s,groupby,G1_1e7_1e2_0_0,10000000,median v3 sd v3 by id4 id5,10000,4,haskell,0.3.3.9,NA,groupBy,1,10.774,17787.043,TRUE,499920.14;288648.108,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634491.342065738s,groupby,G1_1e7_1e2_0_0,10000000,median v3 sd v3 by id4 id5,10000,4,haskell,0.3.3.9,NA,groupBy,2,2.0e-3,17787.043,TRUE,499920.14;288648.108,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634497.72099282s,groupby,G1_1e7_1e2_0_0,10000000,max v1 - min v2 by id3,100000,2,haskell,0.3.3.9,NA,groupBy,1,6.373,17787.043,TRUE,399882.0,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634497.728958547s,groupby,G1_1e7_1e2_0_0,10000000,max v1 - min v2 by id3,100000,2,haskell,0.3.3.9,NA,groupBy,2,2.0e-3,17787.043,TRUE,399882.0,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634527.391336684s,groupby,G1_1e7_1e2_0_0,10000000,sum v3 count by id1:id6,10000000,7,haskell,0.3.3.9,NA,groupBy,1,29.643,17787.043,TRUE,4.99976651408e8,1.3e-2,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634527.402204697s,groupby,G1_1e7_1e2_0_0,10000000,sum v3 count by id1:id6,10000000,7,haskell,0.3.3.9,NA,groupBy,2,4.0e-3,17787.043,TRUE,4.99976651408e8,0.0,,FALSE,c6id.4xlarge