From 2cb0d1d8b0433d9279e34d6ca0dc7df09630df0e Mon Sep 17 00:00:00 2001 From: Albert Liu Date: Tue, 21 Mar 2023 00:52:19 -0700 Subject: [PATCH 1/9] [WIP] Start Process (#75) - [x] Add endpoint for an app to start processes - [x] Add endpoint for an app to stop process - [x] Add endpoint for polling process health --- backend/internal/process/process.go | 12 ++- backend/internal/pubsub/pubsub.go | 4 + backend/internal/server/process_rpc.go | 108 +++++++++++++++++++++ backend/internal/server/pubsub_rpc.go | 1 - backend/internal/server/server.go | 4 + backend/internal/server/stream.go | 56 ++++++----- frontend/pages/index.tsx | 125 +++++++++---------------- toolkit/react/stream.tsx | 77 +++++++++++++++ toolkit/stream.ts | 41 ++++++-- 9 files changed, 314 insertions(+), 114 deletions(-) create mode 100644 backend/internal/server/process_rpc.go create mode 100644 toolkit/react/stream.tsx diff --git a/backend/internal/process/process.go b/backend/internal/process/process.go index 6c58ca90..67573334 100644 --- a/backend/internal/process/process.go +++ b/backend/internal/process/process.go @@ -2,6 +2,7 @@ package process import ( "context" + "encoding/json" "fmt" "net/url" "os" @@ -343,9 +344,18 @@ func (m *ProcessManager) pipeTailIntoTopic(process *Process, topic *pubsub.Topic logger.Err("got error in tail line", log.Ctx{ "err": line.Err.Error(), }) + continue } - topic.Publish(line.Text) + bytes, err := json.Marshal(map[string]any{"line": line.Text}) + if err != nil { + logger.Err("got error in JSON encoding", log.Ctx{ + "err": line.Err.Error(), + }) + continue + } + + topic.Publish(string(bytes)) } } } diff --git a/backend/internal/pubsub/pubsub.go b/backend/internal/pubsub/pubsub.go index 8a758413..9ffcc454 100644 --- a/backend/internal/pubsub/pubsub.go +++ b/backend/internal/pubsub/pubsub.go @@ -24,6 +24,10 @@ if narrowedTopic, castSucceeded := topic.(Topic[T]); castSucceeded { subscribe to the topic using the narrowedTopic variable... } ``` + +It may also be useful or even necessary to include a "state" field for each topic, so for example, +a subscription can get the list of log statements that happened before it existed. ~Something something monad.~ +I don't quite want to implement all that hoopla right this second, but it's something to be aware of. */ import ( diff --git a/backend/internal/server/process_rpc.go b/backend/internal/server/process_rpc.go new file mode 100644 index 00000000..226582cd --- /dev/null +++ b/backend/internal/server/process_rpc.go @@ -0,0 +1,108 @@ +package server + +import ( + "net/http" + + "robinplatform.dev/internal/process" + "robinplatform.dev/internal/pubsub" +) + +type StartProcessForAppInput struct { + AppId string `json:"appId"` + ProcessKey string `json:"processKey"` + Command string `json:"command"` + Args []string `json:"args"` +} + +var StartProcessForApp = AppsRpcMethod[StartProcessForAppInput, map[string]any]{ + Name: "StartProcess", + Run: func(req RpcRequest[StartProcessForAppInput]) (map[string]any, *HttpError) { + id := process.ProjectAppId(req.Data.AppId, req.Data.ProcessKey) + + processConfig := process.ProcessConfig{ + Command: req.Data.Command, + Args: req.Data.Args, + Id: id, + } + + proc, err := process.Manager.Spawn(processConfig) + if err != nil { + return nil, Errorf(http.StatusInternalServerError, "Failed to spawn new process %s: %s", req.Data.AppId, err) + } + + return map[string]any{ + "processKey": proc.Id, + "pid": proc.Pid, + }, nil + }, +} + +type StopProcessForAppInput struct { + AppId string `json:"appId"` + ProcessKey string `json:"processKey"` +} + +var StopProcessForApp = AppsRpcMethod[StartProcessForAppInput, map[string]any]{ + Name: "StopProcess", + Run: func(req RpcRequest[StartProcessForAppInput]) (map[string]any, *HttpError) { + id := process.ProjectAppId(req.Data.AppId, req.Data.ProcessKey) + + if err := process.Manager.Remove(id); err != nil { + return nil, Errorf(http.StatusInternalServerError, "Failed to kill process %s: %s", req.Data.AppId, err) + } + + return map[string]any{}, nil + }, +} + +type CheckProcessHealthInput struct { + AppId string `json:"appId"` + ProcessKey string `json:"processKey"` +} + +var CheckProcessHealth = AppsRpcMethod[CheckProcessHealthInput, map[string]any]{ + Name: "CheckProcessHealth", + Run: func(req RpcRequest[CheckProcessHealthInput]) (map[string]any, *HttpError) { + id := process.ProjectAppId(req.Data.AppId, req.Data.ProcessKey) + + isAlive := process.Manager.IsAlive(id) + + return map[string]any{ + "processKey": id, + "isAlive": isAlive, + }, nil + }, +} + +var ReadAppProcessLogs = Stream[CheckProcessHealthInput, string]{ + Name: "ReadAppProcessLogs", + Run: func(req StreamRequest[CheckProcessHealthInput, string]) error { + input, err := req.ParseInput() + if err != nil { + return err + } + + id := pubsub.AppProcessLogs(input.AppId, input.ProcessKey) + + subscription := make(chan string) + if err := pubsub.Topics.Subscribe(id, subscription); err != nil { + return err + } + defer pubsub.Topics.Unsubscribe(id, subscription) + + for { + select { + case s, ok := <-subscription: + if !ok { + // Channel is closed + return nil + } + + req.Send(s) + + case <-req.Context.Done(): + return nil + } + } + }, +} diff --git a/backend/internal/server/pubsub_rpc.go b/backend/internal/server/pubsub_rpc.go index 6bc7d305..8687aa3c 100644 --- a/backend/internal/server/pubsub_rpc.go +++ b/backend/internal/server/pubsub_rpc.go @@ -46,7 +46,6 @@ var SubscribeTopic = Stream[SubscribeTopicInput, string]{ case <-req.Context.Done(): return nil } - } }, } diff --git a/backend/internal/server/server.go b/backend/internal/server/server.go index d5fae72e..e661b855 100644 --- a/backend/internal/server/server.go +++ b/backend/internal/server/server.go @@ -74,6 +74,10 @@ func (server *Server) loadRpcMethods() { UpdateAppSettings.Register(server) GetTopics.Register(server) + StartProcessForApp.Register(server) + StopProcessForApp.Register(server) + CheckProcessHealth.Register(server) + // Streaming methods wsHandler := &RpcWebsocket{} diff --git a/backend/internal/server/stream.go b/backend/internal/server/stream.go index ec69d51b..b9621ec2 100644 --- a/backend/internal/server/stream.go +++ b/backend/internal/server/stream.go @@ -39,6 +39,28 @@ type streamRequest struct { cancel func() } +func (req *streamRequest) SendRaw(kind string, data any) { + req.output <- socketMessageOut{ + Method: req.Method, + Id: req.Id, + Kind: kind, + Data: data, + } +} + +func (s *StreamRequest[_, _]) SendRaw(kind string, data any) { + (*streamRequest)(s).SendRaw(kind, data) +} + +// The idea behind using `ParseInput` instead of something with generics is twofold: +// 1. It reduces the amount of generic code necessary to implement certain things in +// this file. - Specifically, some of the hooks for handling code are now very very short, +// and don't need to be duplicated for each generic instantiation +// 2. It allows more flexibility - This is a bit weak, but it does technically allow the user +// to make some custom parsing error handling, or to parse the input in a custom way. +// +// Using ParseInput also comes with downsides, mostly in usability. It's unclear if the tradeoff is +// worth it yet, but we will see. func (s *StreamRequest[Input, _]) ParseInput() (Input, error) { var input Input err := json.Unmarshal(s.RawInput, &input) @@ -53,13 +75,16 @@ func (s *StreamRequest[Input, _]) ParseInput() (Input, error) { return input, err } +// This code uses `Send` instead of a channel to try to reduce the number +// of channels/goroutines that need to run at any one time. Otherwise there'd +// need to at least be one goroutine per-stream, and often pubsub uses channels +// with *very slight* caveats, which require an additional goroutine/thread intercepting +// the subscription channel and piping into the stream's channel. +// +// With a send function, we at least eliminate some complexity in the implementation, and also allow +// the user to decide themselves what parallelism paradigms they would like to use. func (s *StreamRequest[_, Output]) Send(o Output) { - s.output <- socketMessageOut{ - Method: s.Method, - Id: s.Id, - Kind: "methodOutput", - Data: o, - } + s.SendRaw("methodOutput", o) } type socketMessageIn struct { @@ -234,28 +259,15 @@ func runMethod(method handler, rawReq streamRequest) { "id": rawReq.Id, }) - rawReq.output <- socketMessageOut{ - Id: rawReq.Id, - Method: rawReq.Method, - Kind: "methodStarted", - } + rawReq.SendRaw("methodStarted", nil) if err := method(rawReq); err != nil { - rawReq.output <- socketMessageOut{ - Id: rawReq.Id, - Method: rawReq.Method, - Kind: "error", - Err: err.Error(), - } + rawReq.SendRaw("error", err.Error()) return } - rawReq.output <- socketMessageOut{ - Id: rawReq.Id, - Method: rawReq.Method, - Kind: "methodDone", - } + rawReq.SendRaw("methodDone", nil) } func (method *Stream[Input, Output]) handler(rawReq streamRequest) error { diff --git a/frontend/pages/index.tsx b/frontend/pages/index.tsx index d53c0804..bba4cd3e 100644 --- a/frontend/pages/index.tsx +++ b/frontend/pages/index.tsx @@ -4,6 +4,7 @@ import { z } from 'zod'; import { useRpcQuery } from '../hooks/useRpcQuery'; import toast from 'react-hot-toast'; import { Stream } from '@robinplatform/toolkit/stream'; +import { useStreamMethod } from '@robinplatform/toolkit/react/stream'; // This is a temporary bit of code to just display what's in the processes DB // to make writing other features easier @@ -103,104 +104,66 @@ function Topics() { const [selectedTopic, setSelectedTopic] = React.useState< TopicInfo & { key: string } >(); - const [topicMessages, setTopicMessages] = React.useState< - Record - >({}); - const [topics, setTopics] = React.useState>({}); - const { error } = useRpcQuery({ + const { data: initialTopics, error } = useRpcQuery({ method: 'GetTopics', data: {}, result: z.record(z.string(), TopicInfo), pathPrefix: '/api/apps/rpc', - onSuccess: setTopics, }); - React.useEffect(() => { - const id = `${Math.random()} topic-info`; - const stream = new Stream('SubscribeTopic', id); - stream.onmessage = (message) => { - const { kind, data } = message; - if (kind !== 'methodOutput') { - console.log('message from topic subscription', message); - return; - } - - const res = MetaTopicInfo.safeParse(JSON.parse(data)); - if (!res.success) { - console.log( - 'parse failure from from topic subscription', - message, - res.error, - ); - return; - } - - const packet = res.data; + const { state: topics } = useStreamMethod({ + methodName: 'SubscribeTopic', + resultType: MetaTopicInfo, + skip: !initialTopics, + data: { + id: { + category: '@robin/topics', + name: 'meta', + }, + }, + initialState: initialTopics ?? {}, + reducer: (prev, packet) => { switch (packet.kind) { case 'update': - setTopics((prev) => ({ + return { ...prev, [`${packet.data.id.category}/${packet.data.id.name}`]: packet.data, - })); - break; + }; case 'close': - setTopics((prev) => { - const a = { ...prev }; - // rome-ignore lint/performance/noDelete: I'm deleting a key from a record... also the docs say this rule shouldn't even apply here. - delete a[`${packet.data.category}/${packet.data.name}`]; - return a; - }); - break; + const a: Record = { ...prev }; + // rome-ignore lint/performance/noDelete: I'm deleting a key from a record... also the docs say this rule shouldn't even apply here. + delete a[`${packet.data.category}/${packet.data.name}`]; + return a; } - }; - stream.onerror = (err) => { - console.log('error', err); - }; - - stream.start({ - id: { - category: '@robin/topics', - name: 'meta', - }, - }); - - return () => { - stream.close(); - }; - }, []); - - React.useEffect(() => { - if (selectedTopic === undefined) { - return; - } + }, + }); - const id = `${Math.random()} topic-contents`; - const stream = new Stream('SubscribeTopic', id); - stream.onmessage = (message) => { - const { kind, data } = message; - if (kind !== 'methodOutput') { - console.log('message from topic subscription', message); - return; + const { state: topicMessages } = useStreamMethod< + Record, + any + >({ + methodName: 'SubscribeTopic', + resultType: z.any(), + skip: !selectedTopic?.id, + data: { + id: selectedTopic?.id, + }, + initialState: {}, + reducer: (prev, packet) => { + if (!selectedTopic) { + return prev; } - setTopicMessages((prev) => ({ + return { ...prev, - [selectedTopic.key]: [...(prev[selectedTopic.key] ?? []), String(data)], - })); - }; - stream.onerror = (err) => { - console.log('error', err); - }; - - stream.start({ - id: selectedTopic.id, - }); - - return () => { - stream.close(); - }; - }, [selectedTopic]); + [selectedTopic.key]: [ + ...(prev[selectedTopic.key] ?? []), + JSON.stringify(packet), + ], + }; + }, + }); return (
({ + methodName, + data: initialData, + initialState, + reducer, + resultType, + skip, +}: { + resultType: z.Schema; + methodName: string; + data: object; + initialState: State; + reducer: (s: State, o: Output) => State; + skip?: boolean; +}) { + // enforce reducer stability + const reducerRef = React.useRef(reducer); + reducerRef.current = reducer; + + const cb = React.useCallback( + (s: State, o: Output) => reducerRef.current(s, o), + [], + ); + + const [state, dispatch] = React.useReducer(cb, initialState); + + const id = React.useMemo( + () => `${methodName}-${Math.random()}`, + [methodName, JSON.stringify(initialData), skip], + ); + + const stream = React.useMemo( + () => new Stream(methodName, id), + [methodName, id], + ); + + // TODO: use stable stringify for the dep check + React.useEffect(() => { + if (skip) { + return; + } + + stream.onmessage = (message) => { + const { kind, data } = message as { kind: string; data: string }; + if (kind !== 'methodOutput') { + return; + } + + const res = resultType.safeParse(JSON.parse(data)); + if (!res.success) { + // TODO: handle the error + console.log('Robin stream parse error', res.error); + return; + } + + dispatch(res.data); + }; + + stream.start(initialData); + + return () => { + stream.close(); + }; + }, [skip, stream, JSON.stringify(initialData)]); + + return { state, dispatch }; +} diff --git a/toolkit/stream.ts b/toolkit/stream.ts index 98555d97..ca7a44cc 100644 --- a/toolkit/stream.ts +++ b/toolkit/stream.ts @@ -20,35 +20,58 @@ async function getWs(): Promise { return; } - if (data.kind === 'error') { - stream.onerror(data.error); - } else { - stream.onmessage(data); + switch (data.kind) { + case 'error': + stream.onerror(data.error); + break; + + case 'methodDone': + stream.onclose(); + break; + + default: + stream.onmessage(data); } }; ws.onerror = (evt) => { - console.log('error', evt); + console.error('Robin WS error', evt); }; ws.onopen = () => { - console.log('opened'); resolve(ws); }; return _ws; } +// This is a low-level primitive that can be used to implement higher-level +// streaming requests. export class Stream { - onmessage: (a: any) => void = () => {}; - onerror: (a: any) => void = () => {}; + onmessage: (a: unknown) => void = () => {}; + onerror: (a: unknown) => void = () => {}; private started = false; private closed = false; + private closeHandler: () => void = () => { + this.closed = true; + }; + constructor(readonly method: string, readonly id: string) {} - async start(data: any) { + set onclose(f: () => void) { + this.closeHandler = () => { + f(); + this.closed = true; + }; + } + + get onclose(): () => void { + return this.closeHandler; + } + + async start(data: unknown) { if (this.started) { throw new Error('Already started'); } From 1be863476dcc034bcc340a74369b619d154cbb5b Mon Sep 17 00:00:00 2001 From: Albert Liu Date: Tue, 21 Mar 2023 14:44:49 -0700 Subject: [PATCH 2/9] meh --- backend/internal/server/process_rpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/internal/server/process_rpc.go b/backend/internal/server/process_rpc.go index 226582cd..9d6a3b16 100644 --- a/backend/internal/server/process_rpc.go +++ b/backend/internal/server/process_rpc.go @@ -76,7 +76,7 @@ var CheckProcessHealth = AppsRpcMethod[CheckProcessHealthInput, map[string]any]{ var ReadAppProcessLogs = Stream[CheckProcessHealthInput, string]{ Name: "ReadAppProcessLogs", - Run: func(req StreamRequest[CheckProcessHealthInput, string]) error { + Run: func(req *StreamRequest[CheckProcessHealthInput, string]) error { input, err := req.ParseInput() if err != nil { return err From 499adeee12fd0a5bb3ccf1045fc7bcec028703a2 Mon Sep 17 00:00:00 2001 From: Albert Liu Date: Fri, 14 Apr 2023 15:17:32 -0700 Subject: [PATCH 3/9] mhe --- backend/internal/identity/id.go | 4 +++ backend/internal/server/process_rpc.go | 49 +++++++------------------- 2 files changed, 17 insertions(+), 36 deletions(-) diff --git a/backend/internal/identity/id.go b/backend/internal/identity/id.go index a964a264..cb8d2022 100644 --- a/backend/internal/identity/id.go +++ b/backend/internal/identity/id.go @@ -35,6 +35,10 @@ func (id Id) String() string { // Cleans inputs and then creates a category from them. If you have a valid category already, // ust path.Join to combine it with another category. +// +// Example: +// identity.Category("app", "appId") -> "/app/appId" +// identity.Category("my", "app", "badString/../") -> "/my/app/badString%2F%2E%2E%2F func Category(ids ...string) string { parts := make([]string, 0, len(ids)) for _, id := range ids { diff --git a/backend/internal/server/process_rpc.go b/backend/internal/server/process_rpc.go index 58fec38a..11541155 100644 --- a/backend/internal/server/process_rpc.go +++ b/backend/internal/server/process_rpc.go @@ -3,6 +3,7 @@ package server import ( "net/http" + "robinplatform.dev/internal/identity" "robinplatform.dev/internal/process" "robinplatform.dev/internal/pubsub" ) @@ -17,7 +18,10 @@ type StartProcessForAppInput struct { var StartProcessForApp = AppsRpcMethod[StartProcessForAppInput, map[string]any]{ Name: "StartProcess", Run: func(req RpcRequest[StartProcessForAppInput]) (map[string]any, *HttpError) { - id := process.ProjectAppId(req.Data.AppId, req.Data.ProcessKey) + id := process.ProcessId{ + Category: identity.Category("app", req.Data.AppId), + Key: req.Data.ProcessKey, + } processConfig := process.ProcessConfig{ Command: req.Data.Command, @@ -45,7 +49,10 @@ type StopProcessForAppInput struct { var StopProcessForApp = AppsRpcMethod[StartProcessForAppInput, map[string]any]{ Name: "StopProcess", Run: func(req RpcRequest[StartProcessForAppInput]) (map[string]any, *HttpError) { - id := process.ProjectAppId(req.Data.AppId, req.Data.ProcessKey) + id := process.ProcessId{ + Category: identity.Category("app", req.Data.AppId), + Key: req.Data.ProcessKey, + } if err := process.Manager.Remove(id); err != nil { return nil, Errorf(http.StatusInternalServerError, "Failed to kill process %s: %s", req.Data.AppId, err) @@ -63,7 +70,10 @@ type CheckProcessHealthInput struct { var CheckProcessHealth = AppsRpcMethod[CheckProcessHealthInput, map[string]any]{ Name: "CheckProcessHealth", Run: func(req RpcRequest[CheckProcessHealthInput]) (map[string]any, *HttpError) { - id := process.ProjectAppId(req.Data.AppId, req.Data.ProcessKey) + id := process.ProcessId{ + Category: identity.Category("app", req.Data.AppId), + Key: req.Data.ProcessKey, + } isAlive := process.Manager.IsAlive(id) @@ -74,39 +84,6 @@ var CheckProcessHealth = AppsRpcMethod[CheckProcessHealthInput, map[string]any]{ }, } -var ReadAppProcessLogs = Stream[CheckProcessHealthInput, string]{ - Name: "ReadAppProcessLogs", - Run: func(req *StreamRequest[CheckProcessHealthInput, string]) error { - input, err := req.ParseInput() - if err != nil { - return err - } - - id := pubsub.AppProcessLogs(input.AppId, input.ProcessKey) - - subscription := make(chan string) - if err := pubsub.Topics.Subscribe(id, subscription); err != nil { - return err - } - defer pubsub.Topics.Unsubscribe(id, subscription) - - for { - select { - case s, ok := <-subscription: - if !ok { - // Channel is closed - return nil - } - - req.Send(s) - - case <-req.Context.Done(): - return nil - } - } - }, -} - func PipeTopic[T any](topicId pubsub.TopicId, req *StreamRequest[T, any]) error { sub, err := pubsub.SubscribeAny(&pubsub.Topics, topicId) if err != nil { From 96bcd888aa5f3b43b4df7cada3281052c228fcb6 Mon Sep 17 00:00:00 2001 From: Albert Liu Date: Fri, 14 Apr 2023 19:02:07 -0700 Subject: [PATCH 4/9] meh --- backend/internal/process/handle.go | 7 +++++++ backend/internal/process/process.go | 8 ++++++++ backend/internal/server/process_rpc.go | 4 ++-- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/backend/internal/process/handle.go b/backend/internal/process/handle.go index f0991abb..3dc277bb 100644 --- a/backend/internal/process/handle.go +++ b/backend/internal/process/handle.go @@ -49,6 +49,13 @@ func (m *ProcessManager) IsAlive(id ProcessId) bool { return r.IsAlive(id) } +func (m *ProcessManager) IsHealthy(id ProcessId) bool { + r := m.ReadHandle() + defer r.Close() + + return r.IsHealthy(id) +} + func (m *ProcessManager) CopyOutData() []Process { r := m.ReadHandle() defer r.Close() diff --git a/backend/internal/process/process.go b/backend/internal/process/process.go index a5db6817..d3fe1a1d 100644 --- a/backend/internal/process/process.go +++ b/backend/internal/process/process.go @@ -264,6 +264,14 @@ func (r *RHandle) IsAlive(id ProcessId) bool { return process.IsAlive() } +func (r *RHandle) IsHealthy(id ProcessId) bool { + process, found := r.FindById(id) + if !found { + return false + } + return process.IsHealthy() +} + func (proc *Process) IsHealthy() bool { return proc.IsAlive() && proc.HealthCheck.Check(health.RunningProcessInfo{Pid: proc.Pid, Port: proc.Port}) } diff --git a/backend/internal/server/process_rpc.go b/backend/internal/server/process_rpc.go index 11541155..6b5dd258 100644 --- a/backend/internal/server/process_rpc.go +++ b/backend/internal/server/process_rpc.go @@ -75,11 +75,11 @@ var CheckProcessHealth = AppsRpcMethod[CheckProcessHealthInput, map[string]any]{ Key: req.Data.ProcessKey, } - isAlive := process.Manager.IsAlive(id) + isHealthy := process.Manager.IsHealthy(id) return map[string]any{ "processKey": id, - "isAlive": isAlive, + "isHealthy": isHealthy, }, nil }, } From 08fc2418dcbec6b21da6af17f26d7fc81cb41ec2 Mon Sep 17 00:00:00 2001 From: Albert Liu Date: Wed, 19 Apr 2023 00:48:44 -0700 Subject: [PATCH 5/9] meh --- toolkit/react/stream.tsx | 44 +++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/toolkit/react/stream.tsx b/toolkit/react/stream.tsx index 1e55e70f..66b82df1 100644 --- a/toolkit/react/stream.tsx +++ b/toolkit/react/stream.tsx @@ -12,10 +12,40 @@ const PubsubData = z.object({ data: z.unknown(), }); +type ProcessInfo = z.infer; +const ProcessInfo = z.object({}); + +// Subscribe to app process information +export function useAppProcessInfo({ + category, + key, + reducer, + skip, +}: { + category?: string[]; + key?: string; + reducer: (s: State, o: ProcessInfo) => State; + skip?: boolean; +}) { + const appId = process.env.ROBIN_APP_ID; + + return useIndexedStream({ + methodName: 'SubscribeAppProcessInfo', + data: { + appId, + category, + key, + }, + skip: skip || !appId || !category || !key, + resultType: ProcessInfo, + reducer, + fetchState: () => undefined as any, + }); +} + // Subscribe to an app topic and track the messages received in relation // to state. export function useAppTopicQuery({ - appId = process.env.ROBIN_APP_ID, category, key, fetchState, @@ -23,7 +53,6 @@ export function useAppTopicQuery({ resultType, skip, }: { - appId?: string; resultType: z.Schema; category?: string[]; key?: string; @@ -31,7 +60,8 @@ export function useAppTopicQuery({ reducer: (s: State, o: Output) => State; skip?: boolean; }) { - return useTopicQueryInternal({ + const appId = process.env.ROBIN_APP_ID; + return useIndexedStream({ methodName: 'SubscribeAppTopic', data: { appId, @@ -60,7 +90,7 @@ export function useTopicQuery({ reducer: (s: State, o: Output) => State; skip?: boolean; }) { - return useTopicQueryInternal({ + return useIndexedStream({ methodName: 'SubscribeTopic', data: { id: topicId }, skip: skip || !topicId, @@ -70,9 +100,9 @@ export function useTopicQuery({ }); } -// Subscribe to a topic and track the messages received in relation -// to state. -function useTopicQueryInternal({ +// Read data from a stream and track the stream data in lock-step with +// state fetched from an external source. +export function useIndexedStream({ methodName, data, fetchState, From a1ba308b5af69c6bd82be77685a4fa4a2674edef Mon Sep 17 00:00:00 2001 From: Albert Liu Date: Wed, 19 Apr 2023 00:49:30 -0700 Subject: [PATCH 6/9] rename useTopicQuery --- frontend/pages/index.tsx | 11 ++++------- toolkit/react/stream.tsx | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/frontend/pages/index.tsx b/frontend/pages/index.tsx index c8c2e440..2a5b6578 100644 --- a/frontend/pages/index.tsx +++ b/frontend/pages/index.tsx @@ -3,7 +3,7 @@ import React from 'react'; import { z } from 'zod'; import { runRpcQuery, useRpcQuery } from '../hooks/useRpcQuery'; import toast from 'react-hot-toast'; -import { useTopicQuery } from '../../toolkit/react/stream'; +import { useTopic } from '../../toolkit/react/stream'; import { ScrollWindow } from '../components/ScrollWindow'; type Process = z.infer; @@ -26,7 +26,7 @@ function Processes() { }); const [currentProcess, setCurrentProcess] = React.useState(); - const { state } = useTopicQuery({ + const { state } = useTopic({ topicId: currentProcess && { category: `/logs${currentProcess.id.category}`, key: currentProcess.id.key, @@ -131,7 +131,7 @@ function Topics() { TopicInfo & { key: string } >(); - const { state: topics } = useTopicQuery({ + const { state: topics } = useTopic({ resultType: MetaTopicInfo, topicId: { category: '/topics', @@ -166,10 +166,7 @@ function Topics() { }, }); - const { state: topicMessages } = useTopicQuery< - Record, - unknown - >({ + const { state: topicMessages } = useTopic, unknown>({ resultType: z.unknown(), skip: !selectedTopic?.id, topicId: selectedTopic?.id, diff --git a/toolkit/react/stream.tsx b/toolkit/react/stream.tsx index 66b82df1..16abdb2a 100644 --- a/toolkit/react/stream.tsx +++ b/toolkit/react/stream.tsx @@ -77,7 +77,7 @@ export function useAppTopicQuery({ // Subscribe to a topic and track the messages received in relation // to state. -export function useTopicQuery({ +export function useTopic({ topicId, fetchState, reducer, From 50795002d9fa9c934b8e446eefceab3842e90be2 Mon Sep 17 00:00:00 2001 From: Albert Liu Date: Wed, 19 Apr 2023 01:12:02 -0700 Subject: [PATCH 7/9] refactor --- frontend/components/ProcessDebugger.tsx | 104 ++++++++++++++++++++++++ frontend/pages/index.tsx | 101 +---------------------- toolkit/react/stream.tsx | 31 ------- 3 files changed, 107 insertions(+), 129 deletions(-) create mode 100644 frontend/components/ProcessDebugger.tsx diff --git a/frontend/components/ProcessDebugger.tsx b/frontend/components/ProcessDebugger.tsx new file mode 100644 index 00000000..0ae30c9d --- /dev/null +++ b/frontend/components/ProcessDebugger.tsx @@ -0,0 +1,104 @@ +import { z } from 'zod'; +import React from 'react'; +import { runRpcQuery, useRpcQuery } from '../hooks/useRpcQuery'; +import { useTopic } from '../../toolkit/react/stream'; +import { ScrollWindow } from './ScrollWindow'; +import toast from 'react-hot-toast'; + +type ProcessInfo = z.infer; +const ProcessInfo = z.object({}); + +type Process = z.infer; +const Process = z.object({ + id: z.object({ + category: z.string(), + key: z.string(), + }), + command: z.string(), + args: z.array(z.string()), +}); + +// This is a temporary bit of code to just display what's in the processes DB +// to make writing other features easier +export function ProcessDebugger() { + const { data: processes = [], error } = useRpcQuery({ + method: 'ListProcesses', + data: {}, + result: z.array(Process), + }); + + const [currentProcess, setCurrentProcess] = React.useState(); + const { state } = useTopic({ + topicId: currentProcess && { + category: `/logs${currentProcess.id.category}`, + key: currentProcess.id.key, + }, + resultType: z.string(), + fetchState: () => + runRpcQuery({ + method: 'GetProcessLogs', + data: { processId: currentProcess?.id }, + result: z.object({ + counter: z.number(), + text: z.string(), + }), + }).then(({ counter, text }) => ({ counter, state: text })), + reducer: (prev, message) => { + return prev + '\n' + message; + }, + }); + + React.useEffect(() => { + if (error) { + toast.error(`${String(error)}`); + } + }, [error]); + + return ( +
+
Processes
+ + + {processes?.map((value) => { + const key = `${value.id.category} ${value.id.key}`; + return ( +
+ {key} + + + +
+								{JSON.stringify(value, null, 2)}
+							
+
+ ); + })} +
+ + +
+					{state}
+				
+
+
+ ); +} diff --git a/frontend/pages/index.tsx b/frontend/pages/index.tsx index 2a5b6578..8cd5cb72 100644 --- a/frontend/pages/index.tsx +++ b/frontend/pages/index.tsx @@ -1,105 +1,10 @@ import Head from 'next/head'; import React from 'react'; import { z } from 'zod'; -import { runRpcQuery, useRpcQuery } from '../hooks/useRpcQuery'; -import toast from 'react-hot-toast'; +import { runRpcQuery } from '../hooks/useRpcQuery'; import { useTopic } from '../../toolkit/react/stream'; import { ScrollWindow } from '../components/ScrollWindow'; - -type Process = z.infer; -const Process = z.object({ - id: z.object({ - category: z.string(), - key: z.string(), - }), - command: z.string(), - args: z.array(z.string()), -}); - -// This is a temporary bit of code to just display what's in the processes DB -// to make writing other features easier -function Processes() { - const { data: processes = [], error } = useRpcQuery({ - method: 'ListProcesses', - data: {}, - result: z.array(Process), - }); - - const [currentProcess, setCurrentProcess] = React.useState(); - const { state } = useTopic({ - topicId: currentProcess && { - category: `/logs${currentProcess.id.category}`, - key: currentProcess.id.key, - }, - resultType: z.string(), - fetchState: () => - runRpcQuery({ - method: 'GetProcessLogs', - data: { processId: currentProcess?.id }, - result: z.object({ - counter: z.number(), - text: z.string(), - }), - }).then(({ counter, text }) => ({ counter, state: text })), - reducer: (prev, message) => { - return prev + '\n' + message; - }, - }); - - React.useEffect(() => { - if (error) { - toast.error(`${String(error)}`); - } - }, [error]); - - return ( -
-
Processes
- - - {processes?.map((value) => { - const key = `${value.id.category} ${value.id.key}`; - return ( -
- {key} - - - -
-								{JSON.stringify(value, null, 2)}
-							
-
- ); - })} -
- - -
-					{state}
-				
-
-
- ); -} +import { ProcessDebugger } from '../components/ProcessDebugger'; type TopicId = z.infer; const TopicId = z.object({ @@ -264,7 +169,7 @@ export default function Home() {
- +
diff --git a/toolkit/react/stream.tsx b/toolkit/react/stream.tsx index 16abdb2a..f51eb0ae 100644 --- a/toolkit/react/stream.tsx +++ b/toolkit/react/stream.tsx @@ -12,37 +12,6 @@ const PubsubData = z.object({ data: z.unknown(), }); -type ProcessInfo = z.infer; -const ProcessInfo = z.object({}); - -// Subscribe to app process information -export function useAppProcessInfo({ - category, - key, - reducer, - skip, -}: { - category?: string[]; - key?: string; - reducer: (s: State, o: ProcessInfo) => State; - skip?: boolean; -}) { - const appId = process.env.ROBIN_APP_ID; - - return useIndexedStream({ - methodName: 'SubscribeAppProcessInfo', - data: { - appId, - category, - key, - }, - skip: skip || !appId || !category || !key, - resultType: ProcessInfo, - reducer, - fetchState: () => undefined as any, - }); -} - // Subscribe to an app topic and track the messages received in relation // to state. export function useAppTopicQuery({ From 554a90da51d45e4f205fcded8c01b58fe7147f62 Mon Sep 17 00:00:00 2001 From: Albert Liu Date: Wed, 19 Apr 2023 01:15:49 -0700 Subject: [PATCH 8/9] meh --- backend/internal/compilerServer/daemon.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/backend/internal/compilerServer/daemon.go b/backend/internal/compilerServer/daemon.go index 5696437a..bfdd661e 100644 --- a/backend/internal/compilerServer/daemon.go +++ b/backend/internal/compilerServer/daemon.go @@ -30,16 +30,7 @@ func (app *CompiledApp) IsAlive() bool { return false } - if !process.IsAlive() { - return false - } - - // Send a ping to the process - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/api/health", process.Port)) - if resp != nil { - resp.Body.Close() - } - return err == nil && resp.StatusCode == http.StatusOK + return process.IsHealthy() } func (app *CompiledApp) keepAlive() { From 34f09c9c850e9271bcef37d5c02f473abb06bbc8 Mon Sep 17 00:00:00 2001 From: Albert Liu Date: Wed, 19 Apr 2023 01:17:50 -0700 Subject: [PATCH 9/9] rename --- backend/internal/compilerServer/daemon.go | 4 ++-- backend/internal/process/handle.go | 4 ++-- backend/internal/process/process.go | 8 ++++---- backend/internal/server/process_rpc.go | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/backend/internal/compilerServer/daemon.go b/backend/internal/compilerServer/daemon.go index bfdd661e..3e927db9 100644 --- a/backend/internal/compilerServer/daemon.go +++ b/backend/internal/compilerServer/daemon.go @@ -30,7 +30,7 @@ func (app *CompiledApp) IsAlive() bool { return false } - return process.IsHealthy() + return process.CheckHealth() } func (app *CompiledApp) keepAlive() { @@ -255,7 +255,7 @@ func (app *CompiledApp) StartServer() error { } // Send a ping to the process - if serverProcess.IsHealthy() { + if serverProcess.CheckHealth() { if atomic.CompareAndSwapInt64(app.keepAliveRunning, 0, 1) { go app.keepAlive() } diff --git a/backend/internal/process/handle.go b/backend/internal/process/handle.go index 3dc277bb..46d631bc 100644 --- a/backend/internal/process/handle.go +++ b/backend/internal/process/handle.go @@ -49,11 +49,11 @@ func (m *ProcessManager) IsAlive(id ProcessId) bool { return r.IsAlive(id) } -func (m *ProcessManager) IsHealthy(id ProcessId) bool { +func (m *ProcessManager) CheckHealth(id ProcessId) bool { r := m.ReadHandle() defer r.Close() - return r.IsHealthy(id) + return r.CheckHealth(id) } func (m *ProcessManager) CopyOutData() []Process { diff --git a/backend/internal/process/process.go b/backend/internal/process/process.go index d3fe1a1d..13fa25bd 100644 --- a/backend/internal/process/process.go +++ b/backend/internal/process/process.go @@ -264,16 +264,16 @@ func (r *RHandle) IsAlive(id ProcessId) bool { return process.IsAlive() } -func (r *RHandle) IsHealthy(id ProcessId) bool { +func (r *RHandle) CheckHealth(id ProcessId) bool { process, found := r.FindById(id) if !found { return false } - return process.IsHealthy() + return process.CheckHealth() } -func (proc *Process) IsHealthy() bool { - return proc.IsAlive() && proc.HealthCheck.Check(health.RunningProcessInfo{Pid: proc.Pid, Port: proc.Port}) +func (proc *Process) CheckHealth() bool { + return proc.HealthCheck.Check(health.RunningProcessInfo{Pid: proc.Pid, Port: proc.Port}) } // This reads the path variable to find the right executable. diff --git a/backend/internal/server/process_rpc.go b/backend/internal/server/process_rpc.go index 6b5dd258..c02cf78d 100644 --- a/backend/internal/server/process_rpc.go +++ b/backend/internal/server/process_rpc.go @@ -75,7 +75,7 @@ var CheckProcessHealth = AppsRpcMethod[CheckProcessHealthInput, map[string]any]{ Key: req.Data.ProcessKey, } - isHealthy := process.Manager.IsHealthy(id) + isHealthy := process.Manager.CheckHealth(id) return map[string]any{ "processKey": id,