package main import ( "bufio" "bytes" "crypto/sha256" "encoding/json" "fmt" "io" "io/ioutil" "log" "mime/multipart" "net/http" "net/url" "os" "os/exec" "strconv" "strings" "time" "deadbeef.codes/steven/cyclescaler" ) // Status contains data parsed from the blender stdoutput, this gets passed to the master in the heartbeat sent every 30 seconds type Status struct { Frame string Mem string PeakMem string Output string } var ( status *Status masterURL string slaveName string blenderPath string so *cyclescaler.SlaveOrder ) // environment configuration func init() { if len(os.Getenv("csmasterurl")) == 0 { log.Fatalf("environment variable csmasterurl is not set") } if len(os.Getenv("csslavename")) == 0 { log.Fatalf("environment variable csslavename is not set") } masterURL = os.Getenv("csmasterurl") slaveName = os.Getenv("csslavename") // Path to the blender executable. This depends on the container filesystem. // Because we're T H I C C and using Ubuntu right now (containers aren't VMs!) we use a reasonable path. blenderPath = "/usr/local/blender/blender" status = &Status{} so = &cyclescaler.SlaveOrder{} status.Output = "initializing slave" log.Print("starting cyclescaler slave...") log.Printf("hi, my name is: %s", slaveName) log.Printf("my master is located at: %s", masterURL) } // Main program loop func main() { go heartBeat() for { var err error log.Print("contacting master for next orders...") status.Output = "contacting master for next orders..." so, err = getWork() if err != nil { log.Printf("couldn't get work: %v", err) time.Sleep(time.Second * 25) // We literally just sleep for 25 seconds before asking for work again. TBD: pubsub continue } log.Print("caching .blend file locally...") status.Output = "retrieving .blend file from master..." err = getBlendFile(so.Token) if err != nil { log.Printf("couldn't get blend file for work: %v", err) time.Sleep(time.Minute * 5) continue } log.Print("Ready to render, initializing blender...") status.Output = "Initializing blender..." err = render() if err != nil { log.Printf("rendering token '%s', frame '%d' failed: %v", so.Token, so.FrameNumber, err) time.Sleep(time.Minute * 5) continue } log.Print("Posting work to master...") status.Output = "Uploading rendered frame to master..." err = postWork() if err != nil { log.Printf("failed to post frame '%d' for token '%s': %v", so.FrameNumber, so.Token, err) continue } } } // Sends heartbeat to master every 15 seconds to provide a status update func heartBeat() { var missedHeartbeats int for { time.Sleep(time.Second * 15) if so == nil { continue } if (*so).Token == "" { continue } v := url.Values{} v.Add("token", so.Token) v.Add("framenumber", strconv.Itoa(so.FrameNumber)) v.Add("status", status.Output) reqURL := fmt.Sprintf("%s/heartbeat?%s", masterURL, v.Encode()) log.Printf("sending heartbeat - %s", reqURL) resp, err := http.Get(reqURL) if err != nil { log.Printf("http GET error: %v", err) continue } if resp.StatusCode != 200 { missedHeartbeats++ log.Printf("heartbeat response HTTP status code is '%s', expected 200, master offline?", resp.Status) log.Printf("total consecutive missed heartbeats: %d", missedHeartbeats) log.Printf("rendering will continue and work will be uploaded once the master is available") continue } missedHeartbeats = 0 } } // contacts master asking for and receiving next orders func getWork() (*cyclescaler.SlaveOrder, error) { v := url.Values{} v.Add("slavename", slaveName) resp, err := http.Get(fmt.Sprintf("%s/getwork?%s", masterURL, v.Encode())) if err != nil { return nil, fmt.Errorf("http get request to master failed: %v", err) } so := &cyclescaler.SlaveOrder{} defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %v", err) } if string(body) == "none" { return nil, fmt.Errorf("no work available") } err = json.Unmarshal(body, so) if err != nil { return nil, fmt.Errorf("failed to unmarshal response body to cyclescaler.SlaveOrder{}: %v", err) } return so, nil } // If it doesn't exist, downloads the hash and blend file. // Verifies hash of current file with what master has provided func getBlendFile(token string) error { blendFileName := fmt.Sprintf("%s.blend", token) hashFileName := fmt.Sprintf("%s.sha256", token) hash := sha256.New() if _, err := os.Stat(hashFileName); os.IsNotExist(err) { //if it doesn't exist, download it out, err := os.Create(hashFileName) if err != nil { return fmt.Errorf("failed to create file '%s': %v", hashFileName, err) } defer out.Close() resp, err := http.Get(fmt.Sprintf("%s/public/blendfiles/%s.sha256", masterURL, token)) if err != nil { return fmt.Errorf("failed to download hash for blend file from master: %v", err) } defer resp.Body.Close() _, err = io.Copy(out, resp.Body) if err != nil { return fmt.Errorf("failed to save downloaded hash for blend file to local file: %v", err) } } remoteHash, err := ioutil.ReadFile(hashFileName) if err != nil { return fmt.Errorf("failed to read file '%s': %v", hashFileName, err) } if _, err := os.Stat(blendFileName); os.IsNotExist(err) { //if it doesn't exist, download it out, err := os.Create(blendFileName) if err != nil { return fmt.Errorf("failed to create file '%s': %v", blendFileName, err) } defer out.Close() resp, err := http.Get(fmt.Sprintf("%s/public/blendfiles/%s.blend", masterURL, token)) if err != nil { return fmt.Errorf("failed to download blend file from master: %v", err) } defer resp.Body.Close() multiWriter := io.MultiWriter(out, hash) _, err = io.Copy(multiWriter, resp.Body) if err != nil { return fmt.Errorf("failed to write .blend file to multiwriter (local file and hash engine): %v", err) } } else { blendFile, err := os.Open(blendFileName) if err != nil { return fmt.Errorf("failed to read file '%s': %v", blendFileName, err) } defer blendFile.Close() if _, err := io.Copy(hash, blendFile); err != nil { return fmt.Errorf("failed to copy blend file to hash engine: %v", err) } } localHash := hash.Sum(nil) if bytes.Compare(localHash, remoteHash) != 0 { os.Remove(blendFileName) os.Remove(hashFileName) return fmt.Errorf("hash of downloaded blend file does not equal hash provided by master") } return nil } // runs blender with appropriate arguments provided by slave order func render() error { //Required parameter fileName := fmt.Sprintf("%s.blend", so.Token) cmdArgs := []string{"-b", fileName, "-F", "PNG", "-o", fmt.Sprintf("%s_#####", so.Token), "-f", strconv.Itoa(so.FrameNumber)} cmd := exec.Command(blenderPath, cmdArgs...) cmdReader, err := cmd.StdoutPipe() if err != nil { return fmt.Errorf("%v: error creating stdout pipe for blender: %v", os.Stderr, err) } scanner := bufio.NewScanner(cmdReader) go func() { for scanner.Scan() { outputFields := strings.Split(scanner.Text(), " | ") //0 = frame/mem //1 = time //2 = mem //3 = scene //4 = output if strings.HasPrefix(outputFields[0], "Fra:") { frameMem := strings.Split(strings.TrimPrefix(outputFields[0], "Fra:"), " ") status.Frame = frameMem[0] status.Mem = frameMem[1] status.PeakMem = frameMem[4] if len(outputFields) > 4 { status.Output = fmt.Sprintf("%v", outputFields[4:]) } } fmt.Printf("Frame: %s | Status: %s\n", status.Frame, status.Output) fmt.Println(scanner.Text()) } }() err = cmd.Start() if err != nil { return fmt.Errorf("%v: error starting blender: %v", os.Stderr, err) } err = cmd.Wait() if err != nil { return fmt.Errorf("%v error waiting for blender to finish: %v", os.Stderr, err) } if status.Frame == "" { //Reached last frame _, err := http.Get(fmt.Sprintf("%s/setfinalframe?token=%s&finalframe=%d", masterURL, so.Token, so.FrameNumber-1)) if err != nil { return fmt.Errorf("failed to inform master last frame was reached: %v", err) } } return nil } // validates work and the uploads it to master func postWork() error { pngFileName := "" dir, err := os.Open(".") if err != nil { return fmt.Errorf("failed to open current directory: %v", err) } defer dir.Close() list, _ := dir.Readdirnames(0) for _, name := range list { if strings.Contains(name, ".png") { pngFileName = name } } if pngFileName == "" { return fmt.Errorf("failed to local .png file in local directory - be sure .blend is not configured to save render outside of './'") } bodyBuf := &bytes.Buffer{} bodyWriter := multipart.NewWriter(bodyBuf) // this step is very important fileWriter, err := bodyWriter.CreateFormFile("uploadfile", pngFileName) if err != nil { return fmt.Errorf("failed creating form writer: %v", err) } // open file handle fh, err := os.Open(pngFileName) if err != nil { return fmt.Errorf("error opening file '%s' : %v", pngFileName, err) } defer fh.Close() //iocopy _, err = io.Copy(fileWriter, fh) if err != nil { return fmt.Errorf("failed to copy png file '%s' to form file writer: %v", pngFileName, err) } contentType := bodyWriter.FormDataContentType() tokenWriter, err := bodyWriter.CreateFormField("token") if err != nil { return fmt.Errorf("failed creating form writer: %v", err) } tokenWriter.Write([]byte(so.Token)) frameNumberWriter, err := bodyWriter.CreateFormField("framenumber") if err != nil { return fmt.Errorf("failed creating form writer: %v", err) } frameNumberWriter.Write([]byte(strconv.Itoa(so.FrameNumber))) bodyWriter.Close() backoffMultiplier := 1 for { resp, err := http.Post(fmt.Sprintf("%s/postwork", masterURL), contentType, bodyBuf) if err != nil { return fmt.Errorf("http post error: %v", err) } if resp.StatusCode == 200 { break } backoff := 30 * backoffMultiplier log.Printf("failed to post work to master: received response http status '%s', expected 200", resp.Status) log.Printf("Retrying in %d seconds...", backoff) time.Sleep(time.Second * time.Duration(backoff)) if backoffMultiplier < 60 { backoffMultiplier++ } } err = os.Remove(pngFileName) if err != nil { return fmt.Errorf("failed remove png file after upload: %v", err) } return nil }