Progress?

master
nixo 2020-04-04 22:15:24 +02:00
parent 3dfe01a86e
commit bad6390430
5 changed files with 344 additions and 133 deletions

View File

@ -1,67 +1,20 @@
module InMemoryLoggedDB
export Buffer, flush!, restore
export Action, Transaction,
create, modify, delete,
write, restore, replay,
DataBase, DataBaseTable, Table,
ArrayTable, MappedTable
export Table
using MsgPack
mutable struct Buffer{T}
size::UInt32 # Save transactions when buffer is full
delay_ns::UInt64 # Force save transactions after this period
io::IOStream # File handle (or other stream where data is saved)
transactions::Array{T} # Circular buffer
position::UInt32 # Position of next element in the buffer
last_ns::UInt64
state
end
include("actions.jl")
include("structure.jl")
function Buffer(size::Int, filename::String, sttype::DataType, delay_ns = 10^9)
log = Array{sttype}(undef, size)
Buffer{sttype}(size, delay_ns, jlopen(filename, "a"),
log, 0, time_ns(), [])
end
function Base.push!(buffer::Buffer, element)
# Store element in the buffer, taking care of saving it to file if needed
buffer.transactions[buffer.position+=1] = element
@debug "Stored element" e=element pos=buffer.position
flush = false
# If buffer if full, go back and save
buffer.position >= buffer.size && (flush = true)
# If enough time passed, we save the data
# flush || (((time_ns() - buffer.last_ns) > buffer.delay_ns) && (flush = true))
flush && flush!(buffer)
nothing
end
function flush!(buffer::Buffer)
wrote = 0
@debug "Flush!: " pos = buffer.position
for p in 1:buffer.position
@debug "writing" p=p el=buffer.transactions[p]
# TODO: This convertion to string must be removed
wrote += write(buffer.io, pack(buffer.transactions[p]))
end
buffer.last_ns = time_ns()
buffer.position = 0
wrote
end
Base.flush(buffer::Buffer) = (flush!(buffer); flush(buffer.io))
function Base.close(buffer::Buffer)
flush(buffer)
close(buffer.io)
empty!(buffer.transactions)
buffer.position = 0
end
function restore(file, T)
out = []
open(file) do io
while !eof(io)
push!(out, unpack(io, T))
end
end
out
function Base.push!(stream::IOStream, object::T) where T
write(stream, Action(create, object))
flush(stream)
end
end # module

12
src/actions.jl Normal file
View File

@ -0,0 +1,12 @@
@enum Transaction create=1 modify=2 delete=3
struct Action{T}
# Short names since they are serialized a lot
a::Transaction
o::T
end
MsgPack.msgpack_type(::Type{Action{T}}) where T = MsgPack.StructType()
MsgPack.msgpack_type(::Type{Transaction}) = MsgPack.IntegerType()
Base.isless(r::Transaction, i::Int64) = isless(Int(r), i)
Base.convert(::Type{Transaction}, i::Integer) = Transaction(i)

7
src/dist.jl Normal file
View File

@ -0,0 +1,7 @@
function update!(table::Table, transactions::RemoteChannel)
while true
t = take!(transactions)
@info "Writing $t"
push!(table, t)
end
end

286
src/structure.jl Normal file
View File

