396 lines
10 KiB
Go
396 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"os/exec"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"deadbeef.codes/steven/cyclescaler"
|
|
)
|
|
|
|
// Status contains data parsed from the blender stdoutput, this gets passed to the master in the heartbeat sent every 30 seconds
|
|
type Status struct {
|
|
Frame string
|
|
Mem string
|
|
PeakMem string
|
|
Output string
|
|
}
|
|
|
|
var (
|
|
status *Status
|
|
masterURL string
|
|
slaveName string
|
|
blenderPath string
|
|
so *cyclescaler.SlaveOrder
|
|
)
|
|
|
|
// environment configuration
|
|
func init() {
|
|
if len(os.Getenv("csmasterurl")) == 0 {
|
|
log.Fatalf("environment variable csmasterurl is not set")
|
|
}
|
|
if len(os.Getenv("csslavename")) == 0 {
|
|
log.Fatalf("environment variable csslavename is not set")
|
|
}
|
|
|
|
masterURL = os.Getenv("csmasterurl")
|
|
slaveName = os.Getenv("csslavename")
|
|
|
|
// Path to the blender executable. This depends on the container filesystem.
|
|
// Because we're T H I C C and using Ubuntu right now (containers aren't VMs!) we use a reasonable path.
|
|
blenderPath = "/usr/local/blender/blender"
|
|
|
|
status = &Status{}
|
|
so = &cyclescaler.SlaveOrder{}
|
|
|
|
status.Output = "initializing slave"
|
|
log.Print("starting cyclescaler slave...")
|
|
log.Printf("hi, my name is: %s", slaveName)
|
|
log.Printf("my master is located at: %s", masterURL)
|
|
}
|
|
|
|
// Main program loop
|
|
func main() {
|
|
go heartBeat()
|
|
for {
|
|
var err error
|
|
log.Print("contacting master for next orders...")
|
|
status.Output = "contacting master for next orders..."
|
|
so, err = getWork()
|
|
if err != nil {
|
|
log.Printf("couldn't get work: %v", err)
|
|
time.Sleep(time.Second * 25) // We literally just sleep for 25 seconds before asking for work again. TBD: pubsub
|
|
continue
|
|
}
|
|
log.Print("caching .blend file locally...")
|
|
status.Output = "retrieving .blend file from master..."
|
|
err = getBlendFile(so.Token)
|
|
if err != nil {
|
|
log.Printf("couldn't get blend file for work: %v", err)
|
|
time.Sleep(time.Minute * 5)
|
|
continue
|
|
}
|
|
|
|
log.Print("Ready to render, initializing blender...")
|
|
status.Output = "Initializing blender..."
|
|
err = render()
|
|
if err != nil {
|
|
log.Printf("rendering token '%s', frame '%d' failed: %v", so.Token, so.FrameNumber, err)
|
|
time.Sleep(time.Minute * 5)
|
|
continue
|
|
}
|
|
|
|
log.Print("Posting work to master...")
|
|
status.Output = "Uploading rendered frame to master..."
|
|
err = postWork()
|
|
if err != nil {
|
|
log.Printf("failed to post frame '%d' for token '%s': %v", so.FrameNumber, so.Token, err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sends heartbeat to master every 15 seconds to provide a status update
|
|
func heartBeat() {
|
|
var missedHeartbeats int
|
|
for {
|
|
time.Sleep(time.Second * 15)
|
|
if so == nil {
|
|
continue
|
|
}
|
|
if (*so).Token == "" {
|
|
continue
|
|
}
|
|
|
|
v := url.Values{}
|
|
v.Add("token", so.Token)
|
|
v.Add("framenumber", strconv.Itoa(so.FrameNumber))
|
|
v.Add("status", status.Output)
|
|
|
|
reqURL := fmt.Sprintf("%s/heartbeat?%s", masterURL, v.Encode())
|
|
log.Printf("sending heartbeat - %s", reqURL)
|
|
resp, err := http.Get(reqURL)
|
|
if err != nil {
|
|
log.Printf("http GET error: %v", err)
|
|
continue
|
|
}
|
|
|
|
if resp.StatusCode != 200 {
|
|
missedHeartbeats++
|
|
log.Printf("heartbeat response HTTP status code is '%s', expected 200, master offline?", resp.Status)
|
|
log.Printf("total consecutive missed heartbeats: %d", missedHeartbeats)
|
|
log.Printf("rendering will continue and work will be uploaded once the master is available")
|
|
continue
|
|
}
|
|
|
|
missedHeartbeats = 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// contacts master asking for and receiving next orders
|
|
func getWork() (*cyclescaler.SlaveOrder, error) {
|
|
v := url.Values{}
|
|
v.Add("slavename", slaveName)
|
|
resp, err := http.Get(fmt.Sprintf("%s/getwork?%s", masterURL, v.Encode()))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("http get request to master failed: %v", err)
|
|
}
|
|
so := &cyclescaler.SlaveOrder{}
|
|
|
|
defer resp.Body.Close()
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read response body: %v", err)
|
|
}
|
|
if string(body) == "none" {
|
|
return nil, fmt.Errorf("no work available")
|
|
}
|
|
err = json.Unmarshal(body, so)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal response body to cyclescaler.SlaveOrder{}: %v", err)
|
|
}
|
|
|
|
return so, nil
|
|
}
|
|
|
|
// If it doesn't exist, downloads the hash and blend file.
|
|
// Verifies hash of current file with what master has provided
|
|
func getBlendFile(token string) error {
|
|
blendFileName := fmt.Sprintf("%s.blend", token)
|
|
hashFileName := fmt.Sprintf("%s.sha256", token)
|
|
hash := sha256.New()
|
|
|
|
if _, err := os.Stat(hashFileName); os.IsNotExist(err) { //if it doesn't exist, download it
|
|
out, err := os.Create(hashFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create file '%s': %v", hashFileName, err)
|
|
}
|
|
defer out.Close()
|
|
|
|
resp, err := http.Get(fmt.Sprintf("%s/public/blendfiles/%s.sha256", masterURL, token))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to download hash for blend file from master: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
_, err = io.Copy(out, resp.Body)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save downloaded hash for blend file to local file: %v", err)
|
|
}
|
|
}
|
|
|
|
remoteHash, err := ioutil.ReadFile(hashFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read file '%s': %v", hashFileName, err)
|
|
}
|
|
|
|
if _, err := os.Stat(blendFileName); os.IsNotExist(err) { //if it doesn't exist, download it
|
|
out, err := os.Create(blendFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create file '%s': %v", blendFileName, err)
|
|
}
|
|
defer out.Close()
|
|
|
|
resp, err := http.Get(fmt.Sprintf("%s/public/blendfiles/%s.blend", masterURL, token))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to download blend file from master: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
multiWriter := io.MultiWriter(out, hash)
|
|
_, err = io.Copy(multiWriter, resp.Body)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write .blend file to multiwriter (local file and hash engine): %v", err)
|
|
}
|
|
} else {
|
|
blendFile, err := os.Open(blendFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read file '%s': %v", blendFileName, err)
|
|
}
|
|
defer blendFile.Close()
|
|
|
|
if _, err := io.Copy(hash, blendFile); err != nil {
|
|
return fmt.Errorf("failed to copy blend file to hash engine: %v", err)
|
|
}
|
|
}
|
|
|
|
localHash := hash.Sum(nil)
|
|
if bytes.Compare(localHash, remoteHash) != 0 {
|
|
os.Remove(blendFileName)
|
|
os.Remove(hashFileName)
|
|
return fmt.Errorf("hash of downloaded blend file does not equal hash provided by master")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// runs blender with appropriate arguments provided by slave order
|
|
func render() error {
|
|
|
|
//Required parameter
|
|
fileName := fmt.Sprintf("%s.blend", so.Token)
|
|
cmdArgs := []string{"-b",
|
|
fileName,
|
|
"-F",
|
|
"PNG",
|
|
"-o",
|
|
fmt.Sprintf("%s_#####", so.Token),
|
|
"-f",
|
|
strconv.Itoa(so.FrameNumber)}
|
|
|
|
cmd := exec.Command(blenderPath, cmdArgs...)
|
|
cmdReader, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("%v: error creating stdout pipe for blender: %v", os.Stderr, err)
|
|
}
|
|
|
|
scanner := bufio.NewScanner(cmdReader)
|
|
|
|
go func() {
|
|
for scanner.Scan() {
|
|
outputFields := strings.Split(scanner.Text(), " | ")
|
|
//0 = frame/mem
|
|
//1 = time
|
|
//2 = mem
|
|
//3 = scene
|
|
//4 = output
|
|
|
|
if strings.HasPrefix(outputFields[0], "Fra:") {
|
|
frameMem := strings.Split(strings.TrimPrefix(outputFields[0], "Fra:"), " ")
|
|
status.Frame = frameMem[0]
|
|
status.Mem = frameMem[1]
|
|
status.PeakMem = frameMem[4]
|
|
|
|
if len(outputFields) > 4 {
|
|
status.Output = fmt.Sprintf("%v", outputFields[4:])
|
|
}
|
|
}
|
|
|
|
fmt.Printf("Frame: %s | Status: %s\n", status.Frame, status.Output)
|
|
fmt.Println(scanner.Text())
|
|
}
|
|
}()
|
|
|
|
err = cmd.Start()
|
|
if err != nil {
|
|
return fmt.Errorf("%v: error starting blender: %v", os.Stderr, err)
|
|
}
|
|
|
|
err = cmd.Wait()
|
|
if err != nil {
|
|
return fmt.Errorf("%v error waiting for blender to finish: %v", os.Stderr, err)
|
|
}
|
|
|
|
if status.Frame == "" { //Reached last frame
|
|
_, err := http.Get(fmt.Sprintf("%s/setfinalframe?token=%s&finalframe=%d", masterURL, so.Token, so.FrameNumber-1))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to inform master last frame was reached: %v", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// validates work and the uploads it to master
|
|
func postWork() error {
|
|
|
|
pngFileName := ""
|
|
|
|
dir, err := os.Open(".")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open current directory: %v", err)
|
|
}
|
|
defer dir.Close()
|
|
|
|
list, _ := dir.Readdirnames(0)
|
|
for _, name := range list {
|
|
if strings.Contains(name, ".png") {
|
|
pngFileName = name
|
|
}
|
|
}
|
|
|
|
if pngFileName == "" {
|
|
return fmt.Errorf("failed to local .png file in local directory - be sure .blend is not configured to save render outside of './'")
|
|
}
|
|
|
|
bodyBuf := &bytes.Buffer{}
|
|
bodyWriter := multipart.NewWriter(bodyBuf)
|
|
|
|
// this step is very important
|
|
fileWriter, err := bodyWriter.CreateFormFile("uploadfile", pngFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed creating form writer: %v", err)
|
|
}
|
|
|
|
// open file handle
|
|
fh, err := os.Open(pngFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("error opening file '%s' : %v", pngFileName, err)
|
|
}
|
|
defer fh.Close()
|
|
|
|
//iocopy
|
|
_, err = io.Copy(fileWriter, fh)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to copy png file '%s' to form file writer: %v", pngFileName, err)
|
|
}
|
|
|
|
contentType := bodyWriter.FormDataContentType()
|
|
|
|
tokenWriter, err := bodyWriter.CreateFormField("token")
|
|
if err != nil {
|
|
return fmt.Errorf("failed creating form writer: %v", err)
|
|
}
|
|
tokenWriter.Write([]byte(so.Token))
|
|
|
|
frameNumberWriter, err := bodyWriter.CreateFormField("framenumber")
|
|
if err != nil {
|
|
return fmt.Errorf("failed creating form writer: %v", err)
|
|
}
|
|
frameNumberWriter.Write([]byte(strconv.Itoa(so.FrameNumber)))
|
|
|
|
bodyWriter.Close()
|
|
|
|
backoffMultiplier := 1
|
|
for {
|
|
resp, err := http.Post(fmt.Sprintf("%s/postwork", masterURL), contentType, bodyBuf)
|
|
if err != nil {
|
|
return fmt.Errorf("http post error: %v", err)
|
|
}
|
|
|
|
if resp.StatusCode == 200 {
|
|
break
|
|
}
|
|
|
|
backoff := 30 * backoffMultiplier
|
|
|
|
log.Printf("failed to post work to master: received response http status '%s', expected 200", resp.Status)
|
|
log.Printf("Retrying in %d seconds...", backoff)
|
|
time.Sleep(time.Second * time.Duration(backoff))
|
|
|
|
if backoffMultiplier < 60 {
|
|
backoffMultiplier++
|
|
}
|
|
}
|
|
|
|
err = os.Remove(pngFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed remove png file after upload: %v", err)
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|