diff --git a/cmd/yggmail/main.go b/cmd/yggmail/main.go index 03657c2..a8f0c86 100644 --- a/cmd/yggmail/main.go +++ b/cmd/yggmail/main.go @@ -126,7 +126,7 @@ func main() { panic(err) } - queues := smtpsender.NewQueues(cfg, log, transport) + queues := smtpsender.NewQueues(cfg, log, transport, storage) go func() { defer wg.Done() diff --git a/internal/smtpsender/fifo.go b/internal/smtpsender/fifo.go deleted file mode 100644 index 82ab0d7..0000000 --- a/internal/smtpsender/fifo.go +++ /dev/null @@ -1,58 +0,0 @@ -package smtpsender - -import ( - "sync" -) - -type fifoQueue struct { - frames []interface{} - count int - mutex sync.Mutex - notifs chan struct{} -} - -func newFIFOQueue() *fifoQueue { - q := &fifoQueue{ - notifs: make(chan struct{}), - } - return q -} - -func (q *fifoQueue) push(frame interface{}) bool { - q.mutex.Lock() - defer q.mutex.Unlock() - q.frames = append(q.frames, frame) - q.count++ - select { - case q.notifs <- struct{}{}: - default: - } - return true -} - -func (q *fifoQueue) pop() (interface{}, bool) { - q.mutex.Lock() - defer q.mutex.Unlock() - if q.count == 0 { - return nil, false - } - frame := q.frames[0] - q.frames[0] = nil - q.frames = q.frames[1:] - q.count-- - if q.count == 0 { - q.frames = nil - } - return frame, true -} - -func (q *fifoQueue) wait() <-chan struct{} { - q.mutex.Lock() - defer q.mutex.Unlock() - if q.count > 0 { - ch := make(chan struct{}) - close(ch) - return ch - } - return q.notifs -} diff --git a/internal/smtpsender/sender.go b/internal/smtpsender/sender.go index 4de9bc3..a8f355b 100644 --- a/internal/smtpsender/sender.go +++ b/internal/smtpsender/sender.go @@ -4,13 +4,15 @@ import ( "encoding/hex" "fmt" "log" - "math" + "net/mail" "sync" "time" "github.com/emersion/go-smtp" "github.com/neilalexander/yggmail/internal/config" + "github.com/neilalexander/yggmail/internal/storage" "github.com/neilalexander/yggmail/internal/transport" + "github.com/neilalexander/yggmail/internal/utils" "go.uber.org/atomic" ) @@ -18,22 +20,63 @@ type Queues struct { Config *config.Config Log *log.Logger Transport transport.Transport + Storage storage.Storage queues sync.Map // servername -> *Queue } -func NewQueues(config *config.Config, log *log.Logger, transport transport.Transport) *Queues { - return &Queues{ +func NewQueues(config *config.Config, log *log.Logger, transport transport.Transport, storage storage.Storage) *Queues { + qs := &Queues{ Config: config, Log: log, Transport: transport, + Storage: storage, } + go qs.manager() + return qs } -func (qs *Queues) QueueFor(server string) (*Queue, error) { +func (qs *Queues) manager() { + destinations, err := qs.Storage.QueueListDestinations() + if err != nil { + return + } + for _, destination := range destinations { + _, _ = qs.queueFor(destination) + } + time.AfterFunc(time.Minute*5, qs.manager) +} + +func (qs *Queues) QueueFor(from string, rcpts []string, content []byte) error { + pid, err := qs.Storage.MailCreate("Outbox", content) + if err != nil { + return fmt.Errorf("q.queues.Storage.MailCreate: %w", err) + } + + for _, rcpt := range rcpts { + addr, err := mail.ParseAddress(rcpt) + if err != nil { + return fmt.Errorf("mail.ParseAddress: %w", err) + } + pk, err := utils.ParseAddress(addr.Address) + if err != nil { + return fmt.Errorf("parseAddress: %w", err) + } + host := hex.EncodeToString(pk) + + if err := qs.Storage.QueueInsertDestinationForID(host, pid, from, rcpt); err != nil { + return fmt.Errorf("qs.Storage.QueueInsertDestinationForID: %w", err) + } + + _, _ = qs.queueFor(host) + } + + return nil +} + +func (qs *Queues) queueFor(server string) (*Queue, error) { v, _ := qs.queues.LoadOrStore(server, &Queue{ queues: qs, destination: server, - fifo: newFIFOQueue(), }) q, ok := v.(*Queue) if !ok { @@ -49,40 +92,32 @@ type Queue struct { queues *Queues destination string running atomic.Bool - backoff atomic.Int64 - fifo *fifoQueue -} - -func (q *Queue) Queue(mail *QueuedMail) error { - q.fifo.push(mail) - if q.running.CAS(false, true) { - go q.run() - } - return nil } func (q *Queue) run() { defer q.running.Store(false) - for { - select { - case <-q.fifo.wait(): - case <-time.After(time.Second * 10): - return + defer q.queues.Storage.MailExpunge("Outbox") // nolint:errcheck + + refs, err := q.queues.Storage.QueueMailIDsForDestination(q.destination) + if err != nil { + q.queues.Log.Println("Error with queue:", err) + } + + q.queues.Log.Println("There are", len(refs), "mail(s) queued for", q.destination) + + for _, ref := range refs { + _, mail, err := q.queues.Storage.MailSelect("Outbox", ref.ID) + if err != nil { + q.queues.Log.Println("Failed to get mail", ref.ID, "due to error:", err) + continue } - item, ok := q.fifo.pop() - if !ok { - continue - } - mail, ok := item.(*QueuedMail) - if !ok { - continue - } - q.queues.Log.Println("Processing mail from", mail.From, "to", mail.Destination) + q.queues.Log.Println("Sending mail from", ref.From, "to", q.destination) if err := func() error { conn, err := q.queues.Transport.Dial(q.destination) if err != nil { + q.queues.Log.Println("Failed to dial destination", q.destination, "due to error:", err) return fmt.Errorf("q.queues.Transport.Dial: %w", err) } defer conn.Close() @@ -98,14 +133,12 @@ func (q *Queue) run() { return fmt.Errorf("client.Hello: %w", err) } - q.backoff.Store(0) - - if err := client.Mail(mail.From, nil); err != nil { + if err := client.Mail(ref.From, nil); err != nil { q.queues.Log.Println("Remote server", q.destination, "did not accept MAIL:", err) return fmt.Errorf("client.Mail: %w", err) } - if err := client.Rcpt(mail.Rcpt); err != nil { + if err := client.Rcpt(ref.Rcpt); err != nil { q.queues.Log.Println("Remote server", q.destination, "did not accept RCPT:", err) return fmt.Errorf("client.Rcpt: %w", err) } @@ -116,24 +149,26 @@ func (q *Queue) run() { } defer writer.Close() - if _, err := writer.Write(mail.Content); err != nil { + if _, err := writer.Write(mail.Mail); err != nil { return fmt.Errorf("writer.Write: %w", err) } + if err := q.queues.Storage.QueueDeleteDestinationForID(q.destination, ref.ID); err != nil { + return fmt.Errorf("q.queues.Storage.QueueDeleteDestinationForID: %w", err) + } + + if remaining, err := q.queues.Storage.QueueSelectIsMessagePendingSend("Outbox", ref.ID); err != nil { + return fmt.Errorf("q.queues.Storage.QueueSelectIsMessagePendingSend: %w", err) + } else if !remaining { + return q.queues.Storage.MailDelete("Outbox", ref.ID) + } + return nil }(); err != nil { - retry := time.Second * time.Duration(math.Exp2(float64(q.backoff.Inc()))) - q.queues.Log.Println("Queue error:", err, "- will retry in", retry) - time.Sleep(retry) + q.queues.Log.Println("Failed to send message:", err, "- will retry") + // TODO: Send a mail to the inbox on the first instance? } else { - q.queues.Log.Println("Sent mail from", mail.From, "to", mail.Destination) + q.queues.Log.Println("Sent mail from", ref.From, "to", q.destination) } } } - -type QueuedMail struct { - From string // mail address - Rcpt string // mail addresses - Destination string // server name - Content []byte -} diff --git a/internal/smtpserver/session_local.go b/internal/smtpserver/session_local.go index 8250fc3..18b34f0 100644 --- a/internal/smtpserver/session_local.go +++ b/internal/smtpserver/session_local.go @@ -9,7 +9,6 @@ import ( "github.com/emersion/go-message" "github.com/emersion/go-smtp" - "github.com/neilalexander/yggmail/internal/smtpsender" "github.com/neilalexander/yggmail/internal/utils" ) @@ -55,55 +54,17 @@ func (s *SessionLocal) Data(r io.Reader) error { ), ) - servers := make(map[string]struct{}) - - for _, rcpt := range s.rcpt { - pk, err := utils.ParseAddress(rcpt) - if err != nil { - return fmt.Errorf("parseAddress: %w", err) - } - host := hex.EncodeToString(pk) - - if _, ok := servers[host]; ok { - continue - } - servers[host] = struct{}{} - - if pk.Equal(s.backend.Config.PublicKey) { - var b bytes.Buffer - if err := m.WriteTo(&b); err != nil { - return fmt.Errorf("m.WriteTo: %w", err) - } - if _, err := s.backend.Storage.MailCreate("INBOX", b.Bytes()); err != nil { - return fmt.Errorf("s.backend.Storage.StoreMessageFor: %w", err) - } - continue - } - - queue, err := s.backend.Queues.QueueFor(host) - if err != nil { - return fmt.Errorf("s.backend.Queues.QueueFor: %w", err) - } - - mail := &smtpsender.QueuedMail{ - From: s.from, - Rcpt: rcpt, - Destination: host, - } - - var b bytes.Buffer - if err := m.WriteTo(&b); err != nil { - return fmt.Errorf("m.WriteTo: %w", err) - } - mail.Content = b.Bytes() - - if err := queue.Queue(mail); err != nil { - return fmt.Errorf("queue.Queue: %w", err) - } - - s.backend.Log.Println("Queued mail for", mail.Destination) + var b bytes.Buffer + if err := m.WriteTo(&b); err != nil { + return fmt.Errorf("m.WriteTo: %w", err) } + if err := s.backend.Queues.QueueFor(s.from, s.rcpt, b.Bytes()); err != nil { + return fmt.Errorf("s.backend.Queues.QueueFor: %w", err) + } + + s.backend.Log.Println("Queued mail for", s.rcpt) + return nil } diff --git a/internal/storage/sqlite3/sqlite3.go b/internal/storage/sqlite3/sqlite3.go index 5a3769d..b7f045d 100644 --- a/internal/storage/sqlite3/sqlite3.go +++ b/internal/storage/sqlite3/sqlite3.go @@ -11,6 +11,7 @@ type SQLite3Storage struct { *TableConfig *TableMailboxes *TableMails + *TableQueue } func NewSQLite3StorageStorage(filename string) (*SQLite3Storage, error) { @@ -31,5 +32,9 @@ func NewSQLite3StorageStorage(filename string) (*SQLite3Storage, error) { if err != nil { return nil, fmt.Errorf("NewTableMails: %w", err) } + s.TableQueue, err = NewTableQueue(db) + if err != nil { + return nil, fmt.Errorf("NewTableQueue: %w", err) + } return s, nil } diff --git a/internal/storage/sqlite3/table_mails.go b/internal/storage/sqlite3/table_mails.go index 5549cef..edef371 100644 --- a/internal/storage/sqlite3/table_mails.go +++ b/internal/storage/sqlite3/table_mails.go @@ -163,11 +163,13 @@ func (t *TableMails) MailCreate(mailbox string, data []byte) (int, error) { func (t *TableMails) MailSelect(mailbox string, id int) (int, *types.Mail, error) { var seq int + var datetime int64 mail := &types.Mail{} err := t.selectMail.QueryRow(mailbox, id).Scan( - &seq, &mail.ID, &mail.Mail, &mail.Date, + &seq, &mail.ID, &mail.Mail, &datetime, &mail.Seen, &mail.Answered, &mail.Flagged, &mail.Deleted, ) + mail.Date = time.Unix(datetime, 0) return seq, mail, err } @@ -211,7 +213,7 @@ func (t *TableMails) MailUpdateFlags(mailbox string, id int, seen, answered, fla return err } -func (t *TableMails) MailDelete(mailbox, id string) error { +func (t *TableMails) MailDelete(mailbox string, id int) error { _, err := t.deleteMail.Exec(mailbox, id) return err } diff --git a/internal/storage/sqlite3/table_queue.go b/internal/storage/sqlite3/table_queue.go new file mode 100644 index 0000000..518bdf0 --- /dev/null +++ b/internal/storage/sqlite3/table_queue.go @@ -0,0 +1,150 @@ +package sqlite3 + +import ( + "database/sql" + "fmt" + + "github.com/neilalexander/yggmail/internal/storage/types" +) + +type TableQueue struct { + db *sql.DB + queueSelectDestinations *sql.Stmt + queueSelectIDsForDestination *sql.Stmt + queueInsertDestinationForID *sql.Stmt + queueDeleteIDForDestination *sql.Stmt + queueSelectIsMessagePendingSend *sql.Stmt +} + +const queueSchema = ` + CREATE TABLE IF NOT EXISTS queue ( + destination TEXT NOT NULL, + mailbox TEXT NOT NULL, + id INTEGER NOT NULL, + mail TEXT NOT NULL, + rcpt TEXT NOT NULL, + PRIMARY KEY (destination, mailbox, id), + FOREIGN KEY (mailbox, id) REFERENCES mails(mailbox, id) ON DELETE CASCADE ON UPDATE CASCADE + ); +` + +const queueSelectDestinationsStmt = ` + SELECT DISTINCT destination FROM queue +` + +const queueSelectIDsForDestinationStmt = ` + SELECT id, mail, rcpt FROM queue WHERE destination = $1 + ORDER BY id DESC +` + +const queueInsertDestinationForIDStmt = ` + INSERT INTO queue (destination, mailbox, id, mail, rcpt) VALUES($1, $2, $3, $4, $5) +` + +const deleteDestinationForIDStmt = ` + DELETE FROM queue WHERE destination = $1 AND mailbox = $2 AND id = $3 +` + +const queueSelectIsMessagePendingSendStmt = ` + SELECT COUNT(*) FROM queue WHERE mailbox = $1 AND id = $2 +` + +func NewTableQueue(db *sql.DB) (*TableQueue, error) { + t := &TableQueue{ + db: db, + } + _, err := db.Exec(queueSchema) + if err != nil { + return nil, fmt.Errorf("db.Exec: %w", err) + } + t.queueSelectDestinations, err = db.Prepare(queueSelectDestinationsStmt) + if err != nil { + return nil, fmt.Errorf("db.Prepare(queueSelectDestinationsStmt): %w", err) + } + t.queueSelectIDsForDestination, err = db.Prepare(queueSelectIDsForDestinationStmt) + if err != nil { + return nil, fmt.Errorf("db.Prepare(queueSelectIDsForDestinationStmt): %w", err) + } + t.queueInsertDestinationForID, err = db.Prepare(queueInsertDestinationForIDStmt) + if err != nil { + return nil, fmt.Errorf("db.Prepare(queueInsertDestinationForIDStmt): %w", err) + } + t.queueDeleteIDForDestination, err = db.Prepare(deleteDestinationForIDStmt) + if err != nil { + return nil, fmt.Errorf("db.Prepare(deleteDestinationForIDStmt): %w", err) + } + t.queueSelectIsMessagePendingSend, err = db.Prepare(queueSelectIsMessagePendingSendStmt) + if err != nil { + return nil, fmt.Errorf("db.Prepare(queueSelectIsMessagePendingSendStmt): %w", err) + } + return t, nil +} + +func (t *TableQueue) QueueListDestinations() ([]string, error) { + rows, err := t.queueSelectDestinations.Query() + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, fmt.Errorf("t.queueSelectDestinations.Query: %w", err) + } + defer rows.Close() + var destinations []string + for rows.Next() { + var destination string + if err := rows.Scan(&destination); err != nil { + return nil, fmt.Errorf("rows.Scan: %w", err) + } + destinations = append(destinations, destination) + } + return destinations, nil +} + +func (t *TableQueue) QueueMailIDsForDestination(destination string) ([]types.QueuedMail, error) { + rows, err := t.queueSelectIDsForDestination.Query(destination) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, fmt.Errorf("t.queueSelectDestinations.Query: %w", err) + } + defer rows.Close() + var ids []types.QueuedMail + for rows.Next() { + var id int + var from, rcpt string + if err := rows.Scan(&id, &from, &rcpt); err != nil { + return nil, fmt.Errorf("rows.Scan: %w", err) + } + ids = append(ids, types.QueuedMail{ + ID: id, + From: from, + Rcpt: rcpt, + }) + } + return ids, nil +} + +func (t *TableQueue) QueueInsertDestinationForID(destination string, id int, from, rcpt string) error { + _, err := t.queueInsertDestinationForID.Exec(destination, "Outbox", id, from, rcpt) + return err +} + +func (t *TableQueue) QueueDeleteDestinationForID(destination string, id int) error { + _, err := t.queueDeleteIDForDestination.Exec(destination, "Outbox", id) + return err +} + +func (t *TableQueue) QueueSelectIsMessagePendingSend(mailbox string, id int) (bool, error) { + row := t.queueSelectIsMessagePendingSend.QueryRow(mailbox, id) + if err := row.Err(); err != nil && err != sql.ErrNoRows { + return false, fmt.Errorf("row.Err: %w", err) + } else if err == sql.ErrNoRows { + return false, nil + } + var count int + if err := row.Scan(&count); err != nil { + return false, fmt.Errorf("row.Scan: %w", err) + } + return count > 0, nil +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 24c6298..f8094eb 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -22,7 +22,13 @@ type Storage interface { MailSelect(mailbox string, id int) (int, *types.Mail, error) MailSearch(mailbox string) ([]uint32, error) MailUpdateFlags(mailbox string, id int, seen, answered, flagged, deleted bool) error - MailDelete(mailbox, id string) error + MailDelete(mailbox string, id int) error MailExpunge(mailbox string) error MailCount(mailbox string) (int, error) + + QueueListDestinations() ([]string, error) + QueueMailIDsForDestination(destination string) ([]types.QueuedMail, error) + QueueInsertDestinationForID(destination string, id int, from, rcpt string) error + QueueDeleteDestinationForID(destination string, id int) error + QueueSelectIsMessagePendingSend(mailbox string, id int) (bool, error) } diff --git a/internal/storage/types/types.go b/internal/storage/types/types.go index 97490d2..f2e4c32 100644 --- a/internal/storage/types/types.go +++ b/internal/storage/types/types.go @@ -12,3 +12,9 @@ type Mail struct { Flagged bool Deleted bool } + +type QueuedMail struct { + ID int + From string + Rcpt string +}