Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions packages/datamesh/src/lib/datamodel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ export class DataVar<
return i;
}
});

const _data: Chunk<DType> | Scalar = await get(
this.arr as zarr.Array<DType, AsyncReadable>,
_index as SliceDef
Expand Down Expand Up @@ -432,8 +433,8 @@ export class Dataset<S extends HttpZarr | TempZarr> {
parameters: options.parameters,
timeout: options.timeout,
nocache: options.nocache,
}) as AsyncReadable;
const _zarr = await zarr.withConsolidated(store);
});
const _zarr = await zarr.withConsolidated(store as AsyncReadable);
const root = await zarr.open(_zarr, { kind: "group" });
const vars = {} as Record<string, DataVar<DataType, HttpZarr>>;
const dims = {} as Record<string, number>;
Expand Down Expand Up @@ -479,6 +480,10 @@ export class Dataset<S extends HttpZarr | TempZarr> {
}
const coords = (JSON.parse(root.attrs["_coordinates"] as string) ||
{}) as Coordkeys;
const cache_hash = root.attrs["_datamesh_cache_hash"] as string | null;
if (cache_hash) {
store.set_cache_hash(cache_hash);
}
return new Dataset<HttpZarr>(
dims,
vars,
Expand Down
43 changes: 30 additions & 13 deletions packages/datamesh/src/lib/zarr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,33 @@ export class CachedHTTPStore implements AsyncReadable {
// Create a copy of the auth headers to avoid modifying the original
// Important: We need to preserve all auth headers, including session headers
const headers = { ...authHeaders };


this.params = {};
// Add parameters, chunks, and downsample as headers if provided
if (options.parameters)
if (options.parameters) {
headers["x-parameters"] = JSON.stringify(options.parameters);
if (options.chunks) headers["x-chunks"] = options.chunks;
if (options.downsample)
this.params['parameters']= JSON.stringify(options.parameters);
}
if (options.chunks) {
headers["x-chunks"] = JSON.stringify(options.chunks);
this.params["chunks"] = JSON.stringify(options.chunks);
}
if (options.downsample) {
headers["x-downsample"] = JSON.stringify(options.downsample);
this.params["downsample"] = JSON.stringify(options.downsample);
}

headers["x-filtered"] = "True";

this.params = {};
if (authHeaders["x-datamesh-auth"]) {
this.params["auth"] = authHeaders["x-datamesh-auth"];
}
if (authHeaders["x-datamesh-sig"]) {
this.params["sig"] = authHeaders["x-datamesh-sig"];
}

this.fetchOptions = { headers };

this.url = root;
const datasource = root.split("/").pop();

// Determine if caching should be used
if (options.nocache || typeof window === "undefined") {
this.cache = undefined;
Expand All @@ -78,14 +83,26 @@ export class CachedHTTPStore implements AsyncReadable {
this.timeout = options.timeout || 60000;
}

set_cache_hash(cache_hash: string) {
// Amend the cache prefix with the cache hash to invalidate old cache entries
console.log("Setting cache hash:", cache_hash);
const cache_prefix = this.cache_prefix;
this.cache_prefix = hash({ cache_prefix, cache_hash });
this.params = { ...this.params, cache: cache_hash };
}

async get(
item: AbsolutePath,
options?: RequestInit,
retry = 0
): Promise<Uint8Array | undefined> {
const key = `${this.cache_prefix}${item}`;
let data = null;
if (this.cache) {
const not_time = !item.split('/')[1].includes('time');
const not_metadata = !(item.endsWith(".zmetadata") || item.endsWith(".zattrs"));
const should_cache = this.cache && not_time && not_metadata;
console.log("Should cache:", should_cache, item, not_time, not_metadata);
if (should_cache) {
data = await get_cache(key, this.cache);
if (data) delete this._pending[key];
if (this._pending[key]) {
Expand Down Expand Up @@ -122,21 +139,21 @@ export class CachedHTTPStore implements AsyncReadable {

if (response.status === 404) {
// Item is not found
if (this.cache) await del_cache(key, this.cache);
if (should_cache) await del_cache(key, this.cache);
return undefined;
}
if (response.status >= 400) {
throw new Error(`HTTP error: ${response.status}`);
}
data = new Uint8Array(await response.arrayBuffer());
if (this.cache) await set_cache(key, data, this.cache);
if (should_cache) await set_cache(key, data, this.cache);
} catch (e) {
console.debug("Zarr retry:" + key);
if (retry < this.timeout / 200) {
delete this._pending[key];
return await this.get(item, options, retry + 200);
}
if (this.cache) await del_cache(key, this.cache);
if (should_cache) await del_cache(key, this.cache);
console.error(e);
return undefined;
} finally {
Expand Down
2 changes: 1 addition & 1 deletion packages/datamesh/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export default defineConfig({
// External packages that should not be bundled into your library.
external: [],
},
watch: true,
//watch: true,
},
test: {
watch: true,
Expand Down