using MsgPack @enum TableTypes ArrayTable=1 MappedTable=2 Base.convert(::Type{TableTypes}, i::Integer) = TableTypes(i) MsgPack.msgpack_type(::Type{TableTypes}) = MsgPack.IntegerType() Base.isless(r::TableTypes, i::Int64) = isless(Int(r), i) const DBExt = ".msgpacks" abstract type Memory end struct ArrayMemory{T} <: Memory io::IOStream data::Vector{T} end struct MappedMemory{K,T} <: Memory io::IOStream data::Dict{K,T} end struct Table name::String Type::TableTypes types::Array{DataType} size::DataType end struct DataBaseTable table::Table memory::Memory end MsgPack.msgpack_type(::Type{Table}) = MsgPack.StructType() struct DataBase path::String schema::DataBaseTable # Link from schema tables (stored in schema and serialized) to tables memory memory::Dict{Table,DataBaseTable} size end DataBaseTable(table::Table, path; verbose = false) = DataBaseTable(table, Memory(table, path, verbose = verbose)) const defaultSchema = Table("schema", MappedTable, # Key, Value (File), Value (Memory) [Symbol, Table], UInt32) function DataBase(dir; S = UInt32, verbose = false) dir = ispath(dir) ? realpath(dir) : mkpath(dir) isfile(dir) && throw(ArgumentError("path ($dir) is a file")) tables = DataBaseTable(defaultSchema, dir, verbose = verbose) tbldata = Dict{Table,DataBaseTable}() for (idx,tbl) in tables.memory.data tbldata[tbl] = DataBaseTable(tbl, dir) end DataBase(dir, tables, tbldata, S) end MsgPack.msgpack_type(::Type{DataType}) = MsgPack.StringType() MsgPack.to_msgpack(::MsgPack.StringType, d::DataType) = string(d) MsgPack.from_msgpack(::Type{DataType}, d::AbstractString) = eval(Meta.parse(d)) """Write an `OBJECT' (of type Action) to a file referenced `TABLE'.""" function Base.write(io::IOStream, object::Action; S = UInt32) mobj = MsgPack.pack(object) objtype = typeof(object) wl = length(mobj) @assert(write(io, S(wl)) == sizeof(S)) @assert(write(io, mobj) == wl) wl end Base.Dict(dbt::DataBaseTable) = dbt.memory.data function Base.getindex(tbl::DataBaseTable, key::T) where T tbl.memory.data[key] end function Base.get(tbl::DataBaseTable, key::T, els::U) where {T, U} get(tbl.memory.data, key, els) end function Base.getindex(db::DataBase, name::Symbol) db.memory[db.schema.memory.data[name]] end function Base.setindex!(db::DataBase, table::Table, name::Symbol; verbose = false) dbtable = DataBaseTable(table, Memory(table, db.path, verbose = verbose)) # File action = haskey(db.schema.memory.data, name) ? modify : create write(db.schema.memory.io, Action(action, (name,table)), S = db.size) # In memory # If action == modify, convert memory to new table db.schema.memory.data[name] = table db.memory[table] = dbtable # On disk flush(db.schema.memory.io) end function Base.delete!(db::DataBase, name::Symbol) # File write(db.schema.memory.io, Action(delete, name), S = db.size) # In memory delete!(db.schema.memory.data,name) # On disk flush(db.schema.memory.io) end function Base.setindex!(tbl::DataBaseTable, object::T, index) where T action = haskey(tbl.memory.data, index) ? modify : create write(tbl.memory.io, Action(action, (index, object))) # In memory tbl.memory.data[index] = object # Flush flush(tbl.memory.io) end function Base.delete!(tbl::DataBaseTable, index) haskey(tbl.memory.data, index) || throw(BoundsError(tbl.memory.data, [index])) write(tbl.memory.io, Action(delete, (index, tbl.memory.data[index]))) # In memory delete!(tbl.memory.data, index) # Flush flush(tbl.memory.io) end function Base.empty!(tbl::DataBaseTable) # emptyfile!(tbl.memory.data) # FIXME: make it work for dictionaries write.(Ref(tbl.memory.io), Action.(Ref(delete), tbl.memory.data)) # In memory empty!(tbl.memory.data) # Flush flush(tbl.memory.io) end function Base.push!(tbl::DataBaseTable, object::T) where T write(tbl.memory.io, Action(create, object), S = tbl.table.size) # In memory push!(tbl.memory.data, object) # Flush flush(tbl.memory.io) tbl.memory.data end function restore(file, T, S) output = Action{T}[] open(file, "r") do io while !eof(io) obj = read(io, read(io, S)) push!(output, MsgPack.unpack(obj, Action{T})) end end output end function replay(state, t::Transaction, u::Table) if t == create push!(state, u) elseif t == delete idx = findfirst(x -> x.id == u.id, state) idx === nothing && @error "Cannot delete non-existing entry" deleteat!(state, idx) else @error "Invalid transaction" end end function replay(state::Array{T}, t::Transaction, e::T) where T if t == create push!(state, e) elseif t == delete idx = findfirst(x -> x == e, state) idx === nothing && @error "Cannot delete non-existing entry" deleteat!(state, idx) elseif t == modify idx = findfirst(x -> x == e, state) idx === nothing && @error "Cannot modify non-existing entry" state[idx] = e else @error "Invalid transaction" end end function replay(state::Dict{K,V}, t::Transaction, (s,u)::Tuple{K,V}) where {K,V} if t == create state[s] = u elseif t == delete haskey(state, s) || @error "Cannot delete non-existing entry" delete!(state, s) elseif t == modify # Check exists, override haskey(state, s) || @error "Cannot modify non-existing entry" state[s] = u else @error "Invalid transaction" end end function replay(actions::Vector{Action{Tuple{K,V}}}) where {K, V} state = Dict{K, V}() for action in actions replay(state, action.a, action.o) end state end function replay(actions::Vector{Action{T}}) where T state = T[] for action in actions replay(state, action.a, action.o) end state end function Memory(table::Table, path; verbose = false)::Memory vname(version) = joinpath(path, string(table.name, version, DBExt)) rx = Regex("$(table.name)([0-9]+)$(DBExt)") files = filter!(!isnothing, map(x -> match(rx, x), readdir(path))) matches = isempty(files) ? Int[] : parse.(Int, getindex.(files, 1)) exists = length(matches) > 0 version = exists ? maximum(matches) : 0 fname = vname(version) verbose && @info(exists ? "Loading table from $(fname)" : "Creating new table in $(fname)") if table.Type == ArrayTable data = if exists log = restore(fname, first(table.types), table.size) dt = replay(log) # Incremental backups consolidated = Action.(Ref(create), dt) # If new file is exactly the same of the old one, # do not write, do not replace and do not increment version number! if consolidated != log io = open(vname(version+1), "w") write.(Ref(io), consolidated; S = table.size) flush(io) else io = open(vname(version), "a") end ArrayMemory{table.types[1]}(io, dt) else mkpath(path) io = open(vname(0), "w") ArrayMemory{table.types[1]}(io, first(table.types)[]) end elseif table.Type == MappedTable # TODO: Simplify the differences between mapped and array data = if exists log = restore(fname, Tuple{table.types...}, table.size) dt = replay(log) # Incremental backups consolidated = Action.(Ref(create), [(first(i), last(i)) for i in collect(dt)]) if consolidated != log io = open(vname(version+1), "w") write.(Ref(io), consolidated; S = table.size) flush(io) else io = open(vname(version), "a") end dt else mkpath(path) io = open(vname(0), "w") Dict{table.types...}() end MappedMemory{table.types...}(io, data) else @error "Unknown type" end end Base.convert(::Type{Symbol}, s::String) = Symbol(s) function Base.convert(::Type{Table}, s::Dict{Any,Any}) Table(s["name"], s["Type"], Main.eval.(Meta.parse.(s["types"])), Main.eval(Meta.parse(s["size"]))) end # TEST # database = DataBase("/tmp/db") # database[:tokens] = Table("tokens", MappedTable, [UUID, String], UInt32) # # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(), Dict{UUID,String}()))), UInt32) # # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict{Table,DataBaseTable}(), UInt32) # database[:tokens][UUID(0)] = "primo token" # # database -> DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(), Dict(UUID("00000000-0000-0000-0000-000000000000") => "primo token")))), UInt32) # database[:OTP] = Table("OTP", ArrayTable, [String], UInt8) # push!.(Ref(database[:OTP]), string.(1:10))