Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions pkg/connector/chatsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,31 @@ import (
)

func (l *LinkedInClient) deleteConversation(ctx context.Context, conv linkedingo.Conversation) {
l.deletePortal(ctx, l.makePortalKey(conv))
}

func (l *LinkedInClient) deletePortal(ctx context.Context, portalKey networkid.PortalKey) {
l.main.Bridge.QueueRemoteEvent(l.userLogin, &simplevent.ChatDelete{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatDelete,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.Stringer("entity_urn", conv.EntityURN)
},
PortalKey: l.makePortalKey(conv),
Type: bridgev2.RemoteEventChatDelete,
PortalKey: portalKey,
},
OnlyForMe: true,
})
}

func (l *LinkedInClient) deleteURN(ctx context.Context, urn linkedingo.URN) {
portalKey := networkid.PortalKey{
ID: networkid.PortalID(urn.String()),
Receiver: l.userLogin.ID,
}
l.deletePortal(ctx, portalKey)
if !l.main.Bridge.Config.SplitPortals {
portalKey.Receiver = ""
l.deletePortal(ctx, portalKey)
}
}

func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linkedingo.Conversation) {
log := zerolog.Ctx(ctx)

Expand Down Expand Up @@ -185,3 +198,17 @@ func (l *LinkedInClient) syncConversations(ctx context.Context) {
l.handleConversations(ctx, conversations.Elements)
}
}
func (l *LinkedInClient) getConversationsBySyncToken(ctx context.Context) {
convs, err := l.client.GetConversationsBySyncToken(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("failed to get conversations by sync token")
return
}
if convs == nil {
return
}
l.handleConversations(ctx, convs.Elements)
for _, item := range convs.Metadata.DeletedURNs {
l.deleteURN(ctx, item.Conversation.EntityURN)
}
}
10 changes: 10 additions & 0 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func NewLinkedInClient(ctx context.Context, lc *LinkedInConnector, login *bridge
cookies,
meta.XLIPageInstance,
meta.XLITrack,
meta.ConversationsSyncToken,
linkedingo.Handlers{
Heartbeat: func(ctx context.Context) {
if login.BridgeState.GetPrevUnsent().StateEvent != status.StateConnected {
Expand All @@ -103,6 +104,13 @@ func NewLinkedInClient(ctx context.Context, lc *LinkedInConnector, login *bridge
BadCredentials: client.onBadCredentials,
UnknownError: client.onUnknownError,
DecoratedEvent: client.onDecoratedEvent,
ConversationsSyncToken: func(ctx context.Context, syncToken string) {
meta.ConversationsSyncToken = syncToken
err := login.Save(ctx)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("Failed to save sync token")
}
},
},
)

Expand Down Expand Up @@ -148,6 +156,8 @@ func (l *LinkedInClient) Connect(ctx context.Context) {
}

l.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnecting})