@ -0,0 +1,286 @@
using MsgPack
@enum TableTypes ArrayTable=1 MappedTable=2
Base.convert(::Type{TableTypes}, i::Integer) = TableTypes(i)
MsgPack.msgpack_type(::Type{TableTypes}) = MsgPack.IntegerType()
Base.isless(r::TableTypes, i::Int64) = isless(Int(r), i)
const DBExt = ".msgpacks"
abstract type Memory end
struct ArrayMemory{T} <: Memory
io::IOStream
data::Vector{T}
end
struct MappedMemory{K,T} <: Memory
io::IOStream
data::Dict{K,T}
end
struct Table
name::String
Type::TableTypes
types::Array{DataType}
size::DataType
end
struct DataBaseTable
table::Table
memory::Memory
end
MsgPack.msgpack_type(::Type{Table}) = MsgPack.StructType()
struct DataBase
path::String
schema::DataBaseTable
# Link from schema tables (stored in schema and serialized) to tables memory
memory::Dict{Table,DataBaseTable}
size
end
DataBaseTable(table::Table, path) =
DataBaseTable(table, Memory(table, path))
const defaultSchema = Table("schema", MappedTable,
# Key, Value (File), Value (Memory)
[Symbol, Table], UInt32)
function DataBase(dir; S = UInt32)
dir = ispath(dir) ? realpath(dir) : mkpath(dir)
isfile(dir) && throw(ArgumentError("path ($dir) is a file"))
tables = DataBaseTable(defaultSchema, dir)
tbldata = Dict{Table,DataBaseTable}()
for (idx,tbl) in tables.memory.data
tbldata[tbl] = DataBaseTable(tbl, dir)
end
DataBase(dir, tables, tbldata, S)
end
MsgPack.msgpack_type(::Type{DataType}) = MsgPack.StringType()
MsgPack.to_msgpack(::MsgPack.StringType, d::DataType) = string(d)
MsgPack.from_msgpack(::Type{DataType}, d::AbstractString) = eval(Meta.parse(d))
"""Write an `OBJECT' (of type Action) to a file referenced `TABLE'."""
function Base.write(io::IOStream, object::Action; S = UInt32)
mobj = MsgPack.pack(object)
objtype = typeof(object)
wl = length(mobj)
@assert(write(io, S(wl)) == sizeof(S))
@assert(write(io, mobj) == wl)
wl
end
function Base.getindex(tbl::DataBaseTable, key::T) where T
tbl.memory.data[key]
end
function Base.get(tbl::DataBaseTable, key::T, els::U) where {T, U}
get(tbl.memory.data, key, els)
end
function Base.getindex(db::DataBase, name::Symbol)
db.memory[db.schema.memory.data[name]]
end
function Base.setindex!(db::DataBase, table::Table, name::Symbol)
dbtable = DataBaseTable(table, Memory(table, db.path))
# File
action = haskey(db.schema.memory.data, name) ? modify : create
write(db.schema.memory.io, Action(action, (name,table)), S = db.size)
# In memory
# If action == modify, convert memory to new table
db.schema.memory.data[name] = table
db.memory[table] = dbtable
# On disk
flush(db.schema.memory.io)
end
function Base.setindex!(tbl::DataBaseTable, object::T, index) where T
action = haskey(tbl.memory.data, index) ? modify : create
write(tbl.memory.io, Action(action, (index, object)))
# In memory
tbl.memory.data[index] = object
# Flush
flush(tbl.memory.io)
end
function Base.delete!(tbl::DataBaseTable, index)
haskey(tbl.memory.data, index) ||
throw(BoundsError(tbl.memory.data, [index]))
write(tbl.memory.io, Action(delete, (index, tbl.memory.data[index])))
# In memory
delete!(tbl.memory.data, index)
# Flush
flush(tbl.memory.io)
end
function Base.empty!(tbl::DataBaseTable)
# emptyfile!(tbl.memory.data)
# FIXME: make it work for dictionaries
write.(Ref(tbl.memory.io), Action.(Ref(delete), tbl.memory.data))
# In memory
empty!(tbl.memory.data)
# Flush
flush(tbl.memory.io)
end
function Base.push!(tbl::DataBaseTable, object::T) where T
write(tbl.memory.io, Action(create, object), S = tbl.table.size)
# In memory
push!(tbl.memory.data, object)
# Flush
flush(tbl.memory.io)
tbl.memory.data
end
function restore(file, T, S)
output = Action{T}[]
open(file, "r") do io
while !eof(io)
obj = read(io, read(io, S))
push!(output, MsgPack.unpack(obj, Action{T}))
end
end
output
end
function replay(state, t::Transaction, u::Table)
if t == create
push!(state, u)
elseif t == delete
idx = findfirst(x -> x.id == u.id, state)
idx === nothing && @error "Cannot delete non-existing entry"
deleteat!(state, idx)
else
@error "Invalid transaction"
end
end
function replay(state::Array{T}, t::Transaction, e::T) where T
if t == create
push!(state, e)
elseif t == delete
idx = findfirst(x -> x == e, state)
idx === nothing && @error "Cannot delete non-existing entry"
deleteat!(state, idx)
elseif t == modify
idx = findfirst(x -> x == e, state)
idx === nothing && @error "Cannot modify non-existing entry"
state[idx] = e
else
@error "Invalid transaction"
end
end
function replay(state::Dict{K,V}, t::Transaction, (s,u)::Tuple{K,V}) where {K,V}
if t == create
state[s] = u
elseif t == delete
haskey(state, s) || @error "Cannot delete non-existing entry"
delete!(state, s)
elseif t == modify
# Check exists, override
haskey(state, s) || @error "Cannot modify non-existing entry"
state[s] = u
else
@error "Invalid transaction"
end
end
function replay(actions::Vector{Action{Tuple{K,V}}}) where {K, V}
state = Dict{K, V}()
for action in actions
replay(state, action.a, action.o)
end
state
end
function replay(actions::Vector{Action{T}}) where T
state = T[]
for action in actions
replay(state, action.a, action.o)
end
state
end
function Memory(table::Table, path)::Memory
vname(version) = joinpath(path, string(table.name, version, DBExt))
rx = Regex("$(table.name)([0-9]+)$(DBExt)")
files = filter!(!isnothing, map(x -> match(rx, x), readdir(path)))
matches = isempty(files) ? Int[] : parse.(Int, getindex.(files, 1))
exists = length(matches) > 0
version = exists ? maximum(matches) : 0
fname = vname(version)
@info(exists ? "Loading table from $(fname)" :
"Creating new table in $(fname)")
if table.Type == ArrayTable
data = if exists
log = restore(fname, first(table.types), table.size)
dt = replay(log)
# Incremental backups
consolidated = Action.(Ref(create), dt)
# If new file is exactly the same of the old one,
# do not write, do not replace and do not increment version number!
if consolidated != log
io = open(vname(version+1), "w")
write.(Ref(io), consolidated; S = table.size)
flush(io)
else
io = open(vname(version), "a")
end
ArrayMemory{table.types[1]}(io, dt)
else
mkpath(path)
io = open(vname(0), "w")
ArrayMemory{table.types[1]}(io, first(table.types)[])
end
elseif table.Type == MappedTable
# TODO: Simplify the differences between mapped and array
data = if exists
log = restore(fname, Tuple{table.types...}, table.size)
dt = replay(log)
# Incremental backups
consolidated = Action.(Ref(create),
[(first(i), last(i)) for i in collect(dt)])
if consolidated != log
io = open(vname(version+1), "w")
write.(Ref(io), consolidated; S = table.size)
flush(io)
else
io = open(vname(version), "a")
end
dt
else
mkpath(path)
io = open(vname(0), "w")
Dict{table.types...}()
end
MappedMemory{table.types...}(io, data)
else
@error "Unknown type"
end
end
Base.convert(::Type{Symbol}, s::String) = Symbol(s)
function Base.convert(::Type{Table}, s::Dict{Any,Any})
Table(s["name"], s["Type"],
Main.eval.(Meta.parse.(s["types"])),
Main.eval(Meta.parse(s["size"])))
end
# TEST
# database = DataBase("/tmp/db")
# database[:tokens] = Table("tokens", MappedTable, [UUID, String], UInt32)
# # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(<file /tmp/db/schema.msgpacks>), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(<file /tmp/db/tokens.msgpacks>), Dict{UUID,String}()))), UInt32)
# # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(<file /tmp/db/schema.msgpacks>), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict{Table,DataBaseTable}(), UInt32)
# database[:tokens][UUID(0)] = "primo token"
# # database -> DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(<file /tmp/db/schema.msgpacks>), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(<file /tmp/db/tokens.msgpacks>), Dict(UUID("00000000-0000-0000-0000-000000000000") => "primo token")))), UInt32)
# database[:OTP] = Table("OTP", ArrayTable, [String], UInt8)
# push!.(Ref(database[:OTP]), string.(1:10))

