diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml index 41717226..d7173efa 100644 --- a/.github/workflows/regression.yml +++ b/.github/workflows/regression.yml @@ -17,7 +17,7 @@ jobs: strategy: fail-fast: false matrix: - solution: [data.table, collapse, dplyr, pandas, spark, polars, R-arrow, duckdb, datafusion, dask, clickhouse, chdb] + solution: [data.table, collapse, dplyr, pandas, spark, polars, R-arrow, duckdb, datafusion, dask, clickhouse, chdb, haskell] name: Solo solutions runs-on: ubuntu-latest env: diff --git a/.gitignore b/.gitignore index 78694453..8ab87bb6 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ workdir/ timeout-exit-codes.out */target *.lock +dist-newstyle +.stack-work diff --git a/README.md b/README.md index 9bbe2c6b..d3b8e601 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ Contribution and feedback are very welcome! - [x] [DataFrames.jl](https://github.com/JuliaData/DataFrames.jl) - [x] [In Memory DataSets](https://github.com/sl-solution/InMemoryDatasets.jl) - [x] [Datafusion](https://github.com/apache/arrow-datafusion) + - [x] [(haskell)dataframe](https://github.com/mchav/dataframe) If you would like your solution to be included, feel free to file a PR with the necessary setup-_solution_/ver-_solution_/groupby-_solution_/join-_solution_ scripts. If the team at DuckDB Labs approves the PR it will be merged. In the interest of transparency and fairness, only results from open-source data-science tools will be merged. diff --git a/_benchplot/benchplot-dict.R b/_benchplot/benchplot-dict.R index fedc4aac..c944579a 100644 --- a/_benchplot/benchplot-dict.R +++ b/_benchplot/benchplot-dict.R @@ -46,7 +46,8 @@ solution.dict = {list( "duckdb" = list(name=c(short="duckdb", long="DuckDB"), color=c(strong="#ddcd07", light="#fff100")), "duckdb-latest" = list(name=c(short="duckdb-latest", long="duckdb-latest"), color=c(strong="#ddcd07", light="#fff100")), "datafusion" = list(name=c(short="datafusion", long="Datafusion"), color=c(strong="deepskyblue4", light="deepskyblue3")), - "chdb" = list(name=c(short="chdb", long="chDB"), color=c(strong="hotpink4", light="hotpink1")) + "chdb" = list(name=c(short="chdb", long="chDB"), color=c(strong="hotpink4", light="hotpink1")), + "haskell" = list(name=c(short="haskell", long="Haskell"), color=c(strong="#3d0569ff", light="#61298bff")), )} #barplot(rep(c(0L,1L,1L), length(solution.dict)), # col=rev(c(rbind(sapply(solution.dict, `[[`, "color"), "black"))), @@ -259,7 +260,19 @@ groupby.syntax.dict = {list( "largest two v3 by id6" = "SELECT id6, arrayJoin(arraySlice(arrayReverseSort(groupArray(v3)), 1, 2)) AS v3 FROM (SELECT id6, v3 FROM db_benchmark.x WHERE v3 IS NOT NULL) AS subq GROUP BY id6", "regression v1 v2 by id2 id4" = "SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM db_benchmark.x GROUP BY id2, id4", "sum v3 count by id1:id6" = "SELECT id1, id2, id3, id4, id5, id6, sum(v3) AS v3, count() AS cnt FROM db_benchmark.x GROUP BY id1, id2, id3, id4, id5, id6" - )} + )}, + "haskell" = {c( + "sum v1 by id1" = "df |> D.groupby [\"id1\"] |> D.aggregate [\"v1_sum\" .= F.sum (F.col @Int \"v1\")]", + "sum v1 by id1:id2" = "df |> D.groupby [\"id1\", \"id2\"] |> D.aggregate [\"v1_sum\" .= F.sum (F.col @Int \"v1\")]", + "sum v1 mean v3 by id3" = "df |> D.groupby [\"id3\"] |> D.aggregate [\"v1_sum\" .= F.sum (F.col @Int \"v1\"), \"v3_mean\" .= F.mean (F.col @Double \"v3\")]", + "mean v1:v3 by id4" = "df |> D.groupby [\"id4\"] |> D.aggregate [\"v1_mean\" .= F.mean (F.col @Int \"v1\"), \"v2_mean\" .= F.mean (F.col @Int \"v2\"), \"v3_mean\" .= F.mean (F.col @Double \"v3\")]", + "sum v1:v3 by id6" = "df |> D.groupby [\"id6\"] |> D.aggregate [\"v1_sum\" .= F.sum (F.col @Int \"v1\"), \"v2_sum\" .= F.sum (F.col @Int \"v2\"), \"v3_sum\" .= F.sum (F.col @Double \"v3\")]", + "median v3 sd v3 by id4 id5" = "df |> D.groupby [\"id4\", \"id5\"] |> D.aggregate [\"v3_median\" .= F.median (F.col @Doublee \"v3\"), \"v3_sd\" .= F.stddev (F.col @Double \"v3\")]", + "max v1 - min v2 by id3" = "df |> D.groupby [\"id3\"] |> D.aggregate [\"diff\" .= F.maximum (F.col @Int \"v1\") - F.minimum (F.col @Int \"v2\")]", + "largest two v3 by id6" = "", + "regression v1 v2 by id2 id4" = "", + "sum v3 count by id1:id6" = "df |> D.groupBy [\"id1\",\"id2\",\"id3\",\"id4\",\"id5\",\"id6\"]).agg([F.sum (F.col @Double \"v3\") `F.as` \"v3\", F..count (F.col @Int \"v1\") `F.as` \"count\"]" + )}, )} groupby.query.exceptions = {list( "collapse" = list(), @@ -277,7 +290,8 @@ groupby.syntax.dict = {list( "duckdb" = list(), "duckdb-latest" = list(), "datafusion" = list(), - "chdb" = list() + "chdb" = list(), + "haskell" = list() )} groupby.data.exceptions = {list( # exceptions as of run 1575727624 "collapse" = {list( @@ -348,6 +362,8 @@ groupby.data.exceptions = {list( "Not Tested" = c("G1_1e9_1e2_0_0") )}, "chdb" = {list( + )}, + "haskell" = {list( )} )} groupby.exceptions = task.exceptions(groupby.query.exceptions, groupby.data.exceptions) @@ -472,7 +488,14 @@ join.syntax.dict = {list( "medium outer on int" = "SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 as medium_id5, v2 FROM db_benchmark.x AS x LEFT JOIN db_benchmark.medium AS medium USING (id2)", "medium inner on factor" = "SELECT x.*, medium.id1 AS medium_id1, medium.id2 AS medium_id2, medium.id4 as medium_id4, v2 FROM db_benchmark.x AS x INNER JOIN db_benchmark.medium AS medium USING (id5)", "big inner on int" = "SELECT x.*, big.id1 AS big_id1, big.id2 AS big_id2, big.id4 as big_id4, big.id5 AS big_id5, big.id6 AS big_id6, v2 FROM db_benchmark.x AS x INNER JOIN db_benchmark.big AS big USING (id3)" - )} + )}, + "haskell" = {c( + "small inner on int" = "D.innerJoin [\"id1\"] small small", + "medium inner on int" = "D.innerJoin [\"id2\"] medium medium", + "medium outer on int" = "D.leftJoin [\"id2\"] medium medium", + "medium inner on factor" = "D.innerJoin [\"id5\"] medium medium", + "big inner on int" = "D.innerJoin [\"id3\"] big big" + )}, )} join.query.exceptions = {list( "collapse" = list(), @@ -490,7 +513,8 @@ join.query.exceptions = {list( "duckdb" = list(), "duckdb-latest" = list(), "datafusion" = list(), - "chdb" = list() + "chdb" = list(), + "haskell" = list() )} join.data.exceptions = {list( # exceptions as of run 1575727624 "collapse" = {list( @@ -550,6 +574,8 @@ join.data.exceptions = {list( "Not tested" = c("J1_1e9_NA_0_0") )}, "chdb" = {list( + )}, + "haskell" = {list( )} )} join.exceptions = task.exceptions(join.query.exceptions, join.data.exceptions) diff --git a/_control/solutions.csv b/_control/solutions.csv index 2888dd13..3c12ae32 100644 --- a/_control/solutions.csv +++ b/_control/solutions.csv @@ -33,3 +33,5 @@ datafusion,groupby datafusion,join chdb,groupby chdb,join +haskell,groupby +haskell,join diff --git a/_launcher/launcher.R b/_launcher/launcher.R index be0e4b2b..ae92dbdf 100644 --- a/_launcher/launcher.R +++ b/_launcher/launcher.R @@ -16,7 +16,7 @@ file.ext = function(x) { x, "collapse"=, "data.table"=, "dplyr"=, "h2o"=, "R-arrow"=, "duckdb"="R", "duckdb-latest"="R", "pandas"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "datafusion"=, "polars"="py", - "clickhouse"="sh", "juliadf"="jl", "juliads"="jl", "chdb"="py" + "clickhouse"="sh", "juliadf"="jl", "juliads"="jl", "chdb"="py", "haskell"="hs", ) if (is.null(ans)) stop(sprintf("solution %s does not have file extension defined in file.ext helper function", x)) ans diff --git a/_launcher/solution.R b/_launcher/solution.R index 98c4298e..48bf86b6 100755 --- a/_launcher/solution.R +++ b/_launcher/solution.R @@ -112,7 +112,7 @@ file.ext = function(x) { x, "collapse"=, "data.table"=, "dplyr"=, "h2o"=, "R-arrow"=, "duckdb"="R", "duckdb-latest"="R", "pandas"="py", "spark"=, "pydatatable"=, "modin"=, "dask"=, "datafusion"=, "polars"="py", - "clickhouse"="sh", "juliadf"="jl", "juliads"="jl", "chdb"="py" + "clickhouse"="sh", "juliadf"="jl", "juliads"="jl", "chdb"="py", "haskell"="hs", ) if (is.null(ans)) stop(sprintf("solution %s does not have file extension defined in file.ext helper function", x)) ans @@ -153,7 +153,7 @@ setenv("SRC_DATANAME", d) ns = solution.path(s) ext = file.ext(s) -localcmd = if (s %in% c("clickhouse","h2o","juliadf", "juliads")) { # custom launcher bash script, for clickhouse h2o juliadf +localcmd = if (s %in% c("clickhouse","h2o","juliadf", "juliads", "haskell")) { # custom launcher bash script, for clickhouse h2o juliadf sprintf("exec.sh %s", t) } else if (s %in% c("dask")) { sprintf("%s_%s.%s", t, ns, ext) diff --git a/_report/report.R b/_report/report.R index a726b628..6cd5a483 100644 --- a/_report/report.R +++ b/_report/report.R @@ -6,7 +6,7 @@ get_report_status_file = function(path=getwd()) { file.path(path, "report-done") } get_report_solutions = function() { - c("duckdb-latest", "collapse", "data.table", "dplyr", "pandas", "pydatatable", "spark", "dask", "juliadf", "juliads", "clickhouse", "cudf", "polars", "duckdb", "datafusion", "arrow", "R-arrow", "chdb") + c("duckdb-latest", "collapse", "data.table", "dplyr", "pandas", "pydatatable", "spark", "dask", "juliadf", "juliads", "clickhouse", "cudf", "polars", "duckdb", "datafusion", "arrow", "R-arrow", "chdb", "haskell") } get_data_levels = function() { ## groupby diff --git a/haskell/README.md b/haskell/README.md new file mode 100644 index 00000000..d8f9abe2 --- /dev/null +++ b/haskell/README.md @@ -0,0 +1,85 @@ +# Haskell DataFrame Benchmark + +This benchmark entry uses Haskell with the `mchav/dataframe` library to implement dataframe operations. + +## Implementation Details + +- **Language**: Haskell (GHC) +- **DataFrame Library**: [mchav/dataframe](https://github.com/mchav/dataframe) +- **Build Tool**: Stack + +## About mchav/dataframe + +The `dataframe` library is a fast, safe, and intuitive DataFrame library for Haskell that provides: +- Type-safe column operations +- Familiar operations for users coming from pandas, dplyr, or polars +- Concise, declarative, and composable data pipelines +- Static typing that catches many bugs at compile time + +Resources: +- GitHub: https://github.com/mchav/dataframe +- Hackage: https://hackage.haskell.org/package/dataframe +- Documentation: https://dataframe.readthedocs.io/ + +## Implemented Benchmarks + +### Groupby (`groupby-haskell.hs`) +Implements 5 out of 10 groupby questions: +1. sum v1 by id1 +2. sum v1 by id1:id2 +3. sum v1 mean v3 by id3 +4. mean v1:v3 by id4 +5. sum v1:v3 by id6 + +Uses `D.groupBy` and `D.aggregate` with expression DSL (`F.sum`, `F.mean`). + +Note: Questions 6-10 would require additional statistical functions (median, standard deviation, regression, top-n selection). + +### Join (`join-haskell.hs`) +Implements all 5 join questions: +1. small inner on int +2. medium inner on int +3. medium outer on int (using leftJoin) +4. medium inner on factor +5. big inner on int + +Uses `DJ.innerJoin` and `DJ.leftJoin` from `DataFrame.Operations.Join`. + +## Setup + +Run the setup script to install dependencies: +```bash +./haskell/setup-haskell.sh +``` + +This will: +1. Install Stack (if not present) +2. Initialize the Stack project +3. Build all dependencies +4. Compile the benchmark executables + +## API Usage Examples + +```haskell +-- Read CSV +df <- D.readCsv "data/file.csv" + +-- GroupBy with aggregation +let grouped = D.groupBy ["id1"] df +let result = D.aggregate [F.sum (F.col @Double "v1") `F.as` "v1_sum"] grouped + +-- Inner Join +let joined = DJ.innerJoin ["id1"] df1 df2 + +-- Get dimensions +let (rows, cols) = D.dimensions df +``` + +## Performance Notes + +The implementation uses: +- Type-safe column operations with `TypeApplications` +- Expression DSL for clean aggregation syntax +- Efficient grouping and joining operations from the dataframe library + +This benchmark demonstrates Haskell's capabilities for high-performance dataframe operations with the additional benefits of static typing and functional programming. diff --git a/haskell/VERSION b/haskell/VERSION new file mode 100644 index 00000000..1d45831f --- /dev/null +++ b/haskell/VERSION @@ -0,0 +1 @@ +0.3.3.7 diff --git a/haskell/exec.sh b/haskell/exec.sh new file mode 100755 index 00000000..4c8fb635 --- /dev/null +++ b/haskell/exec.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -e + +cd ./haskell + +cabal run -O2 "$1-haskell" diff --git a/haskell/groupby-haskell.hs b/haskell/groupby-haskell.hs new file mode 100755 index 00000000..13947686 --- /dev/null +++ b/haskell/groupby-haskell.hs @@ -0,0 +1,253 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE ScopedTypeVariables #-} + +import Control.Monad (forM_, when) +import Data.List (intercalate) +import Data.Maybe (fromMaybe) +import Data.Text (Text) +import qualified Data.Text as T +import Data.Time.Clock.POSIX (getPOSIXTime) +import qualified Data.Vector.Unboxed as VU +import qualified DataFrame as D +import qualified DataFrame.Functions as F +import GHC.Stats (getRTSStats, max_live_bytes, getRTSStatsEnabled) +import System.Directory (doesFileExist) +import System.Environment (getEnv, lookupEnv) +import System.IO (hFlush, hPutStrLn, stderr, stdout, withFile, IOMode(..), hSetBuffering, BufferMode(..), hPutStr) +import System.Posix.Process (getProcessID) +import System.Process (readProcess) +import Text.Read (readMaybe) + +-- | Configuration context for the benchmark. +data BenchConfig = BenchConfig + { cfgTask :: String + , cfgDataName :: String + , cfgMachineType :: String + , cfgSolution :: String + , cfgVer :: String + , cfgGit :: String + , cfgFun :: String + , cfgCache :: String + , cfgOnDisk :: String + , cfgInRows :: Int + } + +main :: IO () +main = do + hSetBuffering stdout NoBuffering + putStrLn "# groupby-haskell.hs" + + dataName <- getEnv "SRC_DATANAME" + machineType <- getEnv "MACHINE_TYPE" + let srcFile = "../data/" ++ dataName ++ ".csv" + + -- Check NA Flag + let parts = T.splitOn "_" (T.pack dataName) + let naFlag = if length parts > 3 then read (T.unpack $ parts !! 3) :: Int else 0 + + if naFlag > 0 + then hPutStrLn stderr "skip due to na_flag>0" + else runBenchmark srcFile dataName machineType + +runBenchmark :: String -> String -> String -> IO () +runBenchmark srcFile dataName machineType = do + putStrLn $ "loading dataset " ++ dataName + + df <- D.readCsv srcFile + let (inRows, _) = D.dimensions df + print inRows + print df + + let config = BenchConfig + { cfgTask = "groupby" + , cfgDataName = dataName + , cfgMachineType = machineType + , cfgSolution = "haskell" + , cfgVer = "0.3.3.9" + , cfgGit = "NA" + , cfgFun = "groupBy" + , cfgCache = "TRUE" + , cfgOnDisk = "FALSE" + , cfgInRows = inRows + } + + putStrLn "grouping..." + + -- Q1: Sum v1 by id1 + runQuestion config df "sum v1 by id1" + (\d -> D.groupBy ["id1"] d) + (\g -> D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] g) + (\res -> [chkSumInt "v1_sum" res]) + + -- Q2: Sum v1 by id1:id2 + runQuestion config df "sum v1 by id1:id2" + (\d -> D.groupBy ["id1", "id2"] d) + (\g -> D.aggregate [F.sum (F.col @Int "v1") `F.as` "v1_sum"] g) + (\res -> [chkSumInt "v1_sum" res]) + + -- Q3: Sum v1, Mean v3 by id3 + runQuestion config df "sum v1 mean v3 by id3" + (\d -> D.groupBy ["id3"] d) + (\g -> D.aggregate + [ F.sum (F.col @Int "v1") `F.as` "v1_sum" + , F.mean (F.col @Double "v3") `F.as` "v3_mean" + ] g) + (\res -> [chkSumInt "v1_sum" res, chkSumDbl "v3_mean" res]) + + -- Q4: Mean v1, v2, v3 by id4 + runQuestion config df "mean v1:v3 by id4" + (\d -> D.groupBy ["id4"] d) + (\g -> D.aggregate + [ F.mean (F.col @Int "v1") `F.as` "v1_mean" + , F.mean (F.col @Int "v2") `F.as` "v2_mean" + , F.mean (F.col @Double "v3") `F.as` "v3_mean" + ] g) + (\res -> [chkSumDbl "v1_mean" res, chkSumDbl "v2_mean" res, chkSumDbl "v3_mean" res]) + + -- Q5 (Question 6 in original): Sum v1, v2, v3 by id6 + runQuestion config df "sum v1:v3 by id6" + (\d -> D.groupBy ["id6"] d) + (\g -> D.aggregate + [ F.sum (F.col @Int "v1") `F.as` "v1_sum" + , F.sum (F.col @Int "v2") `F.as` "v2_sum" + , F.sum (F.col @Double "v3") `F.as` "v3_sum" + ] g) + (\res -> [chkSumInt "v1_sum" res, chkSumInt "v2_sum" res, chkSumDbl "v3_sum" res]) + + -- Q6: median v3, sd v3 by id4, id5 + runQuestion config df "median v3 sd v3 by id4 id5" + (\d -> D.groupBy ["id4", "id5"] d) + (\g -> D.aggregate + [ F.median (F.col @Double "v3") `F.as` "v3_median" + , F.stddev (F.col @Double "v3") `F.as` "v3_sd" + ] g) + (\res -> [chkSumDbl "v3_median" res, chkSumDbl "v3_sd" res]) + + -- Q7: max v1 - min v2 by id3 + runQuestion config df "max v1 - min v2 by id3" + (\d -> D.groupBy ["id3"] d) + (\g -> D.aggregate + [ (F.maximum (F.col @Int "v1") - F.minimum (F.col @Int "v2")) `F.as` "diff" + ] g) + (\res -> [chkSumInt "diff" res]) + + -- Q10: sum v3 count by id1:id6 + runQuestion config df "sum v3 count by id1:id6" + (\d -> D.groupBy (zipWith (\i n -> i <> (T.pack . show) n) (cycle ["id"]) [1 .. 6]) d) + (\g -> D.aggregate [F.sum (F.col @Double "v3") `F.as` "v3_sum"] g) + (\res -> [chkSumDbl "v3_sum" res]) + + putStrLn "Haskell dataframe groupby benchmark completed!" + +runQuestion :: BenchConfig + -> D.DataFrame + -> String -- ^ Question label + -> (D.DataFrame -> D.GroupedDataFrame) -- ^ Grouping logic + -> (D.GroupedDataFrame -> D.DataFrame) -- ^ Aggregation logic + -> (D.DataFrame -> [Double]) -- ^ Checksum extractor + -> IO () +runQuestion cfg inputDF qLabel groupFn aggFn chkFn = do + forM_ [1, 2] $ \runNum -> do + + (resultDF, calcTime) <- timeIt $ do + let grouped = groupFn inputDF + let aggregated = aggFn grouped + print aggregated + return aggregated + + memUsage <- getMemoryUsage + + let (outRows, outCols) = D.dimensions resultDF + + (chkValues, chkTime) <- timeIt $ do + let vals = chkFn resultDF + print vals + return vals + + writeLog cfg qLabel outRows outCols runNum calcTime memUsage chkValues chkTime + + putStrLn $ qLabel ++ " completed." + +chkSumInt :: String -> D.DataFrame -> Double +chkSumInt col df = + case D.columnAsIntVector (T.pack col) df of + Right vec -> fromIntegral $ VU.sum vec + Left _ -> 0.0 + +chkSumDbl :: String -> D.DataFrame -> Double +chkSumDbl col df = + case D.columnAsDoubleVector (T.pack col) df of + Right vec -> VU.sum vec + Left _ -> 0.0 + +getMemoryUsage :: IO Double +getMemoryUsage = do + isStats <- getRTSStatsEnabled + if isStats then do + s <- getRTSStats + return $ fromIntegral (max_live_bytes s) / (1024 * 1024) + else do + pid <- getProcessID + raw <- readProcess "ps" ["-p", show pid, "-o", "rss="] "" + case readMaybe @Double (filter (/= '\n') raw) of + Just kb -> return (kb / 1024) + Nothing -> return 0.0 + +timeIt :: IO a -> IO (a, Double) +timeIt action = do + start <- getPOSIXTime + result <- action + end <- getPOSIXTime + return (result, realToFrac (end - start)) + +writeLog :: BenchConfig -> String -> Int -> Int -> Int -> Double -> Double -> [Double] -> Double -> IO () +writeLog BenchConfig{..} question outRows outCols run timeSec memGb chkValues chkTime = do + batch <- lookupEnv "BATCH" >>= return . fromMaybe "" + timestamp <- getPOSIXTime + csvFile <- lookupEnv "CSV_TIME_FILE" >>= return . fromMaybe "time.csv" + nodename <- fmap init (readProcess "hostname" [] "") + + let formatNum x = show (roundTo 3 x) + let chkStr = intercalate ";" $ map (map (\c -> if c == ',' then '_' else c) . formatNum) chkValues + + let logRow = intercalate "," + [ nodename + , batch + , show timestamp + , cfgTask + , cfgDataName + , show cfgInRows + , question + , show outRows + , show outCols + , cfgSolution + , cfgVer + , cfgGit + , cfgFun + , show run + , formatNum timeSec + , formatNum memGb + , cfgCache + , chkStr + , formatNum chkTime + , "" -- comment + , cfgOnDisk + , cfgMachineType + ] + + exists <- doesFileExist csvFile + let header = "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" + + forceAppend csvFile $ (if not exists then header else "") ++ logRow ++ "\n" + +roundTo :: Int -> Double -> Double +roundTo n x = (fromInteger $ round $ x * (10 ^ n)) / (10.0 ^^ n) + +forceAppend :: FilePath -> String -> IO () +forceAppend path content = + withFile path AppendMode $ \h -> do + hSetBuffering h NoBuffering + hPutStr h content + hFlush h diff --git a/haskell/haskell-benchmark.cabal b/haskell/haskell-benchmark.cabal new file mode 100644 index 00000000..5c0a06d9 --- /dev/null +++ b/haskell/haskell-benchmark.cabal @@ -0,0 +1,30 @@ +name: haskell-benchmark +version: 0.1.0.0 +build-type: Simple +cabal-version: >=1.10 + +executable groupby-haskell + main-is: groupby-haskell.hs + build-depends: base >= 4.7 && < 5 + , dataframe >= 0.3 + , text >= 1.2 + , vector >= 0.12 + , time >= 1.9 + , process >= 1.6 + , directory >= 1.3 + , unix + default-language: Haskell2010 + ghc-options: -O2 -threaded -rtsopts -with-rtsopts=-N + +executable join-haskell + main-is: join-haskell.hs + build-depends: base >= 4.7 && < 5 + , dataframe >= 0.3 + , text >= 1.2 + , vector >= 0.12 + , time >= 1.9 + , process >= 1.6 + , directory >= 1.3 + , unix + default-language: Haskell2010 + ghc-options: -O2 -threaded -rtsopts -with-rtsopts=-N diff --git a/haskell/join-haskell.hs b/haskell/join-haskell.hs new file mode 100644 index 00000000..3fdaf702 --- /dev/null +++ b/haskell/join-haskell.hs @@ -0,0 +1,212 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TypeApplications #-} + +import Control.Exception (evaluate) +import Control.Monad (forM_) +import Data.List (intercalate) +import Data.Maybe (fromMaybe) +import qualified Data.Text as T +import Data.Time.Clock.POSIX (getPOSIXTime) +import qualified Data.Vector.Unboxed as VU +import qualified DataFrame as D +import qualified DataFrame.Operations.Join as DJ +import GHC.Stats (getRTSStats, max_live_bytes, getRTSStatsEnabled) +import System.Directory (doesFileExist) +import System.Environment (getEnv, lookupEnv) +import System.IO (hFlush, hPutStrLn, hPutStr, stdout, withFile, IOMode(..), hSetBuffering, BufferMode(..)) +import System.Process (readProcess) +import System.Posix.Process (getProcessID) +import Text.Read (readMaybe) + +-- | Configuration context +data BenchConfig = BenchConfig + { cfgTask :: String + , cfgDataName :: String + , cfgMachineType :: String + , cfgSolution :: String + , cfgVer :: String + , cfgGit :: String + , cfgFun :: String + , cfgCache :: String + , cfgOnDisk :: String + , cfgInRows :: Int + } + +main :: IO () +main = do + hSetBuffering stdout NoBuffering + putStrLn "# join-haskell.hs" + + dataName <- getEnv "SRC_DATANAME" + machineType <- getEnv "MACHINE_TYPE" + + let auxTableNames = determineAuxTables dataName + let srcMain = "../data/" ++ dataName ++ ".csv" + let srcAux = map (\n -> "../data/" ++ n ++ ".csv") auxTableNames + + putStrLn $ "loading datasets: " ++ dataName ++ ", " ++ intercalate ", " auxTableNames + + dfX <- D.readCsv srcMain + dfSmall <- D.readCsv (srcAux !! 0) + dfMedium <- D.readCsv (srcAux !! 1) + dfBig <- D.readCsv (srcAux !! 2) + + let (rowsX, _) = D.dimensions dfX + print (rowsX, fst (D.dimensions dfSmall), fst (D.dimensions dfMedium), fst (D.dimensions dfBig)) + + let config = BenchConfig + { cfgTask = "join" + , cfgDataName = dataName + , cfgMachineType = machineType + , cfgSolution = "haskell" + , cfgVer = "0.3.3" + , cfgGit = "dataframe" + , cfgFun = "innerJoin" + , cfgCache = "TRUE" + , cfgOnDisk = "FALSE" + , cfgInRows = rowsX + } + + putStrLn "joining..." + + -- Q1: small inner on int + runJoin config dfX dfSmall "small inner on int" + (\l r -> DJ.innerJoin ["id1"] l r) + + -- Q2: medium inner on int + runJoin config dfX dfMedium "medium inner on int" + (\l r -> DJ.innerJoin ["id1"] l r) + + -- Q3: medium outer on int + -- Note: We update the function name in the config for logging accuracy + runJoin config{cfgFun="leftJoin"} dfX dfMedium "medium outer on int" + (\l r -> DJ.leftJoin ["id1"] l r) + + -- Q4: medium inner on factor (id4) + runJoin config dfX dfMedium "medium inner on factor" + (\l r -> DJ.innerJoin ["id4"] l r) + + -- Q5: big inner on int + runJoin config dfX dfBig "big inner on int" + (\l r -> DJ.innerJoin ["id1"] l r) + + putStrLn "Haskell dataframe join benchmark completed!" + +runJoin :: BenchConfig + -> D.DataFrame -- ^ Left Table + -> D.DataFrame -- ^ Right Table + -> String -- ^ Question Label + -> (D.DataFrame -> D.DataFrame -> D.DataFrame) -- ^ Join Function + -> IO () +runJoin cfg leftDF rightDF qLabel joinFn = do + forM_ [1, 2] $ \runNum -> do + + (resultDF, calcTime) <- timeIt $ do + let res = joinFn leftDF rightDF + _ <- evaluate res + return res + + memUsage <- getMemoryUsage + let (outRows, outCols) = D.dimensions resultDF + + (chkValues, chkTime) <- timeIt $ do + let sumV1 = sumCol "v1" resultDF + let sumV2 = sumCol "v2" resultDF + let res = (sumV1, sumV2) + _ <- evaluate res + return [sumV1, sumV2] + + writeLog cfg qLabel outRows outCols runNum calcTime memUsage chkValues chkTime + + putStrLn $ qLabel ++ " completed." + +sumCol :: String -> D.DataFrame -> Double +sumCol name df = + case D.columnAsDoubleVector (T.pack name) df of + Right vec -> VU.sum vec + Left _ -> 0.0 + +determineAuxTables :: String -> [String] +determineAuxTables dataName = + let parts = T.splitOn "_" (T.pack dataName) + xnStr = if length parts > 1 then T.unpack (parts !! 1) else "1e7" + xn = read xnStr :: Double + + yn1 = show (floor (xn / 1e6) :: Int) ++ "e4" + yn2 = show (floor (xn / 1e3) :: Int) ++ "e3" + yn3 = show (floor xn :: Int) + + replaceNA s = T.unpack $ T.replace "NA" (T.pack s) (T.pack dataName) + in [ replaceNA yn1, replaceNA yn2, replaceNA yn3 ] + +getMemoryUsage :: IO Double +getMemoryUsage = do + isStats <- getRTSStatsEnabled + if isStats then do + s <- getRTSStats + return $ fromIntegral (max_live_bytes s) / (1024 * 1024) + else do + pid <- getProcessID + raw <- readProcess "ps" ["-p", show pid, "-o", "rss="] "" + case readMaybe @Double (filter (/= '\n') raw) of + Just kb -> return (kb / 1024) + Nothing -> return 0.0 + +timeIt :: IO a -> IO (a, Double) +timeIt action = do + start <- getPOSIXTime + result <- action + _ <- evaluate result + end <- getPOSIXTime + return (result, realToFrac (end - start)) + +writeLog :: BenchConfig -> String -> Int -> Int -> Int -> Double -> Double -> [Double] -> Double -> IO () +writeLog BenchConfig{..} question outRows outCols run timeSec memGb chkValues chkTime = do + batch <- lookupEnv "BATCH" >>= return . fromMaybe "" + timestamp <- getPOSIXTime + csvFile <- lookupEnv "CSV_TIME_FILE" >>= return . fromMaybe "time.csv" + nodename <- fmap init (readProcess "hostname" [] "") + + let formatNum x = show (roundTo 3 x) + let chkStr = intercalate ";" $ map (map (\c -> if c == ',' then '_' else c) . formatNum) chkValues + + let logRow = intercalate "," + [ nodename + , batch + , show timestamp + , cfgTask + , cfgDataName + , show cfgInRows + , question + , show outRows + , show outCols + , cfgSolution + , cfgVer + , cfgGit + , cfgFun + , show run + , formatNum timeSec + , formatNum memGb + , cfgCache + , chkStr + , formatNum chkTime + , "" -- comment + , cfgOnDisk + , cfgMachineType + ] + + exists <- doesFileExist csvFile + let header = "nodename,batch,timestamp,task,data,in_rows,question,out_rows,out_cols,solution,version,git,fun,run,time_sec,mem_gb,cache,chk,chk_time_sec,comment,on_disk,machine_type\n" + + forceAppend csvFile $ (if not exists then header else "") ++ logRow ++ "\n" + +roundTo :: Int -> Double -> Double +roundTo n x = (fromInteger $ round $ x * (10 ^ n)) / (10.0 ^^ n) + +forceAppend :: FilePath -> String -> IO () +forceAppend path content = + withFile path AppendMode $ \h -> do + hSetBuffering h NoBuffering + hPutStr h content + hFlush h diff --git a/haskell/setup-haskell.sh b/haskell/setup-haskell.sh new file mode 100755 index 00000000..09f5d8c0 --- /dev/null +++ b/haskell/setup-haskell.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -e + +# Install Cabal (Haskell build tool) if not present +if ! command -v ghcup &> /dev/null; then + echo "Installing Cabal..." + curl --proto '=https' --tlsv1.2 -sSf https://get-ghcup.haskell.org | BOOTSTRAP_HASKELL_NONINTERACTIVE=1 BOOTSTRAP_HASKELL_MINIMAL=1 sh + ghcup install cabal +fi + +cd haskell + +# Install dependencies and build +cabal update +cabal build -O2 + +cd .. + +./haskell/ver-haskell.sh diff --git a/haskell/stack.yaml b/haskell/stack.yaml new file mode 100644 index 00000000..95f46dd4 --- /dev/null +++ b/haskell/stack.yaml @@ -0,0 +1,67 @@ +# This file was automatically generated by 'stack init' +# +# Some commonly used options have been documented as comments in this file. +# For advanced use and comprehensive documentation of the format, please see: +# https://docs.haskellstack.org/en/stable/configure/yaml/ + +# A 'specific' Stackage snapshot or a compiler version. +# A snapshot resolver dictates the compiler version and the set of packages +# to be used for project dependencies. For example: +# +# snapshot: lts-23.0 +# snapshot: nightly-2024-12-13 +# snapshot: ghc-9.8.4 +# +# The location of a snapshot can be provided as a file or url. Stack assumes +# a snapshot provided as a file might change, whereas a url resource does not. +# +# snapshot: ./custom-snapshot.yaml +# snapshot: https://example.com/snapshots/2024-01-01.yaml +snapshot: + url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/nightly/2025/11/19.yaml + +# User packages to be built. +# Various formats can be used as shown in the example below. +# +# packages: +# - some-directory +# - https://example.com/foo/bar/baz-0.0.2.tar.gz +# subdirs: +# - auto-update +# - wai +packages: +- . +# Dependency packages to be pulled from upstream that are not in the snapshot. +# These entries can reference officially published versions as well as +# forks / in-progress versions pinned to a git hash. For example: +# +# extra-deps: +# - acme-missiles-0.3 +# - git: https://github.com/commercialhaskell/stack.git +# commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a +# +# extra-deps: [] + +# Override default flag values for project packages and extra-deps +# flags: {} + +# Extra package databases containing global packages +# extra-package-dbs: [] + +# Control whether we use the GHC we find on the path +# system-ghc: true +# +# Require a specific version of Stack, using version ranges +# require-stack-version: -any # Default +# require-stack-version: ">=3.3" +# +# Override the architecture used by Stack, especially useful on Windows +# arch: i386 +# arch: x86_64 +# +# Extra directories used by Stack for building +# extra-include-dirs: [/path/to/dir] +# extra-lib-dirs: [/path/to/dir] +# +# Allow a newer minor version of GHC than the snapshot specifies +# compiler-check: newer-minor diff --git a/haskell/upg-haskell.sh b/haskell/upg-haskell.sh new file mode 100755 index 00000000..d453cdce --- /dev/null +++ b/haskell/upg-haskell.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -e + +cd haskell + +# Update stack resolver and dependencies +stack update +stack upgrade +stack build --only-dependencies + +cd .. + +./haskell/ver-haskell.sh diff --git a/haskell/ver-haskell.sh b/haskell/ver-haskell.sh new file mode 100755 index 00000000..7ba223d1 --- /dev/null +++ b/haskell/ver-haskell.sh @@ -0,0 +1,20 @@ +#!/bin/bash +set -e + +cd haskell + +# Get dataframe version from stack +DF_VERSION=$(stack exec -- ghc-pkg field dataframe version 2>/dev/null | awk '{print $2}' || echo "0.3.3") + +# Write version to VERSION file +echo "${DF_VERSION}" > VERSION + +# Get git revision of dataframe if available +GIT_REV=$(stack path --local-install-root 2>/dev/null && git -C $(stack path --local-install-root 2>/dev/null || echo ".") rev-parse --short HEAD 2>/dev/null || echo "") +if [ -n "$GIT_REV" ]; then + echo "$GIT_REV" > REVISION +else + echo "dataframe-${DF_VERSION}" > REVISION +fi + +cd .. diff --git a/run.conf b/run.conf index 0aa2cf69..641a49f5 100644 --- a/run.conf +++ b/run.conf @@ -1,7 +1,7 @@ # task, used in init-setup-iteration.R export RUN_TASKS="groupby join" # solution, used in init-setup-iteration.R -export RUN_SOLUTIONS="collapse data.table juliads juliadf dplyr pandas pydatatable spark dask clickhouse polars R-arrow duckdb duckdb-latest datafusion chdb" +export RUN_SOLUTIONS="collapse data.table juliads juliadf dplyr pandas pydatatable spark dask clickhouse polars R-arrow duckdb duckdb-latest datafusion chdb haskell" # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=false diff --git a/run.sh b/run.sh index f17a5644..c2408bb2 100755 --- a/run.sh +++ b/run.sh @@ -92,6 +92,8 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" == "duckdb-latest" ]]; then ./du if [[ "$RUN_SOLUTIONS" == "duckdb-latest" ]]; then ./duckdb-latest/ver-duckdb-latest.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/upg-datafusion.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/ver-datafusion.sh; fi; +if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "haskell" ]]; then ./haskell/upg-haskell.sh; fi; +if [[ "$RUN_SOLUTIONS" =~ "haskell" ]]; then ./haskell/ver-haskell.sh; fi; # run if [[ -f ./stop ]]; then echo "# Benchmark run $BATCH has been interrupted after $(($(date +%s)-$BATCH))s due to 'stop' file" && rm -f ./stop && rm -f ./run.lock && exit; fi; diff --git a/time.csv b/time.csv index 284fcfcd..900c3170 100644 --- a/time.csv +++ b/time.csv @@ -36966,3 +36966,19 @@ ip-172-31-18-189,1761209486,1761212936,groupby,G1_1e9_1e2_5_0,1000000000,regress ip-172-31-18-189,1761209486,1761212940,groupby,G1_1e9_1e2_5_0,1000000000,regression v1 v2 by id2 id4,9216,3,duckdb,1.4.1,b390a7c376,group_by,2,3.935,NA,TRUE,0.09857194,0.001,NA,TRUE,c6id.4xlarge ip-172-31-18-189,1761209486,1761213076,groupby,G1_1e9_1e2_5_0,1000000000,sum v3 count by id1:id6,999939563,8,duckdb,1.4.1,b390a7c376,group_by,1,123.828,NA,TRUE,47498842806;1000000000,12.969,NA,TRUE,c6id.4xlarge ip-172-31-18-189,1761209486,1761213213,groupby,G1_1e9_1e2_5_0,1000000000,sum v3 count by id1:id6,999939563,8,duckdb,1.4.1,b390a7c376,group_by,2,121.902,NA,TRUE,47498842806;1000000000,13.061,NA,TRUE,c6id.4xlarge +ip-172-31-38-245,,1763634448.084065271s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 by id1,100,2,haskell,0.3.3.9,NA,groupBy,1,4.724,17786.52,TRUE,2.9998789e7,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634448.090627419s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 by id1,100,2,haskell,0.3.3.9,NA,groupBy,2,1.0e-3,17786.793,TRUE,2.9998789e7,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634454.606981034s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 by id1:id2,10000,3,haskell,0.3.3.9,NA,groupBy,1,6.51,17786.793,TRUE,2.9998789e7,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634454.613670393s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 by id1:id2,10000,3,haskell,0.3.3.9,NA,groupBy,2,1.0e-3,17786.793,TRUE,2.9998789e7,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634461.465441205s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 mean v3 by id3,100000,3,haskell,0.3.3.9,NA,groupBy,1,6.845,17786.793,TRUE,2.9998789e7;4999719.622,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634461.473431198s,groupby,G1_1e7_1e2_0_0,10000000,sum v1 mean v3 by id3,100000,3,haskell,0.3.3.9,NA,groupBy,2,2.0e-3,17786.793,TRUE,2.9998789e7;4999719.622,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634468.523071921s,groupby,G1_1e7_1e2_0_0,10000000,mean v1:v3 by id4,100,4,haskell,0.3.3.9,NA,groupBy,1,7.043,17786.918,TRUE,299.988;799.894;4999.767,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634468.531968066s,groupby,G1_1e7_1e2_0_0,10000000,mean v1:v3 by id4,100,4,haskell,0.3.3.9,NA,groupBy,2,3.0e-3,17786.918,TRUE,299.988;799.894;4999.767,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634480.545535913s,groupby,G1_1e7_1e2_0_0,10000000,sum v1:v3 by id6,100000,4,haskell,0.3.3.9,NA,groupBy,1,12.007,17786.918,TRUE,2.9998789e7;7.998936e7;4.99976651408e8,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634480.553569496s,groupby,G1_1e7_1e2_0_0,10000000,sum v1:v3 by id6,100000,4,haskell,0.3.3.9,NA,groupBy,2,2.0e-3,17786.918,TRUE,2.9998789e7;7.998936e7;4.99976651408e8,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634491.334093087s,groupby,G1_1e7_1e2_0_0,10000000,median v3 sd v3 by id4 id5,10000,4,haskell,0.3.3.9,NA,groupBy,1,10.774,17787.043,TRUE,499920.14;288648.108,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634491.342065738s,groupby,G1_1e7_1e2_0_0,10000000,median v3 sd v3 by id4 id5,10000,4,haskell,0.3.3.9,NA,groupBy,2,2.0e-3,17787.043,TRUE,499920.14;288648.108,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634497.72099282s,groupby,G1_1e7_1e2_0_0,10000000,max v1 - min v2 by id3,100000,2,haskell,0.3.3.9,NA,groupBy,1,6.373,17787.043,TRUE,399882.0,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634497.728958547s,groupby,G1_1e7_1e2_0_0,10000000,max v1 - min v2 by id3,100000,2,haskell,0.3.3.9,NA,groupBy,2,2.0e-3,17787.043,TRUE,399882.0,0.0,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634527.391336684s,groupby,G1_1e7_1e2_0_0,10000000,sum v3 count by id1:id6,10000000,7,haskell,0.3.3.9,NA,groupBy,1,29.643,17787.043,TRUE,4.99976651408e8,1.3e-2,,FALSE,c6id.4xlarge +ip-172-31-38-245,,1763634527.402204697s,groupby,G1_1e7_1e2_0_0,10000000,sum v3 count by id1:id6,10000000,7,haskell,0.3.3.9,NA,groupBy,2,4.0e-3,17787.043,TRUE,4.99976651408e8,0.0,,FALSE,c6id.4xlarge