96 lines
2.3 KiB
Go
96 lines
2.3 KiB
Go
package workerserver
|
|
|
|
import (
|
|
"bufio"
|
|
b64 "encoding/base64"
|
|
c "gitlab.com/saburly/kiviscraplib/config"
|
|
"gitlab.com/saburly/kiviscraplib/structures"
|
|
"log"
|
|
"math/rand"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
var requests chan structures.Request
|
|
var workers chan structures.WorkerDescription
|
|
|
|
func ServeWorkers(queue chan structures.Request, end chan<- string) {
|
|
rand.Seed(time.Now().Unix())
|
|
|
|
workers = make(chan structures.WorkerDescription, c.WorkerServerConfig.WorkersCount)
|
|
requests = queue
|
|
|
|
listener, err := net.Listen("tcp", c.WorkerServerConfig.Address)
|
|
if err != nil {
|
|
log.Fatal("tcp server listener error:", err)
|
|
end <- "tcp server"
|
|
return
|
|
}
|
|
|
|
go distributeWork()
|
|
|
|
for {
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
log.Fatal("tcp server accept error", err)
|
|
}
|
|
|
|
go handleConnection(conn)
|
|
}
|
|
}
|
|
|
|
func distributeWork() {
|
|
for {
|
|
request := <-requests
|
|
chosenWorker := <-workers
|
|
chosenWorker.Req <- request
|
|
}
|
|
}
|
|
|
|
func handleConnection(conn net.Conn) {
|
|
clientAddr := conn.RemoteAddr().String()
|
|
workerDescription := structures.WorkerDescription{
|
|
Ip: clientAddr,
|
|
Req: make(chan structures.Request),
|
|
}
|
|
|
|
log.Printf("New worker client connected from %s\n", clientAddr)
|
|
|
|
workers <- workerDescription // add new worker to the end of the queue
|
|
for {
|
|
request := <-workerDescription.Req
|
|
|
|
requestBase64 := b64.StdEncoding.EncodeToString([]byte(c.WorkerServerConfig.RequestMessagePrefix + request.Url))
|
|
|
|
_, err := conn.Write([]byte(requestBase64 + "\n"))
|
|
if err != nil {
|
|
log.Println("Cannot send to " + clientAddr)
|
|
conn.Close()
|
|
requests <- request // return job to some other worker
|
|
return // don't return worker to the queue - not healthy
|
|
}
|
|
|
|
bufferString, err := bufio.NewReader(conn).ReadString('\n')
|
|
if err != nil {
|
|
log.Println("client left..")
|
|
conn.Close()
|
|
requests <- request // return job to some other worker
|
|
return // don't return worker to the queue - not healthy
|
|
}
|
|
|
|
decodedPageBody, _ := b64.StdEncoding.DecodeString(strings.TrimSpace(bufferString))
|
|
|
|
log.Printf("Recieving response from %s\n", clientAddr)
|
|
response := structures.Response{
|
|
Url: request.Url,
|
|
Content: decodedPageBody,
|
|
Err: nil,
|
|
}
|
|
|
|
request.Response <- response
|
|
workers <- workerDescription // finished working and healthy - return to queue
|
|
}
|
|
|
|
}
|