l.getConversationsBySyncToken(ctx)
if err := l.client.RealtimeConnect(ctx); err != nil {
l.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateBadCredentials,
Expand Down
7 changes: 4 additions & 3 deletions pkg/connector/dbmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func (lc *LinkedInConnector) GetDBMetaTypes() database.MetaTypes {
}

type UserLoginMetadata struct {
Cookies *linkedingo.StringCookieJar `json:"cookies,omitempty"`
XLITrack string `json:"x_li_track,omitempty"`
XLIPageInstance string `json:"x_li_page_instance,omitempty"`
Cookies *linkedingo.StringCookieJar `json:"cookies,omitempty"`
XLITrack string `json:"x_li_track,omitempty"`
XLIPageInstance string `json:"x_li_page_instance,omitempty"`
ConversationsSyncToken string `json:"conversations_sync_token,omitempty"`
}

type MessageMetadata struct {
Expand Down
12 changes: 3 additions & 9 deletions pkg/connector/handlelinkedin.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,13 @@ func (l *LinkedInClient) onDecoratedEvent(ctx context.Context, decoratedEvent *l
}

func (l *LinkedInClient) onRealtimeConversations(ctx context.Context, conv *linkedingo.Conversation) {
var convs []linkedingo.Conversation
if conv != nil {
convs = []linkedingo.Conversation{*conv}
convs := []linkedingo.Conversation{*conv}
l.handleConversations(ctx, convs)
} else {
convsRes, err := l.client.GetConversations(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("failed to get conversations")
return
}
convs = convsRes.Elements
l.getConversationsBySyncToken(ctx)
}

l.handleConversations(ctx, convs)
}

func (l *LinkedInClient) onRealtimeConversationDelete(ctx context.Context, conv linkedingo.Conversation) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/login.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *CookieLogin) SubmitCookies(ctx context.Context, cookies map[string]stri
pageInstance := cookies[CookieLoginXLIPageInstanceField]
xLiTrack := cookies[CookieLoginXLITrackField]

loginClient := linkedingo.NewClient(ctx, linkedingo.NewURN(""), jar, pageInstance, xLiTrack, linkedingo.Handlers{})
loginClient := linkedingo.NewClient(ctx, linkedingo.NewURN(""), jar, pageInstance, xLiTrack, "", linkedingo.Handlers{})
profile, err := loginClient.GetCurrentUserProfile(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get current user profile: %w", err)
Expand Down
38 changes: 23 additions & 15 deletions pkg/linkedingo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ type Client struct {
xLITrack string
serviceVersion string

syncToken string
conversationsSyncToken string
}

func NewClient(ctx context.Context, userEntityURN URN, jar *StringCookieJar, pageInstance, xLiTrack string, handlers Handlers) *Client {
func NewClient(ctx context.Context, userEntityURN URN, jar *StringCookieJar, pageInstance, xLiTrack, conversationsSyncToken string, handlers Handlers) *Client {
log := zerolog.Ctx(ctx)
if xLiTrack == "" {
log.Warn().Msg("x-li-track is empty, using default")
Expand Down Expand Up @@ -87,13 +87,14 @@ func NewClient(ctx context.Context, userEntityURN URN, jar *StringCookieJar, pag
}

cli := &Client{
userEntityURN: userEntityURN,
jar: jar,
pageInstance: pageInstance,
xLITrack: xLiTrack,
serviceVersion: serviceVersion,
realtimeSessionID: uuid.New(),
handlers: handlers,
userEntityURN: userEntityURN,
jar: jar,
pageInstance: pageInstance,
xLITrack: xLiTrack,
serviceVersion: serviceVersion,
realtimeSessionID: uuid.New(),
handlers: handlers,
conversationsSyncToken: conversationsSyncToken,
}
cli.http = &http.Client{
Transport: &http.Transport{
Expand All @@ -110,12 +111,13 @@ func (c *Client) IsLoggedIn() bool {
}

type Handlers struct {
Heartbeat func(context.Context)
ClientConnection func(context.Context, *ClientConnection)
TransientDisconnect func(context.Context, error)
BadCredentials func(context.Context, error)
UnknownError func(context.Context, error)
DecoratedEvent func(context.Context, *DecoratedEvent)
Heartbeat func(context.Context)
ClientConnection func(context.Context, *ClientConnection)
TransientDisconnect func(context.Context, error)
BadCredentials func(context.Context, error)
UnknownError func(context.Context, error)
DecoratedEvent func(context.Context, *DecoratedEvent)
ConversationsSyncToken func(context.Context, string)
}

func (h Handlers) onHeartbeat(ctx context.Context) {
Expand Down Expand Up @@ -153,3 +155,9 @@ func (h Handlers) onDecoratedEvent(ctx context.Context, decoratedEvent *Decorate
h.DecoratedEvent(ctx, decoratedEvent)
}
}

func (h Handlers) onConversationsSyncToken(ctx context.Context, conversationsSyncToken string) {
if h.ConversationsSyncToken != nil {
h.ConversationsSyncToken(ctx, conversationsSyncToken)
}
}
21 changes: 15 additions & 6 deletions pkg/linkedingo/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,16 @@ type ConversationCursorMetadata struct {
NextCursor string `json:"nextCursor,omitempty"`
}

// ConversationSyncMetadata represents a com.linkedin.messenger.SyncMetadata object.
type DeletedURN struct {
Message any `json:"message"`
Conversation struct {
EntityURN URN `json:"entityUrn"`
} `json:"conversation"`
}

type ConversationSyncMetadata struct {
NewSyncToken string `json:"newSyncToken,omitempty"`
DeletedURNs []DeletedURN `json:"deletedUrns,omitempty"`
NewSyncToken string `json:"newSyncToken,omitempty"`
}

// MessageMetadata represents a com.linkedin.messenger.MessageMetadata object.
Expand Down Expand Up @@ -173,14 +180,14 @@ func (c *Client) GetConversationsUpdatedBefore(ctx context.Context, updatedBefor
return response.Data.MessengerConversationsByCategoryQuery, nil
}

func (c *Client) GetConversations(ctx context.Context) (*CollectionResponse[ConversationSyncMetadata, Conversation], error) {
func (c *Client) GetConversationsBySyncToken(ctx context.Context) (*CollectionResponse[ConversationSyncMetadata, Conversation], error) {
zerolog.Ctx(ctx).Info().Msg("Getting conversations")
req := c.newAuthedRequest(http.MethodGet, linkedInVoyagerMessagingGraphQLURL)
mailboxUrn := url.QueryEscape(c.userEntityURN.WithPrefix("urn", "li", "fsd_profile").String())
if c.syncToken != "" {
if c.conversationsSyncToken != "" {
req.WithGraphQLQuery(graphQLQueryIDMessengerConversationsWithSyncToken, map[string]string{
"mailboxUrn": mailboxUrn,
"syncToken": c.syncToken,
"syncToken": c.conversationsSyncToken,
})
} else {
req.WithGraphQLQuery(graphQLQueryIDMessengerConversations, map[string]string{
Expand All @@ -194,7 +201,9 @@ func (c *Client) GetConversations(ctx context.Context) (*CollectionResponse[Conv
return nil, err
}

c.syncToken = response.Data.MessengerConversationsBySyncToken.Metadata.NewSyncToken
newToken := response.Data.MessengerConversationsBySyncToken.Metadata.NewSyncToken
c.conversationsSyncToken = newToken
c.handlers.onConversationsSyncToken(ctx, newToken)
return response.Data.MessengerConversationsBySyncToken, nil
}

Expand Down