From 89164ae00632083a61a44ac51df70435e49e9901 Mon Sep 17 00:00:00 2001 From: Lukasz Janyst Date: Wed, 9 Nov 2022 10:09:46 +0100 Subject: [PATCH] imap: Fix a race when sending updates to the clients --- pkg/imap/backend.go | 4 ++-- pkg/imap/updates.go | 52 ++++++++++++++++++++++++++++++++------------- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/pkg/imap/backend.go b/pkg/imap/backend.go index 45e7faa..b21493e 100644 --- a/pkg/imap/backend.go +++ b/pkg/imap/backend.go @@ -165,7 +165,7 @@ func (ib *imapBackend) Login(_ *imap.ConnInfo, username, password string) (goIMA } if err := imapUser.user.CheckCredentials(slot, password); err != nil { - log.WithError(err).Error("Could not check bridge password") + log.WithError(err).Errorf("Could not check bridge password: %s %s", username, slot) if err := imapUser.Logout(); err != nil { log.WithError(err).Warn("Could not logout user after unsuccessful login check") } @@ -188,7 +188,7 @@ func (ib *imapBackend) Login(_ *imap.ConnInfo, username, password string) (goIMA // Updates returns a channel of updates for IMAP IDLE extension. func (ib *imapBackend) Updates() <-chan goIMAPBackend.Update { - return ib.updates.ch + return ib.updates.chout } func (ib *imapBackend) CreateMessageLimit() *uint32 { diff --git a/pkg/imap/updates.go b/pkg/imap/updates.go index 8c24e31..d286758 100644 --- a/pkg/imap/updates.go +++ b/pkg/imap/updates.go @@ -22,11 +22,11 @@ import ( "sync" "time" - "github.com/ljanyst/peroxide/pkg/store" - "github.com/ljanyst/peroxide/pkg/message" - "github.com/ljanyst/peroxide/pkg/pmapi" imap "github.com/emersion/go-imap" goIMAPBackend "github.com/emersion/go-imap/backend" + "github.com/ljanyst/peroxide/pkg/message" + "github.com/ljanyst/peroxide/pkg/pmapi" + "github.com/ljanyst/peroxide/pkg/store" "github.com/sirupsen/logrus" ) @@ -37,20 +37,46 @@ const ( operationDeleteMessage operation = "expunge" ) +type updateHelper struct { + data goIMAPBackend.Update + expiration time.Time +} + type imapUpdates struct { lock sync.Locker blocking map[string]bool delayedExpunges map[string][]chan struct{} - ch chan goIMAPBackend.Update + chout chan goIMAPBackend.Update + chin chan updateHelper } func newIMAPUpdates() *imapUpdates { - return &imapUpdates{ + iu := &imapUpdates{ lock: &sync.Mutex{}, blocking: map[string]bool{}, delayedExpunges: map[string][]chan struct{}{}, - ch: make(chan goIMAPBackend.Update), + chout: make(chan goIMAPBackend.Update), + chin: make(chan updateHelper, 1000), } + + go func() { + for { + upd := <-iu.chin + + if time.Now().After(upd.expiration) { + log.Warn("IMAP update could not be sent (timeout)") + continue + } + + select { + case iu.chout <- upd.data: + case <-time.After(1 * time.Second): + log.Warn("IMAP update could not be sent (timeout)") + } + } + }() + + return iu } func (iu *imapUpdates) block(address, mailboxName string, op operation) { @@ -192,20 +218,16 @@ func (iu *imapUpdates) MailboxStatus(address, mailboxName string, total, unread, } func (iu *imapUpdates) sendIMAPUpdate(update goIMAPBackend.Update, isBlocking bool) { - if iu.ch == nil { + if iu.chout == nil { log.Trace("IMAP IDLE unavailable") return } done := update.Done() - go func() { - select { - case <-time.After(1 * time.Second): - log.Warn("IMAP update could not be sent (timeout)") - return - case iu.ch <- update: - } - }() + iu.chin <- updateHelper{ + data: update, + expiration: time.Now().Add(1 * time.Second), + } if !isBlocking { return