mirror of
https://github.com/neilalexander/yggmail.git
synced 2026-05-06 20:06:28 +03:00
QUIC transport
This commit is contained in:
@@ -31,7 +31,7 @@ func NewIMAPServer(backend *Backend, addr string, insecure bool) (*IMAPServer, *
|
||||
s.server.AllowInsecureAuth = insecure
|
||||
//s.server.Debug = os.Stdout
|
||||
s.server.Enable(idle.NewExtension())
|
||||
s.server.Enable(notify)
|
||||
//s.server.Enable(notify)
|
||||
s.server.EnableAuth(sasl.Login, func(conn server.Conn) sasl.Server {
|
||||
return sasl.NewLoginServer(func(username, password string) error {
|
||||
_, err := s.backend.Login(nil, username, password)
|
||||
|
||||
@@ -39,7 +39,7 @@ func NewQueues(config *config.Config, log *log.Logger, transport transport.Trans
|
||||
Transport: transport,
|
||||
Storage: storage,
|
||||
}
|
||||
go qs.manager()
|
||||
time.AfterFunc(time.Second*5, qs.manager)
|
||||
return qs
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ func (qs *Queues) manager() {
|
||||
for _, destination := range destinations {
|
||||
_, _ = qs.queueFor(destination)
|
||||
}
|
||||
time.AfterFunc(time.Minute*10, qs.manager)
|
||||
time.AfterFunc(time.Minute, qs.manager)
|
||||
}
|
||||
|
||||
func (qs *Queues) QueueFor(from string, rcpts []string, content []byte) error {
|
||||
@@ -90,7 +90,7 @@ func (qs *Queues) queueFor(server string) (*Queue, error) {
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("type assertion error")
|
||||
}
|
||||
if q.running.CAS(false, true) {
|
||||
if q.running.CompareAndSwap(false, true) {
|
||||
go q.run()
|
||||
}
|
||||
return q, nil
|
||||
|
||||
@@ -9,24 +9,45 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"crypto/tls"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
iwt "github.com/Arceliar/ironwood/types"
|
||||
"github.com/fatih/color"
|
||||
gologme "github.com/gologme/log"
|
||||
"github.com/neilalexander/utp"
|
||||
"github.com/quic-go/quic-go"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/config"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/core"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/multicast"
|
||||
)
|
||||
|
||||
type YggdrasilTransport struct {
|
||||
Sessions *utp.Socket
|
||||
listener *quic.Listener
|
||||
yggdrasil net.PacketConn
|
||||
transport *quic.Transport
|
||||
tlsConfig *tls.Config
|
||||
quicConfig *quic.Config
|
||||
incoming chan *yggdrasilSession
|
||||
sessions sync.Map // string -> quic.Connection
|
||||
dials sync.Map // string -> *yggdrasilDial
|
||||
}
|
||||
|
||||
type yggdrasilSession struct {
|
||||
quic.Connection
|
||||
quic.Stream
|
||||
}
|
||||
|
||||
type yggdrasilDial struct {
|
||||
context.Context
|
||||
context.CancelFunc
|
||||
}
|
||||
|
||||
func NewYggdrasilTransport(log *log.Logger, sk ed25519.PrivateKey, pk ed25519.PublicKey, peers []string, mcast bool) (*YggdrasilTransport, error) {
|
||||
@@ -38,7 +59,9 @@ func NewYggdrasilTransport(log *log.Logger, sk ed25519.PrivateKey, pk ed25519.Pu
|
||||
|
||||
cfg := config.GenerateConfig()
|
||||
copy(cfg.PrivateKey, sk)
|
||||
cfg.GenerateSelfSignedCertificate()
|
||||
if err := cfg.GenerateSelfSignedCertificate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ygg *core.Core
|
||||
var err error
|
||||
@@ -73,25 +96,140 @@ func NewYggdrasilTransport(log *log.Logger, sk ed25519.PrivateKey, pk ed25519.Pu
|
||||
}
|
||||
}
|
||||
|
||||
us, err := utp.NewSocketFromPacketConnNoClose(ygg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("utp.NewSocketFromPacketConnNoClose: %w", err)
|
||||
tr := &YggdrasilTransport{
|
||||
tlsConfig: &tls.Config{
|
||||
ServerName: hex.EncodeToString(ygg.PublicKey()),
|
||||
Certificates: []tls.Certificate{
|
||||
*cfg.Certificate,
|
||||
},
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
quicConfig: &quic.Config{
|
||||
HandshakeIdleTimeout: time.Second * 5,
|
||||
MaxIdleTimeout: time.Second * 60,
|
||||
},
|
||||
transport: &quic.Transport{
|
||||
Conn: ygg,
|
||||
},
|
||||
yggdrasil: ygg,
|
||||
incoming: make(chan *yggdrasilSession, 1),
|
||||
}
|
||||
|
||||
if tr.listener, err = tr.transport.Listen(tr.tlsConfig, tr.quicConfig); err != nil {
|
||||
return nil, fmt.Errorf("quic.Listen: %w", err)
|
||||
}
|
||||
|
||||
go tr.connectionAcceptLoop()
|
||||
return tr, nil
|
||||
}
|
||||
|
||||
func (t *YggdrasilTransport) connectionAcceptLoop() {
|
||||
for {
|
||||
qc, err := t.listener.Accept(context.TODO())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
host := qc.RemoteAddr().String()
|
||||
if eqc, ok := t.sessions.LoadAndDelete(host); ok {
|
||||
eqc := eqc.(quic.Connection)
|
||||
_ = eqc.CloseWithError(0, "Connection replaced")
|
||||
}
|
||||
t.sessions.Store(host, qc)
|
||||
if dial, ok := t.dials.LoadAndDelete(host); ok {
|
||||
dial := dial.(*yggdrasilDial)
|
||||
dial.CancelFunc()
|
||||
}
|
||||
|
||||
go t.streamAcceptLoop(qc)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *YggdrasilTransport) streamAcceptLoop(qc quic.Connection) {
|
||||
host := qc.RemoteAddr().String()
|
||||
|
||||
defer qc.CloseWithError(0, "Timed out") // nolint:errcheck
|
||||
defer t.sessions.Delete(host)
|
||||
|
||||
for {
|
||||
qs, err := qc.AcceptStream(context.Background())
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
t.incoming <- &yggdrasilSession{qc, qs}
|
||||
}
|
||||
return &YggdrasilTransport{
|
||||
Sessions: us,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *YggdrasilTransport) Dial(host string) (net.Conn, error) {
|
||||
addr := make(iwt.Addr, ed25519.PublicKeySize)
|
||||
k, err := hex.DecodeString(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
var retry bool
|
||||
retry:
|
||||
qc, ok := t.sessions.Load(host)
|
||||
if !ok {
|
||||
if dial, ok := t.dials.Load(host); ok {
|
||||
<-dial.(*yggdrasilDial).Done()
|
||||
}
|
||||
if qc, ok = t.sessions.Load(host); !ok {
|
||||
dialctx, dialcancel := context.WithCancel(ctx)
|
||||
defer dialcancel()
|
||||
|
||||
t.dials.Store(host, &yggdrasilDial{dialctx, dialcancel})
|
||||
defer t.dials.Delete(host)
|
||||
|
||||
addr := make(iwt.Addr, ed25519.PublicKeySize)
|
||||
k, err := hex.DecodeString(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
copy(addr, k)
|
||||
|
||||
if qc, err = t.transport.Dial(dialctx, addr, t.tlsConfig, t.quicConfig); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
qc := qc.(quic.Connection)
|
||||
t.sessions.Store(host, qc)
|
||||
go t.streamAcceptLoop(qc)
|
||||
}
|
||||
}
|
||||
if qc == nil {
|
||||
return nil, net.ErrClosed
|
||||
} else {
|
||||
qc := qc.(quic.Connection)
|
||||
qs, err := qc.OpenStreamSync(ctx)
|
||||
if err != nil {
|
||||
if !retry {
|
||||
retry = true
|
||||
goto retry
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
// For some reason this is needed to kick the stream
|
||||
_, err = qs.Write([]byte(" "))
|
||||
return &yggdrasilSession{qc, qs}, err
|
||||
}
|
||||
copy(addr, k)
|
||||
return t.Sessions.DialAddr(addr)
|
||||
}
|
||||
|
||||
func (t *YggdrasilTransport) Listener() net.Listener {
|
||||
return t.Sessions
|
||||
return &yggdrasilListener{t}
|
||||
}
|
||||
|
||||
type yggdrasilListener struct {
|
||||
*YggdrasilTransport
|
||||
}
|
||||
|
||||
func (t *yggdrasilListener) Accept() (net.Conn, error) {
|
||||
return <-t.incoming, nil
|
||||
}
|
||||
|
||||
func (t *yggdrasilListener) Addr() net.Addr {
|
||||
return t.listener.Addr()
|
||||
}
|
||||
|
||||
func (t *yggdrasilListener) Close() error {
|
||||
if err := t.listener.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return t.yggdrasil.Close()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user