Add worker server / move timeout to 100 seconds

This commit is contained in:
Senad Uka
2020-03-18 20:42:43 +01:00
parent 62f2e7c3fd
commit 0882dc4b15
3 changed files with 95 additions and 3 deletions

View File

@@ -10,3 +10,8 @@ type Request struct {
Url string
Response chan Response
}
type WorkerDescription struct {
Ip string
Req chan Request
}

View File

@@ -8,9 +8,9 @@ import (
"time"
)
var requests chan<- structures.Request
var requests chan structures.Request
func ServeHTTP(queue chan<- structures.Request, end chan<- string) {
func ServeHTTP(queue chan structures.Request, end chan<- string) {
requests = queue
http.HandleFunc("/", httpHandler)
err := http.ListenAndServe("127.0.0.1:1337", nil)
@@ -59,7 +59,7 @@ func httpHandler(w http.ResponseWriter, r *http.Request) {
// todo: put timeout in ENV variable
timeout := make(chan bool, 1)
go func() {
time.Sleep(10 * time.Second)
time.Sleep(100 * time.Second)
timeout <- true
}()

View File

@@ -0,0 +1,87 @@
package workerserver
import (
"bufio"
"gitlab.com/saburly/kiviscraplib/structures"
"log"
"math/rand"
"net"
"time"
)
var requests chan structures.Request
var workers chan structures.WorkerDescription
func ServeWorkers(queue chan structures.Request, end chan<- string) {
rand.Seed(time.Now().Unix())
workers = make(chan structures.WorkerDescription, 50) // TODO: move to env var
requests = queue
listener, err := net.Listen("tcp", "127.0.0.1:1338")
if err != nil {
log.Fatal("tcp server listener error:", err)
end <- "tcp server"
return
}
go distributeWork()
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("tcp server accept error", err)
}
go handleConnection(conn)
}
}
func distributeWork() {
for {
request := <-requests
chosenWorker := <-workers
chosenWorker.Req <- request
}
}
func handleConnection(conn net.Conn) {
clientAddr := conn.RemoteAddr().String()
workerDescription := structures.WorkerDescription{
Ip: clientAddr,
Req: make(chan structures.Request),
}
workers <- workerDescription // add new worker to the end of the queue
for {
request := <-workerDescription.Req
_, err := conn.Write([]byte("URL " + request.Url + "\n"))
if err != nil {
log.Println("Cannot send to " + clientAddr)
conn.Close()
requests <- request // return job to some other worker
return // don't return worker to the queue - not healthy
}
bufferBytes, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
log.Println("client left..")
conn.Close()
requests <- request // return job to some other worker
return // don't return worker to the queue - not healthy
}
log.Printf("Recieving response from %s\n", clientAddr)
response := structures.Response{
Url: request.Url,
Content: bufferBytes,
Err: nil,
}
request.Response <- response
workers <- workerDescription // finished working and healthy - return to queue
}
}