diff --git a/structures/structures.go b/structures/structures.go index 894b35a..deab078 100644 --- a/structures/structures.go +++ b/structures/structures.go @@ -10,3 +10,8 @@ type Request struct { Url string Response chan Response } + +type WorkerDescription struct { + Ip string + Req chan Request +} diff --git a/webserver/webserver.go b/webserver/webserver.go index f201a59..09bf000 100644 --- a/webserver/webserver.go +++ b/webserver/webserver.go @@ -8,9 +8,9 @@ import ( "time" ) -var requests chan<- structures.Request +var requests chan structures.Request -func ServeHTTP(queue chan<- structures.Request, end chan<- string) { +func ServeHTTP(queue chan structures.Request, end chan<- string) { requests = queue http.HandleFunc("/", httpHandler) err := http.ListenAndServe("127.0.0.1:1337", nil) @@ -59,7 +59,7 @@ func httpHandler(w http.ResponseWriter, r *http.Request) { // todo: put timeout in ENV variable timeout := make(chan bool, 1) go func() { - time.Sleep(10 * time.Second) + time.Sleep(100 * time.Second) timeout <- true }() diff --git a/workerserver/workerserver.go b/workerserver/workerserver.go new file mode 100644 index 0000000..7d568af --- /dev/null +++ b/workerserver/workerserver.go @@ -0,0 +1,87 @@ +package workerserver + +import ( + "bufio" + "gitlab.com/saburly/kiviscraplib/structures" + "log" + "math/rand" + "net" + "time" +) + +var requests chan structures.Request +var workers chan structures.WorkerDescription + +func ServeWorkers(queue chan structures.Request, end chan<- string) { + + rand.Seed(time.Now().Unix()) + + workers = make(chan structures.WorkerDescription, 50) // TODO: move to env var + requests = queue + + listener, err := net.Listen("tcp", "127.0.0.1:1338") + if err != nil { + log.Fatal("tcp server listener error:", err) + end <- "tcp server" + return + } + + go distributeWork() + + for { + conn, err := listener.Accept() + if err != nil { + log.Fatal("tcp server accept error", err) + } + + go handleConnection(conn) + } +} + +func distributeWork() { + for { + request := <-requests + chosenWorker := <-workers + chosenWorker.Req <- request + } +} + +func handleConnection(conn net.Conn) { + clientAddr := conn.RemoteAddr().String() + workerDescription := structures.WorkerDescription{ + Ip: clientAddr, + Req: make(chan structures.Request), + } + + workers <- workerDescription // add new worker to the end of the queue + for { + request := <-workerDescription.Req + + _, err := conn.Write([]byte("URL " + request.Url + "\n")) + if err != nil { + log.Println("Cannot send to " + clientAddr) + conn.Close() + requests <- request // return job to some other worker + return // don't return worker to the queue - not healthy + } + + bufferBytes, err := bufio.NewReader(conn).ReadBytes('\n') + if err != nil { + log.Println("client left..") + conn.Close() + requests <- request // return job to some other worker + return // don't return worker to the queue - not healthy + } + + log.Printf("Recieving response from %s\n", clientAddr) + response := structures.Response{ + Url: request.Url, + Content: bufferBytes, + Err: nil, + } + + request.Response <- response + workers <- workerDescription // finished working and healthy - return to queue + } + +}