package workerclient import ( "bufio" b64 "encoding/base64" "gitlab.com/saburly/kiviscraplib/structures" "golang.org/x/net/proxy" "io/ioutil" "log" "math/rand" "net" "net/http" "strings" "time" ) func StartClientConnections() { rand.Seed(time.Now().Unix()) // TODO: Take number of connections from ENV numberOfConnections := 1 for i := 0; i < numberOfConnections; i++ { go startSingleConnection(i) } // TODO: Make one initial global proxy list load // TODO: Make goroutine for periodic proxy reload and health check } // startSingleConnection starts one single connection waiting for request message from load balancer server func startSingleConnection(connectionId int) { log.Printf("(%d) Starting new client connection\n", connectionId) // TODO: Move initial connection timeout to the ENV connectionTimeout := 2 for { // TODO: Move server address to the ENV const serverAddress = "127.0.0.1:1338" conn, err := net.Dial("tcp", serverAddress) if err != nil { log.Printf("(%d) Cannot connect to the load balancer server on %s : %s", connectionId, serverAddress, err) time.Sleep(time.Duration(connectionTimeout) * time.Second) continue } for { requestMessage, err := bufio.NewReader(conn).ReadString('\n') if err != nil { log.Printf("(%d) Error receiving request from load balancer server : %s\n", connectionId, err) _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } log.Printf("(%d) Received new request message : %s", connectionId, requestMessage) // TODO: Move prefix to the ENV const requestMessagePrefix = "URL " if !strings.HasPrefix(requestMessage, requestMessagePrefix) { log.Printf("(%d) Request message has to start with %s", connectionId, requestMessagePrefix) _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } urlToFetch := strings.TrimSuffix(strings.TrimPrefix(requestMessage, requestMessagePrefix), "\n") pageBody, err := fetchPage(urlToFetch, connectionId) if err != nil { _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } pageBodyBase64 := b64.StdEncoding.EncodeToString([]byte(pageBody)) _, err = conn.Write([]byte(pageBodyBase64 + "\n")) if err != nil { log.Printf("(%d) Cannot write to the load balancer server : %s", connectionId, err) _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } log.Printf("(%d) Response sent. Waiting for a new job", connectionId) } } } // fetchPage fetches page from desired url using random proxy func fetchPage(url string, connectionId int) (string, error) { dialer, proxyType, proxyAddr, err := getDialer() if err != nil { log.Printf("(%d) [PROXY] Cannot create connection to the proxy %s %s : %s\n", connectionId, proxyType, proxyAddr, err) return "", err } log.Printf("(%d) [PROXY] Selected proxy (%s) %s\n", connectionId, proxyType, proxyAddr) // setup a http client httpTransport := &http.Transport{} httpClient := &http.Client{Transport: httpTransport} httpTransport.Dial = dialer.Dial // create req, err := http.NewRequest("GET", url, nil) if err != nil { log.Printf("(%d) [PROXY] Cannot create GET request to the %s : %s", connectionId, url, err) return "", err } // use the http client to fetch the page resp, err2 := httpClient.Do(req) if err2 != nil { log.Printf("(%d) [PROXY] Cannot GET page %s : %s", connectionId, url, err2) return "", err2 } // TODO: Handle error defer resp.Body.Close() pageBody, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("(%d) [PROXY] Cannot read body of %s : %s", connectionId, url, err) return "", err } return string(pageBody), nil } // getDialer selects one random proxy from the slice of all proxies func getDialer() (proxy.Dialer, string, string, error) { proxyList := getAllProxies() if len(proxyList) == 0 { return proxy.Direct, "", "", nil // No proxy } // get random proxy from the list selectedProxy := proxyList[rand.Intn(len(proxyList))] if selectedProxy.Type == "https" { // TODO: Implement https proxy //parsedProxyURL, err := url.Parse("https://" + selectedProxy.Address) //if err != nil { // return nil, "", "", err //} // //dialer, err := proxy.FromURL(parsedProxyURL, proxy.Direct) //return dialer, selectedProxy.Type, selectedProxy.Address, err } if selectedProxy.Type == "socks5" { dialer, err := proxy.SOCKS5("tcp", selectedProxy.Address, nil, proxy.Direct) return dialer, selectedProxy.Type, selectedProxy.Address, err } return proxy.Direct, "", "", nil } // getAllProxies combines all proxy types in one slice func getAllProxies() []structures.ProxyServer { //return append(getProxiesList("socks5"), getProxiesList("https")...) return getProxiesList("socks5") } // getProxiesList fetches list of proxies of specific type (valid types are : "socks5", "https") func getProxiesList(proxyType string) []structures.ProxyServer { switch proxyType { case "https": case "socks5": break default: log.Printf("%s is not a valid proxy type", proxyType) return []structures.ProxyServer{} } proxyListUrl := "https://www.proxy-list.download/api/v1/get?type=" + proxyType resp, err := http.Get(proxyListUrl) if err != nil { log.Printf("Cannot get list of proxies [%s] : %s", proxyType, err) return []structures.ProxyServer{} } defer resp.Body.Close() proxyList, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("Cannot read response for a list of proxies [%s] : %s", proxyType, err) return []structures.ProxyServer{} } proxyAddresses := strings.Split(strings.TrimSuffix(string(proxyList), "\n"), "\n") var result []structures.ProxyServer for _, addr := range proxyAddresses { result = append(result, structures.ProxyServer{Type: proxyType, Address: strings.TrimSpace(addr)}) } return result }