Skip to content

Commit 2cf28f3

Browse files
committed
Implementing list table apis for olap (#8187)
* implementing clickhouse list table apis * adding list tables apis for pinot and druid * list table apis for duckdb * adding tests * revert deleted api * fix duckdb * review comment * review
1 parent 6465847 commit 2cf28f3

File tree

13 files changed

+1125
-145
lines changed

13 files changed

+1125
-145
lines changed

proto/rill/runtime/v1/connectors.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,4 +298,4 @@ message BigQueryListTablesRequest {
298298
message BigQueryListTablesResponse {
299299
string next_page_token = 1;
300300
repeated string names = 2;
301-
}
301+
}

runtime/drivers/clickhouse/clickhouse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ func (c *Connection) AsOLAP(instanceID string) (drivers.OLAPStore, bool) {
490490

491491
// AsInformationSchema implements drivers.Handle.
492492
func (c *Connection) AsInformationSchema() (drivers.InformationSchema, bool) {
493-
return nil, false
493+
return c, true
494494
}
495495

496496
// Migrate implements drivers.Handle.

runtime/drivers/clickhouse/information_schema.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,197 @@ import (
1313
"github.com/rilldata/rill/runtime/pkg/pagination"
1414
)
1515

16+
func (c *Connection) ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*drivers.DatabaseSchemaInfo, string, error) {
17+
limit := pagination.ValidPageSize(pageSize, drivers.DefaultPageSize)
18+
19+
var args []any
20+
var condFilter string
21+
if c.config.DatabaseWhitelist != "" {
22+
dbs := strings.Split(c.config.DatabaseWhitelist, ",")
23+
var sb strings.Builder
24+
for i, db := range dbs {
25+
if i > 0 {
26+
sb.WriteString(", ")
27+
}
28+
sb.WriteString("?")
29+
args = append(args, strings.TrimSpace(db))
30+
}
31+
condFilter = fmt.Sprintf("(schema_name IN (%s))", sb.String())
32+
} else {
33+
condFilter = "(schema_name == currentDatabase() OR lower(schema_name) NOT IN ('information_schema', 'system'))"
34+
}
35+
36+
if pageToken != "" {
37+
var startAfter string
38+
if err := pagination.UnmarshalPageToken(pageToken, &startAfter); err != nil {
39+
return nil, "", fmt.Errorf("invalid page token: %w", err)
40+
}
41+
condFilter += " AND schema_name > ?"
42+
args = append(args, startAfter)
43+
}
44+
45+
q := fmt.Sprintf(`
46+
SELECT
47+
schema_name
48+
FROM information_schema.schemata
49+
WHERE %s
50+
ORDER BY schema_name
51+
LIMIT ?
52+
`, condFilter)
53+
args = append(args, limit+1)
54+
55+
conn, release, err := c.acquireMetaConn(ctx)
56+
if err != nil {
57+
return nil, "", err
58+
}
59+
defer func() { _ = release() }()
60+
61+
rows, err := conn.QueryxContext(ctx, q, args...)
62+
if err != nil {
63+
return nil, "", err
64+
}
65+
defer rows.Close()
66+
67+
var res []*drivers.DatabaseSchemaInfo
68+
var schema string
69+
for rows.Next() {
70+
if err := rows.Scan(&schema); err != nil {
71+
return nil, "", err
72+
}
73+
res = append(res, &drivers.DatabaseSchemaInfo{
74+
Database: "",
75+
DatabaseSchema: schema,
76+
})
77+
}
78+
79+
if err := rows.Err(); err != nil {
80+
return nil, "", err
81+
}
82+
83+
next := ""
84+
if len(res) > limit {
85+
res = res[:limit]
86+
next = pagination.MarshalPageToken(res[len(res)-1].DatabaseSchema)
87+
}
88+
return res, next, nil
89+
}
90+
91+
func (c *Connection) ListTables(ctx context.Context, database, databaseSchema string, pageSize uint32, pageToken string) ([]*drivers.TableInfo, string, error) {
92+
limit := pagination.ValidPageSize(pageSize, drivers.DefaultPageSize)
93+
94+
q := `
95+
SELECT
96+
table_name,
97+
CASE WHEN table_type = 'VIEW' THEN true ELSE false END AS view
98+
FROM information_schema.tables
99+
WHERE table_schema = ?
100+
`
101+
args := []any{databaseSchema}
102+
if pageToken != "" {
103+
var startAfter string
104+
if err := pagination.UnmarshalPageToken(pageToken, &startAfter); err != nil {
105+
return nil, "", fmt.Errorf("invalid page token: %w", err)
106+
}
107+
q += " AND table_name > ?"
108+
args = append(args, startAfter)
109+
}
110+
q += `
111+
ORDER BY table_name
112+
LIMIT ?
113+
`
114+
args = append(args, limit+1)
115+
116+
conn, release, err := c.acquireMetaConn(ctx)
117+
if err != nil {
118+
return nil, "", err
119+
}
120+
defer func() { _ = release() }()
121+
122+
rows, err := conn.QueryxContext(ctx, q, args...)
123+
if err != nil {
124+
return nil, "", err
125+
}
126+
defer rows.Close()
127+
128+
var res []*drivers.TableInfo
129+
var name string
130+
var view bool
131+
for rows.Next() {
132+
if err := rows.Scan(&name, &view); err != nil {
133+
return nil, "", err
134+
}
135+
res = append(res, &drivers.TableInfo{
136+
Name: name,
137+
View: view,
138+
})
139+
}
140+
141+
if err := rows.Err(); err != nil {
142+
return nil, "", err
143+
}
144+
145+
next := ""
146+
if len(res) > limit {
147+
res = res[:limit]
148+
next = pagination.MarshalPageToken(res[len(res)-1].Name)
149+
}
150+
return res, next, nil
151+
}
152+
153+
func (c *Connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) {
154+
conn, release, err := c.acquireMetaConn(ctx)
155+
if err != nil {
156+
return nil, err
157+
}
158+
defer func() { _ = release() }()
159+
160+
q := `
161+
SELECT
162+
CASE WHEN table_type = 'VIEW' THEN true ELSE false END AS view,
163+
c.column_name,
164+
c.data_type
165+
FROM information_schema.tables t
166+
LEFT JOIN information_schema.columns c
167+
ON t.table_schema = c.table_schema AND t.table_name = c.table_name
168+
WHERE t.table_schema = ? AND t.table_name = ?
169+
ORDER BY c.ordinal_position
170+
`
171+
rows, err := conn.QueryxContext(ctx, q, databaseSchema, table)
172+
if err != nil {
173+
return nil, err
174+
}
175+
defer rows.Close()
176+
177+
schemaMap := make(map[string]string)
178+
var view bool
179+
var colName, dataType string
180+
for rows.Next() {
181+
if err := rows.Scan(&view, &colName, &dataType); err != nil {
182+
return nil, err
183+
}
184+
if pbType, err := databaseTypeToPB(dataType, false); err != nil {
185+
if errors.Is(err, errUnsupportedType) {
186+
schemaMap[colName] = fmt.Sprintf("UNKNOWN(%s)", dataType)
187+
} else {
188+
return nil, err
189+
}
190+
} else if pbType.Code == runtimev1.Type_CODE_UNSPECIFIED {
191+
schemaMap[colName] = fmt.Sprintf("UNKNOWN(%s)", dataType)
192+
} else {
193+
schemaMap[colName] = strings.TrimPrefix(pbType.Code.String(), "CODE_")
194+
}
195+
}
196+
197+
if err := rows.Err(); err != nil {
198+
return nil, err
199+
}
200+
201+
return &drivers.TableMetadata{
202+
Schema: schemaMap,
203+
View: view,
204+
}, nil
205+
}
206+
16207
func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) {
17208
conn, release, err := c.acquireMetaConn(ctx)
18209
if err != nil {

runtime/drivers/clickhouse/information_schema_test.go

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ func TestInformationSchema(t *testing.T) {
2020
dsn := testclickhouse.Start(t)
2121
conn, err := drivers.Open("clickhouse", "default", map[string]any{"dsn": dsn, "mode": "readwrite"}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
2222
require.NoError(t, err)
23+
24+
infoSchema, ok := conn.AsInformationSchema()
25+
require.True(t, ok)
26+
2327
prepareConn(t, conn)
2428
t.Run("testInformationSchemaAll", func(t *testing.T) { testInformationSchemaAll(t, conn) })
2529
t.Run("testInformationSchemaAllLike", func(t *testing.T) { testInformationSchemaAllLike(t, conn) })
@@ -29,8 +33,13 @@ func TestInformationSchema(t *testing.T) {
2933
testInformationSchemaSystemAllLike(t, conn)
3034
})
3135
t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, conn) })
32-
t.Run("testInformationSchemaPagination", func(t *testing.T) { testInformationSchemaAllPagination(t, conn) })
33-
t.Run("testInformationSchemaPaginationWithLike", func(t *testing.T) { testInformationSchemaAllPaginationWithLike(t, conn) })
36+
t.Run("testInformationSchemaAllPagination", func(t *testing.T) { testInformationSchemaAllPagination(t, conn) })
37+
t.Run("testInformationSchemaAllPaginationWithLike", func(t *testing.T) { testInformationSchemaAllPaginationWithLike(t, conn) })
38+
t.Run("testInformationSchemaListDatabaseSchemas", func(t *testing.T) { testInformationSchemaListDatabaseSchemas(t, infoSchema) })
39+
t.Run("testInformationSchemaListTables", func(t *testing.T) { testInformationSchemaListTables(t, infoSchema) })
40+
t.Run("testInformationSchemaGetTable", func(t *testing.T) { testInformationSchemaGetTable(t, infoSchema) })
41+
t.Run("testInformationSchemaListDatabaseSchemasPagination", func(t *testing.T) { testInformationSchemaListDatabaseSchemasPagination(t, infoSchema) })
42+
t.Run("testInformationSchemaListTablesPagination", func(t *testing.T) { testInformationSchemaListTablesPagination(t, infoSchema) })
3443
}
3544

