Queue messages for re-send

This commit is contained in:
Neil Alexander
2021-07-09 23:43:09 +01:00
parent a0c2c595f0
commit 5838d89581
9 changed files with 262 additions and 155 deletions

View File

@@ -126,7 +126,7 @@ func main() {
panic(err)
}
queues := smtpsender.NewQueues(cfg, log, transport)
queues := smtpsender.NewQueues(cfg, log, transport, storage)
go func() {
defer wg.Done()

View File

@@ -1,58 +0,0 @@
package smtpsender
import (
"sync"
)
type fifoQueue struct {
frames []interface{}
count int
mutex sync.Mutex
notifs chan struct{}
}
func newFIFOQueue() *fifoQueue {
q := &fifoQueue{
notifs: make(chan struct{}),
}
return q
}
func (q *fifoQueue) push(frame interface{}) bool {
q.mutex.Lock()
defer q.mutex.Unlock()
q.frames = append(q.frames, frame)
q.count++
select {
case q.notifs <- struct{}{}:
default:
}
return true
}
func (q *fifoQueue) pop() (interface{}, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
return nil, false
}
frame := q.frames[0]
q.frames[0] = nil
q.frames = q.frames[1:]
q.count--
if q.count == 0 {
q.frames = nil
}
return frame, true
}
func (q *fifoQueue) wait() <-chan struct{} {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count > 0 {
ch := make(chan struct{})
close(ch)
return ch
}
return q.notifs
}

View File