View File

@ -1,80 +1,33 @@
using MsgPack
using Random
## CREATE
FDBNAME = "test1"
struct Timing
label::String
time::Float64
FILEDB = open(FDBNAME, "w")
# create 100 users
unum = 100
users = []
uuids = UUID.(1:unum)
for uuid in uuids
u = User(provider, "Nicolò", "Balzarotti", "3935027690", nothing,
"\$argon2id\$v=19\$m=65536,t=2,p=1\$c1FCl+2ur/YcMwcfNCsBkw\$xqCwQ/VeJNTkrEduNO7SyhQSDSW5hJMxjBaBFpIr26Q",
false, false, false, uuid, 0)
push!(users, u)
end
MsgPack.msgpack_type(::Type{Timing}) = MsgPack.StructType()
const timings = Buffer(1_000_000, "timings.msgpack", Timing)
struct X
el::Bool
N::Int
c::String
for user in users
write(FILEDB, Action(create, user))
end
MsgPack.msgpack_type(::Type{X}) = MsgPack.StructType()
mkpath("results")
function microtest(t, ns)
label = "$(t)_$(ns)"
buf = Buffer(t, "results/$(label).msgpack", X, ns)
res = @timed begin
for i in 1:10000
push!(buf, X(1,i,string("c is: ", i)))
end
flush(buf)
end
close(buf)
(label, res[2])
# Randomly modify those users
for i in 1:100_000
u = rand(users)
u.online = rand(Bool)
u.version += 1
write(FILEDB, Action(modify, u))
end
function test(range)
for ns in range
for t in 1:1000
push!(timings,
Timing(microtest(t, ns)...))
end
end
# Delete half of them
todelete = users[randperm(length(users))][1:(length(users) ÷ 2)]
for u in todelete
write(FILEDB, Action(delete, u))
end
test(1:5)
test(6:15:31)
test(10_000_000:10_000_000)
close(timing)
# Compare with the dumbest Array implementation
const dumbtimings = Buffer(1_000_000, "dumbtimings.msgpack", Timing)
function dumb(t, ns)
label = "dumb_$(t)_$(ns)"
name = "results/$(label).msgpack"
res = open(name, "a+") do out
buf = X[]
@timed begin
for i in 1:10000
push!(buf, X(1,i,string("c is: ", i)))
end
write.(Ref(out), pack.(buf))
empty!(buf)
end
end
(label, res[2])
end
function test_dumb(range)
for ns in range
for t in 1:1000
push!(dumbtimings,
Timing(dumb(t, ns)...))
end
end
end
test_dumb(1:5)
test_dumb(6:15:31)
test_dumb(10_000_000:10_000_000)
close(FILEDB)
x = restore(FDBNAME, User)
InMemoryLoggedDB.replay(x)