From 3dfe01a86ee8cd4a80d646ca9c523654588d6daa Mon Sep 17 00:00:00 2001 From: nixo Date: Fri, 28 Feb 2020 09:55:03 +0100 Subject: [PATCH] First try --- Manifest.toml | 146 ++++++++++++++++++++++++++++++++++++++++ Project.toml | 11 +++ src/InMemoryLoggedDB.jl | 67 ++++++++++++++++++ test/store.jl | 80 ++++++++++++++++++++++ 4 files changed, 304 insertions(+) create mode 100644 Manifest.toml create mode 100644 Project.toml create mode 100644 src/InMemoryLoggedDB.jl create mode 100644 test/store.jl diff --git a/Manifest.toml b/Manifest.toml new file mode 100644 index 0000000..92c948d --- /dev/null +++ b/Manifest.toml @@ -0,0 +1,146 @@ +# This file is machine-generated - editing it directly is not advised + +[[Base64]] +uuid = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" + +[[BinaryProvider]] +deps = ["Libdl", "SHA"] +git-tree-sha1 = "5b08ed6036d9d3f0ee6369410b830f8873d4024c" +uuid = "b99e7846-7c00-51b0-8f62-c81ae34c0232" +version = "0.5.8" + +[[CodecZlib]] +deps = ["BinaryProvider", "Libdl", "TranscodingStreams"] +git-tree-sha1 = "05916673a2627dd91b4969ff8ba6941bc85a960e" +uuid = "944b1d66-785c-5afd-91f1-9de20f533193" +version = "0.6.0" + +[[DataStructures]] +deps = ["InteractiveUtils", "OrderedCollections"] +git-tree-sha1 = "5a431d46abf2ef2a4d5d00bd0ae61f651cf854c8" +uuid = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" +version = "0.17.10" + +[[Dates]] +deps = ["Printf"] +uuid = "ade2ca70-3891-5945-98fb-dc099432e06a" + +[[Distributed]] +deps = ["Random", "Serialization", "Sockets"] +uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b" + +[[FileIO]] +deps = ["Pkg"] +git-tree-sha1 = "2c84c57aced468fa21763c66d3bef33adcd09ec7" +uuid = "5789e2e9-d7fb-5bc7-8068-2c6fae9b9549" +version = "1.2.2" + +[[InteractiveUtils]] +deps = ["Markdown"] +uuid = "b77e0a4c-d291-57a0-90e8-8db25a27a240" + +[[JLD2]] +deps = ["CodecZlib", "DataStructures", "FileIO", "Mmap", "Pkg", "Printf", "UUIDs"] +git-tree-sha1 = "d6cfa7c24e27d7eaa2290372739c8298257dae16" +uuid = "033835bb-8acc-5ee8-8aae-3f567f8a3819" +version = "0.1.12" + +[[JSON2]] +deps = ["Dates", "Parsers", "Test"] +git-tree-sha1 = "6cbbbab27d9411946725f5d5c91e8b8fb5f7d5db" +uuid = "2535ab7d-5cd8-5a07-80ac-9b1792aadce3" +version = "0.3.1" + +[[JSON3]] +deps = ["Dates", "Mmap", "Parsers", "StructTypes", "Test", "UUIDs"] +git-tree-sha1 = "68cf28e37c70fd9a161dcaa13dc9e9209b415014" +uuid = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +version = "1.0.1" + +[[LibGit2]] +uuid = "76f85450-5226-5b5a-8eaa-529ad045b433" + +[[Libdl]] +uuid = "8f399da3-3557-5675-b5ff-fb832c97cbdb" + +[[Logging]] +uuid = "56ddb016-857b-54e1-b83d-db4d58db5568" + +[[Markdown]] +deps = ["Base64"] +uuid = "d6f4376e-aef5-505a-96c1-9c027394607a" + +[[MemPool]] +deps = ["DataStructures", "Distributed", "Mmap", "Random", "Serialization", "Sockets", "Test"] +git-tree-sha1 = "d52799152697059353a8eac1000d32ba8d92aa25" +uuid = "f9f48841-c794-520a-933b-121f7ba6ed94" +version = "0.2.0" + +[[Mmap]] +uuid = "a63ad114-7e13-5084-954f-fe012c677804" + +[[MsgPack]] +deps = ["Serialization"] +git-tree-sha1 = "a8cbf066b54d793b9a48c5daa5d586cf2b5bd43d" +uuid = "99f44e22-a591-53d1-9472-aa23ef4bd671" +version = "1.1.0" + +[[OrderedCollections]] +deps = ["Random", "Serialization", "Test"] +git-tree-sha1 = "c4c13474d23c60d20a67b217f1d7f22a40edf8f1" +uuid = "bac558e1-5e72-5ebc-8fee-abe8a469f55d" +version = "1.1.0" + +[[Parsers]] +deps = ["Dates", "Test"] +git-tree-sha1 = "d112c19ccca00924d5d3a38b11ae2b4b268dda39" +uuid = "69de0a69-1ddd-5017-9359-2bf0b02dc9f0" +version = "0.3.11" + +[[Pkg]] +deps = ["Dates", "LibGit2", "Libdl", "Logging", "Markdown", "Printf", "REPL", "Random", "SHA", "Test", "UUIDs"] +uuid = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" + +[[Printf]] +deps = ["Unicode"] +uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7" + +[[REPL]] +deps = ["InteractiveUtils", "Markdown", "Sockets"] +uuid = "3fa0cd96-eef1-5676-8a61-b3b8758bbffb" + +[[Random]] +deps = ["Serialization"] +uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" + +[[SHA]] +uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce" + +[[Serialization]] +uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b" + +[[Sockets]] +uuid = "6462fe0b-24de-5631-8697-dd941f90decc" + +[[StructTypes]] +deps = ["Dates", "UUIDs"] +git-tree-sha1 = "1ed04f622a39d2e5a6747c3a70be040c00333933" +uuid = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" +version = "1.1.0" + +[[Test]] +deps = ["Distributed", "InteractiveUtils", "Logging", "Random"] +uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[[TranscodingStreams]] +deps = ["Random", "Test"] +git-tree-sha1 = "7c53c35547de1c5b9d46a4797cf6d8253807108c" +uuid = "3bb67fe8-82b1-5028-8e26-92a6c54297fa" +version = "0.9.5" + +[[UUIDs]] +deps = ["Random", "SHA"] +uuid = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" + +[[Unicode]] +uuid = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5" diff --git a/Project.toml b/Project.toml new file mode 100644 index 0000000..f58a22a --- /dev/null +++ b/Project.toml @@ -0,0 +1,11 @@ +name = "InMemoryLoggedDB" +uuid = "0b408f8b-f764-4985-8a89-4195658babf6" +authors = ["nixo "] +version = "0.1.0" + +[deps] +JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819" +JSON2 = "2535ab7d-5cd8-5a07-80ac-9b1792aadce3" +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94" +MsgPack = "99f44e22-a591-53d1-9472-aa23ef4bd671" diff --git a/src/InMemoryLoggedDB.jl b/src/InMemoryLoggedDB.jl new file mode 100644 index 0000000..78fae75 --- /dev/null +++ b/src/InMemoryLoggedDB.jl @@ -0,0 +1,67 @@ +module InMemoryLoggedDB + +export Buffer, flush!, restore + +using MsgPack + +mutable struct Buffer{T} + size::UInt32 # Save transactions when buffer is full + delay_ns::UInt64 # Force save transactions after this period + io::IOStream # File handle (or other stream where data is saved) + transactions::Array{T} # Circular buffer + position::UInt32 # Position of next element in the buffer + last_ns::UInt64 + state +end + +function Buffer(size::Int, filename::String, sttype::DataType, delay_ns = 10^9) + log = Array{sttype}(undef, size) + Buffer{sttype}(size, delay_ns, jlopen(filename, "a"), + log, 0, time_ns(), []) +end + +function Base.push!(buffer::Buffer, element) + # Store element in the buffer, taking care of saving it to file if needed + buffer.transactions[buffer.position+=1] = element + @debug "Stored element" e=element pos=buffer.position + flush = false + # If buffer if full, go back and save + buffer.position >= buffer.size && (flush = true) + # If enough time passed, we save the data + # flush || (((time_ns() - buffer.last_ns) > buffer.delay_ns) && (flush = true)) + flush && flush!(buffer) + nothing +end + +function flush!(buffer::Buffer) + wrote = 0 + @debug "Flush!: " pos = buffer.position + for p in 1:buffer.position + @debug "writing" p=p el=buffer.transactions[p] + # TODO: This convertion to string must be removed + wrote += write(buffer.io, pack(buffer.transactions[p])) + end + buffer.last_ns = time_ns() + buffer.position = 0 + wrote +end + +Base.flush(buffer::Buffer) = (flush!(buffer); flush(buffer.io)) +function Base.close(buffer::Buffer) + flush(buffer) + close(buffer.io) + empty!(buffer.transactions) + buffer.position = 0 +end + +function restore(file, T) + out = [] + open(file) do io + while !eof(io) + push!(out, unpack(io, T)) + end + end + out +end + +end # module diff --git a/test/store.jl b/test/store.jl new file mode 100644 index 0000000..6145b30 --- /dev/null +++ b/test/store.jl @@ -0,0 +1,80 @@ +using MsgPack + +struct Timing + label::String + time::Float64 +end +MsgPack.msgpack_type(::Type{Timing}) = MsgPack.StructType() + +const timings = Buffer(1_000_000, "timings.msgpack", Timing) + +struct X + el::Bool + N::Int + c::String +end +MsgPack.msgpack_type(::Type{X}) = MsgPack.StructType() + +mkpath("results") + +function microtest(t, ns) + label = "$(t)_$(ns)" + buf = Buffer(t, "results/$(label).msgpack", X, ns) + res = @timed begin + for i in 1:10000 + push!(buf, X(1,i,string("c is: ", i))) + end + flush(buf) + end + close(buf) + (label, res[2]) +end + +function test(range) + for ns in range + for t in 1:1000 + push!(timings, + Timing(microtest(t, ns)...)) + end + end +end + +test(1:5) +test(6:15:31) +test(10_000_000:10_000_000) + +close(timing) + +# Compare with the dumbest Array implementation + +const dumbtimings = Buffer(1_000_000, "dumbtimings.msgpack", Timing) + +function dumb(t, ns) + label = "dumb_$(t)_$(ns)" + name = "results/$(label).msgpack" + res = open(name, "a+") do out + buf = X[] + @timed begin + for i in 1:10000 + push!(buf, X(1,i,string("c is: ", i))) + end + write.(Ref(out), pack.(buf)) + empty!(buf) + end + end + (label, res[2]) +end + + +function test_dumb(range) + for ns in range + for t in 1:1000 + push!(dumbtimings, + Timing(dumb(t, ns)...)) + end + end +end + +test_dumb(1:5) +test_dumb(6:15:31) +test_dumb(10_000_000:10_000_000)