From bad6390430f506bf339886a2f752abd03e485a11 Mon Sep 17 00:00:00 2001 From: nixo Date: Sat, 4 Apr 2020 22:15:24 +0200 Subject: [PATCH] Progress? --- src/InMemoryLoggedDB.jl | 69 ++-------- src/actions.jl | 12 ++ src/dist.jl | 7 + src/structure.jl | 286 ++++++++++++++++++++++++++++++++++++++++ test/store.jl | 103 ++++----------- 5 files changed, 344 insertions(+), 133 deletions(-) create mode 100644 src/actions.jl create mode 100644 src/dist.jl create mode 100644 src/structure.jl diff --git a/src/InMemoryLoggedDB.jl b/src/InMemoryLoggedDB.jl index 78fae75..c1a3789 100644 --- a/src/InMemoryLoggedDB.jl +++ b/src/InMemoryLoggedDB.jl @@ -1,67 +1,20 @@ module InMemoryLoggedDB -export Buffer, flush!, restore +export Action, Transaction, + create, modify, delete, + write, restore, replay, + DataBase, DataBaseTable, Table, + ArrayTable, MappedTable +export Table using MsgPack -mutable struct Buffer{T} - size::UInt32 # Save transactions when buffer is full - delay_ns::UInt64 # Force save transactions after this period - io::IOStream # File handle (or other stream where data is saved) - transactions::Array{T} # Circular buffer - position::UInt32 # Position of next element in the buffer - last_ns::UInt64 - state -end +include("actions.jl") +include("structure.jl") -function Buffer(size::Int, filename::String, sttype::DataType, delay_ns = 10^9) - log = Array{sttype}(undef, size) - Buffer{sttype}(size, delay_ns, jlopen(filename, "a"), - log, 0, time_ns(), []) -end - -function Base.push!(buffer::Buffer, element) - # Store element in the buffer, taking care of saving it to file if needed - buffer.transactions[buffer.position+=1] = element - @debug "Stored element" e=element pos=buffer.position - flush = false - # If buffer if full, go back and save - buffer.position >= buffer.size && (flush = true) - # If enough time passed, we save the data - # flush || (((time_ns() - buffer.last_ns) > buffer.delay_ns) && (flush = true)) - flush && flush!(buffer) - nothing -end - -function flush!(buffer::Buffer) - wrote = 0 - @debug "Flush!: " pos = buffer.position - for p in 1:buffer.position - @debug "writing" p=p el=buffer.transactions[p] - # TODO: This convertion to string must be removed - wrote += write(buffer.io, pack(buffer.transactions[p])) - end - buffer.last_ns = time_ns() - buffer.position = 0 - wrote -end - -Base.flush(buffer::Buffer) = (flush!(buffer); flush(buffer.io)) -function Base.close(buffer::Buffer) - flush(buffer) - close(buffer.io) - empty!(buffer.transactions) - buffer.position = 0 -end - -function restore(file, T) - out = [] - open(file) do io - while !eof(io) - push!(out, unpack(io, T)) - end - end - out +function Base.push!(stream::IOStream, object::T) where T + write(stream, Action(create, object)) + flush(stream) end end # module diff --git a/src/actions.jl b/src/actions.jl new file mode 100644 index 0000000..c3f89c0 --- /dev/null +++ b/src/actions.jl @@ -0,0 +1,12 @@ +@enum Transaction create=1 modify=2 delete=3 + +struct Action{T} + # Short names since they are serialized a lot + a::Transaction + o::T +end + +MsgPack.msgpack_type(::Type{Action{T}}) where T = MsgPack.StructType() +MsgPack.msgpack_type(::Type{Transaction}) = MsgPack.IntegerType() +Base.isless(r::Transaction, i::Int64) = isless(Int(r), i) +Base.convert(::Type{Transaction}, i::Integer) = Transaction(i) diff --git a/src/dist.jl b/src/dist.jl new file mode 100644 index 0000000..4e4553b --- /dev/null +++ b/src/dist.jl @@ -0,0 +1,7 @@ +function update!(table::Table, transactions::RemoteChannel) + while true + t = take!(transactions) + @info "Writing $t" + push!(table, t) + end +end diff --git a/src/structure.jl b/src/structure.jl new file mode 100644 index 0000000..9f621d1 --- /dev/null +++ b/src/structure.jl @@ -0,0 +1,286 @@ +using MsgPack + +@enum TableTypes ArrayTable=1 MappedTable=2 + +Base.convert(::Type{TableTypes}, i::Integer) = TableTypes(i) +MsgPack.msgpack_type(::Type{TableTypes}) = MsgPack.IntegerType() +Base.isless(r::TableTypes, i::Int64) = isless(Int(r), i) +const DBExt = ".msgpacks" + +abstract type Memory end + +struct ArrayMemory{T} <: Memory + io::IOStream + data::Vector{T} +end + +struct MappedMemory{K,T} <: Memory + io::IOStream + data::Dict{K,T} +end + +struct Table + name::String + Type::TableTypes + types::Array{DataType} + size::DataType +end + +struct DataBaseTable + table::Table + memory::Memory +end + +MsgPack.msgpack_type(::Type{Table}) = MsgPack.StructType() + +struct DataBase + path::String + schema::DataBaseTable + # Link from schema tables (stored in schema and serialized) to tables memory + memory::Dict{Table,DataBaseTable} + size +end + +DataBaseTable(table::Table, path) = + DataBaseTable(table, Memory(table, path)) + +const defaultSchema = Table("schema", MappedTable, + # Key, Value (File), Value (Memory) + [Symbol, Table], UInt32) + +function DataBase(dir; S = UInt32) + dir = ispath(dir) ? realpath(dir) : mkpath(dir) + isfile(dir) && throw(ArgumentError("path ($dir) is a file")) + tables = DataBaseTable(defaultSchema, dir) + tbldata = Dict{Table,DataBaseTable}() + for (idx,tbl) in tables.memory.data + tbldata[tbl] = DataBaseTable(tbl, dir) + end + DataBase(dir, tables, tbldata, S) +end + +MsgPack.msgpack_type(::Type{DataType}) = MsgPack.StringType() +MsgPack.to_msgpack(::MsgPack.StringType, d::DataType) = string(d) +MsgPack.from_msgpack(::Type{DataType}, d::AbstractString) = eval(Meta.parse(d)) + +"""Write an `OBJECT' (of type Action) to a file referenced `TABLE'.""" +function Base.write(io::IOStream, object::Action; S = UInt32) + mobj = MsgPack.pack(object) + objtype = typeof(object) + wl = length(mobj) + @assert(write(io, S(wl)) == sizeof(S)) + @assert(write(io, mobj) == wl) + wl +end + +function Base.getindex(tbl::DataBaseTable, key::T) where T + tbl.memory.data[key] +end + +function Base.get(tbl::DataBaseTable, key::T, els::U) where {T, U} + get(tbl.memory.data, key, els) +end + +function Base.getindex(db::DataBase, name::Symbol) + db.memory[db.schema.memory.data[name]] +end + +function Base.setindex!(db::DataBase, table::Table, name::Symbol) + dbtable = DataBaseTable(table, Memory(table, db.path)) + # File + action = haskey(db.schema.memory.data, name) ? modify : create + write(db.schema.memory.io, Action(action, (name,table)), S = db.size) + # In memory + # If action == modify, convert memory to new table + db.schema.memory.data[name] = table + db.memory[table] = dbtable + # On disk + flush(db.schema.memory.io) +end + +function Base.setindex!(tbl::DataBaseTable, object::T, index) where T + action = haskey(tbl.memory.data, index) ? modify : create + write(tbl.memory.io, Action(action, (index, object))) + # In memory + tbl.memory.data[index] = object + # Flush + flush(tbl.memory.io) +end + +function Base.delete!(tbl::DataBaseTable, index) + haskey(tbl.memory.data, index) || + throw(BoundsError(tbl.memory.data, [index])) + write(tbl.memory.io, Action(delete, (index, tbl.memory.data[index]))) + # In memory + delete!(tbl.memory.data, index) + # Flush + flush(tbl.memory.io) +end + +function Base.empty!(tbl::DataBaseTable) + # emptyfile!(tbl.memory.data) + # FIXME: make it work for dictionaries + write.(Ref(tbl.memory.io), Action.(Ref(delete), tbl.memory.data)) + # In memory + empty!(tbl.memory.data) + # Flush + flush(tbl.memory.io) +end + + +function Base.push!(tbl::DataBaseTable, object::T) where T + write(tbl.memory.io, Action(create, object), S = tbl.table.size) + # In memory + push!(tbl.memory.data, object) + # Flush + flush(tbl.memory.io) + tbl.memory.data +end + +function restore(file, T, S) + output = Action{T}[] + open(file, "r") do io + while !eof(io) + obj = read(io, read(io, S)) + push!(output, MsgPack.unpack(obj, Action{T})) + end + end + output +end + +function replay(state, t::Transaction, u::Table) + if t == create + push!(state, u) + elseif t == delete + idx = findfirst(x -> x.id == u.id, state) + idx === nothing && @error "Cannot delete non-existing entry" + deleteat!(state, idx) + else + @error "Invalid transaction" + end +end + +function replay(state::Array{T}, t::Transaction, e::T) where T + if t == create + push!(state, e) + elseif t == delete + idx = findfirst(x -> x == e, state) + idx === nothing && @error "Cannot delete non-existing entry" + deleteat!(state, idx) + elseif t == modify + idx = findfirst(x -> x == e, state) + idx === nothing && @error "Cannot modify non-existing entry" + state[idx] = e + else + @error "Invalid transaction" + end +end + +function replay(state::Dict{K,V}, t::Transaction, (s,u)::Tuple{K,V}) where {K,V} + if t == create + state[s] = u + elseif t == delete + haskey(state, s) || @error "Cannot delete non-existing entry" + delete!(state, s) + elseif t == modify + # Check exists, override + haskey(state, s) || @error "Cannot modify non-existing entry" + state[s] = u + else + @error "Invalid transaction" + end +end + +function replay(actions::Vector{Action{Tuple{K,V}}}) where {K, V} + state = Dict{K, V}() + for action in actions + replay(state, action.a, action.o) + end + state +end + +function replay(actions::Vector{Action{T}}) where T + state = T[] + for action in actions + replay(state, action.a, action.o) + end + state +end + +function Memory(table::Table, path)::Memory + vname(version) = joinpath(path, string(table.name, version, DBExt)) + rx = Regex("$(table.name)([0-9]+)$(DBExt)") + files = filter!(!isnothing, map(x -> match(rx, x), readdir(path))) + matches = isempty(files) ? Int[] : parse.(Int, getindex.(files, 1)) + exists = length(matches) > 0 + version = exists ? maximum(matches) : 0 + fname = vname(version) + @info(exists ? "Loading table from $(fname)" : + "Creating new table in $(fname)") + if table.Type == ArrayTable + data = if exists + log = restore(fname, first(table.types), table.size) + dt = replay(log) + # Incremental backups + consolidated = Action.(Ref(create), dt) + # If new file is exactly the same of the old one, + # do not write, do not replace and do not increment version number! + if consolidated != log + io = open(vname(version+1), "w") + write.(Ref(io), consolidated; S = table.size) + flush(io) + else + io = open(vname(version), "a") + end + ArrayMemory{table.types[1]}(io, dt) + else + mkpath(path) + io = open(vname(0), "w") + ArrayMemory{table.types[1]}(io, first(table.types)[]) + end + elseif table.Type == MappedTable + # TODO: Simplify the differences between mapped and array + data = if exists + log = restore(fname, Tuple{table.types...}, table.size) + dt = replay(log) + # Incremental backups + consolidated = Action.(Ref(create), + [(first(i), last(i)) for i in collect(dt)]) + if consolidated != log + io = open(vname(version+1), "w") + write.(Ref(io), consolidated; S = table.size) + flush(io) + else + io = open(vname(version), "a") + end + dt + else + mkpath(path) + io = open(vname(0), "w") + Dict{table.types...}() + end + MappedMemory{table.types...}(io, data) + else + @error "Unknown type" + end +end + +Base.convert(::Type{Symbol}, s::String) = Symbol(s) +function Base.convert(::Type{Table}, s::Dict{Any,Any}) + Table(s["name"], s["Type"], + Main.eval.(Meta.parse.(s["types"])), + Main.eval(Meta.parse(s["size"]))) +end + + +# TEST +# database = DataBase("/tmp/db") +# database[:tokens] = Table("tokens", MappedTable, [UUID, String], UInt32) +# # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(), Dict{UUID,String}()))), UInt32) +# # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict{Table,DataBaseTable}(), UInt32) +# database[:tokens][UUID(0)] = "primo token" +# # database -> DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(), Dict(UUID("00000000-0000-0000-0000-000000000000") => "primo token")))), UInt32) + +# database[:OTP] = Table("OTP", ArrayTable, [String], UInt8) + +# push!.(Ref(database[:OTP]), string.(1:10)) diff --git a/test/store.jl b/test/store.jl index 6145b30..b1855ac 100644 --- a/test/store.jl +++ b/test/store.jl @@ -1,80 +1,33 @@ -using MsgPack +using Random +## CREATE +FDBNAME = "test1" -struct Timing - label::String - time::Float64 +FILEDB = open(FDBNAME, "w") +# create 100 users +unum = 100 +users = [] +uuids = UUID.(1:unum) +for uuid in uuids + u = User(provider, "Nicolò", "Balzarotti", "3935027690", nothing, + "\$argon2id\$v=19\$m=65536,t=2,p=1\$c1FCl+2ur/YcMwcfNCsBkw\$xqCwQ/VeJNTkrEduNO7SyhQSDSW5hJMxjBaBFpIr26Q", + false, false, false, uuid, 0) + push!(users, u) end -MsgPack.msgpack_type(::Type{Timing}) = MsgPack.StructType() - -const timings = Buffer(1_000_000, "timings.msgpack", Timing) - -struct X - el::Bool - N::Int - c::String +for user in users + write(FILEDB, Action(create, user)) end -MsgPack.msgpack_type(::Type{X}) = MsgPack.StructType() - -mkpath("results") - -function microtest(t, ns) - label = "$(t)_$(ns)" - buf = Buffer(t, "results/$(label).msgpack", X, ns) - res = @timed begin - for i in 1:10000 - push!(buf, X(1,i,string("c is: ", i))) - end - flush(buf) - end - close(buf) - (label, res[2]) +# Randomly modify those users +for i in 1:100_000 + u = rand(users) + u.online = rand(Bool) + u.version += 1 + write(FILEDB, Action(modify, u)) end - -function test(range) - for ns in range - for t in 1:1000 - push!(timings, - Timing(microtest(t, ns)...)) - end - end +# Delete half of them +todelete = users[randperm(length(users))][1:(length(users) ÷ 2)] +for u in todelete + write(FILEDB, Action(delete, u)) end - -test(1:5) -test(6:15:31) -test(10_000_000:10_000_000) - -close(timing) - -# Compare with the dumbest Array implementation - -const dumbtimings = Buffer(1_000_000, "dumbtimings.msgpack", Timing) - -function dumb(t, ns) - label = "dumb_$(t)_$(ns)" - name = "results/$(label).msgpack" - res = open(name, "a+") do out - buf = X[] - @timed begin - for i in 1:10000 - push!(buf, X(1,i,string("c is: ", i))) - end - write.(Ref(out), pack.(buf)) - empty!(buf) - end - end - (label, res[2]) -end - - -function test_dumb(range) - for ns in range - for t in 1:1000 - push!(dumbtimings, - Timing(dumb(t, ns)...)) - end - end -end - -test_dumb(1:5) -test_dumb(6:15:31) -test_dumb(10_000_000:10_000_000) +close(FILEDB) +x = restore(FDBNAME, User) +InMemoryLoggedDB.replay(x)