Files
yggmail/internal/smtpsender/sender.go
Neil Alexander f9ae101d38 Validation fixes
2021-07-08 22:38:18 +01:00

140 lines
3.1 KiB
Go

package smtpsender
import (
"fmt"
"log"
"math"
"sync"
"time"
"github.com/emersion/go-smtp"
"github.com/jxskiss/base62"
"github.com/neilalexander/yggmail/internal/config"
"github.com/neilalexander/yggmail/internal/transport"
"go.uber.org/atomic"
)
type Queues struct {
Config *config.Config
Log *log.Logger
Transport transport.Transport
queues sync.Map // servername -> *Queue
}
func NewQueues(config *config.Config, log *log.Logger, transport transport.Transport) *Queues {
return &Queues{
Config: config,
Log: log,
Transport: transport,
}
}
func (qs *Queues) QueueFor(server string) (*Queue, error) {
v, _ := qs.queues.LoadOrStore(server, &Queue{
queues: qs,
destination: server,
fifo: newFIFOQueue(),
})
q, ok := v.(*Queue)
if !ok {
return nil, fmt.Errorf("type assertion error")
}
if q.running.CAS(false, true) {
go q.run()
}
return q, nil
}
type Queue struct {
queues *Queues
destination string
running atomic.Bool
backoff atomic.Int64
fifo *fifoQueue
}
func (q *Queue) Queue(mail *QueuedMail) error {
q.fifo.push(mail)
if q.running.CAS(false, true) {
go q.run()
}
return nil
}
func (q *Queue) run() {
defer q.running.Store(false)
for {
select {
case <-q.fifo.wait():
case <-time.After(time.Second * 10):
return
}
item, ok := q.fifo.pop()
if !ok {
continue
}
mail, ok := item.(*QueuedMail)
if !ok {
continue
}
q.queues.Log.Println("Processing mail from", mail.From, "to", mail.Destination)
if err := func() error {
conn, err := q.queues.Transport.Dial(q.destination)
if err != nil {
return fmt.Errorf("q.queues.Transport.Dial: %w", err)
}
defer conn.Close()
client, err := smtp.NewClient(conn, q.destination)
if err != nil {
return fmt.Errorf("smtp.NewClient: %w", err)
}
defer client.Close()
if err := client.Hello(base62.EncodeToString(q.queues.Config.PublicKey)); err != nil {
q.queues.Log.Println("Remote server", q.destination, "did not accept HELLO:", err)
return fmt.Errorf("client.Hello: %w", err)
}
q.backoff.Store(0)
if err := client.Mail(mail.From, nil); err != nil {
q.queues.Log.Println("Remote server", q.destination, "did not accept MAIL:", err)
return fmt.Errorf("client.Mail: %w", err)
}
if err := client.Rcpt(mail.Rcpt); err != nil {
q.queues.Log.Println("Remote server", q.destination, "did not accept RCPT:", err)
return fmt.Errorf("client.Rcpt: %w", err)
}
writer, err := client.Data()
if err != nil {
return fmt.Errorf("client.Data: %w", err)
}
defer writer.Close()
if _, err := writer.Write(mail.Content); err != nil {
return fmt.Errorf("writer.Write: %w", err)
}
return nil
}(); err != nil {
retry := time.Second * time.Duration(math.Exp2(float64(q.backoff.Inc())))
q.queues.Log.Println("Queue error:", err, "- will retry in", retry)
time.Sleep(retry)
} else {
q.queues.Log.Println("Sent mail from", mail.From, "to", mail.Destination)
}
}
}
type QueuedMail struct {
From string // mail address
Rcpt string // mail addresses
Destination string // server name
Content []byte
}