Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions servers/mu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export const domainConfigSchema = z.object({
ENABLE_HB_WALLET_CHECK: z.boolean(),
HB_GRAPHQL_URL: z.string(),
RATE_LIMIT_FILE_URL: z.string().optional(),
HB_PROCESSES_URL: z.string().optional(),
PROCESS_WHITELIST_URL: z.string().optional(),
SKIP_REPUSH_CHECKS_TOKEN: z.string().optional()
})

Expand Down Expand Up @@ -159,7 +159,7 @@ const CONFIG_ENVS = {
ENABLE_HB_WALLET_CHECK: process.env.ENABLE_HB_WALLET_CHECK !== 'false',
HB_GRAPHQL_URL: process.env.HB_GRAPHQL_URL || 'https://cache.forward.computer',
RATE_LIMIT_FILE_URL: process.env.RATE_LIMIT_FILE_URL || '',
HB_PROCESSES_URL: process.env.HB_PROCESSES_URL || '',
PROCESS_WHITELIST_URL: process.env.PROCESS_WHITELIST_URL || '',
SKIP_REPUSH_CHECKS_TOKEN: process.env.SKIP_REPUSH_CHECKS_TOKEN || ''
},
production: {
Expand Down Expand Up @@ -201,7 +201,7 @@ const CONFIG_ENVS = {
ENABLE_HB_WALLET_CHECK: process.env.ENABLE_HB_WALLET_CHECK !== 'false',
HB_GRAPHQL_URL: process.env.HB_GRAPHQL_URL || 'https://cache.forward.computer',
RATE_LIMIT_FILE_URL: process.env.RATE_LIMIT_FILE_URL || '',
HB_PROCESSES_URL: process.env.HB_PROCESSES_URL || '',
PROCESS_WHITELIST_URL: process.env.PROCESS_WHITELIST_URL || '',
SKIP_REPUSH_CHECKS_TOKEN: process.env.SKIP_REPUSH_CHECKS_TOKEN || ''
}
}
Expand Down
14 changes: 12 additions & 2 deletions servers/mu/src/domain/api/monitorProcess.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
import { of } from 'hyper-async'
import { of, Rejected } from 'hyper-async'

import { parseDataItemWith } from '../lib/parse-data-item.js'
import { startWith } from '../lib/start-process.js'

export function monitorProcessWith ({
logger,
createDataItem,
startProcessMonitor
startProcessMonitor,
fetchProcessWhitelist
}) {
const parseDataItem = parseDataItemWith({ createDataItem, logger })
const start = startWith({ logger, startProcessMonitor })

return (ctx) => {
return of(ctx)
.chain(parseDataItem)
.chain((ctx) => {
const whitelist = fetchProcessWhitelist ? fetchProcessWhitelist() : {}
if (whitelist && Object.keys(whitelist).length > 0 && !whitelist[ctx.tx.processId]) {
const error = new Error('Forbidden, process not whitelisted')
error.code = 403
return Rejected(error)
}
return of(ctx)
})
.chain(start)
}
}
9 changes: 8 additions & 1 deletion servers/mu/src/domain/api/sendAssign.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export function sendAssignWith ({
locateProcess,
fetchResult,
crank,
logger
logger,
fetchProcessWhitelist
}) {
const getCuAddress = getCuAddressWith({ selectNode, logger })
const pullResult = pullResultWith({ fetchResult, logger })
Expand Down Expand Up @@ -62,6 +63,12 @@ export function sendAssignWith ({
}))

