package main import ( "bitbucket.org/outfrontmedia/vendor-scheduler-service-golang/internal/pkg/onexqueue" "encoding/json" "fmt" "io" "log" "net/http" "os" "sync" "time" ) var netClient = &http.Client{} func init() { tr := &http.Transport{ MaxIdleConns: 0, MaxIdleConnsPerHost: 0, } netClient = &http.Client{Transport: tr} } var numberMessagesToProcess = 15000 var maxConnections = 500 func main() { defer func(start time.Time) { elapsed := time.Since(start) log.Printf("vendorscheduler.main took %s", elapsed) }(time.Now()) conn, ch := onexqueue.Connect(os.Getenv("ONEX_QUEUE_URL")) defer conn.Close() defer ch.Close() messages := onexqueue.Consume(ch, "hello") apiWork := make(chan onexqueue.Message) apiResults := make(chan string) go func() { for { result, ok := <-apiResults if !ok { return } log.Print(result) } }() wg := sync.WaitGroup{} for i := 0; i < maxConnections; i++ { wg.Add(1) go func() { for { work, ok := <-apiWork if !ok { wg.Done() return } _ = work CallVistarAPI(apiResults) } }() } queueCount := 0 for d := range messages { queueCount++ msg := onexqueue.Message{} _ = json.Unmarshal(d.Body, &msg) log.Printf("Received message %d", queueCount) apiWork <- msg if numberMessagesToProcess == queueCount { break } } close(apiWork) wg.Wait() close(apiResults) } func CallVistarAPI(apiResults chan string) { r, err := netClient.Get(os.Getenv("VISTAR_API_URL")) if err != nil { apiResults <- fmt.Sprintf("ERROR: %s", err) return } defer r.Body.Close() body, err := io.ReadAll(r.Body) if err != nil { apiResults <- fmt.Sprintf("%s", err) return } apiResults <- fmt.Sprintf("Response received: %v", string(body)) }