3645
func testInformationSchemaAll(t *testing.T, conn drivers.Handle) {
@@ -186,6 +195,119 @@ func testInformationSchemaAllPaginationWithLike(t *testing.T, conn drivers.Handl
186195
require.Empty(t, nextToken)
187196
}
188197

198+
func testInformationSchemaListDatabaseSchemas(t *testing.T, infoSchema drivers.InformationSchema) {
199+
databaseSchemaInfo, _, err := infoSchema.ListDatabaseSchemas(context.Background(), 0, "")
200+
require.NoError(t, err)
201+
require.Equal(t, 3, len(databaseSchemaInfo))
202+
203+
require.Equal(t, "", databaseSchemaInfo[0].Database)
204+
require.Equal(t, "clickhouse", databaseSchemaInfo[0].DatabaseSchema)
205+
require.Equal(t, "", databaseSchemaInfo[1].Database)
206+
require.Equal(t, "default", databaseSchemaInfo[1].DatabaseSchema)
207+
require.Equal(t, "", databaseSchemaInfo[2].Database)
208+
require.Equal(t, "other", databaseSchemaInfo[2].DatabaseSchema)
209+
}
210+
211+
func testInformationSchemaListTables(t *testing.T, infoSchema drivers.InformationSchema) {
212+
tables, _, err := infoSchema.ListTables(context.Background(), "", "default", 0, "")
213+
require.NoError(t, err)
214+
require.Equal(t, len(tables), 3)
215+
216+
require.Equal(t, "bar", tables[0].Name)
217+
require.Equal(t, false, tables[0].View)
218+
require.Equal(t, "foo", tables[1].Name)
219+
require.Equal(t, false, tables[1].View)
220+
require.Equal(t, "model", tables[2].Name)
221+
require.Equal(t, true, tables[2].View)
222+
223+
tables, _, err = infoSchema.ListTables(context.Background(), "", "other", 0, "")
224+
require.NoError(t, err)
225+
require.Equal(t, len(tables), 2)
226+
227+
require.Equal(t, "bar", tables[0].Name)
228+
require.Equal(t, false, tables[0].View)
229+
require.Equal(t, "foo", tables[1].Name)
230+
require.Equal(t, false, tables[1].View)
231+
}
232+
233+
func testInformationSchemaGetTable(t *testing.T, infoSchema drivers.InformationSchema) {
234+
ctx := context.Background()
235+
236+
// Existing table
237+
foo, err := infoSchema.GetTable(ctx, "", "default", "foo")
238+
require.NoError(t, err)
239+
require.Len(t, foo.Schema, 2)
240+
require.Equal(t, "STRING", foo.Schema["bar"])
241+
require.Equal(t, "INT32", foo.Schema["baz"])
242+
require.False(t, foo.View)
243+
244+
// Non-existent table
245+
noTable, err := infoSchema.GetTable(ctx, "", "default", "nonexistent_table")
246+
require.NoError(t, err)
247+
require.Empty(t, noTable.Schema)
248+
249+
// View
250+
model, err := infoSchema.GetTable(ctx, "", "default", "model")
251+
require.NoError(t, err)
252+
require.Equal(t, "UINT8", model.Schema["1"])
253+
require.Equal(t, "UINT8", model.Schema["2"])
254+
require.Equal(t, "UINT8", model.Schema["3"])
255+
require.True(t, model.View)
256+
257+
ofoo, err := infoSchema.GetTable(ctx, "", "other", "foo")
258+
require.NoError(t, err)
259+
require.Equal(t, "STRING", ofoo.Schema["bar"])
260+
require.Equal(t, "INT32", ofoo.Schema["baz"])
261+
require.Equal(t, false, ofoo.View)
262+
263+
}
264+
265+
func testInformationSchemaListDatabaseSchemasPagination(t *testing.T, infoSchema drivers.InformationSchema) {
266+
ctx := context.Background()
267+
pageSize := 2
268+
269+
// First page
270+
page1, token1, err := infoSchema.ListDatabaseSchemas(ctx, uint32(pageSize), "")
271+
require.NoError(t, err)
272+
require.Len(t, page1, pageSize)
273+
require.NotEmpty(t, token1)
274+
275+
// second page
276+
page2, token2, err := infoSchema.ListDatabaseSchemas(ctx, uint32(pageSize), token1)
277+
require.NoError(t, err)
278+
require.NotEmpty(t, page2)
279+
require.Empty(t, token2)
280+
281+
// Page size 0
282+
all, token, err := infoSchema.ListDatabaseSchemas(ctx, 0, "")
283+
require.NoError(t, err)
284+
require.Equal(t, len(all), 3)
285+
require.Empty(t, token)
286+
}
287+
288+
func testInformationSchemaListTablesPagination(t *testing.T, infoSchema drivers.InformationSchema) {
289+
ctx := context.Background()
290+
pageSize := 2
291+
292+
// First page
293+
page1, token1, err := infoSchema.ListTables(ctx, "", "default", uint32(pageSize), "")
294+
require.NoError(t, err)
295+
require.Len(t, page1, pageSize)
296+
require.NotEmpty(t, token1)
297+
298+
// Second page
299+
page2, token2, err := infoSchema.ListTables(ctx, "", "default", uint32(pageSize), token1)
300+
require.NoError(t, err)
301+
require.NotEmpty(t, page2)
302+
require.Empty(t, token2)
303+
304+
// Page size 0
305+
all, token, err := infoSchema.ListTables(ctx, "", "default", 0, "")
306+
require.NoError(t, err)
307+
require.GreaterOrEqual(t, len(all), 3)
308+
require.Empty(t, token)
309+
}
310+
189311
func prepareConn(t *testing.T, conn drivers.Handle) {
190312
olap, ok := conn.AsOLAP("")
191313
require.True(t, ok)

runtime/drivers/druid/druid.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (c *connection) AsOLAP(instanceID string) (drivers.OLAPStore, bool) {
285285

286286
// AsInformationSchema implements drivers.Handle.
287287
func (c *connection) AsInformationSchema() (drivers.InformationSchema, bool) {
288-
return nil, false
288+
return c, true
289289
}
290290

291291
// Migrate implements drivers.Handle.

0 commit comments

Comments
 (0)