From 2d228f95aa64b7fb88bb6acc64153af90b136656 Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Fri, 19 Jun 2020 15:26:51 -0300 Subject: [PATCH 01/31] Atualizando gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 99e2cea..d0f623c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ /Manifest.toml /dev/ /docs/build/ -/docs/site/ +/docs/site/ \ No newline at end of file From 68d1500eb95bdabcc6452acebf0f143a57266f28 Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Fri, 19 Jun 2020 15:27:25 -0300 Subject: [PATCH 02/31] Adicionando dependencias --- Project.toml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Project.toml b/Project.toml index cdc0ad9..885a0af 100644 --- a/Project.toml +++ b/Project.toml @@ -3,6 +3,13 @@ uuid = "ebb30991-6a3b-4324-962c-6bc29053301c" authors = ["Pedro Conrado"] version = "0.1.0" +[deps] +CSVFiles = "5d742f6a-9f54-50ce-8119-2520741973ca" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +MLJ = "add582a8-e3ab-11e8-2d5e-e98b27df1bc7" +RDatasets = "ce6b1742-4840-55fa-b093-852dadbb1d8b" +Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" + [compat] julia = "0.1" From 40e5102317fb49c49e01e193c139c37d54521c06 Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Fri, 19 Jun 2020 15:28:25 -0300 Subject: [PATCH 03/31] Adicionando primeiro prototipo --- src/EasyStream.jl | 6 +- src/datatypes/Stream.jl | 105 +++++++++++++++++++++++++++++++++ src/datatypes/StreamFeeder.jl | 108 ++++++++++++++++++++++++++++++++++ 3 files changed, 217 insertions(+), 2 deletions(-) create mode 100644 src/datatypes/Stream.jl create mode 100644 src/datatypes/StreamFeeder.jl diff --git a/src/EasyStream.jl b/src/EasyStream.jl index 80568f2..e9663d3 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -1,5 +1,7 @@ +using Revise module EasyStream + using CSVFiles, DataFrames -greet() = print("Hello World!") - + include("./datatypes/StreamFeeder.jl") + include("./datatypes/Stream.jl") end # module diff --git a/src/datatypes/Stream.jl b/src/datatypes/Stream.jl new file mode 100644 index 0000000..b3b578a --- /dev/null +++ b/src/datatypes/Stream.jl @@ -0,0 +1,105 @@ +mutable struct Stream + samples + initial_sample_size::Int64 + total_samples::Int64 + labels::Int64 + pointer::Int64 + fluxDensity::Int64 + endInstance::Bool + + function Stream() + newStream = new() + newStream.pointer = 0 + newStream.samples = [] + return newStream + end + + function Stream(initial_sample_size, total_samples, fluxDensity) + newStream = Stream() + newStream.initial_sample_size = initial_sample_size + newStream.total_samples = total_samples + newStream.fluxDensity = fluxDensity + return newStream + end +end + +function Stream(st::StreamFeeder) + stream = Stream(st.initial_sample_size, st.total_samples, st.fluxDensity) + push!(stream.samples, initialsample!(st)) + sample = next!(st) + while sample != -1 + push!(stream.samples, sample) + sample = next!(st) + end + return stream +end + +function next(stream::Stream) + stream.pointer += 1 +end + +## Until functions + +function Base.length(stream::Stream) + return size(stream.samples[stream.pointer + 1], 1) +end + +function Base.lastindex(stream::Stream) + return length(stream) +end + +function Base.lastindex(stream::Stream, d::Int64) + if(d==1) + stream.endInstance = true + return -1 + elseif(d==2) + return length(stream[1]) + elseif(d==3) + return length(stream.samples) + end +end + +### Exploring the three dimensions +function Base.getindex(stream::Stream, instance::Int64, feature::Int64, sample::Int64) + if stream.endInstance == true && instance == -1 + instance = size(stream.samples[sample], 1) + stream.endInstance == false + end + return stream.samples[sample][instance, feature] +end + +function Base.getindex(stream::Stream, instance::Int64, c::Colon, sample::Int64) + if stream.endInstance == true && instance == -1 + instance = size(stream.samples[sample], 1) + stream.endInstance == false + end + return stream.samples[sample][instance, :] +end + +function Base.getindex(stream::Stream, c::Colon, feature::Int64, sample::Int64) + return stream.samples[sample][:, feature] +end + +function Base.getindex(stream::Stream, c::Colon, c1::Colon, sample::Int64) + return stream.samples[sample] +end + +### Exploring two dimensions + +function Base.getindex(stream::Stream, instance::Int64, feature::Int64) + return stream[instance, feature, stream.pointer + 1] +end + +function Base.getindex(stream::EasyStream.Stream, instance::Int64, c::Colon) + return stream[instance, :, stream.pointer + 1] +end + +### Exploring the one dimension + +function Base.getindex(stream::Stream, instance::Int64) + return stream[instance, :, stream.pointer + 1] +end + +function Base.getindex(stream::Stream, c::Colon) + return stream[:,:, stream.pointer + 1] +end diff --git a/src/datatypes/StreamFeeder.jl b/src/datatypes/StreamFeeder.jl new file mode 100644 index 0000000..6cba322 --- /dev/null +++ b/src/datatypes/StreamFeeder.jl @@ -0,0 +1,108 @@ +abstract type StreamFeeder end + +mutable struct FileStream <: StreamFeeder + initial_sample_size::Int64 + total_samples::Int64 + labels::Int64 + delimiter::String + ioStream + pointer::Int64 + fluxDensity::Int64 + + function FileStream() + return new() + end +end + +function FileStream(path::String; header = false, initial_sample_size = 150, fluxDensity = 1, delimiter = ",") + fileStream = FileStream() + fileStream.initial_sample_size = initial_sample_size + fileStream.pointer = 0 + fileStream.ioStream = open(path) + fileStream.delimiter = delimiter + fileStream.fluxDensity = fluxDensity + fileStream.total_samples = size(fileStream.samples, 1) + + return fileStream +end + +function readStream(stream::FileStream) + instance = split(readline(stream.ioStream), stream.delimiter) + if instance == [""] + return -1 + end + instance = convert(Array{Any, 1}, instance) + for i = 1:length(instance) + if '.' in instance[i] + x = tryparse(Float64, instance[i]) + if x != nothing + instance[i] = x + else + instance[i] = instance[i] + end + else + x = tryparse(Int64, instance[i]) + if x != nothing + instance[i] = x + else + instance[i] = instance[i] + end + end + end + return permutedims(instance) +end + +function next(stream::StreamFeeder) + old_position = position(stream.ioStream) + instance = readStream(stream) + seek(stream.ioStream, old_position) + return instance + #= + if stream.pointer == stream.total_samples + return -1 + end + sample = stream.samples[stream.pointer + 1:stream.pointer + stream.fluxDensity, :] + return sample + =# + +end + +function next!(stream::StreamFeeder) + return readStream(stream) + #= + if stream.pointer == stream.total_samples + return -1 + end + sample = stream.samples[stream.pointer + 1:stream.pointer + stream.fluxDensity, :] + stream.pointer += stream.fluxDensity + return sample + =# +end + +function rewind(stream::StreamFeeder) + seekstart(stream.ioStream) +end + +function initialsample(stream::StreamFeeder) + old_position = position(stream.ioStream) + rewind(stream) + sample = next!(stream) + for _ in 2:stream.initial_sample_size + sample = vcat(sample, next!(stream)) + end + seek(stream.ioStream, old_position) + return sample +end + +function initialsample!(stream::StreamFeeder) + rewind(stream) + sample = next!(stream) + for _ in 1:stream.initial_sample_size + sample = vcat(sample, next!(stream)) + end + return sample + #= + stream.pointer = stream.initial_sample_size + return stream.samples[1:stream.initial_sample_size, :] + =# +end From c8de19212637e11ec6162c7f587dff656789a2c9 Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 16:13:47 -0300 Subject: [PATCH 04/31] atualizando gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index d0f623c..1f65508 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ /Manifest.toml /dev/ /docs/build/ -/docs/site/ \ No newline at end of file +/docs/site/ +datasets/ From fdf41198ba1ccbf69aeb10c950bb96dfc57bfd0c Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 16:14:00 -0300 Subject: [PATCH 05/31] removendo codigo antigo --- src/datatypes/Stream.jl | 105 --------------------------------- src/datatypes/StreamFeeder.jl | 108 ---------------------------------- 2 files changed, 213 deletions(-) delete mode 100644 src/datatypes/Stream.jl delete mode 100644 src/datatypes/StreamFeeder.jl diff --git a/src/datatypes/Stream.jl b/src/datatypes/Stream.jl deleted file mode 100644 index b3b578a..0000000 --- a/src/datatypes/Stream.jl +++ /dev/null @@ -1,105 +0,0 @@ -mutable struct Stream - samples - initial_sample_size::Int64 - total_samples::Int64 - labels::Int64 - pointer::Int64 - fluxDensity::Int64 - endInstance::Bool - - function Stream() - newStream = new() - newStream.pointer = 0 - newStream.samples = [] - return newStream - end - - function Stream(initial_sample_size, total_samples, fluxDensity) - newStream = Stream() - newStream.initial_sample_size = initial_sample_size - newStream.total_samples = total_samples - newStream.fluxDensity = fluxDensity - return newStream - end -end - -function Stream(st::StreamFeeder) - stream = Stream(st.initial_sample_size, st.total_samples, st.fluxDensity) - push!(stream.samples, initialsample!(st)) - sample = next!(st) - while sample != -1 - push!(stream.samples, sample) - sample = next!(st) - end - return stream -end - -function next(stream::Stream) - stream.pointer += 1 -end - -## Until functions - -function Base.length(stream::Stream) - return size(stream.samples[stream.pointer + 1], 1) -end - -function Base.lastindex(stream::Stream) - return length(stream) -end - -function Base.lastindex(stream::Stream, d::Int64) - if(d==1) - stream.endInstance = true - return -1 - elseif(d==2) - return length(stream[1]) - elseif(d==3) - return length(stream.samples) - end -end - -### Exploring the three dimensions -function Base.getindex(stream::Stream, instance::Int64, feature::Int64, sample::Int64) - if stream.endInstance == true && instance == -1 - instance = size(stream.samples[sample], 1) - stream.endInstance == false - end - return stream.samples[sample][instance, feature] -end - -function Base.getindex(stream::Stream, instance::Int64, c::Colon, sample::Int64) - if stream.endInstance == true && instance == -1 - instance = size(stream.samples[sample], 1) - stream.endInstance == false - end - return stream.samples[sample][instance, :] -end - -function Base.getindex(stream::Stream, c::Colon, feature::Int64, sample::Int64) - return stream.samples[sample][:, feature] -end - -function Base.getindex(stream::Stream, c::Colon, c1::Colon, sample::Int64) - return stream.samples[sample] -end - -### Exploring two dimensions - -function Base.getindex(stream::Stream, instance::Int64, feature::Int64) - return stream[instance, feature, stream.pointer + 1] -end - -function Base.getindex(stream::EasyStream.Stream, instance::Int64, c::Colon) - return stream[instance, :, stream.pointer + 1] -end - -### Exploring the one dimension - -function Base.getindex(stream::Stream, instance::Int64) - return stream[instance, :, stream.pointer + 1] -end - -function Base.getindex(stream::Stream, c::Colon) - return stream[:,:, stream.pointer + 1] -end diff --git a/src/datatypes/StreamFeeder.jl b/src/datatypes/StreamFeeder.jl deleted file mode 100644 index 6cba322..0000000 --- a/src/datatypes/StreamFeeder.jl +++ /dev/null @@ -1,108 +0,0 @@ -abstract type StreamFeeder end - -mutable struct FileStream <: StreamFeeder - initial_sample_size::Int64 - total_samples::Int64 - labels::Int64 - delimiter::String - ioStream - pointer::Int64 - fluxDensity::Int64 - - function FileStream() - return new() - end -end - -function FileStream(path::String; header = false, initial_sample_size = 150, fluxDensity = 1, delimiter = ",") - fileStream = FileStream() - fileStream.initial_sample_size = initial_sample_size - fileStream.pointer = 0 - fileStream.ioStream = open(path) - fileStream.delimiter = delimiter - fileStream.fluxDensity = fluxDensity - fileStream.total_samples = size(fileStream.samples, 1) - - return fileStream -end - -function readStream(stream::FileStream) - instance = split(readline(stream.ioStream), stream.delimiter) - if instance == [""] - return -1 - end - instance = convert(Array{Any, 1}, instance) - for i = 1:length(instance) - if '.' in instance[i] - x = tryparse(Float64, instance[i]) - if x != nothing - instance[i] = x - else - instance[i] = instance[i] - end - else - x = tryparse(Int64, instance[i]) - if x != nothing - instance[i] = x - else - instance[i] = instance[i] - end - end - end - return permutedims(instance) -end - -function next(stream::StreamFeeder) - old_position = position(stream.ioStream) - instance = readStream(stream) - seek(stream.ioStream, old_position) - return instance - #= - if stream.pointer == stream.total_samples - return -1 - end - sample = stream.samples[stream.pointer + 1:stream.pointer + stream.fluxDensity, :] - return sample - =# - -end - -function next!(stream::StreamFeeder) - return readStream(stream) - #= - if stream.pointer == stream.total_samples - return -1 - end - sample = stream.samples[stream.pointer + 1:stream.pointer + stream.fluxDensity, :] - stream.pointer += stream.fluxDensity - return sample - =# -end - -function rewind(stream::StreamFeeder) - seekstart(stream.ioStream) -end - -function initialsample(stream::StreamFeeder) - old_position = position(stream.ioStream) - rewind(stream) - sample = next!(stream) - for _ in 2:stream.initial_sample_size - sample = vcat(sample, next!(stream)) - end - seek(stream.ioStream, old_position) - return sample -end - -function initialsample!(stream::StreamFeeder) - rewind(stream) - sample = next!(stream) - for _ in 1:stream.initial_sample_size - sample = vcat(sample, next!(stream)) - end - return sample - #= - stream.pointer = stream.initial_sample_size - return stream.samples[1:stream.initial_sample_size, :] - =# -end From 04d2076ce80673da4a68857f335fb45199fcc08c Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 16:14:21 -0300 Subject: [PATCH 06/31] adicionando primeiro iniciativa de buffer --- Project.toml | 3 ++- src/EasyStream.jl | 7 ++----- src/buffer.jl | 11 +++++++++++ src/stream.jl | 0 4 files changed, 15 insertions(+), 6 deletions(-) create mode 100644 src/buffer.jl create mode 100644 src/stream.jl diff --git a/Project.toml b/Project.toml index 885a0af..9d144ea 100644 --- a/Project.toml +++ b/Project.toml @@ -4,6 +4,7 @@ authors = ["Pedro Conrado"] version = "0.1.0" [deps] +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" CSVFiles = "5d742f6a-9f54-50ce-8119-2520741973ca" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" MLJ = "add582a8-e3ab-11e8-2d5e-e98b27df1bc7" @@ -11,7 +12,7 @@ RDatasets = "ce6b1742-4840-55fa-b093-852dadbb1d8b" Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" [compat] -julia = "0.1" +julia = "1.0" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/src/EasyStream.jl b/src/EasyStream.jl index e9663d3..83238d4 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -1,7 +1,4 @@ -using Revise module EasyStream - using CSVFiles, DataFrames - - include("./datatypes/StreamFeeder.jl") - include("./datatypes/Stream.jl") + include("src/buffer.jl") + include("src/stream.jl") end # module diff --git a/src/buffer.jl b/src/buffer.jl new file mode 100644 index 0000000..5195518 --- /dev/null +++ b/src/buffer.jl @@ -0,0 +1,11 @@ +using CSV +using DataFrames + +abstract type Buffer end + +struct MemoryBuffer <: Buffer + data::DataFrame +end + +MemoryBuffer(path::String) = MemoryBuffer(CSV.read(path)) +Dataset1CDT() = MemoryBuffer("datasets/1CDT.csv") diff --git a/src/stream.jl b/src/stream.jl new file mode 100644 index 0000000..e69de29 From 2bb7e4ade58bc02cfd7fc459f6ac4cbe80de7fe4 Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 17:01:47 -0300 Subject: [PATCH 07/31] primeiro prototipo de buffer e stream com um indice --- src/EasyStream.jl | 6 ++++-- src/buffer.jl | 29 +++++++++++++++++++++++++---- src/stream.jl | 17 +++++++++++++++++ 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/src/EasyStream.jl b/src/EasyStream.jl index 83238d4..f4fa36a 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -1,4 +1,6 @@ module EasyStream - include("src/buffer.jl") - include("src/stream.jl") + using DataFrames + + include("buffer.jl") + include("stream.jl") end # module diff --git a/src/buffer.jl b/src/buffer.jl index 5195518..c3589c0 100644 --- a/src/buffer.jl +++ b/src/buffer.jl @@ -1,11 +1,32 @@ using CSV -using DataFrames abstract type Buffer end -struct MemoryBuffer <: Buffer +mutable struct MemoryBuffer <: Buffer data::DataFrame + position::Int + initial_size::Int + flux_size::Int end -MemoryBuffer(path::String) = MemoryBuffer(CSV.read(path)) -Dataset1CDT() = MemoryBuffer("datasets/1CDT.csv") +MemoryBuffer(path::String, initial_size::Int, flux_size::Int) = MemoryBuffer(CSV.read(path), 0, initial_size, flux_size) + +Dataset1CDT() = Dataset1CDT(150, 1) +Dataset1CDT(initial_size::Int, flux_size::Int) = MemoryBuffer("datasets/1CDT.csv", initial_size, flux_size) + +next!(buffer::Buffer) = nothing + +function next!(buffer::MemoryBuffer) + if(buffer.position >= size(buffer.data, 1)) + return nothing + end + + if(buffer.position < buffer.initial_size) + buffer.position = buffer.initial_size + return buffer.data[1:buffer.initial_size, :] + else + index = (buffer.position + 1):(buffer.position + buffer.flux_size) + buffer.position = buffer.position + buffer.flux_size + return buffer.data[index, :] + end +end diff --git a/src/stream.jl b/src/stream.jl index e69de29..9bd0ae2 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -0,0 +1,17 @@ +struct Stream + buffer::Buffer + data::Vector{DataFrame} +end + +function Stream(buffer::Buffer) + data = Vector{DataFrame}() + push!(data, next!(buffer)) + + return Stream(buffer, data) +end + +Base.getindex(stream::Stream, sample::Int64) = stream.data[length(stream.data)][sample, :] + +function next!(stream::Stream) + push!(stream.data, next!(stream.buffer)) +end From 159568a6ac43bf95a3f1d3ae4210c34799371639 Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 17:28:26 -0300 Subject: [PATCH 08/31] adicionado uns testes de limite --- src/buffer.jl | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/buffer.jl b/src/buffer.jl index c3589c0..6c4764b 100644 --- a/src/buffer.jl +++ b/src/buffer.jl @@ -9,7 +9,24 @@ mutable struct MemoryBuffer <: Buffer flux_size::Int end -MemoryBuffer(path::String, initial_size::Int, flux_size::Int) = MemoryBuffer(CSV.read(path), 0, initial_size, flux_size) +function MemoryBuffer(path::String, initial_size::Int, flux_size::Int) + data = CSV.read(path; header = false) + + if(initial_size > size(data, 1)) + initial_size = size(data, 1) + @warn "initial size é maior que o arquivo e será definido para o tamanho do arquivo" + end + + if initial_size == 0 + @warn "initial size é zero" + end + + if flux_size == 0 + @warn "flux size é zero" + end + + return MemoryBuffer(data, 0, initial_size, flux_size) +end Dataset1CDT() = Dataset1CDT(150, 1) Dataset1CDT(initial_size::Int, flux_size::Int) = MemoryBuffer("datasets/1CDT.csv", initial_size, flux_size) From 82f9a771715fa36afc16b9ef63cc057ef346632d Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 17:28:33 -0300 Subject: [PATCH 09/31] adicioando teste para buffer --- test/runtests.jl | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index c7de039..c699c03 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,6 +1,32 @@ using EasyStream using Test -@testset "EasyStream.jl" begin - # Write your own tests here. +@testset "Buffer Test" begin + buffer = EasyStream.Dataset1CDT() + @test size(EasyStream.next!(buffer), 1) == 150 + @test size(EasyStream.next!(buffer), 1) == 1 + @test size(EasyStream.next!(buffer), 1) == 1 + + initial_size = 200 + flux_size = 5 + buffer = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(buffer), 1) == initial_size + @test size(EasyStream.next!(buffer), 1) == flux_size + @test size(EasyStream.next!(buffer), 1) == flux_size + + initial_size = 16000 + flux_size = 1 + buffer = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(buffer), 1) == initial_size + @test EasyStream.next!(buffer) == nothing + + initial_size = 16001 + flux_size = 1 + buffer = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(buffer), 1) == initial_size - 1 + @test EasyStream.next!(buffer) == nothing + + + @test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) + @test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) end From 5976579e1ef997cf16abdc374a599aabe6a27c60 Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 17:36:10 -0300 Subject: [PATCH 10/31] refatorando --- src/buffer.jl | 7 ++++--- src/stream.jl | 1 + test/runtests.jl | 1 - 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/buffer.jl b/src/buffer.jl index 6c4764b..8a4ef38 100644 --- a/src/buffer.jl +++ b/src/buffer.jl @@ -10,9 +10,10 @@ mutable struct MemoryBuffer <: Buffer end function MemoryBuffer(path::String, initial_size::Int, flux_size::Int) + #TODO: Buffer está sabendo a memoria e se fosse api? data = CSV.read(path; header = false) - if(initial_size > size(data, 1)) + if initial_size > size(data, 1) initial_size = size(data, 1) @warn "initial size é maior que o arquivo e será definido para o tamanho do arquivo" end @@ -34,11 +35,11 @@ Dataset1CDT(initial_size::Int, flux_size::Int) = MemoryBuffer("datasets/1CDT.csv next!(buffer::Buffer) = nothing function next!(buffer::MemoryBuffer) - if(buffer.position >= size(buffer.data, 1)) + if buffer.position >= size(buffer.data, 1) return nothing end - if(buffer.position < buffer.initial_size) + if buffer.position < buffer.initial_size buffer.position = buffer.initial_size return buffer.data[1:buffer.initial_size, :] else diff --git a/src/stream.jl b/src/stream.jl index 9bd0ae2..9fa3556 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -10,6 +10,7 @@ function Stream(buffer::Buffer) return Stream(buffer, data) end +#TODO: Casos limites? Base.getindex(stream::Stream, sample::Int64) = stream.data[length(stream.data)][sample, :] function next!(stream::Stream) diff --git a/test/runtests.jl b/test/runtests.jl index c699c03..b311467 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -26,7 +26,6 @@ using Test @test size(EasyStream.next!(buffer), 1) == initial_size - 1 @test EasyStream.next!(buffer) == nothing - @test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) @test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) end From 72f6d3aff3a20cdb28e0df5c8d144ab05ffcec9a Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 17:43:53 -0300 Subject: [PATCH 11/31] reorganizando --- src/buffer.jl | 10 +--------- src/datasets.jl | 8 ++++++++ test/playgroud.jl | 3 +++ 3 files changed, 12 insertions(+), 9 deletions(-) create mode 100644 src/datasets.jl create mode 100644 test/playgroud.jl diff --git a/src/buffer.jl b/src/buffer.jl index 8a4ef38..13750ad 100644 --- a/src/buffer.jl +++ b/src/buffer.jl @@ -1,5 +1,3 @@ -using CSV - abstract type Buffer end mutable struct MemoryBuffer <: Buffer @@ -9,10 +7,7 @@ mutable struct MemoryBuffer <: Buffer flux_size::Int end -function MemoryBuffer(path::String, initial_size::Int, flux_size::Int) - #TODO: Buffer está sabendo a memoria e se fosse api? - data = CSV.read(path; header = false) - +function MemoryBuffer(data::DataFrame, initial_size::Int, flux_size::Int) if initial_size > size(data, 1) initial_size = size(data, 1) @warn "initial size é maior que o arquivo e será definido para o tamanho do arquivo" @@ -29,9 +24,6 @@ function MemoryBuffer(path::String, initial_size::Int, flux_size::Int) return MemoryBuffer(data, 0, initial_size, flux_size) end -Dataset1CDT() = Dataset1CDT(150, 1) -Dataset1CDT(initial_size::Int, flux_size::Int) = MemoryBuffer("datasets/1CDT.csv", initial_size, flux_size) - next!(buffer::Buffer) = nothing function next!(buffer::MemoryBuffer) diff --git a/src/datasets.jl b/src/datasets.jl new file mode 100644 index 0000000..6d12248 --- /dev/null +++ b/src/datasets.jl @@ -0,0 +1,8 @@ +using CSV + +Dataset1CDT() = Dataset1CDT(150, 1) + +function Dataset1CDT(initial_size::Int, flux_size::Int) + data = CSV.read(path; header = false) + return MemoryBuffer(data, initial_size, flux_size) +end diff --git a/test/playgroud.jl b/test/playgroud.jl new file mode 100644 index 0000000..b8f563f --- /dev/null +++ b/test/playgroud.jl @@ -0,0 +1,3 @@ +using Pkg +Pkg.activate(".") +using Revise #TODO: remover dps From 4219809173f28cdcd7d771a35d7c14ed427c3935 Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 17:44:26 -0300 Subject: [PATCH 12/31] adicionado o datasets no modeulo --- src/EasyStream.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/EasyStream.jl b/src/EasyStream.jl index f4fa36a..217bf31 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -3,4 +3,5 @@ module EasyStream include("buffer.jl") include("stream.jl") + include("datasets.jl") end # module From 6f21bda6cfaede5d984b52d7ec35802c811911be Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 17:45:06 -0300 Subject: [PATCH 13/31] removendo arquivo desnecessario --- test/playgroud.jl | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 test/playgroud.jl diff --git a/test/playgroud.jl b/test/playgroud.jl deleted file mode 100644 index b8f563f..0000000 --- a/test/playgroud.jl +++ /dev/null @@ -1,3 +0,0 @@ -using Pkg -Pkg.activate(".") -using Revise #TODO: remover dps From 365b02458ccfed305b4a5301da1a2b145aaee152 Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 18:28:08 -0300 Subject: [PATCH 14/31] corrigindo bug --- src/datasets.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datasets.jl b/src/datasets.jl index 6d12248..b847b81 100644 --- a/src/datasets.jl +++ b/src/datasets.jl @@ -3,6 +3,6 @@ using CSV Dataset1CDT() = Dataset1CDT(150, 1) function Dataset1CDT(initial_size::Int, flux_size::Int) - data = CSV.read(path; header = false) + data = CSV.read("datasets/1CDT.csv"; header = false) return MemoryBuffer(data, initial_size, flux_size) end From 08ca69927ab26f205de8e41057218a8d13bc074e Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 18:28:22 -0300 Subject: [PATCH 15/31] refactoring do metodo next --- src/buffer.jl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/buffer.jl b/src/buffer.jl index 13750ad..fac2aa9 100644 --- a/src/buffer.jl +++ b/src/buffer.jl @@ -33,10 +33,11 @@ function next!(buffer::MemoryBuffer) if buffer.position < buffer.initial_size buffer.position = buffer.initial_size - return buffer.data[1:buffer.initial_size, :] + index = 1:buffer.initial_size else index = (buffer.position + 1):(buffer.position + buffer.flux_size) buffer.position = buffer.position + buffer.flux_size - return buffer.data[index, :] end + + return buffer.data[index, :] end From 5dbfaa090d191205503d0a8304f70ebb7e113dfb Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Fri, 19 Jun 2020 18:29:50 -0300 Subject: [PATCH 16/31] renomeando a variavel --- test/runtests.jl | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index b311467..d5f4f2b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,30 +1,30 @@ using EasyStream using Test -@testset "Buffer Test" begin - buffer = EasyStream.Dataset1CDT() - @test size(EasyStream.next!(buffer), 1) == 150 - @test size(EasyStream.next!(buffer), 1) == 1 - @test size(EasyStream.next!(buffer), 1) == 1 +@testset "Stream Test" begin + stream = EasyStream.Dataset1CDT() + @test size(EasyStream.next!(stream), 1) == 150 + @test size(EasyStream.next!(stream), 1) == 1 + @test size(EasyStream.next!(stream), 1) == 1 initial_size = 200 flux_size = 5 - buffer = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(EasyStream.next!(buffer), 1) == initial_size - @test size(EasyStream.next!(buffer), 1) == flux_size - @test size(EasyStream.next!(buffer), 1) == flux_size + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(stream), 1) == initial_size + @test size(EasyStream.next!(stream), 1) == flux_size + @test size(EasyStream.next!(stream), 1) == flux_size initial_size = 16000 flux_size = 1 - buffer = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(EasyStream.next!(buffer), 1) == initial_size - @test EasyStream.next!(buffer) == nothing + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(stream), 1) == initial_size + @test EasyStream.next!(stream) == nothing initial_size = 16001 flux_size = 1 - buffer = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(EasyStream.next!(buffer), 1) == initial_size - 1 - @test EasyStream.next!(buffer) == nothing + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(stream), 1) == initial_size - 1 + @test EasyStream.next!(stream) == nothing @test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) @test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) From a10c6745dd465a6a63a20bc21c7a143ccabe176d Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Mon, 22 Jun 2020 14:48:19 -0300 Subject: [PATCH 17/31] Criando sistema de download para os datastreams --- src/EasyStream.jl | 4 ++- src/datasets.jl | 8 ------ src/datastreams.jl | 65 ++++++++++++++++++++++++++++++++++++++++++++++ test/runtests.jl | 12 ++++++++- 4 files changed, 79 insertions(+), 10 deletions(-) delete mode 100644 src/datasets.jl create mode 100644 src/datastreams.jl diff --git a/src/EasyStream.jl b/src/EasyStream.jl index 217bf31..e5a3bee 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -3,5 +3,7 @@ module EasyStream include("buffer.jl") include("stream.jl") - include("datasets.jl") + include("datastreams.jl") + + using .DataStreams end # module diff --git a/src/datasets.jl b/src/datasets.jl deleted file mode 100644 index b847b81..0000000 --- a/src/datasets.jl +++ /dev/null @@ -1,8 +0,0 @@ -using CSV - -Dataset1CDT() = Dataset1CDT(150, 1) - -function Dataset1CDT(initial_size::Int, flux_size::Int) - data = CSV.read("datasets/1CDT.csv"; header = false) - return MemoryBuffer(data, initial_size, flux_size) -end diff --git a/src/datastreams.jl b/src/datastreams.jl new file mode 100644 index 0000000..ee48140 --- /dev/null +++ b/src/datastreams.jl @@ -0,0 +1,65 @@ +module DataStreams + using CSV, EasyStream + + export Dataset1CDT, DatasetUG_2C_5D + + tmp_path = tempdir() + directory_name = "EasyDataStream" + local_path = tmp_path * '/' * directory_name + + mk_tmp_dir() = mkdir(directory_name) + + function check(name) + tmp_directories = readdir(tmp_path) + if directory_name in tmp_directories + downloaded_datastreams = readdir(local_path) + + if(name in downloaded_datastreams) + return 1 + else + return 0 + end + else + cd(mk_tmp_dir, tmp_path) + return 0 + end + end + + function download(url, name, path) + current_path = pwd() + cd(path) + try + Base.download(url, name) + catch + error("Erro durante o download do DataStream") + cd(current_path) + return 0 + end + cd(current_path) + return 1 + end + + download(url, name) = download(url, name, local_path) + + #TODO: O retorno dessa função não pode ser o dataStream já na memoria, pois haverá buffers que irão querer somente o path + function get_datastream(url, name) + if check(name) == 1 + return CSV.read(local_path * '/' * name; header = false) + else + download(url, name) + return CSV.read(local_path * '/' * name; header = false) + end + end + + Dataset1CDT() = Dataset1CDT(150, 1) + function Dataset1CDT(initial_size::Int, flux_size::Int) + data = get_datastream("https://raw.githubusercontent.com/Conradox/datastreams/master/sinthetic/1CDT.csv", "1CDT.csv") + return EasyStream.MemoryBuffer(data, initial_size, flux_size) + end + + function DatasetUG_2C_5D(initial_size::Int, flux_size::Int) + data = get_datastream("https://raw.githubusercontent.com/Conradox/datastreams/master/sinthetic/UG_2C_5D.csv", "UG_2C_5D.csv") + return EasyStream.MemoryBuffer(data, initial_size, flux_size) + end + DatasetUG_2C_5D() = DatasetUG_2C_5D(150, 1) +end \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index d5f4f2b..835f4c6 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,5 +1,15 @@ -using EasyStream using Test +using EasyStream + + +@testset "Module DataStreams" begin + test_file = "moduletest.csv" + @test EasyStream.DataStreams.check(test_file) == 0 + @test EasyStream.DataStreams.download("https://github.com/Conradox/datastreams/blob/master/sinthetic/moduletest.csv", "moduletest.csv") == 1 + @test EasyStream.DataStreams.check(test_file) == 1 + rm(EasyStream.DataStreams.DataStreams.local_path * '/' * test_file) +end + @testset "Stream Test" begin stream = EasyStream.Dataset1CDT() From e42c4518c4790e964e93e3e98a9b5f11712a944f Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Mon, 22 Jun 2020 15:07:00 -0300 Subject: [PATCH 18/31] Adicionando throw para o getindex do stream --- src/stream.jl | 12 +++++++++++- test/runtests.jl | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/stream.jl b/src/stream.jl index 9fa3556..32ca32e 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -11,7 +11,17 @@ function Stream(buffer::Buffer) end #TODO: Casos limites? -Base.getindex(stream::Stream, sample::Int64) = stream.data[length(stream.data)][sample, :] +function Base.getindex(stream::Stream, instance::Int64) + if instance <= size(stream.data[length(stream.data)], 1) + return stream.data[length(stream.data)][instance, :] + else + throw("attempt to access a sample with $(size(stream.data[length(stream.data)], 1)) elements at index [$instance]") + end +end + +function Base.getindex(stream::Stream, c::Colon) + return stream.data[length(stream.data)][:, :] +end function next!(stream::Stream) push!(stream.data, next!(stream.buffer)) diff --git a/test/runtests.jl b/test/runtests.jl index 835f4c6..effcecc 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -7,7 +7,7 @@ using EasyStream @test EasyStream.DataStreams.check(test_file) == 0 @test EasyStream.DataStreams.download("https://github.com/Conradox/datastreams/blob/master/sinthetic/moduletest.csv", "moduletest.csv") == 1 @test EasyStream.DataStreams.check(test_file) == 1 - rm(EasyStream.DataStreams.DataStreams.local_path * '/' * test_file) + rm(EasyStream.DataStreams.local_path * '/' * test_file) end From aef07a2336fb48fe335d25385df6d9be7f3e3b3c Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Tue, 30 Jun 2020 16:07:37 -0300 Subject: [PATCH 19/31] =?UTF-8?q?mudan=C3=A7a=20nas=20variaveis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/datastreams.jl | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/datastreams.jl b/src/datastreams.jl index ee48140..45e87ea 100644 --- a/src/datastreams.jl +++ b/src/datastreams.jl @@ -3,16 +3,16 @@ module DataStreams export Dataset1CDT, DatasetUG_2C_5D - tmp_path = tempdir() - directory_name = "EasyDataStream" - local_path = tmp_path * '/' * directory_name - - mk_tmp_dir() = mkdir(directory_name) + const TMP_DIR = tempdir() + const DIR_NAME = "EasyDataStream" + const DATASTREANS_DIR = joinpath(TMP_DIR, DIR_NAME) + + mk_tmp_dir() = mkdir(DIR_NAME) function check(name) - tmp_directories = readdir(tmp_path) - if directory_name in tmp_directories - downloaded_datastreams = readdir(local_path) + tmp_directories = readdir(TMP_DIR) + if DIR_NAME in tmp_directories + downloaded_datastreams = readdir(DATASTREANS_DIR) if(name in downloaded_datastreams) return 1 @@ -20,7 +20,7 @@ module DataStreams return 0 end else - cd(mk_tmp_dir, tmp_path) + cd(mk_tmp_dir, TMP_DIR) return 0 end end @@ -39,15 +39,15 @@ module DataStreams return 1 end - download(url, name) = download(url, name, local_path) + download(url, name) = download(url, name, DATASTREANS_DIR) #TODO: O retorno dessa função não pode ser o dataStream já na memoria, pois haverá buffers que irão querer somente o path function get_datastream(url, name) if check(name) == 1 - return CSV.read(local_path * '/' * name; header = false) + return CSV.read(DATASTREANS_DIR * '/' * name; header = false) else download(url, name) - return CSV.read(local_path * '/' * name; header = false) + return CSV.read(DATASTREANS_DIR * '/' * name; header = false) end end From 7d398c0f59d2a3d3da582cf33fee148a3e7ef018 Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Tue, 30 Jun 2020 18:43:11 -0300 Subject: [PATCH 20/31] =?UTF-8?q?Adicionando=20index=20para=20as=20inst?= =?UTF-8?q?=C3=A2ncias?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/datasets.jl | 2 +- src/stream.jl | 20 +++++++++++++++++++- test/runtests.jl | 24 ++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/datasets.jl b/src/datasets.jl index b847b81..9669b12 100644 --- a/src/datasets.jl +++ b/src/datasets.jl @@ -3,6 +3,6 @@ using CSV Dataset1CDT() = Dataset1CDT(150, 1) function Dataset1CDT(initial_size::Int, flux_size::Int) - data = CSV.read("datasets/1CDT.csv"; header = false) + data = CSV.read("../datasets/1CDT.csv"; header = false) return MemoryBuffer(data, initial_size, flux_size) end diff --git a/src/stream.jl b/src/stream.jl index 9fa3556..0f76dfb 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -11,7 +11,25 @@ function Stream(buffer::Buffer) end #TODO: Casos limites? -Base.getindex(stream::Stream, sample::Int64) = stream.data[length(stream.data)][sample, :] +function Base.getindex(stream::Stream, instance::Int64) + if instance > 0 && instance <= size(stream.data[length(stream.data)], 1) + return stream.data[length(stream.data)][instance, :] + else + throw(BoundsError(stream.data, instance)) + end +end + +function Base.getindex(stream::Stream, c::Colon) + return stream.data[length(stream.data)][:, :] +end + +function Base.getindex(stream::Stream, index::UnitRange{Int}) + sample = DataFrame() + for i=1:length(index) + push!(sample, stream[index[i]]) + end + return sample +end function next!(stream::Stream) push!(stream.data, next!(stream.buffer)) diff --git a/test/runtests.jl b/test/runtests.jl index d5f4f2b..87361c2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -29,3 +29,27 @@ using Test @test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) @test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) end + +@testset "Stream Indexing" begin + @testset "First Dimension" begin + buffer = EasyStream.Dataset1CDT() + stream = EasyStream.Dataset1CDT() + stream = EasyStream.Stream(stream) + + test_data = stream.data[1] + data_size = size(test_data, 1) + for i=1:data_size + @test stream[i] == test_data[i,:] + end + + @test_throws BoundsError stream[-1] + + @test_throws BoundsError stream[data_size + 1] + + @test stream[:] == test_data[:, :] + + for i=1:data_size + @test stream[1:i] == test_data[1:i,:] + end + end +end \ No newline at end of file From 920796de0cfcf24429b623c21a7e839a44e5dd39 Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Wed, 1 Jul 2020 05:02:34 -0300 Subject: [PATCH 21/31] =?UTF-8?q?Adicionando=20acesso=20atrav=C3=A9s=20de?= =?UTF-8?q?=20dois=20index?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/stream.jl | 60 ++++++++++++++++++++++++++++++++++++++++++++++-- test/runtests.jl | 34 ++++++++++++++++++++++++++- 2 files changed, 91 insertions(+), 3 deletions(-) diff --git a/src/stream.jl b/src/stream.jl index 0f76dfb..bac5fb1 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -6,11 +6,10 @@ end function Stream(buffer::Buffer) data = Vector{DataFrame}() push!(data, next!(buffer)) - return Stream(buffer, data) end -#TODO: Casos limites? +#Using one index to explore the data function Base.getindex(stream::Stream, instance::Int64) if instance > 0 && instance <= size(stream.data[length(stream.data)], 1) return stream.data[length(stream.data)][instance, :] @@ -31,6 +30,63 @@ function Base.getindex(stream::Stream, index::UnitRange{Int}) return sample end +#Using two index to explore the data +function Base.getindex(stream::Stream, instance::Int64, feature::Int64) + if feature > 0 && feature <= size(stream[instance], 1) + return stream[instance][feature] + else + throw(BoundsError(stream.data, feature)) + end +end + +function Base.getindex(stream::Stream, c::Colon, feature::Int64) + if feature > 0 && feature <= size(stream[:], 2) + return stream[:][:, feature] + else + throw(BoundsError(stream.data, feature)) + end +end + +function Base.getindex(stream::Stream, index::UnitRange{Int}, feature::Int64) + if feature > 0 && feature <= size(stream[:], 2) + return stream[index][:, feature] + else + throw(BoundsError(stream.data, feature)) + end +end + +function Base.getindex(stream::Stream, index::UnitRange{Int}, c::Colon) + return stream[index] +end + +function Base.getindex(stream::Stream, instance::Int64, c::Colon) + return stream[instance] +end + +function Base.getindex(stream::Stream, c::Colon, c2::Colon) + return stream[:] +end + +function Base.getindex(stream::Stream, instance::Int64, index::UnitRange{Int}) + return [stream[instance, index[i]] for i = 1:length(index)] +end + +function Base.getindex(stream::Stream, c::Colon, index::UnitRange{Int}) + sample = DataFrame() + for i = 1:size(stream[:], 1) + append!(sample, DataFrame(permutedims(stream[i, index]))) + end + return sample +end + +function Base.getindex(stream::Stream, indexI::UnitRange{Int}, indexF::UnitRange{Int}) + sample = DataFrame() + for i = 1:length(indexI) + append!(sample, DataFrame(permutedims(stream[indexI[i], indexF]))) + end + return sample +end + function next!(stream::Stream) push!(stream.data, next!(stream.buffer)) end diff --git a/test/runtests.jl b/test/runtests.jl index 87361c2..9a92938 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -31,7 +31,7 @@ using Test end @testset "Stream Indexing" begin - @testset "First Dimension" begin + @testset "Test using one index" begin buffer = EasyStream.Dataset1CDT() stream = EasyStream.Dataset1CDT() stream = EasyStream.Stream(stream) @@ -52,4 +52,36 @@ end @test stream[1:i] == test_data[1:i,:] end end + + @testset "Test using two index " begin + buffer = EasyStream.Dataset1CDT() + stream = EasyStream.Dataset1CDT() + stream = EasyStream.Stream(stream) + + + @test stream[1, :]== stream[1] + + for i=1:length(stream[1]) + @test stream[1, i] == stream[1][i] + end + + for i=1:length(stream[1]) + @test stream[:, i] == stream[:][:, i] + end + + @test stream[:] == stream[:, :] + + #TODO Criação de testes unitários para o acesso ao stream usando range + + N_INSTANCES = size(stream[:], 1) + N_FEATURES = length(stream[1]) + + @test_throws BoundsError stream[1, N_FEATURES + 1] + @test_throws BoundsError stream[:, N_FEATURES + 1] + @test_throws BoundsError stream[N_INSTANCES + 1, :] + + @test_throws BoundsError stream[1, -1] + @test_throws BoundsError stream[:, -1] + @test_throws BoundsError stream[-1, :] + end end \ No newline at end of file From 552f3ce3f650d562f53e8ed5ad9c76f4d2dd758c Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Mon, 22 Jun 2020 15:07:00 -0300 Subject: [PATCH 22/31] Revert "Adicionando throw para o getindex do stream" This reverts commit e42c4518c4790e964e93e3e98a9b5f11712a944f. --- src/stream.jl | 12 +----------- test/runtests.jl | 2 +- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/src/stream.jl b/src/stream.jl index 32ca32e..9fa3556 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -11,17 +11,7 @@ function Stream(buffer::Buffer) end #TODO: Casos limites? -function Base.getindex(stream::Stream, instance::Int64) - if instance <= size(stream.data[length(stream.data)], 1) - return stream.data[length(stream.data)][instance, :] - else - throw("attempt to access a sample with $(size(stream.data[length(stream.data)], 1)) elements at index [$instance]") - end -end - -function Base.getindex(stream::Stream, c::Colon) - return stream.data[length(stream.data)][:, :] -end +Base.getindex(stream::Stream, sample::Int64) = stream.data[length(stream.data)][sample, :] function next!(stream::Stream) push!(stream.data, next!(stream.buffer)) diff --git a/test/runtests.jl b/test/runtests.jl index effcecc..835f4c6 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -7,7 +7,7 @@ using EasyStream @test EasyStream.DataStreams.check(test_file) == 0 @test EasyStream.DataStreams.download("https://github.com/Conradox/datastreams/blob/master/sinthetic/moduletest.csv", "moduletest.csv") == 1 @test EasyStream.DataStreams.check(test_file) == 1 - rm(EasyStream.DataStreams.local_path * '/' * test_file) + rm(EasyStream.DataStreams.DataStreams.local_path * '/' * test_file) end From f71abda65f729584945653b7385e391eebc91dda Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Thu, 2 Jul 2020 15:37:53 -0300 Subject: [PATCH 23/31] refatorando o codigo --- src/EasyStream.jl | 6 +-- src/datasets.jl | 47 +++++++++++++++++++++++ src/{stream.jl => datastream.jl} | 38 +++++++++---------- src/datastreams.jl | 65 -------------------------------- test/runtests.jl | 39 ++++++++++--------- test/test.jl | 2 + 6 files changed, 92 insertions(+), 105 deletions(-) create mode 100644 src/datasets.jl rename src/{stream.jl => datastream.jl} (60%) delete mode 100644 src/datastreams.jl create mode 100644 test/test.jl diff --git a/src/EasyStream.jl b/src/EasyStream.jl index e5a3bee..3db7602 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -2,8 +2,8 @@ module EasyStream using DataFrames include("buffer.jl") - include("stream.jl") - include("datastreams.jl") + include("datasets.jl") + include("datastream.jl") - using .DataStreams + using .DatasetsStreams end # module diff --git a/src/datasets.jl b/src/datasets.jl new file mode 100644 index 0000000..c0ab559 --- /dev/null +++ b/src/datasets.jl @@ -0,0 +1,47 @@ +module DatasetsStreams + using CSV, EasyStream + + export Dataset1CDT, DatasetUG_2C_5D + + const defdir = joinpath(dirname(@__FILE__), "..", "datasets") + + function get1cdtdata(dir) + mkpath(joinpath(defdir, "synthetic")) + path = download("https://raw.githubusercontent.com/Conradox/datastreams/master/sinthetic/1CDT.csv") + mv(path, joinpath(defdir, "synthetic/1CDT.csv")) + end + + function getug2c5ddata(dir) + mkpath(joinpath(defdir, "synthetic")) + path = download("https://raw.githubusercontent.com/Conradox/datastreams/master/sinthetic/UG_2C_5D.csv") + mv(path, joinpath(defdir, "synthetic/UG_2C_5D.csv")) + end + + function Dataset1CDT(initial_size::Int, flux_size::Int) + filename = "$(defdir)/synthetic/1CDT.csv" + + isfile(filename) || get1cdtdata(defdir) + + data = CSV.read(filename; header = false) + + buffer = EasyStream.MemoryBuffer(data, initial_size, flux_size) + + return EasyStream.DataStream(buffer) + end + + Dataset1CDT() = Dataset1CDT(150, 1) + + function DatasetUG_2C_5D(initial_size::Int, flux_size::Int) + filename = "$(defdir)/synthetic/UG_2C_5D.csv" + + isfile(filename) || getug2c5ddata(defdir) + + data = CSV.read(filename; header = false) + + buffer = EasyStream.MemoryBuffer(data, initial_size, flux_size) + + return EasyStream.DataStream(buffer) + end + + DatasetUG_2C_5D() = DatasetUG_2C_5D(150, 1) +end diff --git a/src/stream.jl b/src/datastream.jl similarity index 60% rename from src/stream.jl rename to src/datastream.jl index bac5fb1..f60c02d 100644 --- a/src/stream.jl +++ b/src/datastream.jl @@ -1,16 +1,16 @@ -struct Stream +struct DataStream buffer::Buffer data::Vector{DataFrame} end -function Stream(buffer::Buffer) +function DataStream(buffer::Buffer) data = Vector{DataFrame}() push!(data, next!(buffer)) - return Stream(buffer, data) + return DataStream(buffer, data) end #Using one index to explore the data -function Base.getindex(stream::Stream, instance::Int64) +function Base.getindex(stream::DataStream, instance::Int64) if instance > 0 && instance <= size(stream.data[length(stream.data)], 1) return stream.data[length(stream.data)][instance, :] else @@ -18,11 +18,11 @@ function Base.getindex(stream::Stream, instance::Int64) end end -function Base.getindex(stream::Stream, c::Colon) +function Base.getindex(stream::DataStream, c::Colon) return stream.data[length(stream.data)][:, :] end -function Base.getindex(stream::Stream, index::UnitRange{Int}) +function Base.getindex(stream::DataStream, index::UnitRange{Int}) sample = DataFrame() for i=1:length(index) push!(sample, stream[index[i]]) @@ -31,47 +31,47 @@ function Base.getindex(stream::Stream, index::UnitRange{Int}) end #Using two index to explore the data -function Base.getindex(stream::Stream, instance::Int64, feature::Int64) +function Base.getindex(stream::DataStream, instance::Int64, feature::Int64) if feature > 0 && feature <= size(stream[instance], 1) return stream[instance][feature] - else + else throw(BoundsError(stream.data, feature)) end end -function Base.getindex(stream::Stream, c::Colon, feature::Int64) +function Base.getindex(stream::DataStream, c::Colon, feature::Int64) if feature > 0 && feature <= size(stream[:], 2) return stream[:][:, feature] - else + else throw(BoundsError(stream.data, feature)) end end -function Base.getindex(stream::Stream, index::UnitRange{Int}, feature::Int64) +function Base.getindex(stream::DataStream, index::UnitRange{Int}, feature::Int64) if feature > 0 && feature <= size(stream[:], 2) return stream[index][:, feature] - else + else throw(BoundsError(stream.data, feature)) end end -function Base.getindex(stream::Stream, index::UnitRange{Int}, c::Colon) +function Base.getindex(stream::DataStream, index::UnitRange{Int}, c::Colon) return stream[index] end -function Base.getindex(stream::Stream, instance::Int64, c::Colon) +function Base.getindex(stream::DataStream, instance::Int64, c::Colon) return stream[instance] end -function Base.getindex(stream::Stream, c::Colon, c2::Colon) +function Base.getindex(stream::DataStream, c::Colon, c2::Colon) return stream[:] end -function Base.getindex(stream::Stream, instance::Int64, index::UnitRange{Int}) +function Base.getindex(stream::DataStream, instance::Int64, index::UnitRange{Int}) return [stream[instance, index[i]] for i = 1:length(index)] end -function Base.getindex(stream::Stream, c::Colon, index::UnitRange{Int}) +function Base.getindex(stream::DataStream, c::Colon, index::UnitRange{Int}) sample = DataFrame() for i = 1:size(stream[:], 1) append!(sample, DataFrame(permutedims(stream[i, index]))) @@ -79,7 +79,7 @@ function Base.getindex(stream::Stream, c::Colon, index::UnitRange{Int}) return sample end -function Base.getindex(stream::Stream, indexI::UnitRange{Int}, indexF::UnitRange{Int}) +function Base.getindex(stream::DataStream, indexI::UnitRange{Int}, indexF::UnitRange{Int}) sample = DataFrame() for i = 1:length(indexI) append!(sample, DataFrame(permutedims(stream[indexI[i], indexF]))) @@ -87,6 +87,6 @@ function Base.getindex(stream::Stream, indexI::UnitRange{Int}, indexF::UnitRange return sample end -function next!(stream::Stream) +function next!(stream::DataStream) push!(stream.data, next!(stream.buffer)) end diff --git a/src/datastreams.jl b/src/datastreams.jl deleted file mode 100644 index 45e87ea..0000000 --- a/src/datastreams.jl +++ /dev/null @@ -1,65 +0,0 @@ -module DataStreams - using CSV, EasyStream - - export Dataset1CDT, DatasetUG_2C_5D - - const TMP_DIR = tempdir() - const DIR_NAME = "EasyDataStream" - const DATASTREANS_DIR = joinpath(TMP_DIR, DIR_NAME) - - mk_tmp_dir() = mkdir(DIR_NAME) - - function check(name) - tmp_directories = readdir(TMP_DIR) - if DIR_NAME in tmp_directories - downloaded_datastreams = readdir(DATASTREANS_DIR) - - if(name in downloaded_datastreams) - return 1 - else - return 0 - end - else - cd(mk_tmp_dir, TMP_DIR) - return 0 - end - end - - function download(url, name, path) - current_path = pwd() - cd(path) - try - Base.download(url, name) - catch - error("Erro durante o download do DataStream") - cd(current_path) - return 0 - end - cd(current_path) - return 1 - end - - download(url, name) = download(url, name, DATASTREANS_DIR) - - #TODO: O retorno dessa função não pode ser o dataStream já na memoria, pois haverá buffers que irão querer somente o path - function get_datastream(url, name) - if check(name) == 1 - return CSV.read(DATASTREANS_DIR * '/' * name; header = false) - else - download(url, name) - return CSV.read(DATASTREANS_DIR * '/' * name; header = false) - end - end - - Dataset1CDT() = Dataset1CDT(150, 1) - function Dataset1CDT(initial_size::Int, flux_size::Int) - data = get_datastream("https://raw.githubusercontent.com/Conradox/datastreams/master/sinthetic/1CDT.csv", "1CDT.csv") - return EasyStream.MemoryBuffer(data, initial_size, flux_size) - end - - function DatasetUG_2C_5D(initial_size::Int, flux_size::Int) - data = get_datastream("https://raw.githubusercontent.com/Conradox/datastreams/master/sinthetic/UG_2C_5D.csv", "UG_2C_5D.csv") - return EasyStream.MemoryBuffer(data, initial_size, flux_size) - end - DatasetUG_2C_5D() = DatasetUG_2C_5D(150, 1) -end \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 6eba72c..b7a83bd 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,19 +1,27 @@ using Test using EasyStream +@testset "Dataset Test" begin + stream = EasyStream.Dataset1CDT() + @test size(stream[:], 1) == 150 -@testset "Module DataStreams" begin - test_file = "moduletest.csv" - @test EasyStream.DataStreams.check(test_file) == 0 - @test EasyStream.DataStreams.download("https://github.com/Conradox/datastreams/blob/master/sinthetic/moduletest.csv", "moduletest.csv") == 1 - @test EasyStream.DataStreams.check(test_file) == 1 - rm(EasyStream.DataStreams.DataStreams.local_path * '/' * test_file) -end + initial_size = 200 + flux_size = 5 + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(stream[:], 1) == 200 + + stream = EasyStream.DatasetUG_2C_5D() + @test size(stream[:], 1) == 150 + initial_size = 200 + flux_size = 5 + stream = EasyStream.DatasetUG_2C_5D(initial_size, flux_size) + @test size(stream[:], 1) == 200 +end @testset "Stream Test" begin stream = EasyStream.Dataset1CDT() - @test size(EasyStream.next!(stream), 1) == 150 + @test size(EasyStream.next!stream), 1) == 150 @test size(EasyStream.next!(stream), 1) == 1 @test size(EasyStream.next!(stream), 1) == 1 @@ -42,9 +50,7 @@ end @testset "Stream Indexing" begin @testset "Test using one index" begin - buffer = EasyStream.Dataset1CDT() stream = EasyStream.Dataset1CDT() - stream = EasyStream.Stream(stream) test_data = stream.data[1] data_size = size(test_data, 1) @@ -57,20 +63,17 @@ end @test_throws BoundsError stream[data_size + 1] @test stream[:] == test_data[:, :] - + for i=1:data_size @test stream[1:i] == test_data[1:i,:] end end @testset "Test using two index " begin - buffer = EasyStream.Dataset1CDT() stream = EasyStream.Dataset1CDT() - stream = EasyStream.Stream(stream) - @test stream[1, :]== stream[1] - + for i=1:length(stream[1]) @test stream[1, i] == stream[1][i] end @@ -82,16 +85,16 @@ end @test stream[:] == stream[:, :] #TODO Criação de testes unitários para o acesso ao stream usando range - + N_INSTANCES = size(stream[:], 1) N_FEATURES = length(stream[1]) @test_throws BoundsError stream[1, N_FEATURES + 1] @test_throws BoundsError stream[:, N_FEATURES + 1] @test_throws BoundsError stream[N_INSTANCES + 1, :] - + @test_throws BoundsError stream[1, -1] @test_throws BoundsError stream[:, -1] @test_throws BoundsError stream[-1, :] end -end \ No newline at end of file +end diff --git a/test/test.jl b/test/test.jl new file mode 100644 index 0000000..579d48e --- /dev/null +++ b/test/test.jl @@ -0,0 +1,2 @@ +using Pkg +Pkg.activate(".") From 915c1aa248f7b191f438366cafd5f074fd7929a1 Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Thu, 2 Jul 2020 17:24:02 -0300 Subject: [PATCH 24/31] reorganizando para os novos nomes pool e stream --- src/EasyStream.jl | 5 +-- src/buffer.jl | 43 ---------------------- src/datasets.jl | 12 +++---- src/datastream.jl | 92 ----------------------------------------------- src/pool.jl | 92 +++++++++++++++++++++++++++++++++++++++++++++++ src/stream.jl | 43 ++++++++++++++++++++++ test/runtests.jl | 90 +++++++++++++++++++++++----------------------- test/test.jl | 13 ++++++- 8 files changed, 201 insertions(+), 189 deletions(-) delete mode 100644 src/buffer.jl delete mode 100644 src/datastream.jl create mode 100644 src/pool.jl create mode 100644 src/stream.jl diff --git a/src/EasyStream.jl b/src/EasyStream.jl index 3db7602..c07e6b9 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -1,9 +1,10 @@ module EasyStream using DataFrames - include("buffer.jl") + include("stream.jl") + include("pool.jl") include("datasets.jl") - include("datastream.jl") + using .DatasetsStreams end # module diff --git a/src/buffer.jl b/src/buffer.jl deleted file mode 100644 index fac2aa9..0000000 --- a/src/buffer.jl +++ /dev/null @@ -1,43 +0,0 @@ -abstract type Buffer end - -mutable struct MemoryBuffer <: Buffer - data::DataFrame - position::Int - initial_size::Int - flux_size::Int -end - -function MemoryBuffer(data::DataFrame, initial_size::Int, flux_size::Int) - if initial_size > size(data, 1) - initial_size = size(data, 1) - @warn "initial size é maior que o arquivo e será definido para o tamanho do arquivo" - end - - if initial_size == 0 - @warn "initial size é zero" - end - - if flux_size == 0 - @warn "flux size é zero" - end - - return MemoryBuffer(data, 0, initial_size, flux_size) -end - -next!(buffer::Buffer) = nothing - -function next!(buffer::MemoryBuffer) - if buffer.position >= size(buffer.data, 1) - return nothing - end - - if buffer.position < buffer.initial_size - buffer.position = buffer.initial_size - index = 1:buffer.initial_size - else - index = (buffer.position + 1):(buffer.position + buffer.flux_size) - buffer.position = buffer.position + buffer.flux_size - end - - return buffer.data[index, :] -end diff --git a/src/datasets.jl b/src/datasets.jl index c0ab559..4eb2896 100644 --- a/src/datasets.jl +++ b/src/datasets.jl @@ -17,30 +17,30 @@ module DatasetsStreams mv(path, joinpath(defdir, "synthetic/UG_2C_5D.csv")) end - function Dataset1CDT(initial_size::Int, flux_size::Int) + function Dataset1CDT(initial_size::Int, flux_size::Int)::EasyStream.Pool filename = "$(defdir)/synthetic/1CDT.csv" isfile(filename) || get1cdtdata(defdir) data = CSV.read(filename; header = false) - buffer = EasyStream.MemoryBuffer(data, initial_size, flux_size) + stream = EasyStream.MemoryStream(data, initial_size, flux_size) - return EasyStream.DataStream(buffer) + return EasyStream.Pool(stream) end Dataset1CDT() = Dataset1CDT(150, 1) - function DatasetUG_2C_5D(initial_size::Int, flux_size::Int) + function DatasetUG_2C_5D(initial_size::Int, flux_size::Int)::EasyStream.Pool filename = "$(defdir)/synthetic/UG_2C_5D.csv" isfile(filename) || getug2c5ddata(defdir) data = CSV.read(filename; header = false) - buffer = EasyStream.MemoryBuffer(data, initial_size, flux_size) + stream = EasyStream.MemoryStream(data, initial_size, flux_size) - return EasyStream.DataStream(buffer) + return EasyStream.Pool(stream) end DatasetUG_2C_5D() = DatasetUG_2C_5D(150, 1) diff --git a/src/datastream.jl b/src/datastream.jl deleted file mode 100644 index f60c02d..0000000 --- a/src/datastream.jl +++ /dev/null @@ -1,92 +0,0 @@ -struct DataStream - buffer::Buffer - data::Vector{DataFrame} -end - -function DataStream(buffer::Buffer) - data = Vector{DataFrame}() - push!(data, next!(buffer)) - return DataStream(buffer, data) -end - -#Using one index to explore the data -function Base.getindex(stream::DataStream, instance::Int64) - if instance > 0 && instance <= size(stream.data[length(stream.data)], 1) - return stream.data[length(stream.data)][instance, :] - else - throw(BoundsError(stream.data, instance)) - end -end - -function Base.getindex(stream::DataStream, c::Colon) - return stream.data[length(stream.data)][:, :] -end - -function Base.getindex(stream::DataStream, index::UnitRange{Int}) - sample = DataFrame() - for i=1:length(index) - push!(sample, stream[index[i]]) - end - return sample -end - -#Using two index to explore the data -function Base.getindex(stream::DataStream, instance::Int64, feature::Int64) - if feature > 0 && feature <= size(stream[instance], 1) - return stream[instance][feature] - else - throw(BoundsError(stream.data, feature)) - end -end - -function Base.getindex(stream::DataStream, c::Colon, feature::Int64) - if feature > 0 && feature <= size(stream[:], 2) - return stream[:][:, feature] - else - throw(BoundsError(stream.data, feature)) - end -end - -function Base.getindex(stream::DataStream, index::UnitRange{Int}, feature::Int64) - if feature > 0 && feature <= size(stream[:], 2) - return stream[index][:, feature] - else - throw(BoundsError(stream.data, feature)) - end -end - -function Base.getindex(stream::DataStream, index::UnitRange{Int}, c::Colon) - return stream[index] -end - -function Base.getindex(stream::DataStream, instance::Int64, c::Colon) - return stream[instance] -end - -function Base.getindex(stream::DataStream, c::Colon, c2::Colon) - return stream[:] -end - -function Base.getindex(stream::DataStream, instance::Int64, index::UnitRange{Int}) - return [stream[instance, index[i]] for i = 1:length(index)] -end - -function Base.getindex(stream::DataStream, c::Colon, index::UnitRange{Int}) - sample = DataFrame() - for i = 1:size(stream[:], 1) - append!(sample, DataFrame(permutedims(stream[i, index]))) - end - return sample -end - -function Base.getindex(stream::DataStream, indexI::UnitRange{Int}, indexF::UnitRange{Int}) - sample = DataFrame() - for i = 1:length(indexI) - append!(sample, DataFrame(permutedims(stream[indexI[i], indexF]))) - end - return sample -end - -function next!(stream::DataStream) - push!(stream.data, next!(stream.buffer)) -end diff --git a/src/pool.jl b/src/pool.jl new file mode 100644 index 0000000..3391bca --- /dev/null +++ b/src/pool.jl @@ -0,0 +1,92 @@ +struct Pool{T <: Stream} + stream::T + data::Vector{DataFrame} +end + +function Pool(stream::Stream) + data = Vector{DataFrame}() + push!(data, next!(stream)) + return Pool(stream, data) +end + +function next!(pool::Pool) + push!(pool.data, next!(pool.buffer)) +end + +#Using one index to explore the data +function Base.getindex(pool::Pool, instance::Int64) + if instance > 0 && instance <= size(pool.data[length(pool.data)], 1) + return pool.data[length(pool.data)][instance, :] + else + throw(BoundsError(pool.data, instance)) + end +end + +function Base.getindex(pool::Pool, c::Colon) + return pool.data[length(pool.data)][:, :] +end + +function Base.getindex(pool::Pool, index::UnitRange{Int}) + sample = DataFrame() + for i=1:length(index) + push!(sample, pool[index[i]]) + end + return sample +end + +#Using two index to explore the data +function Base.getindex(pool::Pool, instance::Int64, feature::Int64) + if feature > 0 && feature <= size(pool[instance], 1) + return pool[instance][feature] + else + throw(BoundsError(pool.data, feature)) + end +end + +function Base.getindex(pool::Pool, c::Colon, feature::Int64) + if feature > 0 && feature <= size(pool[:], 2) + return pool[:][:, feature] + else + throw(BoundsError(pool.data, feature)) + end +end + +function Base.getindex(pool::Pool, index::UnitRange{Int}, feature::Int64) + if feature > 0 && feature <= size(pool[:], 2) + return pool[index][:, feature] + else + throw(BoundsError(pool.data, feature)) + end +end + +function Base.getindex(pool::Pool, index::UnitRange{Int}, c::Colon) + return pool[index] +end + +function Base.getindex(pool::Pool, instance::Int64, c::Colon) + return pool[instance] +end + +function Base.getindex(pool::Pool, c::Colon, c2::Colon) + return pool[:] +end + +function Base.getindex(pool::Pool, instance::Int64, index::UnitRange{Int}) + return [pool[instance, index[i]] for i = 1:length(index)] +end + +function Base.getindex(pool::Pool, c::Colon, index::UnitRange{Int}) + sample = DataFrame() + for i = 1:size(pool[:], 1) + append!(sample, DataFrame(permutedims(pool[i, index]))) + end + return sample +end + +function Base.getindex(pool::Pool, indexI::UnitRange{Int}, indexF::UnitRange{Int}) + sample = DataFrame() + for i = 1:length(indexI) + append!(sample, DataFrame(permutedims(pool[indexI[i], indexF]))) + end + return sample +end diff --git a/src/stream.jl b/src/stream.jl new file mode 100644 index 0000000..4c99c59 --- /dev/null +++ b/src/stream.jl @@ -0,0 +1,43 @@ +abstract type Stream end + +mutable struct MemoryStream{T} <: Stream{T} + data::T + position::Int + initial_size::Int + batch::Int +end + +function MemoryStream(data::T, initial_size::Int, batch::Int) + if initial_size > size(data, 1) + initial_size = size(data, 1) + @warn "initial size é maior que o arquivo e será definido para o tamanho do arquivo" + end + + if initial_size == 0 + @warn "initial size é zero" + end + + if batch == 0 + @warn "flux size é zero" + end + + return MemoryStream(data, 0, initial_size, batch) +end + +next!(buffer::Stream) = nothing + +function next!(stream::MemoryStream) + if stream.position >= size(stream.data, 1) + return nothing + end + + if stream.position < stream.initial_size + stream.position = stream.initial_size + index = 1:stream.initial_size + else + index = (stream.position + 1):(stream.position + stream.batch) + stream.position = stream.position + stream.batch + end + + return stream.data[index, :] +end diff --git a/test/runtests.jl b/test/runtests.jl index b7a83bd..23d3036 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,47 +2,47 @@ using Test using EasyStream @testset "Dataset Test" begin - stream = EasyStream.Dataset1CDT() - @test size(stream[:], 1) == 150 + pool = EasyStream.Dataset1CDT() + @test size(pool[:], 1) == 150 initial_size = 200 flux_size = 5 - stream = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(stream[:], 1) == 200 + pool = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(pool[:], 1) == 200 - stream = EasyStream.DatasetUG_2C_5D() - @test size(stream[:], 1) == 150 + pool = EasyStream.DatasetUG_2C_5D() + @test size(pool[:], 1) == 150 initial_size = 200 flux_size = 5 - stream = EasyStream.DatasetUG_2C_5D(initial_size, flux_size) - @test size(stream[:], 1) == 200 + pool = EasyStream.DatasetUG_2C_5D(initial_size, flux_size) + @test size(pool[:], 1) == 200 end @testset "Stream Test" begin - stream = EasyStream.Dataset1CDT() - @test size(EasyStream.next!stream), 1) == 150 - @test size(EasyStream.next!(stream), 1) == 1 - @test size(EasyStream.next!(stream), 1) == 1 + pool = EasyStream.Dataset1CDT() + @test size(EasyStream.next!pool), 1) == 150 + @test size(EasyStream.next!(pool), 1) == 1 + @test size(EasyStream.next!(pool), 1) == 1 initial_size = 200 flux_size = 5 - stream = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(EasyStream.next!(stream), 1) == initial_size - @test size(EasyStream.next!(stream), 1) == flux_size - @test size(EasyStream.next!(stream), 1) == flux_size + pool = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(pool), 1) == initial_size + @test size(EasyStream.next!(pool), 1) == flux_size + @test size(EasyStream.next!(pool), 1) == flux_size initial_size = 16000 flux_size = 1 - stream = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(EasyStream.next!(stream), 1) == initial_size - @test EasyStream.next!(stream) == nothing + pool = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(pool), 1) == initial_size + @test EasyStream.next!(pool) == nothing initial_size = 16001 flux_size = 1 - stream = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(EasyStream.next!(stream), 1) == initial_size - 1 - @test EasyStream.next!(stream) == nothing + pool = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(pool), 1) == initial_size - 1 + @test EasyStream.next!(pool) == nothing @test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) @test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) @@ -50,51 +50,51 @@ end @testset "Stream Indexing" begin @testset "Test using one index" begin - stream = EasyStream.Dataset1CDT() + pool = EasyStream.Dataset1CDT() - test_data = stream.data[1] + test_data = pool.data[1] data_size = size(test_data, 1) for i=1:data_size - @test stream[i] == test_data[i,:] + @test pool[i] == test_data[i,:] end - @test_throws BoundsError stream[-1] + @test_throws BoundsError pool[-1] - @test_throws BoundsError stream[data_size + 1] + @test_throws BoundsError pool[data_size + 1] - @test stream[:] == test_data[:, :] + @test pool[:] == test_data[:, :] for i=1:data_size - @test stream[1:i] == test_data[1:i,:] + @test pool[1:i] == test_data[1:i,:] end end @testset "Test using two index " begin - stream = EasyStream.Dataset1CDT() + pool = EasyStream.Dataset1CDT() - @test stream[1, :]== stream[1] + @test pool[1, :]== pool[1] - for i=1:length(stream[1]) - @test stream[1, i] == stream[1][i] + for i=1:length(pool[1]) + @test pool[1, i] == pool[1][i] end - for i=1:length(stream[1]) - @test stream[:, i] == stream[:][:, i] + for i=1:length(pool[1]) + @test pool[:, i] == pool[:][:, i] end - @test stream[:] == stream[:, :] + @test pool[:] == pool[:, :] - #TODO Criação de testes unitários para o acesso ao stream usando range + #TODO Criação de testes unitários para o acesso ao pool usando range - N_INSTANCES = size(stream[:], 1) - N_FEATURES = length(stream[1]) + N_INSTANCES = size(pool[:], 1) + N_FEATURES = length(pool[1]) - @test_throws BoundsError stream[1, N_FEATURES + 1] - @test_throws BoundsError stream[:, N_FEATURES + 1] - @test_throws BoundsError stream[N_INSTANCES + 1, :] + @test_throws BoundsError pool[1, N_FEATURES + 1] + @test_throws BoundsError pool[:, N_FEATURES + 1] + @test_throws BoundsError pool[N_INSTANCES + 1, :] - @test_throws BoundsError stream[1, -1] - @test_throws BoundsError stream[:, -1] - @test_throws BoundsError stream[-1, :] + @test_throws BoundsError pool[1, -1] + @test_throws BoundsError pool[:, -1] + @test_throws BoundsError pool[-1, :] end end diff --git a/test/test.jl b/test/test.jl index 579d48e..da72374 100644 --- a/test/test.jl +++ b/test/test.jl @@ -1,2 +1,13 @@ -using Pkg +using Revise + Pkg.activate(".") + +using EasyStream + +pool = EasyStream.Dataset1CDT() + +pool[150,:] + +EasyStream.next!(stream) + +stream[1,:] From 014bfc1b9e87e26954506e14e94dbefc51ff9802 Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Thu, 2 Jul 2020 18:07:55 -0300 Subject: [PATCH 25/31] reorganizando o codigo --- src/EasyStream.jl | 1 - src/pool.jl | 87 ++++++----------------------------------------- src/stream.jl | 4 +-- 3 files changed, 13 insertions(+), 79 deletions(-) diff --git a/src/EasyStream.jl b/src/EasyStream.jl index c07e6b9..619d22f 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -5,6 +5,5 @@ module EasyStream include("pool.jl") include("datasets.jl") - using .DatasetsStreams end # module diff --git a/src/pool.jl b/src/pool.jl index 3391bca..d237c99 100644 --- a/src/pool.jl +++ b/src/pool.jl @@ -1,92 +1,27 @@ struct Pool{T <: Stream} stream::T data::Vector{DataFrame} + mapping::Vector{Vector{Bool}} end function Pool(stream::Stream) data = Vector{DataFrame}() push!(data, next!(stream)) - return Pool(stream, data) -end - -function next!(pool::Pool) - push!(pool.data, next!(pool.buffer)) -end - -#Using one index to explore the data -function Base.getindex(pool::Pool, instance::Int64) - if instance > 0 && instance <= size(pool.data[length(pool.data)], 1) - return pool.data[length(pool.data)][instance, :] - else - throw(BoundsError(pool.data, instance)) - end -end -function Base.getindex(pool::Pool, c::Colon) - return pool.data[length(pool.data)][:, :] + mapping = Vector{Vector{Bool}}() + push!(mapping, ones(Bool, size(data, 1))) + return Pool(stream, data, mapping) end -function Base.getindex(pool::Pool, index::UnitRange{Int}) - sample = DataFrame() - for i=1:length(index) - push!(sample, pool[index[i]]) - end - return sample -end - -#Using two index to explore the data -function Base.getindex(pool::Pool, instance::Int64, feature::Int64) - if feature > 0 && feature <= size(pool[instance], 1) - return pool[instance][feature] - else - throw(BoundsError(pool.data, feature)) - end -end - -function Base.getindex(pool::Pool, c::Colon, feature::Int64) - if feature > 0 && feature <= size(pool[:], 2) - return pool[:][:, feature] - else - throw(BoundsError(pool.data, feature)) - end -end - -function Base.getindex(pool::Pool, index::UnitRange{Int}, feature::Int64) - if feature > 0 && feature <= size(pool[:], 2) - return pool[index][:, feature] - else - throw(BoundsError(pool.data, feature)) - end -end - -function Base.getindex(pool::Pool, index::UnitRange{Int}, c::Colon) - return pool[index] -end - -function Base.getindex(pool::Pool, instance::Int64, c::Colon) - return pool[instance] -end - -function Base.getindex(pool::Pool, c::Colon, c2::Colon) - return pool[:] -end +function next!(pool::Pool) + streamdata = next!(pool.buffer) + push!(pool.data, streamdata) -function Base.getindex(pool::Pool, instance::Int64, index::UnitRange{Int}) - return [pool[instance, index[i]] for i = 1:length(index)] + push!(pool.mapping, rand(Bool, size(streamdata, 1))) end -function Base.getindex(pool::Pool, c::Colon, index::UnitRange{Int}) - sample = DataFrame() - for i = 1:size(pool[:], 1) - append!(sample, DataFrame(permutedims(pool[i, index]))) - end - return sample +function Base.getindex(pool::Pool, i::Int) + return pool.data[1][1, :] end -function Base.getindex(pool::Pool, indexI::UnitRange{Int}, indexF::UnitRange{Int}) - sample = DataFrame() - for i = 1:length(indexI) - append!(sample, DataFrame(permutedims(pool[indexI[i], indexF]))) - end - return sample -end +Base.getindex(pool::Pool, i::Int, j::Int) = pool[i][j] diff --git a/src/stream.jl b/src/stream.jl index 4c99c59..7b092f6 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -1,4 +1,4 @@ -abstract type Stream end +abstract type Stream{T} end mutable struct MemoryStream{T} <: Stream{T} data::T @@ -7,7 +7,7 @@ mutable struct MemoryStream{T} <: Stream{T} batch::Int end -function MemoryStream(data::T, initial_size::Int, batch::Int) +function MemoryStream(data::T, initial_size::Int, batch::Int) where T if initial_size > size(data, 1) initial_size = size(data, 1) @warn "initial size é maior que o arquivo e será definido para o tamanho do arquivo" From a5f7bdd4a57279eafee9d94e2cb77c760b9047ff Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Fri, 3 Jul 2020 04:09:00 -0300 Subject: [PATCH 26/31] =?UTF-8?q?Adicionando=20indexa=C3=A7=C3=A3o=20a=20p?= =?UTF-8?q?artir=20de=20um=20=C3=ADndice=20ao=20pool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pool.jl | 73 +++++++++++++++++++++++++++++++++++++++++------- test/runtests.jl | 45 +++++++++++++++++++---------- 2 files changed, 93 insertions(+), 25 deletions(-) diff --git a/src/pool.jl b/src/pool.jl index d237c99..aa6ba1d 100644 --- a/src/pool.jl +++ b/src/pool.jl @@ -1,27 +1,80 @@ -struct Pool{T <: Stream} +mutable struct Pool{T <: Stream} stream::T data::Vector{DataFrame} mapping::Vector{Vector{Bool}} + size::Int64 end function Pool(stream::Stream) data = Vector{DataFrame}() - push!(data, next!(stream)) - + streamdata = next!(stream) + push!(data, streamdata) mapping = Vector{Vector{Bool}}() - push!(mapping, ones(Bool, size(data, 1))) - return Pool(stream, data, mapping) + push!(mapping, ones(Bool, size(streamdata, 1))) + + return Pool(stream, data, mapping, size(streamdata, 1)) end function next!(pool::Pool) - streamdata = next!(pool.buffer) + streamdata = next!(pool.stream) + pool.size += size(streamdata, 1) push!(pool.data, streamdata) - push!(pool.mapping, rand(Bool, size(streamdata, 1))) + return streamdata +end + + +##Utils +function useble_length(pool) + count = 0 + for i=1:size(pool.data, 1) + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + count += 1 + end + end + end + return count +end + +##Indexing - Using one index, moving throght of the instances +function Base.getindex(pool::Pool, index::Int) + count = 1 + for i=1:size(pool.data, 1) + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + if count == index + return pool.data[i][j, :] + end + count += 1 + end + end + end +end + +function Base.getindex(pool::Pool, i::Colon) + data = DataFrame() + for i=1:useble_length(pool) + push!(data, pool[i]) + end + #= + data = DataFrame() + for i=1:size(pool.data, 1) + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + push!(data, pool.data[i][j, :]) + end + end + end + =# + return data end -function Base.getindex(pool::Pool, i::Int) - return pool.data[1][1, :] +function Base.getindex(pool::Pool, range::UnitRange{Int64}) + data = DataFrame() + for i in range + push!(data, pool[i]) + end + return data end -Base.getindex(pool::Pool, i::Int, j::Int) = pool[i][j] diff --git a/test/runtests.jl b/test/runtests.jl index 23d3036..45a5bad 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -21,52 +21,65 @@ end @testset "Stream Test" begin pool = EasyStream.Dataset1CDT() - @test size(EasyStream.next!pool), 1) == 150 + @test size(pool.data[1], 1) == 150 + x = EasyStream.next!(pool) + println(x) + println(size(x, 1)) @test size(EasyStream.next!(pool), 1) == 1 @test size(EasyStream.next!(pool), 1) == 1 initial_size = 200 flux_size = 5 pool = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(EasyStream.next!(pool), 1) == initial_size + @test size(pool.data[1], 1) == initial_size @test size(EasyStream.next!(pool), 1) == flux_size @test size(EasyStream.next!(pool), 1) == flux_size initial_size = 16000 flux_size = 1 pool = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(EasyStream.next!(pool), 1) == initial_size - @test EasyStream.next!(pool) == nothing + @test size(pool.data[1], 1) == initial_size + #@test EasyStream.next!(pool) == nothing - initial_size = 16001 - flux_size = 1 - pool = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(EasyStream.next!(pool), 1) == initial_size - 1 - @test EasyStream.next!(pool) == nothing + #initial_size = 16001 + #flux_size = 1 + #pool = EasyStream.Dataset1CDT(initial_size, flux_size) + #@test size(EasyStream.next!(pool), 1) == initial_size - 1 + #@test EasyStream.next!(pool) == nothing - @test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) - @test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) + #@test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) + #@test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) end @testset "Stream Indexing" begin @testset "Test using one index" begin pool = EasyStream.Dataset1CDT() - test_data = pool.data[1] + data_size = size(test_data, 1) for i=1:data_size @test pool[i] == test_data[i,:] end - @test_throws BoundsError pool[-1] + #@test_throws BoundsError pool[-1] - @test_throws BoundsError pool[data_size + 1] + #@test_throws BoundsError pool[data_size + 1] @test pool[:] == test_data[:, :] for i=1:data_size @test pool[1:i] == test_data[1:i,:] end + + ##Testing mapper + counter = pool.stream.initial_size + for i=1:20 + EasyStream.next!(pool) + if pool.mapping[i+1][1] + counter += 1 + end + @test size(pool[:], 1) == counter + end end @testset "Test using two index " begin @@ -85,7 +98,7 @@ end @test pool[:] == pool[:, :] #TODO Criação de testes unitários para o acesso ao pool usando range - +#= N_INSTANCES = size(pool[:], 1) N_FEATURES = length(pool[1]) @@ -96,5 +109,7 @@ end @test_throws BoundsError pool[1, -1] @test_throws BoundsError pool[:, -1] @test_throws BoundsError pool[-1, :] +=# end + end From 303fe3e4aeb2dea08fd76e7df733a44dfbc9ec7b Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Fri, 3 Jul 2020 04:22:35 -0300 Subject: [PATCH 27/31] =?UTF-8?q?Adicionando=20indexa=C3=A7=C3=A3o=20a=20p?= =?UTF-8?q?artir=20de=20dois=20=C3=ADndices=20ao=20pool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pool.jl | 25 ++++++++++++++++++++++++- test/runtests.jl | 15 +++++++++------ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/pool.jl b/src/pool.jl index aa6ba1d..186c731 100644 --- a/src/pool.jl +++ b/src/pool.jl @@ -37,7 +37,7 @@ function useble_length(pool) return count end -##Indexing - Using one index, moving throght of the instances +##Indexing - Using one index, moving through of the instances function Base.getindex(pool::Pool, index::Int) count = 1 for i=1:size(pool.data, 1) @@ -78,3 +78,26 @@ function Base.getindex(pool::Pool, range::UnitRange{Int64}) return data end +##Indexing - Using one index, moving through of the instances and features + +Base.getindex(pool::Pool, instance::Int, feature::Int) = pool[instance][feature] + +Base.getindex(pool::Pool, instance::Int, c::Colon) = pool[instance] + +Base.getindex(pool::Pool, instance::Int, range::UnitRange{Int64}) = pool[instance][range] + +Base.getindex(pool::Pool, c::Colon, feature::Int) = pool[:][:, feature] + +Base.getindex(pool::Pool, c::Colon, range::UnitRange{Int64}) = pool[:][:, range] + +Base.getindex(pool::Pool, c1::Colon, c2::Colon) = pool[:] + +Base.getindex(pool::Pool, range::UnitRange{Int64}, feature::Int) = pool[range][:, feature] + +Base.getindex(pool::Pool, range::UnitRange{Int64}, c::Colon) = pool[range] + +Base.getindex(pool::Pool, range::UnitRange{Int64}, range2::UnitRange{Int64}) = pool[range][:, range2] + +##Indexing - Using one index, moving through of the instances, features, samples + + diff --git a/test/runtests.jl b/test/runtests.jl index 45a5bad..bc799ab 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -61,9 +61,7 @@ end @test pool[i] == test_data[i,:] end - #@test_throws BoundsError pool[-1] - - #@test_throws BoundsError pool[data_size + 1] + @test pool[:] == test_data[:, :] @@ -71,7 +69,7 @@ end @test pool[1:i] == test_data[1:i,:] end - ##Testing mapper + ## Testing mapper counter = pool.stream.initial_size for i=1:20 EasyStream.next!(pool) @@ -80,6 +78,11 @@ end end @test size(pool[:], 1) == counter end + + + #@test_throws BoundsError pool[-1] + + #@test_throws BoundsError pool[data_size + 1] end @testset "Test using two index " begin @@ -98,7 +101,7 @@ end @test pool[:] == pool[:, :] #TODO Criação de testes unitários para o acesso ao pool usando range -#= + #= N_INSTANCES = size(pool[:], 1) N_FEATURES = length(pool[1]) @@ -109,7 +112,7 @@ end @test_throws BoundsError pool[1, -1] @test_throws BoundsError pool[:, -1] @test_throws BoundsError pool[-1, :] -=# + =# end end From d92a3cb6c9699885075fe410e6fb7a677918230f Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Fri, 3 Jul 2020 18:53:30 -0300 Subject: [PATCH 28/31] =?UTF-8?q?Modifica=C3=A7=C3=A3o=20para=20que=20os?= =?UTF-8?q?=20datasets=20sejam=20retornados=20como=20streams?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pool.jl | 9 +++---- test/runtests.jl | 61 +++++++++++++++++++++++------------------------- 2 files changed, 34 insertions(+), 36 deletions(-) diff --git a/src/pool.jl b/src/pool.jl index 186c731..91c0c26 100644 --- a/src/pool.jl +++ b/src/pool.jl @@ -18,8 +18,9 @@ end function next!(pool::Pool) streamdata = next!(pool.stream) pool.size += size(streamdata, 1) - push!(pool.data, streamdata) + push!(pool.mapping, rand(Bool, size(streamdata, 1))) + push!(pool.data, streamdata) return streamdata end @@ -37,7 +38,7 @@ function useble_length(pool) return count end -##Indexing - Using one index, moving through of the instances +##Indexing - Using three indexes to move in data through the instances function Base.getindex(pool::Pool, index::Int) count = 1 for i=1:size(pool.data, 1) @@ -78,7 +79,7 @@ function Base.getindex(pool::Pool, range::UnitRange{Int64}) return data end -##Indexing - Using one index, moving through of the instances and features +##Indexing - Using two indexes to move in data through the instances and features Base.getindex(pool::Pool, instance::Int, feature::Int) = pool[instance][feature] @@ -98,6 +99,6 @@ Base.getindex(pool::Pool, range::UnitRange{Int64}, c::Colon) = pool[range] Base.getindex(pool::Pool, range::UnitRange{Int64}, range2::UnitRange{Int64}) = pool[range][:, range2] -##Indexing - Using one index, moving through of the instances, features, samples +##Indexing - Using three indexes to move in data through the instances, features, samples diff --git a/test/runtests.jl b/test/runtests.jl index bc799ab..f6f3d03 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,58 +2,57 @@ using Test using EasyStream @testset "Dataset Test" begin - pool = EasyStream.Dataset1CDT() + pool = EasyStream.Dataset1CDT() |> EasyStream.Pool @test size(pool[:], 1) == 150 initial_size = 200 flux_size = 5 - pool = EasyStream.Dataset1CDT(initial_size, flux_size) + pool = EasyStream.Dataset1CDT(initial_size, flux_size) |> EasyStream.Pool @test size(pool[:], 1) == 200 - pool = EasyStream.DatasetUG_2C_5D() + pool = EasyStream.DatasetUG_2C_5D() |> EasyStream.Pool @test size(pool[:], 1) == 150 initial_size = 200 flux_size = 5 - pool = EasyStream.DatasetUG_2C_5D(initial_size, flux_size) + pool = EasyStream.DatasetUG_2C_5D(initial_size, flux_size) |> EasyStream.Pool @test size(pool[:], 1) == 200 end @testset "Stream Test" begin - pool = EasyStream.Dataset1CDT() - @test size(pool.data[1], 1) == 150 - x = EasyStream.next!(pool) - println(x) - println(size(x, 1)) - @test size(EasyStream.next!(pool), 1) == 1 - @test size(EasyStream.next!(pool), 1) == 1 + stream = EasyStream.Dataset1CDT() + @test size(EasyStream.next!(stream), 1) == 150 + x = EasyStream.next!(stream) + + @test size(EasyStream.next!(stream), 1) == 1 + @test size(EasyStream.next!(stream), 1) == 1 initial_size = 200 flux_size = 5 - pool = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(pool.data[1], 1) == initial_size - @test size(EasyStream.next!(pool), 1) == flux_size - @test size(EasyStream.next!(pool), 1) == flux_size + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(stream), 1) == initial_size + @test size(EasyStream.next!(stream), 1) == flux_size + @test size(EasyStream.next!(stream), 1) == flux_size initial_size = 16000 flux_size = 1 - pool = EasyStream.Dataset1CDT(initial_size, flux_size) - @test size(pool.data[1], 1) == initial_size - #@test EasyStream.next!(pool) == nothing - - #initial_size = 16001 - #flux_size = 1 - #pool = EasyStream.Dataset1CDT(initial_size, flux_size) - #@test size(EasyStream.next!(pool), 1) == initial_size - 1 - #@test EasyStream.next!(pool) == nothing - - #@test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) - #@test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(stream), 1) == initial_size + @test EasyStream.next!(stream) == nothing + + initial_size = 16001 + flux_size = 1 + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(stream), 1) == initial_size - 1 + @test EasyStream.next!(stream) == nothing + + @test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) + @test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) end -@testset "Stream Indexing" begin +@testset "Pool Indexing" begin @testset "Test using one index" begin - pool = EasyStream.Dataset1CDT() + pool = EasyStream.Dataset1CDT() |> EasyStream.Pool test_data = pool.data[1] data_size = size(test_data, 1) @@ -61,8 +60,6 @@ end @test pool[i] == test_data[i,:] end - - @test pool[:] == test_data[:, :] for i=1:data_size @@ -86,7 +83,7 @@ end end @testset "Test using two index " begin - pool = EasyStream.Dataset1CDT() + pool = EasyStream.Dataset1CDT() |> EasyStream.Pool @test pool[1, :]== pool[1] From 5bcaadc8087b4ccc5a923858c037f114f5647071 Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Tue, 7 Jul 2020 13:03:16 -0300 Subject: [PATCH 29/31] =?UTF-8?q?Adicionando=20indexa=C3=A7=C3=A3o=20a=20p?= =?UTF-8?q?artir=20de=203=20=C3=ADndices?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/datasets.jl | 8 ++++---- src/pool.jl | 40 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/datasets.jl b/src/datasets.jl index 4eb2896..66a3de6 100644 --- a/src/datasets.jl +++ b/src/datasets.jl @@ -17,7 +17,7 @@ module DatasetsStreams mv(path, joinpath(defdir, "synthetic/UG_2C_5D.csv")) end - function Dataset1CDT(initial_size::Int, flux_size::Int)::EasyStream.Pool + function Dataset1CDT(initial_size::Int, flux_size::Int)::EasyStream.MemoryStream filename = "$(defdir)/synthetic/1CDT.csv" isfile(filename) || get1cdtdata(defdir) @@ -26,12 +26,12 @@ module DatasetsStreams stream = EasyStream.MemoryStream(data, initial_size, flux_size) - return EasyStream.Pool(stream) + return stream end Dataset1CDT() = Dataset1CDT(150, 1) - function DatasetUG_2C_5D(initial_size::Int, flux_size::Int)::EasyStream.Pool + function DatasetUG_2C_5D(initial_size::Int, flux_size::Int)::EasyStream.MemoryStream filename = "$(defdir)/synthetic/UG_2C_5D.csv" isfile(filename) || getug2c5ddata(defdir) @@ -40,7 +40,7 @@ module DatasetsStreams stream = EasyStream.MemoryStream(data, initial_size, flux_size) - return EasyStream.Pool(stream) + return stream end DatasetUG_2C_5D() = DatasetUG_2C_5D(150, 1) diff --git a/src/pool.jl b/src/pool.jl index 91c0c26..ae5eb17 100644 --- a/src/pool.jl +++ b/src/pool.jl @@ -39,12 +39,12 @@ function useble_length(pool) end ##Indexing - Using three indexes to move in data through the instances -function Base.getindex(pool::Pool, index::Int) +function Base.getindex(pool::Pool, instance::Int) count = 1 for i=1:size(pool.data, 1) for j=1:size(pool.data[i], 1) if pool.mapping[i][j] - if count == index + if count == instance return pool.data[i][j, :] end count += 1 @@ -101,4 +101,40 @@ Base.getindex(pool::Pool, range::UnitRange{Int64}, range2::UnitRange{Int64}) = p ##Indexing - Using three indexes to move in data through the instances, features, samples +#= +if sample > size(pool.data, 1) + throw(BoundsError(pool.data, sample)) + else if instance > size(pool.data[sample], 1) + throw(BoundsError(pool.data[sample], instance)) + else if feature > size(pool.data[sample], 2) + throw(BoundsError(pool.data[sample][instance], feature)) + end +=# +function Base.getindex(pool::Pool, instance::Int, feature::Colon, sample::Int) + count = 1 + for j=1:size(pool.data[sample], 1) + if pool.mapping[sample][j] + if count == instance + return pool.data[sample][j, :] + end + count += 1 + end + end +end + +#Base.getindex(pool::Pool, instance::Int, feature::Int, sample::Int) = pool[instance, :, sample][feature] + + +function Base.getindex(pool::Pool, instance::Int, feature::Int, sample::Int) + count = 1 + for j=1:size(pool.data[sample], 1) + if pool.mapping[sample][j] + if count == instance + return pool.data[sample][j, feature] + end + count += 1 + end + end +end +#Base.getindex(pool::Pool, instance::Int, feature::Int, sample::Int) = pool[instance, :, sample] \ No newline at end of file From b613837fba945e409025e3b4e7438e4907f22517 Mon Sep 17 00:00:00 2001 From: Pedro Conrado Date: Fri, 10 Jul 2020 04:14:44 -0300 Subject: [PATCH 30/31] =?UTF-8?q?Finaliza=C3=A7=C3=A3o=20da=20indexa=C3=A7?= =?UTF-8?q?=C3=A3o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pool.jl | 65 +++++++++++++++++++++-------------------------------- 1 file changed, 26 insertions(+), 39 deletions(-) diff --git a/src/pool.jl b/src/pool.jl index ae5eb17..3dc2d97 100644 --- a/src/pool.jl +++ b/src/pool.jl @@ -1,8 +1,9 @@ -mutable struct Pool{T <: Stream} +mutable struct Pool{T <: Stream, N} stream::T - data::Vector{DataFrame} + data::Vector{N} mapping::Vector{Vector{Bool}} size::Int64 + N end function Pool(stream::Stream) @@ -12,14 +13,15 @@ function Pool(stream::Stream) mapping = Vector{Vector{Bool}}() push!(mapping, ones(Bool, size(streamdata, 1))) - return Pool(stream, data, mapping, size(streamdata, 1)) + return Pool(stream, data, mapping, size(streamdata, 1), DataFrame) end function next!(pool::Pool) streamdata = next!(pool.stream) pool.size += size(streamdata, 1) - push!(pool.mapping, rand(Bool, size(streamdata, 1))) + #push!(pool.mapping, rand(Bool, size(streamdata, 1))) + push!(pool.mapping, ones(Bool, size(streamdata, 1))) push!(pool.data, streamdata) return streamdata end @@ -54,25 +56,15 @@ function Base.getindex(pool::Pool, instance::Int) end function Base.getindex(pool::Pool, i::Colon) - data = DataFrame() + data = pool.N() for i=1:useble_length(pool) push!(data, pool[i]) end - #= - data = DataFrame() - for i=1:size(pool.data, 1) - for j=1:size(pool.data[i], 1) - if pool.mapping[i][j] - push!(data, pool.data[i][j, :]) - end - end - end - =# return data end function Base.getindex(pool::Pool, range::UnitRange{Int64}) - data = DataFrame() + data = pool.N() for i in range push!(data, pool[i]) end @@ -101,40 +93,35 @@ Base.getindex(pool::Pool, range::UnitRange{Int64}, range2::UnitRange{Int64}) = p ##Indexing - Using three indexes to move in data through the instances, features, samples -#= -if sample > size(pool.data, 1) - throw(BoundsError(pool.data, sample)) - else if instance > size(pool.data[sample], 1) - throw(BoundsError(pool.data[sample], instance)) - else if feature > size(pool.data[sample], 2) - throw(BoundsError(pool.data[sample][instance], feature)) - end -=# -function Base.getindex(pool::Pool, instance::Int, feature::Colon, sample::Int) + +function Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::Int) count = 1 + data = pool.N() for j=1:size(pool.data[sample], 1) if pool.mapping[sample][j] - if count == instance - return pool.data[sample][j, :] - end + push!(data, pool.data[sample][j, :]) count += 1 end end + return data end -#Base.getindex(pool::Pool, instance::Int, feature::Int, sample::Int) = pool[instance, :, sample][feature] - - -function Base.getindex(pool::Pool, instance::Int, feature::Int, sample::Int) +function Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::UnitRange{Int64}) count = 1 - for j=1:size(pool.data[sample], 1) - if pool.mapping[sample][j] - if count == instance - return pool.data[sample][j, feature] + data = pool.N() + for i=range + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + if count == instance + push!(data, pool.data[i][j, :]) + end + count += 1 end - count += 1 end end + return data end -#Base.getindex(pool::Pool, instance::Int, feature::Int, sample::Int) = pool[instance, :, sample] \ No newline at end of file +Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::Colon) = pool[:] + +Base.getindex(pool::Pool, instance, feature, sample) = pool[:, :, sample][instance, feature] From b70c3a99e559ee37b8d4cb7cf6b2e96c1b2ebb22 Mon Sep 17 00:00:00 2001 From: Filipe Braida Date: Wed, 22 Jul 2020 17:18:58 -0300 Subject: [PATCH 31/31] =?UTF-8?q?Desenho=20de=20um=20prot=C3=B3tipo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Pedro Conrado --- Project.toml | 1 + src/EasyStream.jl | 1 + src/pool.jl | 130 ++-------------------------------------------- src/pool_bk.jl | 127 ++++++++++++++++++++++++++++++++++++++++++++ src/source.jl | 52 +++++++++++++++++++ src/stream.jl | 47 +++++------------ test/test.jl | 51 +++++++++++++++++- 7 files changed, 248 insertions(+), 161 deletions(-) create mode 100644 src/pool_bk.jl create mode 100644 src/source.jl diff --git a/Project.toml b/Project.toml index 9d144ea..548e8ee 100644 --- a/Project.toml +++ b/Project.toml @@ -10,6 +10,7 @@ DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" MLJ = "add582a8-e3ab-11e8-2d5e-e98b27df1bc7" RDatasets = "ce6b1742-4840-55fa-b093-852dadbb1d8b" Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" +Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" [compat] julia = "1.0" diff --git a/src/EasyStream.jl b/src/EasyStream.jl index 619d22f..eb2834b 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -1,6 +1,7 @@ module EasyStream using DataFrames + include("source.jl") include("stream.jl") include("pool.jl") include("datasets.jl") diff --git a/src/pool.jl b/src/pool.jl index 3dc2d97..553bd26 100644 --- a/src/pool.jl +++ b/src/pool.jl @@ -1,127 +1,3 @@ -mutable struct Pool{T <: Stream, N} - stream::T - data::Vector{N} - mapping::Vector{Vector{Bool}} - size::Int64 - N -end - -function Pool(stream::Stream) - data = Vector{DataFrame}() - streamdata = next!(stream) - push!(data, streamdata) - mapping = Vector{Vector{Bool}}() - push!(mapping, ones(Bool, size(streamdata, 1))) - - return Pool(stream, data, mapping, size(streamdata, 1), DataFrame) -end - -function next!(pool::Pool) - streamdata = next!(pool.stream) - pool.size += size(streamdata, 1) - - #push!(pool.mapping, rand(Bool, size(streamdata, 1))) - push!(pool.mapping, ones(Bool, size(streamdata, 1))) - push!(pool.data, streamdata) - return streamdata -end - - -##Utils -function useble_length(pool) - count = 0 - for i=1:size(pool.data, 1) - for j=1:size(pool.data[i], 1) - if pool.mapping[i][j] - count += 1 - end - end - end - return count -end - -##Indexing - Using three indexes to move in data through the instances -function Base.getindex(pool::Pool, instance::Int) - count = 1 - for i=1:size(pool.data, 1) - for j=1:size(pool.data[i], 1) - if pool.mapping[i][j] - if count == instance - return pool.data[i][j, :] - end - count += 1 - end - end - end -end - -function Base.getindex(pool::Pool, i::Colon) - data = pool.N() - for i=1:useble_length(pool) - push!(data, pool[i]) - end - return data -end - -function Base.getindex(pool::Pool, range::UnitRange{Int64}) - data = pool.N() - for i in range - push!(data, pool[i]) - end - return data -end - -##Indexing - Using two indexes to move in data through the instances and features - -Base.getindex(pool::Pool, instance::Int, feature::Int) = pool[instance][feature] - -Base.getindex(pool::Pool, instance::Int, c::Colon) = pool[instance] - -Base.getindex(pool::Pool, instance::Int, range::UnitRange{Int64}) = pool[instance][range] - -Base.getindex(pool::Pool, c::Colon, feature::Int) = pool[:][:, feature] - -Base.getindex(pool::Pool, c::Colon, range::UnitRange{Int64}) = pool[:][:, range] - -Base.getindex(pool::Pool, c1::Colon, c2::Colon) = pool[:] - -Base.getindex(pool::Pool, range::UnitRange{Int64}, feature::Int) = pool[range][:, feature] - -Base.getindex(pool::Pool, range::UnitRange{Int64}, c::Colon) = pool[range] - -Base.getindex(pool::Pool, range::UnitRange{Int64}, range2::UnitRange{Int64}) = pool[range][:, range2] - -##Indexing - Using three indexes to move in data through the instances, features, samples - - -function Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::Int) - count = 1 - data = pool.N() - for j=1:size(pool.data[sample], 1) - if pool.mapping[sample][j] - push!(data, pool.data[sample][j, :]) - count += 1 - end - end - return data -end - -function Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::UnitRange{Int64}) - count = 1 - data = pool.N() - for i=range - for j=1:size(pool.data[i], 1) - if pool.mapping[i][j] - if count == instance - push!(data, pool.data[i][j, :]) - end - count += 1 - end - end - end - return data -end - -Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::Colon) = pool[:] - -Base.getindex(pool::Pool, instance, feature, sample) = pool[:, :, sample][instance, feature] +mutable struct Pool + data::Vector +end \ No newline at end of file diff --git a/src/pool_bk.jl b/src/pool_bk.jl new file mode 100644 index 0000000..3dc2d97 --- /dev/null +++ b/src/pool_bk.jl @@ -0,0 +1,127 @@ +mutable struct Pool{T <: Stream, N} + stream::T + data::Vector{N} + mapping::Vector{Vector{Bool}} + size::Int64 + N +end + +function Pool(stream::Stream) + data = Vector{DataFrame}() + streamdata = next!(stream) + push!(data, streamdata) + mapping = Vector{Vector{Bool}}() + push!(mapping, ones(Bool, size(streamdata, 1))) + + return Pool(stream, data, mapping, size(streamdata, 1), DataFrame) +end + +function next!(pool::Pool) + streamdata = next!(pool.stream) + pool.size += size(streamdata, 1) + + #push!(pool.mapping, rand(Bool, size(streamdata, 1))) + push!(pool.mapping, ones(Bool, size(streamdata, 1))) + push!(pool.data, streamdata) + return streamdata +end + + +##Utils +function useble_length(pool) + count = 0 + for i=1:size(pool.data, 1) + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + count += 1 + end + end + end + return count +end + +##Indexing - Using three indexes to move in data through the instances +function Base.getindex(pool::Pool, instance::Int) + count = 1 + for i=1:size(pool.data, 1) + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + if count == instance + return pool.data[i][j, :] + end + count += 1 + end + end + end +end + +function Base.getindex(pool::Pool, i::Colon) + data = pool.N() + for i=1:useble_length(pool) + push!(data, pool[i]) + end + return data +end + +function Base.getindex(pool::Pool, range::UnitRange{Int64}) + data = pool.N() + for i in range + push!(data, pool[i]) + end + return data +end + +##Indexing - Using two indexes to move in data through the instances and features + +Base.getindex(pool::Pool, instance::Int, feature::Int) = pool[instance][feature] + +Base.getindex(pool::Pool, instance::Int, c::Colon) = pool[instance] + +Base.getindex(pool::Pool, instance::Int, range::UnitRange{Int64}) = pool[instance][range] + +Base.getindex(pool::Pool, c::Colon, feature::Int) = pool[:][:, feature] + +Base.getindex(pool::Pool, c::Colon, range::UnitRange{Int64}) = pool[:][:, range] + +Base.getindex(pool::Pool, c1::Colon, c2::Colon) = pool[:] + +Base.getindex(pool::Pool, range::UnitRange{Int64}, feature::Int) = pool[range][:, feature] + +Base.getindex(pool::Pool, range::UnitRange{Int64}, c::Colon) = pool[range] + +Base.getindex(pool::Pool, range::UnitRange{Int64}, range2::UnitRange{Int64}) = pool[range][:, range2] + +##Indexing - Using three indexes to move in data through the instances, features, samples + + +function Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::Int) + count = 1 + data = pool.N() + for j=1:size(pool.data[sample], 1) + if pool.mapping[sample][j] + push!(data, pool.data[sample][j, :]) + count += 1 + end + end + return data +end + +function Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::UnitRange{Int64}) + count = 1 + data = pool.N() + for i=range + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + if count == instance + push!(data, pool.data[i][j, :]) + end + count += 1 + end + end + end + return data +end + +Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::Colon) = pool[:] + +Base.getindex(pool::Pool, instance, feature, sample) = pool[:, :, sample][instance, feature] diff --git a/src/source.jl b/src/source.jl new file mode 100644 index 0000000..151e1cf --- /dev/null +++ b/src/source.jl @@ -0,0 +1,52 @@ +using Tables + +abstract type AbstractSource end + +mutable struct Source <: AbstractSource + table + position::Int + initial_size::Int + batch::Int +end + +function Source(table, initial_size::Int, batch::Int) + if !Tables.istable(table) + @error "não é um tipo table" + end + + if initial_size > size(table, 1) + initial_size = size(table, 1) + @warn "initial size é maior que o arquivo e será definido para o tamanho do arquivo" + end + + if initial_size == 0 + @warn "initial size é zero" + end + + if batch == 0 + @warn "flux size é zero" + end + + return Source(table, 0, initial_size, batch) +end + +function next(source::Source) + if source.position < source.initial_size + source.position = source.initial_size + return source.table[1:source.initial_size, :] + end + + if source.position >= size(source.table, 1) + return nothing + end + + if source.position < source.initial_size + source.position = source.initial_size + index = 1:source.initial_size + else + index = (source.position + 1):(source.position + source.batch) + source.position = source.position + source.batch + end + + return source.table[index, :] +end \ No newline at end of file diff --git a/src/stream.jl b/src/stream.jl index 7b092f6..9576e92 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -1,43 +1,24 @@ -abstract type Stream{T} end - -mutable struct MemoryStream{T} <: Stream{T} - data::T - position::Int - initial_size::Int - batch::Int +struct Stream + source::AbstractSource + data_tables::Vector end -function MemoryStream(data::T, initial_size::Int, batch::Int) where T - if initial_size > size(data, 1) - initial_size = size(data, 1) - @warn "initial size é maior que o arquivo e será definido para o tamanho do arquivo" - end +Stream(source::AbstractSource) = Stream(source, Vector{Any}()) - if initial_size == 0 - @warn "initial size é zero" - end +function next(stream::Stream; f::Function = copyall) + data = next(stream.source) - if batch == 0 - @warn "flux size é zero" - end + elements = f(size(data)[1], length(stream.data_tables)) - return MemoryStream(data, 0, initial_size, batch) + for i=1:length(stream.data_tables) + append!(stream.data_tables[i], data[elements[:, i], :]) + end end -next!(buffer::Stream) = nothing - -function next!(stream::MemoryStream) - if stream.position >= size(stream.data, 1) - return nothing - end +copyall(qnt_elements, qnt_tables) = ones(Bool, qnt_elements, qnt_tables) - if stream.position < stream.initial_size - stream.position = stream.initial_size - index = 1:stream.initial_size - else - index = (stream.position + 1):(stream.position + stream.batch) - stream.position = stream.position + stream.batch +function publish(stream::Stream, data_tables...) + for data_table in data_tables + push!(stream.data_tables, data_table) end - - return stream.data[index, :] end diff --git a/test/test.jl b/test/test.jl index da72374..646cca0 100644 --- a/test/test.jl +++ b/test/test.jl @@ -4,10 +4,59 @@ Pkg.activate(".") using EasyStream -pool = EasyStream.Dataset1CDT() +stream = EasyStream.Dataset1CDT() + +stream pool[150,:] EasyStream.next!(stream) stream[1,:] + + +using DataFrames + +function createdummydatasetone() + df = DataFrame() + df[:user] = [1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 7, 7, 7] + df[:item] = [1, 1, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 1, 2, 4, 5, 6, 2, 4, 5] + df[:rating] = [2.5, 3.5, 3.0, 3.5, 2.5, 3.0, 3, 3.5, 1.5, 5, 3, 3.5, 2.5, 3.0, 3.5, 4.0, 3.5, 3.0, 4.0, 2.5, 4.5, 3.0, 4.0, 2.0, 3.0, 2.0, 3.0, 3.0, 4.0, 5.0, 3.5, 3.0, 4.5, 4.0, 1.0] + return df +end + +df = createdummydatasetone() +EasyStream.Source(df, 100, 10) + +using CSV + +data = CSV.read("datasets/synthetic/1CDT.csv"; header = true) + +data = CSV.File("datasets/synthetic/1CDT.csv"; header = false) + +data = CSV.Rows("datasets/synthetic/1CDT.csv"; header = true, ignoreemptylines = true) +data +for row in CSV.Rows("datasets/synthetic/1CDT.csv") + println("$row") + break +end + +using Revise +using EasyStream +using CSV +using DataFrames + +data = CSV.read("datasets/synthetic/1CDT.csv"; header = false) +source = EasyStream.Source(data, 10, 5) + +stream = EasyStream.Stream(source) + +table1 = Vector() + +EasyStream.publish(stream, table1) +stream.data_tables + +EasyStream.next(stream) +length(table1) + +table1[1] \ No newline at end of file