From 601186352520b6f8d6210752d3655eb823afcb1d Mon Sep 17 00:00:00 2001 From: Bilal Date: Thu, 7 May 2020 10:02:49 +0200 Subject: [PATCH] Implement worker client --- structures/structures.go | 5 + workerclient/workerclient.go | 205 +++++++++++++++++++++++++++++++++++ 2 files changed, 210 insertions(+) create mode 100644 workerclient/workerclient.go diff --git a/structures/structures.go b/structures/structures.go index deab078..ce705e5 100644 --- a/structures/structures.go +++ b/structures/structures.go @@ -15,3 +15,8 @@ type WorkerDescription struct { Ip string Req chan Request } + +type ProxyServer struct { + Type string + Address string +} diff --git a/workerclient/workerclient.go b/workerclient/workerclient.go new file mode 100644 index 0000000..7459514 --- /dev/null +++ b/workerclient/workerclient.go @@ -0,0 +1,205 @@ +package workerclient + +import ( + "bufio" + b64 "encoding/base64" + "gitlab.com/saburly/kiviscraplib/structures" + "golang.org/x/net/proxy" + "io/ioutil" + "log" + "math/rand" + "net" + "net/http" + "strings" + "time" +) + +func StartClientConnections() { + rand.Seed(time.Now().Unix()) + + // TODO: Take number of connections from ENV + numberOfConnections := 1 + + for i := 0; i < numberOfConnections; i++ { + go startSingleConnection(i) + } + + // TODO: Make one initial global proxy list load + // TODO: Make goroutine for periodic proxy reload and health check +} + +// startSingleConnection starts one single connection waiting for request message from load balancer server +func startSingleConnection(connectionId int) { + log.Printf("(%d) Starting new client connection\n", connectionId) + + // TODO: Move initial connection timeout to the ENV + connectionTimeout := 2 + + for { + // TODO: Move server address to the ENV + const serverAddress = "127.0.0.1:1338" + conn, err := net.Dial("tcp", serverAddress) + if err != nil { + log.Printf("(%d) Cannot connect to the load balancer server on %s : %s", connectionId, serverAddress, err) + time.Sleep(time.Duration(connectionTimeout) * time.Second) + continue + } + + for { + requestMessage, err := bufio.NewReader(conn).ReadString('\n') + if err != nil { + log.Printf("(%d) Error receiving request from load balancer server : %s\n", connectionId, err) + _ = conn.Close() + time.Sleep(time.Duration(connectionTimeout) * time.Second) + break + } + + log.Printf("(%d) Received new request message : %s", connectionId, requestMessage) + + // TODO: Move prefix to the ENV + const requestMessagePrefix = "URL " + if !strings.HasPrefix(requestMessage, requestMessagePrefix) { + log.Printf("(%d) Request message has to start with %s", connectionId, requestMessagePrefix) + _ = conn.Close() + time.Sleep(time.Duration(connectionTimeout) * time.Second) + break + } + + urlToFetch := strings.TrimSuffix(strings.TrimPrefix(requestMessage, requestMessagePrefix), "\n") + + pageBody, err := fetchPage(urlToFetch, connectionId) + if err != nil { + _ = conn.Close() + time.Sleep(time.Duration(connectionTimeout) * time.Second) + break + } + + pageBodyBase64 := b64.StdEncoding.EncodeToString([]byte(pageBody)) + + _, err = conn.Write([]byte(pageBodyBase64 + "\n")) + if err != nil { + log.Printf("(%d) Cannot write to the load balancer server : %s", connectionId, err) + _ = conn.Close() + time.Sleep(time.Duration(connectionTimeout) * time.Second) + break + } + + log.Printf("(%d) Response sent. Waiting for a new job", connectionId) + } + } +} + +// fetchPage fetches page from desired url using random proxy +func fetchPage(url string, connectionId int) (string, error) { + dialer, proxyType, proxyAddr, err := getDialer() + if err != nil { + log.Printf("(%d) [PROXY] Cannot create connection to the proxy %s %s : %s\n", connectionId, proxyType, proxyAddr, err) + return "", err + } + + log.Printf("(%d) [PROXY] Selected proxy (%s) %s\n", connectionId, proxyType, proxyAddr) + + // setup a http client + httpTransport := &http.Transport{} + httpClient := &http.Client{Transport: httpTransport} + + httpTransport.Dial = dialer.Dial + + // create + req, err := http.NewRequest("GET", url, nil) + if err != nil { + log.Printf("(%d) [PROXY] Cannot create GET request to the %s : %s", connectionId, url, err) + return "", err + } + + // use the http client to fetch the page + resp, err2 := httpClient.Do(req) + if err2 != nil { + log.Printf("(%d) [PROXY] Cannot GET page %s : %s", connectionId, url, err2) + return "", err2 + } + + // TODO: Handle error + defer resp.Body.Close() + + pageBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("(%d) [PROXY] Cannot read body of %s : %s", connectionId, url, err) + return "", err + } + + return string(pageBody), nil +} + +// getDialer selects one random proxy from the slice of all proxies +func getDialer() (proxy.Dialer, string, string, error) { + proxyList := getAllProxies() + + if len(proxyList) == 0 { + return proxy.Direct, "", "", nil // No proxy + } + + // get random proxy from the list + selectedProxy := proxyList[rand.Intn(len(proxyList))] + + if selectedProxy.Type == "https" { + // TODO: Implement https proxy + //parsedProxyURL, err := url.Parse("https://" + selectedProxy.Address) + //if err != nil { + // return nil, "", "", err + //} + // + //dialer, err := proxy.FromURL(parsedProxyURL, proxy.Direct) + //return dialer, selectedProxy.Type, selectedProxy.Address, err + } + + if selectedProxy.Type == "socks5" { + dialer, err := proxy.SOCKS5("tcp", selectedProxy.Address, nil, proxy.Direct) + return dialer, selectedProxy.Type, selectedProxy.Address, err + } + + return proxy.Direct, "", "", nil +} + +// getAllProxies combines all proxy types in one slice +func getAllProxies() []structures.ProxyServer { + //return append(getProxiesList("socks5"), getProxiesList("https")...) + return getProxiesList("socks5") +} + +// getProxiesList fetches list of proxies of specific type (valid types are : "socks5", "https") +func getProxiesList(proxyType string) []structures.ProxyServer { + switch proxyType { + case "https": + case "socks5": + break + default: + log.Printf("%s is not a valid proxy type", proxyType) + return []structures.ProxyServer{} + } + + proxyListUrl := "https://www.proxy-list.download/api/v1/get?type=" + proxyType + + resp, err := http.Get(proxyListUrl) + if err != nil { + log.Printf("Cannot get list of proxies [%s] : %s", proxyType, err) + return []structures.ProxyServer{} + } + + defer resp.Body.Close() + + proxyList, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("Cannot read response for a list of proxies [%s] : %s", proxyType, err) + return []structures.ProxyServer{} + } + + proxyAddresses := strings.Split(strings.TrimSuffix(string(proxyList), "\n"), "\n") + + var result []structures.ProxyServer + for _, addr := range proxyAddresses { + result = append(result, structures.ProxyServer{Type: proxyType, Address: strings.TrimSpace(addr)}) + } + + return result +}