InMemoryLoggedDB.jl/src/InMemoryLoggedDB.jl

68 lines
1.9 KiB
Julia

module InMemoryLoggedDB
export Buffer, flush!, restore
using MsgPack
mutable struct Buffer{T}
size::UInt32 # Save transactions when buffer is full
delay_ns::UInt64 # Force save transactions after this period
io::IOStream # File handle (or other stream where data is saved)
transactions::Array{T} # Circular buffer
position::UInt32 # Position of next element in the buffer
last_ns::UInt64
state
end
function Buffer(size::Int, filename::String, sttype::DataType, delay_ns = 10^9)
log = Array{sttype}(undef, size)
Buffer{sttype}(size, delay_ns, jlopen(filename, "a"),
log, 0, time_ns(), [])
end
function Base.push!(buffer::Buffer, element)
# Store element in the buffer, taking care of saving it to file if needed
buffer.transactions[buffer.position+=1] = element
@debug "Stored element" e=element pos=buffer.position
flush = false
# If buffer if full, go back and save
buffer.position >= buffer.size && (flush = true)
# If enough time passed, we save the data
# flush || (((time_ns() - buffer.last_ns) > buffer.delay_ns) && (flush = true))
flush && flush!(buffer)
nothing
end
function flush!(buffer::Buffer)
wrote = 0
@debug "Flush!: " pos = buffer.position
for p in 1:buffer.position
@debug "writing" p=p el=buffer.transactions[p]
# TODO: This convertion to string must be removed
wrote += write(buffer.io, pack(buffer.transactions[p]))
end
buffer.last_ns = time_ns()
buffer.position = 0
wrote
end
Base.flush(buffer::Buffer) = (flush!(buffer); flush(buffer.io))
function Base.close(buffer::Buffer)
flush(buffer)
close(buffer.io)
empty!(buffer.transactions)
buffer.position = 0
end
function restore(file, T)
out = []
open(file) do io
while !eof(io)
push!(out, unpack(io, T))
end
end
out
end
end # module