package workerclient import ( "bufio" b64 "encoding/base64" c "gitlab.com/saburly/kiviscraplib/config" "gitlab.com/saburly/kiviscraplib/structures" "golang.org/x/net/proxy" "io/ioutil" "log" "math/rand" "net" "net/http" "net/url" "strings" "time" ) func StartClientConnections() { rand.Seed(time.Now().Unix()) numberOfConnections := c.ClientConfig.ConnectionsCount for i := 0; i < numberOfConnections; i++ { go startSingleConnection(i) } // TODO: Make one initial global proxy list load // TODO: Make goroutine for periodic proxy reload and health check } // startSingleConnection starts one single connection waiting for request message from load balancer server func startSingleConnection(connectionId int) { log.Printf("(%d) Starting new client connection\n", connectionId) connectionTimeout := c.ClientConfig.ConnectionTimeout for { serverAddress := c.ClientConfig.WorkerServerAddress conn, err := net.Dial("tcp", serverAddress) if err != nil { log.Printf("(%d) Cannot connect to the load balancer server on %s : %s", connectionId, serverAddress, err) time.Sleep(time.Duration(connectionTimeout) * time.Second) continue } for { encodedRequestMessage, err := bufio.NewReader(conn).ReadString('\n') if err != nil { log.Printf("(%d) Error receiving request from load balancer server : %s\n", connectionId, err) _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } requestMessageBytes, _ := b64.StdEncoding.DecodeString(strings.TrimSpace(encodedRequestMessage)) requestMessage := string(requestMessageBytes) log.Printf("(%d) Received new request message : %s", connectionId, requestMessage) requestMessagePrefix := c.ClientConfig.RequestMessagePrefix if !strings.HasPrefix(requestMessage, requestMessagePrefix) { log.Printf("(%d) Request message has to start with %s", connectionId, requestMessagePrefix) _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } urlToFetch := strings.TrimSpace(strings.TrimPrefix(requestMessage, requestMessagePrefix)) pageBody, err := fetchPage(urlToFetch, connectionId) if err != nil { _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } pageBodyBase64 := b64.StdEncoding.EncodeToString([]byte(pageBody)) _, err = conn.Write([]byte(pageBodyBase64 + "\n")) if err != nil { log.Printf("(%d) Cannot write to the load balancer server : %s", connectionId, err) _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } log.Printf("(%d) Response sent. Waiting for a new job", connectionId) } } } // fetchPage fetches page from desired url using random proxy func fetchPage(url string, connectionId int) (string, error) { // create request req, err := http.NewRequest("GET", url, nil) if err != nil { log.Printf("(%d) Cannot create GET request to the %s : %s\n", connectionId, url, err) return "", err } // create httpClient to execute request httpClient, err := getHttpClient(connectionId) if err != nil { return "", err } // use the http client to fetch the page resp, err2 := httpClient.Do(req) if err2 != nil { log.Printf("(%d) [PROXY] Cannot GET page %s : %s\n", connectionId, url, err2) return "", err2 } // TODO: Handle error defer resp.Body.Close() pageBody, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("(%d) [PROXY] Cannot read body of %s : %s\n", connectionId, url, err) return "", err } return string(pageBody), nil } // getHttpClient creates client with proxy connection; Proxy is selected randomly from list of proxies func getHttpClient(connectionId int) (*http.Client, error) { proxyList := getAllProxies() // setup a http client httpTransport := &http.Transport{} httpClient := &http.Client{Transport: httpTransport} if len(proxyList) == 0 { log.Printf("(%d) [PROXY] No proxy found, will continue without proxy!\n", connectionId) return httpClient, nil } // get random proxy from the list selectedProxy := proxyList[rand.Intn(len(proxyList))] if selectedProxy.Type == "https" { proxyUrl, err := url.Parse("http://" + selectedProxy.Address) if err != nil { log.Printf("(%d) [PROXY] Cannot parse proxy address (%s) %s : %s\n", connectionId, selectedProxy.Type, selectedProxy.Address, err) return nil, err } httpTransport.Proxy = http.ProxyURL(proxyUrl) //myClient := &http.Client{Transport: &http.Transport{Proxy: http.ProxyURL(proxyUrl)}} } if selectedProxy.Type == "socks5" { dialer, err := proxy.SOCKS5("tcp", selectedProxy.Address, nil, proxy.Direct) if err != nil { log.Printf("(%d) [PROXY] Cannot create connection to the proxy (%s) %s : %s\n", connectionId, selectedProxy.Type, selectedProxy.Address, err) return nil, err } httpTransport.Dial = dialer.Dial } log.Printf("(%d) [PROXY] Selected proxy (%s) %s\n", connectionId, selectedProxy.Type, selectedProxy.Address) return httpClient, nil } // getAllProxies combines all proxy types in one slice func getAllProxies() []structures.ProxyServer { return append(getProxiesList("socks5"), getProxiesList("https")...) } // getProxiesList fetches list of proxies of specific type (valid types are : "socks5", "https") func getProxiesList(proxyType string) []structures.ProxyServer { switch proxyType { case "https": case "socks5": break default: log.Printf("%s is not a valid proxy type", proxyType) return []structures.ProxyServer{} } proxyListUrl := c.ClientConfig.ProxyListBaseURL + proxyType resp, err := http.Get(proxyListUrl) if err != nil { log.Printf("Cannot get list of proxies [%s] : %s", proxyType, err) return []structures.ProxyServer{} } defer resp.Body.Close() proxyList, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("Cannot read response for a list of proxies [%s] : %s", proxyType, err) return []structures.ProxyServer{} } proxyAddresses := strings.Split(strings.TrimSpace(string(proxyList)), "\n") var result []structures.ProxyServer for _, addr := range proxyAddresses { result = append(result, structures.ProxyServer{Type: proxyType, Address: strings.TrimSpace(addr)}) } return result }