Skip to content

Conversation

@ChrisJar
Copy link
Contributor

@ChrisJar ChrisJar commented May 2, 2022

No description provided.

@randerzander randerzander requested a review from VibhuJawa June 6, 2022 20:49
Copy link
Member

@VibhuJawa VibhuJawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this , have requested changes


if isinstance(x, cudf.Series):
vectorizer = HashingVectorizer
preprocessor = lambda s:s.str.lower()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is common for both so should be outside the if statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They need to use different hashing vectorizers (HashingVectorizer vs SKHashingVectorizer) and the lambda functions need to be slightly different (s:s.str.lower() vs s:s.lower())

Comment on lines +96 to +98
output_ser = cudf.Series(cudf.core.column.full(size=len(ser), fill_value=2, dtype=np.int32))
else:
output_ser = pd.Series(2, index=ser.index, dtype=np.int32)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have similar behavior here. We should not use cudf specific API when not needed.

Lets use cupy and numpy arrray here.e

pd.Series(np.full(shape=len(ser), fill_value=2, dtype=np.int32))
cudf.Series(cp.full(shape=len(ser), fill_value=2,  dtype=cp.int32))))


if isinstance(reviews_df, dask_cudf.DataFrame):
y = y.map_partitions(lambda x: cp.asarray(x, np.int32)).persist()
y._meta = cp.array(y._meta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically the map_partitions call creates a dask array with a cupy chunktype but for some reason the metadata shows a numpy chunktype. This issue has a deeper explanation: rapidsai/cudf#4309

raise ValueError("Single class precision is not yet supported")

ddh = DistributedDataHandler.create([y, y_pred])
gpu_futures = client.sync(_extract_partitions, [y, y_pred], client)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are they called gpu_futures , wont this be for both cpu and gpu ?

Comment on lines +221 to +222
global_tp = cp.sum(res[:, 0])
global_fp = cp.sum(res[:, 1])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
global_tp = cp.sum(res[:, 0])
global_fp = cp.sum(res[:, 1])
global_tp = res[:, 0].sum()
global_fp = res[:, 1].sum()

model.fit(X_train, y_train)
else:
model = ParallelPostFit(estimator=MultNB(alpha=0.001))
model.fit(X_train.compute(), y_train.compute())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are going to train this on the client process, is this intentional ? We should not do anything on the client process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that was a placeholder I meant to replace. Do you have any suggestion for the best way to parallelize SKlearn here? Unfortunately the dask-ml naive bayes model is incompatible with the sparse matrices we use in this query.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still train on worker processes. We can probably do something like below.

est_model = MultNB(alpha=0.001)
X_d = X_train.repartition(npartitions=1).to_delayed()
y_d =  y_train .repartition(npartitions=1).to_delayed()

delayed_model = [delayed(est_model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
del est_model

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants