Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
## 1.5.0 [unreleased]

### Features

1. [#622](https://github.com/InfluxCommunity/influxdb3-js/pull/622):
- Deprecated ConnectionOptions.timeout.
- Added ConnectionOptions.queryTimeout and ConnectionOptions.writeTimeout.
- Added QueryOptions.timeout and WriteOptions.timeout.
- Users can pass timeout directly to the query and write functions.

### CI

1. [#626](https://github.com/InfluxCommunity/influxdb3-js/pull/626) Fix pipeline not downloading the correct node images.
1. [#626](https://github.com/InfluxCommunity/influxdb3-js/pull/626) Fix pipelines not downloading the correct node images.

### Docs

Expand Down
16 changes: 14 additions & 2 deletions packages/client/src/InfluxDBClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,21 @@ export default class InfluxDBClient {
this._options.host = host.substring(0, host.length - 1)
if (typeof this._options.token !== 'string')
throw new IllegalArgumentError('No token specified!')
this._queryApi = new QueryApiImpl(this._options)
this._queryApi = new QueryApiImpl({
...this._options,
queryTimeout:
this._options.queryOptions?.timeout ?? this._options?.queryTimeout,
})

const writeTimeout =
this._options.timeout ??
this._options.writeOptions?.timeout ??
this._options.writeTimeout

this._transport = impl.writeTransport(this._options)
this._transport = impl.writeTransport({
...this._options,
writeTimeout: writeTimeout,
})
this._writeApi = new WriteApiImpl({
transport: this._transport,
...this._options,
Expand Down
3 changes: 3 additions & 0 deletions packages/client/src/WriteApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ export default interface WriteApi {
* Write lines of [Line Protocol](https://bit.ly/2QL99fu).
*
* @param lines - InfluxDB Line Protocol
* @param bucket - InfluxDB bucket to write to
* @param org - InfluxDB organization of bucket
* @param writeOptions - Write call-specific options
*/
doWrite(
lines: string[],
Expand Down
1 change: 1 addition & 0 deletions packages/client/src/impl/QueryApiImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export default class QueryApiImpl implements QueryApi {

const meta = this.prepareMetadata(options.headers)
const rpcOptions: RpcOptions = {meta}
rpcOptions.timeout = options.timeout ?? this._options.queryTimeout

const flightDataStream = client.doGet(ticket, rpcOptions)

Expand Down
1 change: 1 addition & 0 deletions packages/client/src/impl/WriteApiImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export default class WriteApiImpl implements WriteApi {
...writeOptions?.headers,
},
gzipThreshold: writeOptionsOrDefault.gzipThreshold,
timeout: writeOptionsOrDefault.timeout,
}

this._transport.send(
Expand Down
10 changes: 9 additions & 1 deletion packages/client/src/impl/node/NodeHttpTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ export class NodeHttpTransport implements Transport {
port: url.port,
protocol: url.protocol,
hostname: url.hostname,
timeout:
nodeSupportedOptions.timeout ?? nodeSupportedOptions.writeTimeout,
}
this._contextPath = proxyUrl ? _url : url.path ?? ''
if (this._contextPath.endsWith('/')) {
Expand Down Expand Up @@ -259,11 +261,15 @@ export class NodeHttpTransport implements Transport {
yield chunk
}
}

/**
* Creates configuration for a specific request.
*
* @param path - API path starting with '/' and containing also query parameters
* @param body - request body, will be utf-8 encoded
* @param body - Request body, will be utf-8 encoded
* @param sendOptions - Options for sending a request message
* @param resolve - Resolve callback
* @param reject - Reject callback
* @returns a configuration object that is suitable for making the request
*/
private _createRequestMessage(
Expand All @@ -282,6 +288,7 @@ export class NodeHttpTransport implements Transport {
const authScheme = this._authScheme ?? 'Token'
headers.authorization = `${authScheme} ${this._token}`
}

const options: {[key: string]: any} = {
...this._defaultOptions,
path: this._contextPath + path,
Expand All @@ -290,6 +297,7 @@ export class NodeHttpTransport implements Transport {
...headers,
...sendOptions.headers,
},
timeout: sendOptions.timeout ?? this._defaultOptions.timeout,
}
if (sendOptions.signal) {
options.signal = sendOptions.signal
Expand Down
25 changes: 21 additions & 4 deletions packages/client/src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,31 @@ import {QParamType} from './QueryApi'
export interface ConnectionOptions {
/** base host URL */
host: string

/** authentication token */
token?: string

/** token authentication scheme. Not set for Cloud access, set to 'Bearer' for Edge. */
authScheme?: string

/**
* socket timeout. Not applicable in browser (option is ignored).
* @Deprecated: Please use more specific properties like writeTimeout or WriteOptions.timeout.
*/
timeout?: number

/**
* socket timeout. 10000 milliseconds by default in node.js. Not applicable in browser (option is ignored).
* @defaultValue 10000
*/
timeout?: number
writeTimeout?: number

/**
* stream timeout for query (grpc timeout). The gRPC doesn't apply the socket timeout to operations as is defined above. To successfully close a call to the gRPC endpoint, the queryTimeout must be specified. Without this timeout, a gRPC call might end up in an infinite wait state.
* @defaultValue 60000
*/
queryTimeout?: number

/**
* default database for write query if not present as argument.
*/
Expand Down Expand Up @@ -54,9 +65,11 @@ export interface ConnectionOptions {
}

/** default connection options */
export const DEFAULT_ConnectionOptions: Partial<ConnectionOptions> = {
timeout: 10000,
queryTimeout: 60000,
export const DEFAULT_ConnectionOptions: Readonly<Partial<ConnectionOptions>> = {
// Legacy timeout property. Will be removed in the future. Use writeTimeout or WriteOptions.timeout instead.
timeout: undefined,
writeTimeout: 10_000,
queryTimeout: 60_000,
}

/**
Expand Down Expand Up @@ -131,6 +144,8 @@ export interface WriteOptions {
* ```
*/
defaultTags?: {[key: string]: string}
/** Specific timeout for the writing call. If not set, the value from {@link ConnectionOptions.writeTimeout} is used by default. */
timeout?: number
}

/** default writeOptions */
Expand Down Expand Up @@ -170,6 +185,8 @@ export interface QueryOptions {
/** GRPC specific Parameters to be set when instantiating a client
* See supported channel options in @grpc/grpc-js/README.md. **/
grpcOptions?: Record<string, any>
/** Specific timeout for the query. If not set, the value from {@link ConnectionOptions.queryTimeout} is used by default. */
timeout?: number
}

/** Default QueryOptions */
Expand Down
3 changes: 3 additions & 0 deletions packages/client/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export interface SendOptions {
gzipThreshold?: number
/** Abort signal */
signal?: AbortSignal
/** Timeout of the request */
timeout?: number
}

/**
Expand Down Expand Up @@ -39,6 +41,7 @@ export interface Transport {
* @param path - HTTP request path
* @param requestBody - request body
* @param options - send options
* @param responseStarted - Callback called before issuing the request
* @returns response data
*/
request(
Expand Down
85 changes: 85 additions & 0 deletions packages/client/test/integration/queryAPI.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,89 @@ describe('query api tests', () => {
'RpcError'
)
})

it('timeout in ClientOptions', async function () {
this.timeout(3000)
const client: InfluxDBClient = new InfluxDBClient({
host: `http://localhost:${server.port}`,
token: 'TEST_TOKEN',
database: 'CI_TEST',
timeout: 10_000,
queryTimeout: 0,
})

await expectThrowsAsync(
async () => {
const data = client.query('SELECT * FROM wumpus', 'CI_TEST')
await data.next()
},
/^Deadline exceeded.*/,
'RpcError'
)
})

it('timeout in ClientOptions should not effecting query timeout', async function () {
this.timeout(3000)
const client: InfluxDBClient = new InfluxDBClient({
host: `http://localhost:${server.port}`,
token: 'TEST_TOKEN',
database: 'CI_TEST',
timeout: 10_000,
})

await expectThrowsAsync(
async () => {
const data = client.query('SELECT * FROM wumpus', 'CI_TEST', {
timeout: 0,
})
await data.next()
},
/^Deadline exceeded.*/,
'RpcError'
)
})

it('passing timeout directly to query function', async function () {
this.timeout(3000)
const client: InfluxDBClient = new InfluxDBClient({
host: `http://localhost:${server.port}`,
token: 'TEST_TOKEN',
database: 'CI_TEST',
queryTimeout: 10_000,
timeout: 10_000,
})

await expectThrowsAsync(
async () => {
const data = client.query('SELECT * FROM wumpus', 'CI_TEST', {
timeout: 0,
})
await data.next()
},
/^Deadline exceeded.*/,
'RpcError'
)
})

it('passing timeout directly to queryPoints function', async function () {
this.timeout(3000)
const client: InfluxDBClient = new InfluxDBClient({
host: `http://localhost:${server.port}`,
token: 'TEST_TOKEN',
database: 'CI_TEST',
queryTimeout: 10_000,
timeout: 10_000,
})

await expectThrowsAsync(
async () => {
const data = client.queryPoints('SELECT * FROM wumpus', 'CI_TEST', {
timeout: 0,
})
await data.next()
},
/^Deadline exceeded.*/,
'RpcError'
)
})
})
82 changes: 82 additions & 0 deletions packages/client/test/unit/Write.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,5 +439,87 @@ describe('Write', () => {
}
}
})

it('timeout when passing timeout directly to write function will have the highest priority', async function () {
nock(clientOptions.host)
.post(WRITE_PATH_NS)
.delay(1000)
.reply(function (_uri, _requestBody) {
return [200, '', {}]
})
.persist()
const option: ClientOptions = {
...clientOptions,
timeout: 10_000,
writeTimeout: 10_000,
writeOptions: {
timeout: 10_000,
},
}
const client: InfluxDBClient = new InfluxDBClient(option)
try {
await client.write(
Point.measurement('test').setFloatField('value', 1),
DATABASE,
undefined,
{timeout: 100}
)
expect.fail('failure expected')
} catch (e: any) {
expect(e.toString()).to.include('timed')
}
}).timeout(2000)

it('timeout by timeout property in ClientOptions', async function () {
nock(clientOptions.host)
.post(WRITE_PATH_NS)
.delay(1000)
.reply(function (_uri, _requestBody) {
return [200, '', {}]
})
.persist()
const option: ClientOptions = {
...clientOptions,
timeout: 100,
}
const client: InfluxDBClient = new InfluxDBClient(option)
try {
await client.write(
Point.measurement('test').setFloatField('value', 1),
DATABASE
)
expect.fail('failure expected')
} catch (e: any) {
expect(e.toString()).to.include('timed')
}
}).timeout(2000)

it('WriteOptions.timeout > legacy timeout and new property writeTimeout', async function () {
nock(clientOptions.host)
.post(WRITE_PATH_NS)
.delay(1000)
.reply(function (_uri, _requestBody) {
return [200, '', {}]
})
.persist()
const option: ClientOptions = {
...clientOptions,
timeout: 10_000,
writeTimeout: 10_000,
writeOptions: {
timeout: 100,
},
}
const client: InfluxDBClient = new InfluxDBClient(option)
try {
await client.write(
Point.measurement('test').setFloatField('value', 1),
DATABASE
)
expect.fail('failure expected')
} catch (e: any) {
expect(e.toString()).to.include('timed')
}
}).timeout(2000)
})
})
Loading
Loading