298 lines
10 KiB
Julia
298 lines
10 KiB
Julia
using MsgPack
|
|
|
|
@enum TableTypes ArrayTable=1 MappedTable=2
|
|
|
|
Base.convert(::Type{TableTypes}, i::Integer) = TableTypes(i)
|
|
MsgPack.msgpack_type(::Type{TableTypes}) = MsgPack.IntegerType()
|
|
Base.isless(r::TableTypes, i::Int64) = isless(Int(r), i)
|
|
const DBExt = ".msgpacks"
|
|
|
|
abstract type Memory end
|
|
|
|
struct ArrayMemory{T} <: Memory
|
|
io::IOStream
|
|
data::Vector{T}
|
|
end
|
|
|
|
struct MappedMemory{K,T} <: Memory
|
|
io::IOStream
|
|
data::Dict{K,T}
|
|
end
|
|
|
|
struct Table
|
|
name::String
|
|
Type::TableTypes
|
|
types::Array{DataType}
|
|
size::DataType
|
|
end
|
|
|
|
struct DataBaseTable
|
|
table::Table
|
|
memory::Memory
|
|
end
|
|
|
|
MsgPack.msgpack_type(::Type{Table}) = MsgPack.StructType()
|
|
|
|
struct DataBase
|
|
path::String
|
|
schema::DataBaseTable
|
|
# Link from schema tables (stored in schema and serialized) to tables memory
|
|
memory::Dict{Table,DataBaseTable}
|
|
size
|
|
end
|
|
|
|
DataBaseTable(table::Table, path; verbose = false) =
|
|
DataBaseTable(table, Memory(table, path, verbose = verbose))
|
|
|
|
const defaultSchema = Table("schema", MappedTable,
|
|
# Key, Value (File), Value (Memory)
|
|
[Symbol, Table], UInt32)
|
|
|
|
function DataBase(dir; S = UInt32, verbose = false)
|
|
dir = ispath(dir) ? realpath(dir) : mkpath(dir)
|
|
isfile(dir) && throw(ArgumentError("path ($dir) is a file"))
|
|
tables = DataBaseTable(defaultSchema, dir, verbose = verbose)
|
|
tbldata = Dict{Table,DataBaseTable}()
|
|
for (idx,tbl) in tables.memory.data
|
|
tbldata[tbl] = DataBaseTable(tbl, dir)
|
|
end
|
|
DataBase(dir, tables, tbldata, S)
|
|
end
|
|
|
|
MsgPack.msgpack_type(::Type{DataType}) = MsgPack.StringType()
|
|
MsgPack.to_msgpack(::MsgPack.StringType, d::DataType) = string(d)
|
|
MsgPack.from_msgpack(::Type{DataType}, d::AbstractString) = eval(Meta.parse(d))
|
|
|
|
"""Write an `OBJECT' (of type Action) to a file referenced `TABLE'."""
|
|
function Base.write(io::IOStream, object::Action; S = UInt32)
|
|
mobj = MsgPack.pack(object)
|
|
objtype = typeof(object)
|
|
wl = length(mobj)
|
|
@assert(write(io, S(wl)) == sizeof(S))
|
|
@assert(write(io, mobj) == wl)
|
|
wl
|
|
end
|
|
|
|
Base.Dict(dbt::DataBaseTable) = dbt.memory.data
|
|
|
|
function Base.getindex(tbl::DataBaseTable, key::T) where T
|
|
tbl.memory.data[key]
|
|
end
|
|
|
|
function Base.get(tbl::DataBaseTable, key::T, els::U) where {T, U}
|
|
get(tbl.memory.data, key, els)
|
|
end
|
|
|
|
function Base.getindex(db::DataBase, name::Symbol)
|
|
db.memory[db.schema.memory.data[name]]
|
|
end
|
|
|
|
function Base.setindex!(db::DataBase, table::Table, name::Symbol; verbose = false)
|
|
dbtable = DataBaseTable(table, Memory(table, db.path, verbose = verbose))
|
|
# File
|
|
action = haskey(db.schema.memory.data, name) ? modify : create
|
|
write(db.schema.memory.io, Action(action, (name,table)), S = db.size)
|
|
# In memory
|
|
# If action == modify, convert memory to new table
|
|
db.schema.memory.data[name] = table
|
|
db.memory[table] = dbtable
|
|
# On disk
|
|
flush(db.schema.memory.io)
|
|
end
|
|
|
|
function Base.delete!(db::DataBase, name::Symbol)
|
|
# File
|
|
write(db.schema.memory.io, Action(delete, name), S = db.size)
|
|
# In memory
|
|
delete!(db.schema.memory.data,name)
|
|
# On disk
|
|
flush(db.schema.memory.io)
|
|
end
|
|
|
|
function Base.setindex!(tbl::DataBaseTable, object::T, index) where T
|
|
action = haskey(tbl.memory.data, index) ? modify : create
|
|
write(tbl.memory.io, Action(action, (index, object)))
|
|
# In memory
|
|
tbl.memory.data[index] = object
|
|
# Flush
|
|
flush(tbl.memory.io)
|
|
end
|
|
|
|
function Base.delete!(tbl::DataBaseTable, index)
|
|
haskey(tbl.memory.data, index) ||
|
|
throw(BoundsError(tbl.memory.data, [index]))
|
|
write(tbl.memory.io, Action(delete, (index, tbl.memory.data[index])))
|
|
# In memory
|
|
delete!(tbl.memory.data, index)
|
|
# Flush
|
|
flush(tbl.memory.io)
|
|
end
|
|
|
|
function Base.empty!(tbl::DataBaseTable)
|
|
# emptyfile!(tbl.memory.data)
|
|
# FIXME: make it work for dictionaries
|
|
write.(Ref(tbl.memory.io), Action.(Ref(delete), tbl.memory.data))
|
|
# In memory
|
|
empty!(tbl.memory.data)
|
|
# Flush
|
|
flush(tbl.memory.io)
|
|
end
|
|
|
|
|
|
function Base.push!(tbl::DataBaseTable, object::T) where T
|
|
write(tbl.memory.io, Action(create, object), S = tbl.table.size)
|
|
# In memory
|
|
push!(tbl.memory.data, object)
|
|
# Flush
|
|
flush(tbl.memory.io)
|
|
tbl.memory.data
|
|
end
|
|
|
|
function restore(file, T, S)
|
|
output = Action{T}[]
|
|
open(file, "r") do io
|
|
while !eof(io)
|
|
obj = read(io, read(io, S))
|
|
push!(output, MsgPack.unpack(obj, Action{T}))
|
|
end
|
|
end
|
|
output
|
|
end
|
|
|
|
function replay(state, t::Transaction, u::Table)
|
|
if t == create
|
|
push!(state, u)
|
|
elseif t == delete
|
|
idx = findfirst(x -> x.id == u.id, state)
|
|
idx === nothing && @error "Cannot delete non-existing entry"
|
|
deleteat!(state, idx)
|
|
else
|
|
@error "Invalid transaction"
|
|
end
|
|
end
|
|
|
|
function replay(state::Array{T}, t::Transaction, e::T) where T
|
|
if t == create
|
|
push!(state, e)
|
|
elseif t == delete
|
|
idx = findfirst(x -> x == e, state)
|
|
idx === nothing && @error "Cannot delete non-existing entry"
|
|
deleteat!(state, idx)
|
|
elseif t == modify
|
|
idx = findfirst(x -> x == e, state)
|
|
idx === nothing && @error "Cannot modify non-existing entry"
|
|
state[idx] = e
|
|
else
|
|
@error "Invalid transaction"
|
|
end
|
|
end
|
|
|
|
function replay(state::Dict{K,V}, t::Transaction, (s,u)::Tuple{K,V}) where {K,V}
|
|
if t == create
|
|
state[s] = u
|
|
elseif t == delete
|
|
haskey(state, s) || @error "Cannot delete non-existing entry"
|
|
delete!(state, s)
|
|
elseif t == modify
|
|
# Check exists, override
|
|
haskey(state, s) || @error "Cannot modify non-existing entry"
|
|
state[s] = u
|
|
else
|
|
@error "Invalid transaction"
|
|
end
|
|
end
|
|
|
|
function replay(actions::Vector{Action{Tuple{K,V}}}) where {K, V}
|
|
state = Dict{K, V}()
|
|
for action in actions
|
|
replay(state, action.a, action.o)
|
|
end
|
|
state
|
|
end
|
|
|
|
function replay(actions::Vector{Action{T}}) where T
|
|
state = T[]
|
|
for action in actions
|
|
replay(state, action.a, action.o)
|
|
end
|
|
state
|
|
end
|
|
|
|
function Memory(table::Table, path; verbose = false)::Memory
|
|
vname(version) = joinpath(path, string(table.name, version, DBExt))
|
|
rx = Regex("$(table.name)([0-9]+)$(DBExt)")
|
|
files = filter!(!isnothing, map(x -> match(rx, x), readdir(path)))
|
|
matches = isempty(files) ? Int[] : parse.(Int, getindex.(files, 1))
|
|
exists = length(matches) > 0
|
|
version = exists ? maximum(matches) : 0
|
|
fname = vname(version)
|
|
verbose && @info(exists ? "Loading table from $(fname)" :
|
|
"Creating new table in $(fname)")
|
|
if table.Type == ArrayTable
|
|
data = if exists
|
|
log = restore(fname, first(table.types), table.size)
|
|
dt = replay(log)
|
|
# Incremental backups
|
|
consolidated = Action.(Ref(create), dt)
|
|
# If new file is exactly the same of the old one,
|
|
# do not write, do not replace and do not increment version number!
|
|
if consolidated != log
|
|
io = open(vname(version+1), "w")
|
|
write.(Ref(io), consolidated; S = table.size)
|
|
flush(io)
|
|
else
|
|
io = open(vname(version), "a")
|
|
end
|
|
ArrayMemory{table.types[1]}(io, dt)
|
|
else
|
|
mkpath(path)
|
|
io = open(vname(0), "w")
|
|
ArrayMemory{table.types[1]}(io, first(table.types)[])
|
|
end
|
|
elseif table.Type == MappedTable
|
|
# TODO: Simplify the differences between mapped and array
|
|
data = if exists
|
|
log = restore(fname, Tuple{table.types...}, table.size)
|
|
dt = replay(log)
|
|
# Incremental backups
|
|
consolidated = Action.(Ref(create),
|
|
[(first(i), last(i)) for i in collect(dt)])
|
|
if consolidated != log
|
|
io = open(vname(version+1), "w")
|
|
write.(Ref(io), consolidated; S = table.size)
|
|
flush(io)
|
|
else
|
|
io = open(vname(version), "a")
|
|
end
|
|
dt
|
|
else
|
|
mkpath(path)
|
|
io = open(vname(0), "w")
|
|
Dict{table.types...}()
|
|
end
|
|
MappedMemory{table.types...}(io, data)
|
|
else
|
|
@error "Unknown type"
|
|
end
|
|
end
|
|
|
|
Base.convert(::Type{Symbol}, s::String) = Symbol(s)
|
|
function Base.convert(::Type{Table}, s::Dict{Any,Any})
|
|
Table(s["name"], s["Type"],
|
|
Main.eval.(Meta.parse.(s["types"])),
|
|
Main.eval(Meta.parse(s["size"])))
|
|
end
|
|
|
|
|
|
# TEST
|
|
# database = DataBase("/tmp/db")
|
|
# database[:tokens] = Table("tokens", MappedTable, [UUID, String], UInt32)
|
|
# # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(<file /tmp/db/schema.msgpacks>), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(<file /tmp/db/tokens.msgpacks>), Dict{UUID,String}()))), UInt32)
|
|
# # DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(<file /tmp/db/schema.msgpacks>), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict{Table,DataBaseTable}(), UInt32)
|
|
# database[:tokens][UUID(0)] = "primo token"
|
|
# # database -> DataBase("/tmp/db", DataBaseTable(Table("schema", MappedTable, DataType[Symbol, Table], UInt32), MappedMemory{Symbol,Table}(IOStream(<file /tmp/db/schema.msgpacks>), Dict(:tokens => Table("tokens", MappedTable, DataType[UUID, String], UInt32)))), Dict(Table("tokens", MappedTable, DataType[UUID, String], UInt32) => DataBaseTable(Table("tokens", MappedTable, DataType[UUID, String], UInt32), MappedMemory{UUID,String}(IOStream(<file /tmp/db/tokens.msgpacks>), Dict(UUID("00000000-0000-0000-0000-000000000000") => "primo token")))), UInt32)
|
|
|
|
# database[:OTP] = Table("OTP", ArrayTable, [String], UInt8)
|
|
|
|
# push!.(Ref(database[:OTP]), string.(1:10))
|