From 70bfcf114a2b9651c4db7620dcac0c3e55536295 Mon Sep 17 00:00:00 2001 From: Ping Chen Date: Tue, 3 Feb 2026 15:03:32 +0900 Subject: [PATCH] chatsync: use sync token to handle deleted conversations --- pkg/connector/chatsync.go | 37 +++++++++++++++++++++++++++----- pkg/connector/client.go | 10 +++++++++ pkg/connector/dbmeta.go | 7 +++--- pkg/connector/handlelinkedin.go | 12 +++-------- pkg/connector/login.go | 2 +- pkg/linkedingo/client.go | 38 ++++++++++++++++++++------------- pkg/linkedingo/conversations.go | 21 ++++++++++++------ 7 files changed, 88 insertions(+), 39 deletions(-) diff --git a/pkg/connector/chatsync.go b/pkg/connector/chatsync.go index a2eef11..fb86efb 100644 --- a/pkg/connector/chatsync.go +++ b/pkg/connector/chatsync.go @@ -30,18 +30,31 @@ import ( ) func (l *LinkedInClient) deleteConversation(ctx context.Context, conv linkedingo.Conversation) { + l.deletePortal(ctx, l.makePortalKey(conv)) +} + +func (l *LinkedInClient) deletePortal(ctx context.Context, portalKey networkid.PortalKey) { l.main.Bridge.QueueRemoteEvent(l.userLogin, &simplevent.ChatDelete{ EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatDelete, - LogContext: func(c zerolog.Context) zerolog.Context { - return c.Stringer("entity_urn", conv.EntityURN) - }, - PortalKey: l.makePortalKey(conv), + Type: bridgev2.RemoteEventChatDelete, + PortalKey: portalKey, }, OnlyForMe: true, }) } +func (l *LinkedInClient) deleteURN(ctx context.Context, urn linkedingo.URN) { + portalKey := networkid.PortalKey{ + ID: networkid.PortalID(urn.String()), + Receiver: l.userLogin.ID, + } + l.deletePortal(ctx, portalKey) + if !l.main.Bridge.Config.SplitPortals { + portalKey.Receiver = "" + l.deletePortal(ctx, portalKey) + } +} + func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linkedingo.Conversation) { log := zerolog.Ctx(ctx) @@ -185,3 +198,17 @@ func (l *LinkedInClient) syncConversations(ctx context.Context) { l.handleConversations(ctx, conversations.Elements) } } +func (l *LinkedInClient) getConversationsBySyncToken(ctx context.Context) { + convs, err := l.client.GetConversationsBySyncToken(ctx) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("failed to get conversations by sync token") + return + } + if convs == nil { + return + } + l.handleConversations(ctx, convs.Elements) + for _, item := range convs.Metadata.DeletedURNs { + l.deleteURN(ctx, item.Conversation.EntityURN) + } +} diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 5385244..80a769a 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -81,6 +81,7 @@ func NewLinkedInClient(ctx context.Context, lc *LinkedInConnector, login *bridge cookies, meta.XLIPageInstance, meta.XLITrack, + meta.ConversationsSyncToken, linkedingo.Handlers{ Heartbeat: func(ctx context.Context) { if login.BridgeState.GetPrevUnsent().StateEvent != status.StateConnected { @@ -103,6 +104,13 @@ func NewLinkedInClient(ctx context.Context, lc *LinkedInConnector, login *bridge BadCredentials: client.onBadCredentials, UnknownError: client.onUnknownError, DecoratedEvent: client.onDecoratedEvent, + ConversationsSyncToken: func(ctx context.Context, syncToken string) { + meta.ConversationsSyncToken = syncToken + err := login.Save(ctx) + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("Failed to save sync token") + } + }, }, ) @@ -148,6 +156,8 @@ func (l *LinkedInClient) Connect(ctx context.Context) { } l.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnecting}) + + l.getConversationsBySyncToken(ctx) if err := l.client.RealtimeConnect(ctx); err != nil { l.userLogin.BridgeState.Send(status.BridgeState{ StateEvent: status.StateBadCredentials, diff --git a/pkg/connector/dbmeta.go b/pkg/connector/dbmeta.go index c8b87a6..739ee5c 100644 --- a/pkg/connector/dbmeta.go +++ b/pkg/connector/dbmeta.go @@ -37,9 +37,10 @@ func (lc *LinkedInConnector) GetDBMetaTypes() database.MetaTypes { } type UserLoginMetadata struct { - Cookies *linkedingo.StringCookieJar `json:"cookies,omitempty"` - XLITrack string `json:"x_li_track,omitempty"` - XLIPageInstance string `json:"x_li_page_instance,omitempty"` + Cookies *linkedingo.StringCookieJar `json:"cookies,omitempty"` + XLITrack string `json:"x_li_track,omitempty"` + XLIPageInstance string `json:"x_li_page_instance,omitempty"` + ConversationsSyncToken string `json:"conversations_sync_token,omitempty"` } type MessageMetadata struct { diff --git a/pkg/connector/handlelinkedin.go b/pkg/connector/handlelinkedin.go index 0e76982..7fb3343 100644 --- a/pkg/connector/handlelinkedin.go +++ b/pkg/connector/handlelinkedin.go @@ -87,19 +87,13 @@ func (l *LinkedInClient) onDecoratedEvent(ctx context.Context, decoratedEvent *l } func (l *LinkedInClient) onRealtimeConversations(ctx context.Context, conv *linkedingo.Conversation) { - var convs []linkedingo.Conversation if conv != nil { - convs = []linkedingo.Conversation{*conv} + convs := []linkedingo.Conversation{*conv} + l.handleConversations(ctx, convs) } else { - convsRes, err := l.client.GetConversations(ctx) - if err != nil { - zerolog.Ctx(ctx).Err(err).Msg("failed to get conversations") - return - } - convs = convsRes.Elements + l.getConversationsBySyncToken(ctx) } - l.handleConversations(ctx, convs) } func (l *LinkedInClient) onRealtimeConversationDelete(ctx context.Context, conv linkedingo.Conversation) { diff --git a/pkg/connector/login.go b/pkg/connector/login.go index a1effc9..bec6314 100644 --- a/pkg/connector/login.go +++ b/pkg/connector/login.go @@ -129,7 +129,7 @@ func (c *CookieLogin) SubmitCookies(ctx context.Context, cookies map[string]stri pageInstance := cookies[CookieLoginXLIPageInstanceField] xLiTrack := cookies[CookieLoginXLITrackField] - loginClient := linkedingo.NewClient(ctx, linkedingo.NewURN(""), jar, pageInstance, xLiTrack, linkedingo.Handlers{}) + loginClient := linkedingo.NewClient(ctx, linkedingo.NewURN(""), jar, pageInstance, xLiTrack, "", linkedingo.Handlers{}) profile, err := loginClient.GetCurrentUserProfile(ctx) if err != nil { return nil, fmt.Errorf("failed to get current user profile: %w", err) diff --git a/pkg/linkedingo/client.go b/pkg/linkedingo/client.go index 0a0bc5a..73fc97d 100644 --- a/pkg/linkedingo/client.go +++ b/pkg/linkedingo/client.go @@ -54,10 +54,10 @@ type Client struct { xLITrack string serviceVersion string - syncToken string + conversationsSyncToken string } -func NewClient(ctx context.Context, userEntityURN URN, jar *StringCookieJar, pageInstance, xLiTrack string, handlers Handlers) *Client { +func NewClient(ctx context.Context, userEntityURN URN, jar *StringCookieJar, pageInstance, xLiTrack, conversationsSyncToken string, handlers Handlers) *Client { log := zerolog.Ctx(ctx) if xLiTrack == "" { log.Warn().Msg("x-li-track is empty, using default") @@ -87,13 +87,14 @@ func NewClient(ctx context.Context, userEntityURN URN, jar *StringCookieJar, pag } cli := &Client{ - userEntityURN: userEntityURN, - jar: jar, - pageInstance: pageInstance, - xLITrack: xLiTrack, - serviceVersion: serviceVersion, - realtimeSessionID: uuid.New(), - handlers: handlers, + userEntityURN: userEntityURN, + jar: jar, + pageInstance: pageInstance, + xLITrack: xLiTrack, + serviceVersion: serviceVersion, + realtimeSessionID: uuid.New(), + handlers: handlers, + conversationsSyncToken: conversationsSyncToken, } cli.http = &http.Client{ Transport: &http.Transport{ @@ -110,12 +111,13 @@ func (c *Client) IsLoggedIn() bool { } type Handlers struct { - Heartbeat func(context.Context) - ClientConnection func(context.Context, *ClientConnection) - TransientDisconnect func(context.Context, error) - BadCredentials func(context.Context, error) - UnknownError func(context.Context, error) - DecoratedEvent func(context.Context, *DecoratedEvent) + Heartbeat func(context.Context) + ClientConnection func(context.Context, *ClientConnection) + TransientDisconnect func(context.Context, error) + BadCredentials func(context.Context, error) + UnknownError func(context.Context, error) + DecoratedEvent func(context.Context, *DecoratedEvent) + ConversationsSyncToken func(context.Context, string) } func (h Handlers) onHeartbeat(ctx context.Context) { @@ -153,3 +155,9 @@ func (h Handlers) onDecoratedEvent(ctx context.Context, decoratedEvent *Decorate h.DecoratedEvent(ctx, decoratedEvent) } } + +func (h Handlers) onConversationsSyncToken(ctx context.Context, conversationsSyncToken string) { + if h.ConversationsSyncToken != nil { + h.ConversationsSyncToken(ctx, conversationsSyncToken) + } +} diff --git a/pkg/linkedingo/conversations.go b/pkg/linkedingo/conversations.go index 2521f7c..641c45e 100644 --- a/pkg/linkedingo/conversations.go +++ b/pkg/linkedingo/conversations.go @@ -99,9 +99,16 @@ type ConversationCursorMetadata struct { NextCursor string `json:"nextCursor,omitempty"` } -// ConversationSyncMetadata represents a com.linkedin.messenger.SyncMetadata object. +type DeletedURN struct { + Message any `json:"message"` + Conversation struct { + EntityURN URN `json:"entityUrn"` + } `json:"conversation"` +} + type ConversationSyncMetadata struct { - NewSyncToken string `json:"newSyncToken,omitempty"` + DeletedURNs []DeletedURN `json:"deletedUrns,omitempty"` + NewSyncToken string `json:"newSyncToken,omitempty"` } // MessageMetadata represents a com.linkedin.messenger.MessageMetadata object. @@ -173,14 +180,14 @@ func (c *Client) GetConversationsUpdatedBefore(ctx context.Context, updatedBefor return response.Data.MessengerConversationsByCategoryQuery, nil } -func (c *Client) GetConversations(ctx context.Context) (*CollectionResponse[ConversationSyncMetadata, Conversation], error) { +func (c *Client) GetConversationsBySyncToken(ctx context.Context) (*CollectionResponse[ConversationSyncMetadata, Conversation], error) { zerolog.Ctx(ctx).Info().Msg("Getting conversations") req := c.newAuthedRequest(http.MethodGet, linkedInVoyagerMessagingGraphQLURL) mailboxUrn := url.QueryEscape(c.userEntityURN.WithPrefix("urn", "li", "fsd_profile").String()) - if c.syncToken != "" { + if c.conversationsSyncToken != "" { req.WithGraphQLQuery(graphQLQueryIDMessengerConversationsWithSyncToken, map[string]string{ "mailboxUrn": mailboxUrn, - "syncToken": c.syncToken, + "syncToken": c.conversationsSyncToken, }) } else { req.WithGraphQLQuery(graphQLQueryIDMessengerConversations, map[string]string{ @@ -194,7 +201,9 @@ func (c *Client) GetConversations(ctx context.Context) (*CollectionResponse[Conv return nil, err } - c.syncToken = response.Data.MessengerConversationsBySyncToken.Metadata.NewSyncToken + newToken := response.Data.MessengerConversationsBySyncToken.Metadata.NewSyncToken + c.conversationsSyncToken = newToken + c.handlers.onConversationsSyncToken(ctx, newToken) return response.Data.MessengerConversationsBySyncToken, nil }