package workerclient import ( "bufio" b64 "encoding/base64" "errors" c "gitlab.com/saburly/kiviscraplib/config" "gitlab.com/saburly/kiviscraplib/structures" "golang.org/x/net/proxy" "io/ioutil" "log" "math/rand" "net" "net/http" "net/url" "strings" "time" ) var proxyList []structures.ProxyServer func StartClientConnections() { rand.Seed(time.Now().Unix()) reloadProxyList(false) numberOfConnections := c.ClientConfig.ConnectionsCount for i := 0; i < numberOfConnections; i++ { go startSingleConnection(i) } go reloadProxyList(true) } func reloadProxyList(async bool) { for { proxyList = getAllProxies() log.Printf("[CLIENT] proxy list reloaded with %d servers : \n", len(proxyList)) // TODO: Add proxy health check if !async { return } time.Sleep(time.Duration(c.ClientConfig.ProxyListReloadInterval) * time.Minute) } } // startSingleConnection starts one single connection waiting for request message from load balancer server func startSingleConnection(connectionId int) { log.Printf("(%d) Starting new client connection\n", connectionId) connectionTimeout := c.ClientConfig.ConnectionTimeout for { serverAddress := c.ClientConfig.WorkerServerAddress conn, err := net.Dial("tcp", serverAddress) if err != nil { log.Printf("(%d) Cannot connect to the load balancer server on %s : %s", connectionId, serverAddress, err) time.Sleep(time.Duration(connectionTimeout) * time.Second) continue } for { timeout := make(chan bool, 1) go func() { time.Sleep(time.Duration(c.ClientConfig.WaitingTimeout) * time.Second) timeout <- true }() requestChann := make(chan bool, 1) var err error var encodedRequestMessage string go func() { encodedRequestMessage, err = bufio.NewReader(conn).ReadString('\n') requestChann <- true }() timeoutConnection := false select { case <-requestChann: timeoutConnection = false case <-timeout: timeoutConnection = true } if timeoutConnection { log.Printf("(%d) Server not sending requests for too long, closing connection\n", connectionId) _ = conn.Close() break } if err != nil { log.Printf("(%d) Error receiving request from load balancer server : %s\n", connectionId, err) _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } requestMessageBytes, _ := b64.StdEncoding.DecodeString(strings.TrimSpace(encodedRequestMessage)) requestMessage := string(requestMessageBytes) log.Printf("(%d) Received new request message : %s", connectionId, requestMessage) requestMessagePrefix := c.ClientConfig.RequestMessagePrefix if !strings.HasPrefix(requestMessage, requestMessagePrefix) { log.Printf("(%d) Request message has to start with %s", connectionId, requestMessagePrefix) _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } urlToFetch := strings.TrimSpace(strings.TrimPrefix(requestMessage, requestMessagePrefix)) pageBody, err := fetchPage(urlToFetch, connectionId) if err != nil { _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } pageBodyBase64 := b64.StdEncoding.EncodeToString([]byte(pageBody)) _, err = conn.Write([]byte(pageBodyBase64 + "\n")) if err != nil { log.Printf("(%d) Cannot write to the load balancer server : %s", connectionId, err) _ = conn.Close() time.Sleep(time.Duration(connectionTimeout) * time.Second) break } log.Printf("(%d) Response sent. Waiting for a new job", connectionId) } } } // fetchPage fetches page from desired url using random proxy func fetchPage(url string, connectionId int) (string, error) { // create request req, err := http.NewRequest("GET", url, nil) if err != nil { log.Printf("(%d) Cannot create GET request to the %s : %s\n", connectionId, url, err) return "", err } // create httpClient to execute request httpClient, err := getHttpClient(connectionId) if err != nil { return "", err } // use the http client to fetch the page resp, err2 := httpClient.Do(req) if err2 != nil { log.Printf("(%d) [PROXY] Cannot GET page %s : %s\n", connectionId, url, err2) return "", err2 } // TODO: Handle error defer resp.Body.Close() pageBody, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("(%d) [PROXY] Cannot read body of %s : %s\n", connectionId, url, err) return "", err } return string(pageBody), nil } // getHttpClient creates client with proxy connection; Proxy is selected randomly from list of proxies func getHttpClient(connectionId int) (*http.Client, error) { // setup a http client httpTransport := &http.Transport{} httpClient := &http.Client{Transport: httpTransport} if len(proxyList) == 0 { log.Printf("(%d) [PROXY] No proxy found, will continue without proxy!\n", connectionId) return httpClient, nil } // get random proxy from the list proxyListLength := len(proxyList) var selectedProxy structures.ProxyServer if proxyListLength > 0 { selectedProxy = proxyList[rand.Intn(proxyListLength)] } switch selectedProxy.Type { case "https": proxyUrl, err := url.Parse("http://" + selectedProxy.Address) if err != nil { log.Printf("(%d) [PROXY] Cannot parse proxy address (%s) %s : %s\n", connectionId, selectedProxy.Type, selectedProxy.Address, err) return nil, err } httpTransport.Proxy = http.ProxyURL(proxyUrl) break case "socks5": dialer, err := proxy.SOCKS5("tcp", selectedProxy.Address, nil, proxy.Direct) if err != nil { log.Printf("(%d) [PROXY] Cannot create connection to the proxy (%s) %s : %s\n", connectionId, selectedProxy.Type, selectedProxy.Address, err) return nil, err } httpTransport.Dial = dialer.Dial break default: log.Printf("(%d) [PROXY] Failed to select proxy\n", connectionId) return nil, errors.New("failed to select proxy") } log.Printf("(%d) [PROXY] Selected proxy (%s) %s\n", connectionId, selectedProxy.Type, selectedProxy.Address) return httpClient, nil } // getAllProxies combines all proxy types in one slice func getAllProxies() []structures.ProxyServer { var result []structures.ProxyServer result = append(result, getProxiesList("socks5")...) result = append(result, getProxiesList("https")...) result = append(result, c.ClientConfig.CustomSOCKS5ProxyList...) return result } // getProxiesList fetches list of proxies of specific type (valid types are : "socks5", "https") func getProxiesList(proxyType string) []structures.ProxyServer { if len(c.ClientConfig.ProxyListBaseURL) == 0 { return []structures.ProxyServer{} } switch proxyType { case "https": case "socks5": break default: log.Printf("%s is not a valid proxy type", proxyType) return []structures.ProxyServer{} } proxyListUrl := c.ClientConfig.ProxyListBaseURL + proxyType client := http.Client{ Timeout: time.Duration(c.ClientConfig.ProxyListTimeout) * time.Second, } resp, err := client.Get(proxyListUrl) if err != nil { log.Printf("Cannot get list of proxies [%s] : %s", proxyType, err) return []structures.ProxyServer{} } defer resp.Body.Close() proxyList, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("Cannot read response for a list of proxies [%s] : %s", proxyType, err) return []structures.ProxyServer{} } if strings.Contains(string(proxyList), "html") { // Something is wrong, expected to receive proxy list but something else is returned log.Println("Proxy list malformed") return []structures.ProxyServer{} } proxyAddresses := strings.Split(strings.TrimSpace(string(proxyList)), "\n") var result []structures.ProxyServer for _, addr := range proxyAddresses { result = append(result, structures.ProxyServer{Type: proxyType, Address: strings.TrimSpace(addr)}) } return result }