Skip to content

Commit

Permalink
Refactor sim service to include TDS integration (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
fivegrant authored May 4, 2023
1 parent c9b5bb8 commit 23ed271
Show file tree
Hide file tree
Showing 17 changed files with 337 additions and 157 deletions.
20 changes: 19 additions & 1 deletion Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

julia_version = "1.8.5"
manifest_format = "2.0"
project_hash = "196754b48acc19c4658624e737fd2884fea92eaf"
project_hash = "7f877b7a7cfda60e697e46744ae4a557a448a07c"

[[deps.AMQPClient]]
deps = ["Logging", "MbedTLS", "Sockets"]
Expand Down Expand Up @@ -1356,6 +1356,12 @@ version = "1.1.0"
[[deps.Mmap]]
uuid = "a63ad114-7e13-5084-954f-fe012c677804"

[[deps.Mocking]]
deps = ["Compat", "ExprTools"]
git-tree-sha1 = "782e258e80d68a73d8c916e55f8ced1de00c2cea"
uuid = "78c3b35d-d492-501b-9361-3d52fe80e533"
version = "0.7.6"

[[deps.ModelingToolkit]]
deps = ["AbstractTrees", "ArrayInterface", "Combinatorics", "Compat", "ConstructionBase", "DataStructures", "DiffEqBase", "DiffEqCallbacks", "DiffRules", "Distributed", "Distributions", "DocStringExtensions", "DomainSets", "ForwardDiff", "FunctionWrappersWrappers", "Graphs", "IfElse", "InteractiveUtils", "JuliaFormatter", "JumpProcesses", "LabelledArrays", "Latexify", "Libdl", "LinearAlgebra", "MacroTools", "NaNMath", "RecursiveArrayTools", "Reexport", "RuntimeGeneratedFunctions", "SciMLBase", "Serialization", "Setfield", "SimpleNonlinearSolve", "SparseArrays", "SpecialFunctions", "StaticArrays", "SymbolicIndexingInterface", "SymbolicUtils", "Symbolics", "UnPack", "Unitful"]
git-tree-sha1 = "de2daac4b0ca05c2cedfb4535dcee453e3f5fabd"
Expand Down Expand Up @@ -1474,6 +1480,12 @@ git-tree-sha1 = "887579a3eb005446d514ab7aeac5d1d027658b8f"
uuid = "e7412a2a-1a6e-54c0-be00-318e2571c051"
version = "1.3.5+1"

[[deps.OpenAPI]]
deps = ["Base64", "Dates", "Downloads", "HTTP", "JSON", "LibCURL", "MbedTLS", "TimeZones", "URIs"]
git-tree-sha1 = "d091c5b9feb951f0a674a9792cd5f948507eecaa"
uuid = "d5e62ea6-ddf3-4d43-8e4c-ad5e6c8bfd7d"
version = "0.1.9"

[[deps.OpenBLAS32_jll]]
deps = ["Artifacts", "CompilerSupportLibraries_jll", "JLLWrappers", "Libdl", "Pkg"]
git-tree-sha1 = "9c6c2ed4b7acd2137b878eb96c68e63b76199d0f"
Expand Down Expand Up @@ -2203,6 +2215,12 @@ git-tree-sha1 = "c97f60dd4f2331e1a495527f80d242501d2f9865"
uuid = "8290d209-cae3-49c0-8002-c8c24d57dab5"
version = "0.5.1"

[[deps.TimeZones]]
deps = ["Dates", "Downloads", "InlineStrings", "LazyArtifacts", "Mocking", "Printf", "RecipesBase", "Scratch", "Unicode"]
git-tree-sha1 = "a92ec4466fc6e3dd704e2668b5e7f24add36d242"
uuid = "f269a46b-ccf7-5d73-abea-4c690281aa53"
version = "1.9.1"

[[deps.TimerOutputs]]
deps = ["ExprTools", "Printf"]
git-tree-sha1 = "f2fd3f288dfc6f507b0c3a2eb3bac009251e548b"
Expand Down
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name = "Scheduler"
name = "SimulationService"
uuid = "e66378d9-a322-4933-8764-0ce0bcab4993"
authors = ["Five Grant <[email protected]>"]
version = "0.2.0"
Expand All @@ -11,6 +11,7 @@ CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
Catlab = "134e5e36-593f-5add-ad60-77f754baafbe"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
DifferentialEquations = "0c46a032-eb83-5123-abaf-570d42b7fbaa"
Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6"
EasyModelAnalysis = "ef4b24a4-a090-4686-a932-e7e56a5a83bd"
ForwardDiff = "f6369f11-7733-5829-9624-2563aa707210"
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
Expand All @@ -21,6 +22,7 @@ JobSchedulers = "eeff360b-c02d-44d3-ab26-4013c616a17e"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
ModelingToolkit = "961ee093-0014-501f-94e3-6117800e7a78"
NamedTupleTools = "d9ec5142-1e00-5aa0-9d6a-321866360f50"
OpenAPI = "d5e62ea6-ddf3-4d43-8e4c-ad5e6c8bfd7d"
OpenBLAS32_jll = "656ef2d0-ae68-5445-9ca0-591084a874a2"
OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
OrdinaryDiffEq = "1dea7af3-3e70-54e6-95c3-0bf5283fa5ed"
Expand Down
14 changes: 7 additions & 7 deletions docker/Dockerfile.api
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
FROM julia:1.8
WORKDIR /scheduler
WORKDIR /simulation-service

# Install dependencies
COPY Manifest.toml /scheduler/
COPY Project.toml /scheduler/
COPY Manifest.toml /simulation-service/
COPY Project.toml /simulation-service/
ENV JULIA_PROJECT=.
RUN julia -e 'using Pkg; Pkg.instantiate();'

# Install Scheduler source
COPY src/ /scheduler/src/
# Install simulation-service source
COPY src/ /simulation-service/src/
RUN julia -e 'using Pkg; Pkg.resolve();'

# Launch Scheduler
# Launch simulation-service
EXPOSE 8080
CMD [ "julia", "--threads", "4", "-e", "using Scheduler; Scheduler.run!();" ]
CMD [ "julia", "--threads", "4", "-e", "using SimulationService; SimulationService.run!();" ]
12 changes: 6 additions & 6 deletions docker/docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ function "check_suffix" {
# ----------------------------------------------------------------------------------------------------------------------

group "prod" {
targets = ["simulation-scheduler"]
targets = ["simulation-service"]
}

group "default" {
targets = ["simulation-scheduler-base"]
targets = ["simulation-service-base"]
}

# ----------------------------------------------------------------------------------------------------------------------
Expand All @@ -42,12 +42,12 @@ target "_platforms" {
platforms = ["linux/amd64"]
}

target "simulation-scheduler-base" {
target "simulation-service-base" {
context = "."
tags = tag("simulation-scheduler", "", "")
tags = tag("simulation-service", "", "")
dockerfile = "docker/Dockerfile.api"
}

target "simulation-scheduler" {
inherits = ["_platforms", "simulation-scheduler-base"]
target "simulation-service" {
inherits = ["_platforms", "simulation-service-base"]
}
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ networks:
name: sim-execution
services:
api:
container_name: scheduler
container_name: simulation-service
build:
context: ../
dockerfile: docker/Dockerfile.api
Expand Down
24 changes: 0 additions & 24 deletions examples/request-calibrate-broken.json

This file was deleted.

18 changes: 11 additions & 7 deletions examples/request-calibrate.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"petri": "{\"T\":[{\"tname\":\"exp\"},{\"tname\":\"conv\"},{\"tname\":\"rec\"},{\"tname\":\"death\"}],\"S\":[{\"sname\":\"S\"},{\"sname\":\"E\"},{\"sname\":\"I\"},{\"sname\":\"R\"},{\"sname\":\"D\"}],\"I\":[{\"it\":1,\"is\":1},{\"it\":1,\"is\":3},{\"it\":2,\"is\":2},{\"it\":3,\"is\":3},{\"it\":4,\"is\":3}],\"O\":[{\"ot\":1,\"os\":2},{\"ot\":1,\"os\":3},{\"ot\":2,\"os\":3},{\"ot\":3,\"os\":4},{\"ot\":4,\"os\":5}]}",
"model": "{\"T\":[{\"tname\":\"exp\"},{\"tname\":\"conv\"},{\"tname\":\"rec\"},{\"tname\":\"death\"}],\"S\":[{\"sname\":\"S\"},{\"sname\":\"E\"},{\"sname\":\"I\"},{\"sname\":\"R\"},{\"sname\":\"D\"}],\"I\":[{\"it\":1,\"is\":1},{\"it\":1,\"is\":3},{\"it\":2,\"is\":2},{\"it\":3,\"is\":3},{\"it\":4,\"is\":3}],\"O\":[{\"ot\":1,\"os\":2},{\"ot\":1,\"os\":3},{\"ot\":2,\"os\":3},{\"ot\":3,\"os\":4},{\"ot\":4,\"os\":5}]}",
"initials": {
"S": 0.49457800495224524,
"E": 0.26745259325403603,
Expand Down
63 changes: 63 additions & 0 deletions src/ArgIO.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Provide external awareness / service-related side-effects to SciML operations
"""
module ArgIO

import Symbolics
import DataFrames: DataFrame, rename!
import CSV
import HTTP: Request
import Oxygen: serveparallel, serve, resetstate, json, setschema, @post, @get

include("./Settings.jl"); import .Settings: settings
include("./AssetManager.jl"); import .AssetManager: fetch_dataset, fetch_model, upload

export prepare_input, prepare_output


"""
Transform requests into arguments to be used by operation
Optionally, IDs are hydrated with the corresponding entity from TDS.
"""
function prepare_input(req::Request)
args = json(req, Dict{Symbol,Any})
if settings["ENABLE_TDS"]
if in(:model, keys(args))
args[:model] = fetch_model(args[:model])
end
if in(:dataset, keys(args))
args[:dataset] = fetch_dataset(args[:dataset])
end
end
args
end

"""
Normalize the header of the resulting dataframe and return a CSV
Optionally, the CSV is saved to TDS instead an the coreresponding ID is returned.
"""
function prepare_output(dataframe::DataFrame)
stripped_names = names(dataframe) .=> (r -> replace(r, "(t)"=>"")).(names(dataframe))
rename!(dataframe, stripped_names)
rename!(dataframe, "timestamp" => "timestep")
if !settings["ENABLE_TDS"]
io = IOBuffer()
# TODO(five): Write to remote server
CSV.write(io, dataframe)
return String(take!(io))
else
return upload(dataframe)
end
end

"""
Coerces NaN values to nothing for each parameter.
"""
function prepare_output(params::Vector{Pair{Symbolics.Num, Float64}})
nan_to_nothing(value) = isnan(value) ? nothing : value
Dict(key => nan_to_nothing(value) for (key, value) in params)
end

end
61 changes: 61 additions & 0 deletions src/AssetManager.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Asset fetching from TDS
"""
module AssetManager

import DataFrames: DataFrame
import CSV, Downloads, HTTP
import OpenAPI.Clients: Client
import JSON3 as JSON

include("./Settings.jl"); import .Settings: settings

export fetch_dataset, fetch_model, upload

"""
Return model JSON as string from TDS by ID
"""
function fetch_model(model_id::Int64)
response = HTTP.get("$(settings["TDS_URL"])/models/$model_id", ["Content-Type" => "application/json"])
body = response.body |> JSON.read String
body.content
end

"""
Return csv from TDS by ID
"""
function fetch_dataset(dataset_id::Int64)
url = "$(settings["TDS_URL"])/datasets/$dataset_id/files"
io = IOBuffer()
Downloads.download(url, io)
seekstart(io)
CSV.read(io, DataFrame)
end

"""
Upload a CSV to TDS
"""
function upload(output::DataFrame)
# TODO(five): Stream so there isn't duplication
io = IOBuffer()
CSV.write(io, output)
seekstart(io)

payload = JSON.write(Dict(
"name" => "autogenerated sim run",
"description" => "autogenerated sim run",
"url" => "",
"simulation_run" => true
))
response = HTTP.post("$(settings["TDS_URL"])/datasets", ["Content-Type" => "application/json"], payload)
body = response.body |> JSON.read String


# TODO(five): Handle 4xx from TDS
url = "$(settings["TDS_URL"])/datasets/$(body["id"])/files"
HTTP.post(url, [], HTTP.Form(Dict("filename" => "generated.csv", "file" => HTTP.Multipart("file.csv", io, "text/plain"))))

body["id"]
end

end # module AssetManager
15 changes: 15 additions & 0 deletions src/Queuing.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
RabbitMQ Integration
"""
module Queuing

import Logging: AbstractLogger, LogLevel
Expand All @@ -7,6 +10,9 @@ import JSON3 as JSON

include("./Settings.jl"); import .Settings: settings

"""
Connect to channel
"""
function get_channel()
conn = connection(;
virtualhost="/",
Expand All @@ -18,6 +24,9 @@ function get_channel()
channel(conn, UNUSED_CHANNEL, true)
end

"""
Publish JSON to RabbitMQ
"""
function publish_to_rabbitmq(content)
chan = get_channel() # TODO(five): Don't recreate for each call
json = convert(Vector{UInt8}, codeunits(JSON.write(content)))
Expand All @@ -26,10 +35,16 @@ function publish_to_rabbitmq(content)
basic_publish(chan, message; exchange="", routing_key=settings["RABBITMQ_ROUTE"])
end

"""
Logger that calls an arbitrary hook on a message
"""
struct MQLogger <: AbstractLogger
publish_hook::Function
end

"""
MQLogger preloaded with RabbitMQ publishing
"""
MQLogger() = MQLogger(publish_to_rabbitmq)

Logging.shouldlog(::MQLogger, args...; kwargs...) = true
Expand Down
Loading

0 comments on commit 23ed271

Please sign in to comment.