Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion R/create_lkups.R
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,12 @@ create_lkups <- function(data_dir, versions) {

# --- END inclussion of CMD data.

refy_lkup <- refy_lkup[reporting_year %in% lineup_years$lineup_years, ]
refy_lkup <- refy_lkup[
reporting_year %in% lineup_years$lineup_years,
][
# Filter out rows with missing or zero population
!is.na(reporting_pop) & reporting_pop > 0
]

gv(
refy_lkup,
Expand Down
224 changes: 125 additions & 99 deletions R/fgt_cumsum.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,22 @@
#' @return List with elements: DT (data.table of all surveys, with id_rl) and g (GRP object for grouping by id_rl).
#' @keywords internal
format_lfst <- \(lfst, dict) {

DT <- rbindlist(lfst, fill = TRUE)

# Convert to factors (is it faster?)
if (!is.integer(DT$index)) {
DT[, index := as.integer(index)]
}

out <- encode_pairs(DT = DT,
dict = dict,
drop_labels = TRUE)
out <- encode_pairs(DT = DT, dict = dict, drop_labels = TRUE)

## Grouping ----------
g <- GRP(out, ~ id_rl, sort = FALSE)
g <- GRP(out, ~id_rl, sort = FALSE)

list(DT = out,
g = g)
list(DT = out, g = g)
}




#' Compute total population by survey and reporting level
#'
#' Sums the weights for each (id, reporting_level) group in the combined survey data.
Expand All @@ -43,11 +37,12 @@ format_lfst <- \(lfst, dict) {
get_total_pop <- \(LDTg) {
list2env(LDTg, envir = environment())
rm(LDTg)
add_vars(g[["groups"]],
get_vars(DT, c("weight")) |>
fsum(g)) |>
setnames(old = "weight",
new = "W")
add_vars(
g[["groups"]],
get_vars(DT, c("weight")) |>
fsum(g)
) |>
setnames(old = "weight", new = "W")
}

#' Compute FGT and Watts indices for all groups and poverty lines
Expand All @@ -60,67 +55,74 @@ get_total_pop <- \(LDTg) {
#' @param drop_vars Logical, if TRUE returns only summary columns.
#' @return data.table with FGT and Watts measures by group and poverty line.
#' @keywords internal
fgt_cumsum <- \(LDTg, tpop, povline,
drop_vars = TRUE) {
fgt_cumsum <- \(LDTg, tpop, povline, drop_vars = TRUE) {
list2env(LDTg, envir = environment())
rm(LDTg)

# Temporal values to be added to the data.table
tz <- pmax(povline, 1e-12)
tz2 <- pmax(povline^2, 1e-16)
tlogz <- log(tz)
tz <- pmax(povline, 1e-12)
tz2 <- pmax(povline^2, 1e-16)
tlogz <- log(tz)

# 1) Compute cutpoint index for each z, using ONLY non-zero rows for welfare
# -> findInterval(povline, welfare) returns values in 0..N (never N+1)
ID <- DT[index > 0L,
{
idx <- findInterval(povline, welfare, left.open = TRUE)
# 2) Attach z, z2, logz in-group (no replication/copies)
data.table(index = idx,
z = tz,
z2 = tz2,
logz = tlogz)
},
by = id_rl]
ID <- DT[
index > 0L,
{
idx <- findInterval(povline, welfare, left.open = TRUE)
# 2) Attach z, z2, logz in-group (no replication/copies)
data.table(index = idx, z = tz, z2 = tz2, logz = tlogz)
},
by = id_rl
]

# 3) Minimal cumulative view (shallow column subset; avoids copying DT)
DT_min <- get_vars(DT,
c("id_rl", "index", "cw", "cwy", "cwy2", "cwylog"))
DT_min <- get_vars(DT, c("id_rl", "index", "cw", "cwy", "cwy2", "cwylog"))

# 4) join cutpoints to cumulatives (index==0 hits the already-present zero row)
CS <- join(
x = ID,
y = DT_min,
on = c("id_rl","index"),
how = "left",
validate = "m:1", # many cutpoints -> 1 cumulative row
on = c("id_rl", "index"),
how = "left",
validate = "m:1", # many cutpoints -> 1 cumulative row
drop.dup.cols = "y",
verbose = 0) |>
# 5) Bring total population W
join(tpop,
on = "id_rl",
how = "left",
validate = "m:1",
drop.dup.cols = "y",
verbose = 0) |>
verbose = 0
) |>
# 5) Bring total population W
join(
tpop,
on = "id_rl",
how = "left",
validate = "m:1",
drop.dup.cols = "y",
verbose = 0
) |>
setorder(id_rl, index)


# 6) Compute measures (vectorized). Small clamps for numerical safety.
CS[, `:=`(
headcount = cw / W,
poverty_gap = (z * cw - cwy) / (z * W),
headcount = cw / W,
poverty_gap = (z * cw - cwy) / (z * W),
poverty_severity = (z2 * cw - 2 * z * cwy + cwy2) / (z2 * W),
watts = (logz * cw - cwylog) / W
watts = (logz * cw - cwylog) / W
)]

setnames(CS, "z", "povline")
if (!drop_vars) {
return(CS)
}
get_vars(CS, c("id_rl", "povline", "headcount",
"poverty_gap", "poverty_severity", "watts"))

get_vars(
CS,
c(
"id_rl",
"povline",
"headcount",
"poverty_gap",
"poverty_severity",
"watts"
)
)
}


Expand All @@ -140,25 +142,27 @@ fgt_cumsum <- \(LDTg, tpop, povline,
#' @return data.table with columns id, reporting_level, and code.
#' @keywords internal
build_pair_dict <- function(lkup, fill_gaps = TRUE) {

FT <- if (fill_gaps) {
lkup$refy_lkup[, .(country_code, reporting_year, reporting_level)]
} else {
lkup$svy_lkup[, .(country_code, reporting_year, reporting_level)]
} |>
funique()
{
lkup$svy_lkup[, .(country_code, reporting_year, reporting_level)]
} |>
funique()
}

FT[, id := paste0(country_code, "_", reporting_year)
][, c("country_code", "reporting_year") := NULL]
FT[, id := paste0(country_code, "_", reporting_year)][,
c("country_code", "reporting_year") := NULL
]

cols <- c("id", "reporting_level")
dict <- unique(FT[, ..cols])

# deterministic code order
setorderv(dict, cols, order = 1L) # radix by default
dict[, code := as.integer(.I)] # fast in DT
setkeyv(dict, cols) # fast key lookups when needed
setindexv(dict, "code") # index on code
setorderv(dict, cols, order = 1L) # radix by default
dict[, code := as.integer(.I)] # fast in DT
setkeyv(dict, cols) # fast key lookups when needed
setindexv(dict, "code") # index on code
dict
}

Expand All @@ -181,18 +185,20 @@ build_pair_dict <- function(lkup, fill_gaps = TRUE) {
#' @param verbose Integer, verbosity level.
#' @return data.table with code column added.
#' @keywords internal
encode_pairs <- function(DT, dict,
id_col = "id", level_col = "reporting_level",
code_col = "id_rl",
drop_labels = FALSE,
strict = TRUE,
verbose = 0L) {

encode_pairs <- function(
DT,
dict,
id_col = "id",
level_col = "reporting_level",
code_col = "id_rl",
drop_labels = FALSE,
strict = TRUE,
verbose = 0L
) {
stopifnot(is.data.table(DT), is.data.table(dict))
cols <- c(id_col, level_col)
stopifnot(all(cols %in% names(DT)), all(c(cols, "code") %in% names(dict)))


out <- join(
x = DT,
y = dict,
Expand All @@ -203,7 +209,9 @@ encode_pairs <- function(DT, dict,
verbose = verbose
)
# Ensure it's a data.table (join usually preserves)
if (!is.data.table(out)) setDT(out)
if (!is.data.table(out)) {
setDT(out)
}

# Rename 'code' -> code_col if needed
if (code_col != "code" && "code" %in% names(out)) {
Expand All @@ -223,7 +231,9 @@ encode_pairs <- function(DT, dict,
}
}

if (drop_labels) out[, (cols) := NULL]
if (drop_labels) {
out[, (cols) := NULL]
}
out
}

Expand All @@ -245,26 +255,29 @@ encode_pairs <- function(DT, dict,
#' @param verbose Integer, verbosity level.
#' @return data.table with id and reporting_level columns added.
#' @keywords internal
decode_pairs <- function(DT, dict,
code_col = "id_rl",
id_col = "id",
level_col = "reporting_level",
keep_code = FALSE,
add_true_vars = TRUE,
verbose = 0L) {
decode_pairs <- function(
DT,
dict,
code_col = "id_rl",
id_col = "id",
level_col = "reporting_level",
keep_code = FALSE,
add_true_vars = TRUE,
verbose = 0L
) {
stopifnot(exprs = {
is.data.table(DT)
is.data.table(dict)
})
})
stopifnot(exprs = {
code_col %in% names(DT)
all(c("code", id_col, level_col) %in% names(dict))
})
})

out <- join(
x = DT,
y = dict,
on = setNames("code", code_col), # map DT[code_col] to dict$code
on = setNames("code", code_col), # map DT[code_col] to dict$code
how = "left",
drop.dup.cols = "y",
validate = "m:1",
Expand All @@ -274,13 +287,16 @@ decode_pairs <- function(DT, dict,

if (add_true_vars) {
out[, `:=`(
country_code = gsub("(.+)(_.+)", "\\1", id),
reporting_year = as.integer(gsub("(.+_)(.+)", "\\2", id))
)][,
id := NULL]
country_code = gsub("(.+)(_.+)", "\\1", id),
reporting_year = as.integer(gsub("(.+_)(.+)", "\\2", id))
)][,
id := NULL
]
}

if (!keep_code) out[, (code_col) := NULL]
if (!keep_code) {
out[, (code_col) := NULL]
}
out
}

Expand All @@ -298,8 +314,12 @@ decode_pairs <- function(DT, dict,
#' @param level_col Name of reporting level column.
#' @return Updated data.table dictionary.
#' @keywords internal
update_pair_dict <- function(dict, DT,
id_col = "id", level_col = "reporting_level") {
update_pair_dict <- function(
dict,
DT,
id_col = "id",
level_col = "reporting_level"
) {
stopifnot(is.data.table(dict), is.data.table(DT))
cols <- c(id_col, level_col)
stopifnot(all(c(cols, "code") %in% names(dict)), all(cols %in% names(DT)))
Expand All @@ -317,7 +337,6 @@ update_pair_dict <- function(dict, DT,
}



#' Load survey data from file list
#'
#' Reads a list of survey files (e.g., .fst) and returns a named list of data.tables, each with an id column.
Expand All @@ -326,8 +345,7 @@ update_pair_dict <- function(dict, DT,
#' @param input_list Character vector of file paths (from create_full_list()).
#' @return Named list of data.tables, each with an id column.
#' @keywords internal
load_list_refy <- \(input_list){

load_list_refy <- \(input_list) {
id_names <- input_list |>
fs::path_file() |>
fs::path_ext_remove()
Expand All @@ -338,17 +356,25 @@ load_list_refy <- \(input_list){
base::seq_along
}


lfst <- lapply(seq_flex(input_list),
\(i) {
x <- input_list[i]
idn <- fs::path_file(x) |>
fs::path_ext_remove()
fst::read_fst(x, as.data.table = TRUE) |>
_[, id := idn]
}) |>
lfst <- lapply(seq_flex(input_list), \(i) {
x <- input_list[i]
idn <- fs::path_file(x) |>
fs::path_ext_remove()
tryCatch(
fst::read_fst(x, as.data.table = TRUE) |>
_[, id := idn],
error = \(e) {
cli::cli_abort(
c(
"Failed to read file at index {.val {i}}: {.file {x}}",
"x" = conditionMessage(e)
),
parent = e
)
}
)
}) |>
setNames(id_names)

lfst
}

Loading