Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions go-mta.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package main

import (
"fmt"
"log"
"io"
"io/ioutil"
"os"
"os/signal"
"runtime"
"syscall"
)

func Init(
Expand Down Expand Up @@ -56,13 +57,15 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
Info.Println("Starting the Go MTA Server.\n")

service := NewServiceHandler()

// Start the Server listener
cl := NewConnectionListener(smtpServerChan)
go cl.start()

// Handle new server connections
for i := 0; i < SmtpServerConnectionCount; i++ {
go handleSmtpServerConnections(smtpServerChan, envelopeChan)
go service.handleSmtpServerConnections(smtpServerChan, envelopeChan)
}
for i := 0; i < DispatcherThreads; i++ {
go handleDispatcher(envelopeChan, smtpClientChan)
Expand All @@ -71,7 +74,11 @@ func main() {
go handleSmtpClientConnections(smtpClientChan)
}

// Die after input is read.
var input string
fmt.Scanln(&input)
// Handle SIGINT and SIGTERM.
signalCh := make(chan os.Signal)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
Info.Println(<-signalCh)

// Stop the service gracefully.
service.Stop()
}
35 changes: 35 additions & 0 deletions servicehandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"sync"
)

type ServiceHandler struct {
ch chan bool
wg *sync.WaitGroup
}

func NewServiceHandler() *ServiceHandler {
s := &ServiceHandler {
ch: make(chan bool),
wg: &sync.WaitGroup{},
}
s.wg.Add(1) // Add main wait. Closed on Stop().
return s
}

func (s *ServiceHandler) addWatchedProcess() {
s.wg.Add(1)
}

func (s *ServiceHandler) finishProcess() {
s.wg.Done()
}

// Stop the service by closing the service's channel. Block until the service
// is really stopped.
func (s *ServiceHandler) Stop() {
close(s.ch)
s.wg.Done()
s.wg.Wait()
}
31 changes: 26 additions & 5 deletions smtpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,45 @@ import (
"bytes"
"net"
"strings"
// "time"
)

type SmtpServer struct {
conn net.Conn
}

func handleSmtpServerConnections(smtpServerChan chan *SmtpServer, envelopeChan chan *envelope) {
func (sh ServiceHandler) handleSmtpServerConnections(smtpServerChan chan *SmtpServer,
envelopeChan chan *envelope) {
Info.Println("SmtpServer Connection Handler Started.")
for {
// handle an SMTP Conversation
server := <- smtpServerChan
Info.Println("Received new SmtpServer, processing inbound SMTP Conversation.")
receiveSmtp(server.conn, envelopeChan)
select {
case <-sh.ch:
Info.Println("Terminating SmtpServer Connection Handler.")
return
case server := <- smtpServerChan:
// handle an SMTP Conversation
sh.addWatchedProcess()
Info.Println("Received new SmtpServer, processing inbound SMTP Conversation.")
receiveSmtp(server.conn, envelopeChan)
sh.finishProcess()
}
}
}

//func receiveWithTimeout(conn net.Conn, envelopeChan chan *envelope) {
//
// timeoutChan := make(chan bool, 1)
// go func() {
// time.Sleep(3 * time.Second)
// timeoutChan <- true
// }()
//
// receiveSmtp(conn, envelopeChan, timeoutChan)
//}

func receiveSmtp(conn net.Conn, envelopeChan chan *envelope) {


remoteIp := conn.RemoteAddr()
Info.Println("Received Connection from [", remoteIp, "]")

Expand Down