@@ -4,13 +4,15 @@ import (
"encoding/hex"
"fmt"
"log"
"math"
"net/mail"
"sync"
"time"
"github.com/emersion/go-smtp"
"github.com/neilalexander/yggmail/internal/config"
"github.com/neilalexander/yggmail/internal/storage"
"github.com/neilalexander/yggmail/internal/transport"
"github.com/neilalexander/yggmail/internal/utils"
"go.uber.org/atomic"
)
@@ -18,22 +20,63 @@ type Queues struct {
Config *config.Config
Log *log.Logger
Transport transport.Transport
Storage storage.Storage
queues sync.Map // servername -> *Queue
}
func NewQueues(config *config.Config, log *log.Logger, transport transport.Transport) *Queues {
return &Queues{
func NewQueues(config *config.Config, log *log.Logger, transport transport.Transport, storage storage.Storage) *Queues {
qs := &Queues{
Config: config,
Log: log,
Transport: transport,
Storage: storage,
}
go qs.manager()
return qs
}
func (qs *Queues) QueueFor(server string) (*Queue, error) {
func (qs *Queues) manager() {
destinations, err := qs.Storage.QueueListDestinations()
if err != nil {
return
}
for _, destination := range destinations {
_, _ = qs.queueFor(destination)
}
time.AfterFunc(time.Minute*5, qs.manager)
}
func (qs *Queues) QueueFor(from string, rcpts []string, content []byte) error {
pid, err := qs.Storage.MailCreate("Outbox", content)
if err != nil {
return fmt.Errorf("q.queues.Storage.MailCreate: %w", err)
}
for _, rcpt := range rcpts {
addr, err := mail.ParseAddress(rcpt)
if err != nil {
return fmt.Errorf("mail.ParseAddress: %w", err)
}
pk, err := utils.ParseAddress(addr.Address)
if err != nil {
return fmt.Errorf("parseAddress: %w", err)
}
host := hex.EncodeToString(pk)
if err := qs.Storage.QueueInsertDestinationForID(host, pid, from, rcpt); err != nil {
return fmt.Errorf("qs.Storage.QueueInsertDestinationForID: %w", err)
}
_, _ = qs.queueFor(host)
}
return nil
}
func (qs *Queues) queueFor(server string) (*Queue, error) {
v, _ := qs.queues.LoadOrStore(server, &Queue{
queues: qs,
destination: server,
fifo: newFIFOQueue(),
})
q, ok := v.(*Queue)
if !ok {
@@ -49,40 +92,32 @@ type Queue struct {
queues *Queues
destination string
running atomic.Bool
backoff atomic.Int64
fifo *fifoQueue
}
func (q *Queue) Queue(mail *QueuedMail) error {
q.fifo.push(mail)
if q.running.CAS(false, true) {
go q.run()
}
return nil
}
func (q *Queue) run() {
defer q.running.Store(false)
for {
select {
case <-q.fifo.wait():
case <-time.After(time.Second * 10):
return
defer q.queues.Storage.MailExpunge("Outbox") // nolint:errcheck
refs, err := q.queues.Storage.QueueMailIDsForDestination(q.destination)
if err != nil {
q.queues.Log.Println("Error with queue:", err)
}
q.queues.Log.Println("There are", len(refs), "mail(s) queued for", q.destination)
for _, ref := range refs {
_, mail, err := q.queues.Storage.MailSelect("Outbox", ref.ID)
if err != nil {
q.queues.Log.Println("Failed to get mail", ref.ID, "due to error:", err)
continue
}
item, ok := q.fifo.pop()
if !ok {
continue
}
mail, ok := item.(*QueuedMail)
if !ok {
continue
}
q.queues.Log.Println("Processing mail from", mail.From, "to", mail.Destination)
q.queues.Log.Println("Sending mail from", ref.From, "to", q.destination)
if err := func() error {
conn, err := q.queues.Transport.Dial(q.destination)
if err != nil {
q.queues.Log.Println("Failed to dial destination", q.destination, "due to error:", err)
return fmt.Errorf("q.queues.Transport.Dial: %w", err)
}
defer conn.Close()
@@ -98,14 +133,12 @@ func (q *Queue) run() {
return fmt.Errorf("client.Hello: %w", err)
}
q.backoff.Store(0)
if err := client.Mail(mail.From, nil); err != nil {
if err := client.Mail(ref.From, nil); err != nil {
q.queues.Log.Println("Remote server", q.destination, "did not accept MAIL:", err)
return fmt.Errorf("client.Mail: %w", err)
}
if err := client.Rcpt(mail.Rcpt); err != nil {
if err := client.Rcpt(ref.Rcpt); err != nil {
q.queues.Log.Println("Remote server", q.destination, "did not accept RCPT:", err)
return fmt.Errorf("client.Rcpt: %w", err)
}
@@ -116,24 +149,26 @@ func (q *Queue) run() {
}
defer writer.Close()
if _, err := writer.Write(mail.Content); err != nil {
if _, err := writer.Write(mail.Mail); err != nil {
return fmt.Errorf("writer.Write: %w", err)
}
if err := q.queues.Storage.QueueDeleteDestinationForID(q.destination, ref.ID); err != nil {
return fmt.Errorf("q.queues.Storage.QueueDeleteDestinationForID: %w", err)
}
if remaining, err := q.queues.Storage.QueueSelectIsMessagePendingSend("Outbox", ref.ID); err != nil {
return fmt.Errorf("q.queues.Storage.QueueSelectIsMessagePendingSend: %w", err)
} else if !remaining {
return q.queues.Storage.MailDelete("Outbox", ref.ID)
}
return nil
}(); err != nil {
retry := time.Second * time.Duration(math.Exp2(float64(q.backoff.Inc())))
q.queues.Log.Println("Queue error:", err, "- will retry in", retry)
time.Sleep(retry)
q.queues.Log.Println("Failed to send message:", err, "- will retry")
// TODO: Send a mail to the inbox on the first instance?
} else {
q.queues.Log.Println("Sent mail from", mail.From, "to", mail.Destination)
q.queues.Log.Println("Sent mail from", ref.From, "to", q.destination)
}
}
}
type QueuedMail struct {
From string // mail address
Rcpt string // mail addresses
Destination string // server name
Content []byte
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/emersion/go-message"
"github.com/emersion/go-smtp"
"github.com/neilalexander/yggmail/internal/smtpsender"
"github.com/neilalexander/yggmail/internal/utils"
)
@@ -55,55 +54,17 @@ func (s *SessionLocal) Data(r io.Reader) error {
),
)
servers := make(map[string]struct{})
for _, rcpt := range s.rcpt {
pk, err := utils.ParseAddress(rcpt)
if err != nil {
return fmt.Errorf("parseAddress: %w", err)
}
host := hex.EncodeToString(pk)
if _, ok := servers[host]; ok {
continue
}
servers[host] = struct{}{}
if pk.Equal(s.backend.Config.PublicKey) {
var b bytes.Buffer
if err := m.WriteTo(&b); err != nil {
return fmt.Errorf("m.WriteTo: %w", err)
}
if _, err := s.backend.Storage.MailCreate("INBOX", b.Bytes()); err != nil {
return fmt.Errorf("s.backend.Storage.StoreMessageFor: %w", err)
}
continue
}
queue, err := s.backend.Queues.QueueFor(host)
if err != nil {
return fmt.Errorf("s.backend.Queues.QueueFor: %w", err)
}
mail := &smtpsender.QueuedMail{
From: s.from,
Rcpt: rcpt,
Destination: host,
}
var b bytes.Buffer
if err := m.WriteTo(&b); err != nil {
return fmt.Errorf("m.WriteTo: %w", err)
}
mail.Content = b.Bytes()
if err := queue.Queue(mail); err != nil {
return fmt.Errorf("queue.Queue: %w", err)
}
s.backend.Log.Println("Queued mail for", mail.Destination)
var b bytes.Buffer
if err := m.WriteTo(&b); err != nil {
return fmt.Errorf("m.WriteTo: %w", err)
}
if err := s.backend.Queues.QueueFor(s.from, s.rcpt, b.Bytes()); err != nil {
return fmt.Errorf("s.backend.Queues.QueueFor: %w", err)
}
s.backend.Log.Println("Queued mail for", s.rcpt)
return nil
}

View File

@@ -11,6 +11,7 @@ type SQLite3Storage struct {
*TableConfig
*TableMailboxes
*TableMails
*TableQueue
}
func NewSQLite3StorageStorage(filename string) (*SQLite3Storage, error) {
@@ -31,5 +32,9 @@ func NewSQLite3StorageStorage(filename string) (*SQLite3Storage, error) {
if err != nil {
return nil, fmt.Errorf("NewTableMails: %w", err)
}
s.TableQueue, err = NewTableQueue(db)
if err != nil {
return nil, fmt.Errorf("NewTableQueue: %w", err)
}
return s, nil
}

View File

@@ -163,11 +163,13 @@ func (t *TableMails) MailCreate(mailbox string, data []byte) (int, error) {
func (t *TableMails) MailSelect(mailbox string, id int) (int, *types.Mail, error) {
var seq int
var datetime int64
mail := &types.Mail{}
err := t.selectMail.QueryRow(mailbox, id).Scan(
&seq, &mail.ID, &mail.Mail, &mail.Date,
&seq, &mail.ID, &mail.Mail, &datetime,
&mail.Seen, &mail.Answered, &mail.Flagged, &mail.Deleted,
)
mail.Date = time.Unix(datetime, 0)
return seq, mail, err
}
@@ -211,7 +213,7 @@ func (t *TableMails) MailUpdateFlags(mailbox string, id int, seen, answered, fla
return err
}
func (t *TableMails) MailDelete(mailbox, id string) error {
func (t *TableMails) MailDelete(mailbox string, id int) error {
_, err := t.deleteMail.Exec(mailbox, id)
return err
}

View File

@@ -0,0 +1,150 @@
package sqlite3
import (
"database/sql"
"fmt"
"github.com/neilalexander/yggmail/internal/storage/types"
)
type TableQueue struct {
db *sql.DB
queueSelectDestinations *sql.Stmt
queueSelectIDsForDestination *sql.Stmt
queueInsertDestinationForID *sql.Stmt
queueDeleteIDForDestination *sql.Stmt
queueSelectIsMessagePendingSend *sql.Stmt
}
const queueSchema = `
CREATE TABLE IF NOT EXISTS queue (
destination TEXT NOT NULL,
mailbox TEXT NOT NULL,
id INTEGER NOT NULL,
mail TEXT NOT NULL,
rcpt TEXT NOT NULL,
PRIMARY KEY (destination, mailbox, id),
FOREIGN KEY (mailbox, id) REFERENCES mails(mailbox, id) ON DELETE CASCADE ON UPDATE CASCADE
);
`
const queueSelectDestinationsStmt = `
SELECT DISTINCT destination FROM queue
`
const queueSelectIDsForDestinationStmt = `
SELECT id, mail, rcpt FROM queue WHERE destination = $1
ORDER BY id DESC
`
const queueInsertDestinationForIDStmt = `
INSERT INTO queue (destination, mailbox, id, mail, rcpt) VALUES($1, $2, $3, $4, $5)
`
const deleteDestinationForIDStmt = `
DELETE FROM queue WHERE destination = $1 AND mailbox = $2 AND id = $3
`
const queueSelectIsMessagePendingSendStmt = `
SELECT COUNT(*) FROM queue WHERE mailbox = $1 AND id = $2
`
func NewTableQueue(db *sql.DB) (*TableQueue, error) {
t := &TableQueue{
db: db,
}
_, err := db.Exec(queueSchema)
if err != nil {
return nil, fmt.Errorf("db.Exec: %w", err)
}
t.queueSelectDestinations, err = db.Prepare(queueSelectDestinationsStmt)
if err != nil {
return nil, fmt.Errorf("db.Prepare(queueSelectDestinationsStmt): %w", err)
}
t.queueSelectIDsForDestination, err = db.Prepare(queueSelectIDsForDestinationStmt)
if err != nil {
return nil, fmt.Errorf("db.Prepare(queueSelectIDsForDestinationStmt): %w", err)
}
t.queueInsertDestinationForID, err = db.Prepare(queueInsertDestinationForIDStmt)
if err != nil {
return nil, fmt.Errorf("db.Prepare(queueInsertDestinationForIDStmt): %w", err)
}
t.queueDeleteIDForDestination, err = db.Prepare(deleteDestinationForIDStmt)
if err != nil {
return nil, fmt.Errorf("db.Prepare(deleteDestinationForIDStmt): %w", err)
}
t.queueSelectIsMessagePendingSend, err = db.Prepare(queueSelectIsMessagePendingSendStmt)
if err != nil {
return nil, fmt.Errorf("db.Prepare(queueSelectIsMessagePendingSendStmt): %w", err)
}
return t, nil
}
func (t *TableQueue) QueueListDestinations() ([]string, error) {
rows, err := t.queueSelectDestinations.Query()
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("t.queueSelectDestinations.Query: %w", err)
}
defer rows.Close()
var destinations []string
for rows.Next() {
var destination string
if err := rows.Scan(&destination); err != nil {
return nil, fmt.Errorf("rows.Scan: %w", err)
}
destinations = append(destinations, destination)
}
return destinations, nil
}
func (t *TableQueue) QueueMailIDsForDestination(destination string) ([]types.QueuedMail, error) {
rows, err := t.queueSelectIDsForDestination.Query(destination)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("t.queueSelectDestinations.Query: %w", err)
}
defer rows.Close()
var ids []types.QueuedMail
for rows.Next() {
var id int
var from, rcpt string
if err := rows.Scan(&id, &from, &rcpt); err != nil {
return nil, fmt.Errorf("rows.Scan: %w", err)
}
ids = append(ids, types.QueuedMail{
ID: id,
From: from,
Rcpt: rcpt,
})
}
return ids, nil
}
func (t *TableQueue) QueueInsertDestinationForID(destination string, id int, from, rcpt string) error {
_, err := t.queueInsertDestinationForID.Exec(destination, "Outbox", id, from, rcpt)
return err
}
func (t *TableQueue) QueueDeleteDestinationForID(destination string, id int) error {
_, err := t.queueDeleteIDForDestination.Exec(destination, "Outbox", id)
return err
}
func (t *TableQueue) QueueSelectIsMessagePendingSend(mailbox string, id int) (bool, error) {
row := t.queueSelectIsMessagePendingSend.QueryRow(mailbox, id)
if err := row.Err(); err != nil && err != sql.ErrNoRows {
return false, fmt.Errorf("row.Err: %w", err)
} else if err == sql.ErrNoRows {
return false, nil
}
var count int
if err := row.Scan(&count); err != nil {
return false, fmt.Errorf("row.Scan: %w", err)
}
return count > 0, nil
}

View File

@@ -22,7 +22,13 @@ type Storage interface {
MailSelect(mailbox string, id int) (int, *types.Mail, error)
MailSearch(mailbox string) ([]uint32, error)
MailUpdateFlags(mailbox string, id int, seen, answered, flagged, deleted bool) error
MailDelete(mailbox, id string) error
MailDelete(mailbox string, id int) error
MailExpunge(mailbox string) error
MailCount(mailbox string) (int, error)
QueueListDestinations() ([]string, error)
QueueMailIDsForDestination(destination string) ([]types.QueuedMail, error)
QueueInsertDestinationForID(destination string, id int, from, rcpt string) error
QueueDeleteDestinationForID(destination string, id int) error
QueueSelectIsMessagePendingSend(mailbox string, id int) (bool, error)
}

View File

@@ -12,3 +12,9 @@ type Mail struct {
Flagged bool
Deleted bool
}
type QueuedMail struct {
ID int
From string
Rcpt string
}