return (ctx) => {
const whitelist = fetchProcessWhitelist ? fetchProcessWhitelist() : {}
if (whitelist && Object.keys(whitelist).length > 0 && !whitelist[ctx.assign.processId]) {
const error = new Error('Forbidden, process not whitelisted')
error.code = 403
return Rejected(error)
}
return of(ctx)
.chain((ctx) =>
locateProcessLocal(ctx.assign.processId)
Expand Down
11 changes: 9 additions & 2 deletions servers/mu/src/domain/api/sendDataItem.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ export function sendDataItemWith ({
GET_RESULT_MAX_RETRIES,
GET_RESULT_RETRY_DELAY,
ENABLE_MESSAGE_RECOVERY,
fetchHBProcesses
fetchHBProcesses,
fetchProcessWhitelist
}) {
const verifyParsedDataItem = verifyParsedDataItemWith()
const parseDataItem = parseDataItemWith({ createDataItem, logger })
const getCuAddress = getCuAddressWith({ selectNode, logger })
const writeMessage = writeMessageTxWith({ locateProcess, writeDataItem, logger, fetchSchedulerProcess, writeDataItemArweave })
const pullResult = pullResultWith({ fetchResult, fetchHyperBeamResult, logger, fetchHBProcesses})
const pullResult = pullResultWith({ fetchResult, fetchHyperBeamResult, logger, fetchHBProcesses })
const writeProcess = writeProcessTxWith({ locateScheduler, writeDataItem, logger })
const getResult = getResultWith({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY })
const insertMessage = insertMessageWith({ db })
Expand Down Expand Up @@ -319,6 +320,12 @@ export function sendDataItemWith ({
})
.chain(({ isMessage }) => {
if (isMessage) {
const whitelist = fetchProcessWhitelist ? fetchProcessWhitelist() : {}
if (whitelist && Object.keys(whitelist).length > 0 && !whitelist[ctx.dataItem.target]) {
const error = new Error('Forbidden, process not whitelisted')
error.code = 403
return Rejected(error)
}
/*
add schedLocation into the context if the
target is a process. if its a wallet dont add
Expand Down
30 changes: 17 additions & 13 deletions servers/mu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export const createApis = async (ctx) => {
const ENABLE_HB_WALLET_CHECK = ctx.ENABLE_HB_WALLET_CHECK
const HB_GRAPHQL_URL = ctx.HB_GRAPHQL_URL
const RATE_LIMIT_FILE_URL = ctx.RATE_LIMIT_FILE_URL
const HB_PROCESSES_URL = ctx.HB_PROCESSES_URL
const PROCESS_WHITELIST_URL = ctx.PROCESS_WHITELIST_URL

let rateLimitFile = {}
cron.schedule('*/10 * * * *', async () => {
Expand Down Expand Up @@ -189,19 +189,20 @@ export const createApis = async (ctx) => {

let processesFile = {}
cron.schedule('*/5 * * * *', async () => {
if (!HB_PROCESSES_URL || HB_PROCESSES_URL === '') return
console.log('Updating HB_PROCESSES file after 5 minutes', HB_PROCESSES_URL)
const fetchedProcessesFile = await fetch(HB_PROCESSES_URL)
if (!PROCESS_WHITELIST_URL || PROCESS_WHITELIST_URL === '') return
console.log('Updating process whitelist file after 5 minutes', PROCESS_WHITELIST_URL)
const json = await fetch(PROCESS_WHITELIST_URL)
.then((res) => res.json())
.catch(err => {
console.error('Error updating hb processes file', err)
return {}
})
processesFile = { HB_PROCESSES: fetchedProcessesFile }
processesFile = { HB_PROCESSES: json.hb_processes || {}, PROCESSES: json.processes || {} }
console.log('Updated processes file')
}, { runOnInit: true })

const fetchHBProcesses = () => { return processesFile }
const fetchProcessWhitelist = () => processesFile.PROCESSES || {}

// Create trace database metrics
MetricsClient.gaugeWith({})({
Expand Down Expand Up @@ -322,7 +323,8 @@ export const createApis = async (ctx) => {
GET_RESULT_MAX_RETRIES: ctx.GET_RESULT_MAX_RETRIES,
GET_RESULT_RETRY_DELAY: ctx.GET_RESULT_RETRY_DELAY,
ENABLE_MESSAGE_RECOVERY: ctx.ENABLE_MESSAGE_RECOVERY,
fetchHBProcesses
fetchHBProcesses,
fetchProcessWhitelist
})

const sendAssignLogger = logger.child('sendAssign')
Expand All @@ -332,7 +334,8 @@ export const createApis = async (ctx) => {
writeAssignment: schedulerClient.writeAssignmentWith({ fetch, histogram, logger: sendAssignLogger }),
fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: sendDataItemLogger }),
crank,
logger: sendAssignLogger
logger: sendAssignLogger,
fetchProcessWhitelist
})

/**
Expand Down Expand Up @@ -388,7 +391,8 @@ export const createApis = async (ctx) => {
const monitorProcess = monitorProcessWith({
startProcessMonitor,
createDataItem,
logger: monitorProcessLogger
logger: monitorProcessLogger,
fetchProcessWhitelist
})

const stopMonitorProcessLogger = logger.child('stopMonitorProcess')
Expand Down Expand Up @@ -473,7 +477,7 @@ export const createResultApis = async (ctx) => {
const HB_ROUTER_URL = ctx.HB_ROUTER_URL
const ENABLE_HB_WALLET_CHECK = ctx.ENABLE_HB_WALLET_CHECK
const HB_GRAPHQL_URL = ctx.HB_GRAPHQL_URL
const HB_PROCESSES_URL = ctx.HB_PROCESSES_URL
const PROCESS_WHITELIST_URL = ctx.PROCESS_WHITELIST_URL

const logger = ctx.logger
const fetch = ctx.fetch
Expand Down Expand Up @@ -503,15 +507,15 @@ export const createResultApis = async (ctx) => {

let processesFile = {}
cron.schedule('*/5 * * * *', async () => {
if (!HB_PROCESSES_URL || HB_PROCESSES_URL === '') return
console.log('Updating HB_PROCESSES file after 5 minutes', HB_PROCESSES_URL)
const fetchedProcessesFile = await fetch(HB_PROCESSES_URL)
if (!PROCESS_WHITELIST_URL || PROCESS_WHITELIST_URL === '') return
console.log('Updating process whitelist file after 5 minutes', PROCESS_WHITELIST_URL)
const json = await fetch(PROCESS_WHITELIST_URL)
.then((res) => res.json())
.catch(err => {
console.error('Error updating hb processes file', err)
return {}
})
processesFile = { HB_PROCESSES: fetchedProcessesFile }
processesFile = { HB_PROCESSES: json.hb_processes || {}, PROCESSES: json.processes || {} }
console.log('Updated processes file')
}, { runOnInit: true })

Expand Down
Loading