InMemoryLoggedDB.jl/src/structure.jl

298 lines
10 KiB
Julia

using MsgPack
@enum TableTypes ArrayTable=1 MappedTable=2
Base.convert(::Type{TableTypes}, i::Integer) = TableTypes(i)
MsgPack.msgpack_type(::Type{TableTypes}) = MsgPack.IntegerType()
Base.isless(r::TableTypes, i::Int64) = isless(Int(r), i)
const DBExt = ".msgpacks"
abstract type Memory end
struct ArrayMemory{T} <: Memory
io::IOStream
data::Vector{T}
end
struct MappedMemory{K,T} <: Memory
io::IOStream
data::Dict{K,T}
end
struct Table
name::String
Type::TableTypes
types::Array{DataType}
size::DataType
end
struct DataBaseTable
table::Table
memory::Memory
end
MsgPack.msgpack_type(::Type{Table}) = MsgPack.StructType()
struct DataBase
path::String
schema::DataBaseTable
# Link from schema tables (stored in schema and serialized) to tables memory
memory::Dict{Table,DataBaseTable}
size
end
DataBaseTable(table::Table, path; verbose = false) =
DataBaseTable(table, Memory(table, path, verbose = verbose))
const defaultSchema = Table("schema", MappedTable,
# Key, Value (File), Value (Memory)
[Symbol, Table], UInt32)
function DataBase(dir; S = UInt32, verbose = false)
dir = ispath(dir) ? realpath(dir) : mkpath(dir)
isfile(dir) && throw(ArgumentError("path ($dir) is a file"))
tables = DataBaseTable(defaultSchema, dir, verbose = verbose)
tbldata = Dict{Table,DataBaseTable}()
for (idx,tbl) in tables.memory.data
tbldata[tbl] = DataBaseTable(tbl, dir)
end
DataBase(dir, tables, tbldata, S)
end
MsgPack.msgpack_type(::Type{DataType}) = MsgPack.StringType()
MsgPack.to_msgpack(::MsgPack.StringType, d::DataType) = string(d)
MsgPack.from_msgpack(::Type{DataType}, d::AbstractString) = eval(Meta.parse(d))
"""Write an `OBJECT' (of type Action) to a file referenced `TABLE'."""
function Base.write(io::IOStream, object::Action; S = UInt32)
mobj = MsgPack.pack(object)
objtype = typeof(object)
wl = length(mobj)
@assert(write(io, S(wl)) == sizeof(S))
@assert(write(io, mobj) == wl)
wl
end
Base.Dict(dbt::DataBaseTable) = dbt.memory.data
function Base.getindex(tbl::DataBaseTable, key::T) where T
tbl.memory.data[key]
end
function Base.get(tbl::DataBaseTable, key::T, els::U) where {T, U}
get(tbl.memory.data, key, els)
end
function Base.getindex(db::DataBase, name::Symbol)
db.memory[db.schema.memory.data[name]]
end
function Base.setindex!(db::DataBase, table::Table, name::Symbol; verbose = false)
dbtable = DataBaseTable(table, Memory(table, db.path, verbose = verbose))
# File
action = haskey(db.schema.memory.data, name) ? modify : create
write(db.schema.memory.io, Action(action, (name,table)), S = db.size)
# In memory
# If action == modify, convert memory to new table
db.schema.memory.data[name] = table
db.memory[table] = dbtable
# On disk
flush(db.schema.memory.io)
end
function Base.delete!(db::DataBase, name::Symbol)
# File
write(db.schema.memory.io, Action(delete, name), S = db.size)
# In memory
delete!(db.schema.memory.data,name)
# On disk
flush(db.schema.memory.io)
end
function Base.setindex!(tbl::DataBaseTable, object::T, index) where T
action = haskey(tbl.memory.data, index) ? modify : create
write(tbl.memory.io, Action(action, (index, object)))
# In memory
tbl.memory.data[index] = object
# Flush
flush(tbl.memory.io)
end
function Base.delete!(tbl::DataBaseTable, index)
haskey(tbl.memory.data, index) ||
throw(BoundsError(tbl.memory.data, [index]))
write(tbl.memory.io, Action(delete, (index, tbl.memory.data[index])))
# In memory
delete!(tbl.memory.data, index)
# Flush
flush(tbl.memory.io)
end
function Base.empty!(tbl::DataBaseTable)
# emptyfile!(tbl.memory.data)
# FIXME: make it work for dictionaries
write.(Ref(tbl.memory.io), Action.(Ref(delete), tbl.memory.data))
# In memory
empty!(tbl.memory.data)
# Flush
flush(tbl.memory.io)
end
function Base.push!(tbl::DataBaseTable, object::T) where T
write(tbl.memory.io, Action(create, object), S = tbl.table.size)
# In memory
push!(tbl.memory.data, object)
# Flush
flush(tbl.memory.io)
tbl.memory.data
end
function restore(file, T, S)
output = Action{T}[]
open(file, "r") do io
while !eof(io)
obj = read(io, read(io, S))
push!(output, MsgPack.unpack(obj, Action{T}))
end
end
output
end
function replay(state, t::Transaction, u::Table)
if t == create
push!(state, u)
elseif t == delete
idx = findfirst(x -> x.id == u.id, state)
idx === nothing && @error "Cannot delete non-existing entry"
deleteat!(state, idx)
else
@error "Invalid transaction"
end
end
function replay(state::Array{T}, t::Transaction, e::T) where T
if t == create
push!(state, e)
elseif t == delete
idx = findfirst(x -> x == e, state)
idx === nothing && @error "Cannot delete non-existing entry"
deleteat!(state, idx)
elseif t == modify
idx = findfirst(x -> x == e, state)
idx === nothing && @error "Cannot modify non-existing entry"
state[idx] = e
else
@error "Invalid transaction"
end
end
function replay(state::Dict{K,V}, t::Transaction, (s,u)::Tuple{K,V}) where {K,V}
if t == create
state[s] = u
elseif t == delete
haskey(state, s) || @error "Cannot delete non-existing entry"
delete!(state, s)
elseif t == modify
# Check exists, override
haskey(state, s) || @error "Cannot modify non-existing entry"
state[s] = u
else
@error "Invalid transaction"
end
end
function replay(actions::Vector{Action{Tuple{K,V}}}) where {K, V}
state = Dict{K, V}()
for action in actions
replay(state, action.a, action.o)
end
state
end
function replay(actions::Vector{Action{T}}) where T
state = T[]
for action in actions
replay(state, action.a, action.o)
end
state
end
function Memory(table::Table, path; verbose = false)::Memory
vname(version) = joinpath(path, string(table.name, version, DBExt))
rx = Regex("$(table.name)([0-9]+)$(DBExt)")
files = filter!(!isnothing, map(x -> match(rx, x), readdir(path)))
matches = isempty(files) ? Int[] : parse.(Int, getindex.(files, 1))
exists = length(matches) > 0
version = exists ? maximum(matches) : 0
fname = vname(version)
verbose && @info(exists ? "Loading table from $(fname)" :
"Creating new table in $(fname)")
if table.Type == ArrayTable
data = if exists
log = restore(fname, first(table.types), table.size)
dt = replay(log)
# Incremental backups
consolidated = Action.(Ref(create), dt)
# If new file is exactly the same of the old one,
# do not write, do not replace and do not increment version number!
if consolidated != log
io = open(vname(version+1), "w")
write.(Ref(io), consolidated; S = table.size)
flush(io)
else
io = open(vname(version), "a")
end
ArrayMemory{table.types[1]}(io, dt)
else
mkpath(path)
io = open(vname(0), "w")
ArrayMemory{table.types[1]}(io, first(table.types)[])
end
elseif table.Type == MappedTable
# TODO: Simplify the differences between mapped and array
data = if exists
log = restore(fname, Tuple{table.types...}, table.size)
dt = replay(log)
# Incremental backups
consolidated = Action.(Ref(create),
[(first(i), last(i)) for i in collect(dt)])
if consolidated != log
io = open(vname(version+1), "w")
write.(Ref(io), consolidated; S = table.size)
flush(io)
else
io = open(vname(version), "a")
end
dt
else
mkpath(path)
io = open(vname(0), "w")
Dict{table.types...}()
end
MappedMemory{table.types...}(io, data)
else
@error "Unknown type"
end
end
Base.convert(::Type{Symbol}, s::String) = Symbol(s)
function Base.convert(::Type{Table}, s::Dict{Any,Any})
Table(s["name"], s["Type"],
Main.eval.(Meta.parse.(s["types"])),
Main.eval(Meta.parse(s["size"])))
end
# TEST
# database = DataBase("/tmp/db")
# database[:tokens] = Table("tokens", MappedTable, [UUID, String], UInt32)
# # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(<file /tmp/db/schema.msgpacks>), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(<file /tmp/db/tokens.msgpacks>), Dict{UUID,String}()))), UInt32)
# # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(<file /tmp/db/schema.msgpacks>), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict{Table,DataBaseTable}(), UInt32)
# database[:tokens][UUID(0)] = "primo token"
# # database -> DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(<file /tmp/db/schema.msgpacks>), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(<file /tmp/db/tokens.msgpacks>), Dict(UUID("00000000-0000-0000-0000-000000000000") => "primo token")))), UInt32)
# database[:OTP] = Table("OTP", ArrayTable, [String], UInt8)
# push!.(Ref(database[:OTP]), string.(1:10))