Workers - what can they do and what do they look like? Lets make a model in Go.

Today I want to talk about work in software. So naturally/unnaturally I gravitate towards ... orchestrators, for reasons that will become clear shortly. Borg and K8's come to mind. What do these have in common and what do they contain that makes them indispensable to modern computing. I think its the ability to make distributed systems so easy to deploy (K8s). Perhaps also that it hides much of the complexity of failures, and writing software to handle these cases. In my opinion what I love about them is the concept of work. It's the same reason I like the MapReduce system, and the concept of doing work as a whole resonates with me. No this isn't an AI assistant writing for me, I use the word resonate because I like that word. So in this post I will model a Worker, and what does that actually mean for the system, ie. What do Workers do? 


Manager (M) , Workers (W) and KVstores (KV) are the system we will think about today.

Lets start with the following: 

  • Worker comes to work and registers to take assignments.
  • Worker gets stuff to do and keeps a record of stuff completed.
  • Worker stops working and cleans up his work station.
So what would a worker have as his/her characteristics? Im thinking the need to know who the manager is would be needed, where to put stuff (data or results) and what state the Worker is in. Also let's assume that the Worker and the manager are remote just to make it fun. So then the Worker will need to listen for the manager to call and specifically keep track of any work given by the manager. 
Ok so lets start with the properties of our Worker and construct our Worker as well.

type Worker struct {
ctx context.Context
sync.Mutex
AllWorkerAddresses []string
ID uuid.UUID
Queue common.Queue
Channel chan string
State wState
Address string
l net.Listener
managerIP string
shutdown chan struct{}
kVStore map[string]string
storageFile string
EncodedData [][]byte
}
func CreateWorker(address string) (wk *Worker) {
wk = new(Worker)
wk.Channel = make(chan string)
wk.ID = uuid.New()
wk.State = Waiting
wk.Queue = common.Queue{Items: make([]string, 0, 5)}
wk.Address = address
wk.shutdown = make(chan struct{}, 1)
wk.initKVStore()
wk.loadFromFile()
return
}

Then this Worker should register interest in doing some work or the manager might send work to the worker who is perhaps away. Notice I don't talk about the KVstore much as its just some CRUD stuff on a JSON KV slice. Pro-tip: put logs, logs, and more logs when you do distributed systems design, because what you think is happening might not be ....

func (w *Worker) StartWorkerRPC() {
rpcs := rpc.NewServer()
errX := rpcs.Register(w)
if errX != nil {
log.Fatalf("failed to register worker with rpc server: %v", errX)
}
os.Remove(w.Address)
l, err := net.Listen(common.Protocol, w.Address)
if err != nil {
log.Fatalf("worker RPC server not initiated: %v", err)
}
w.l = l
fmt.Println("worker rpc seems to be live!")

go func() {
loop:
for {
select {
case <-w.shutdown:
break loop
default:
}
conn, err := w.l.Accept()
if err == nil {
go func() {
rpcs.ServeConn(conn)
conn.Close()
}()
} else {
fmt.Printf("error accepting request from client, %v", err)
}
fmt.Println("worker successfully handled RPC call")
}
}()
}

Great so now our Worker is listening for calls from the manager over RPC. And if you start working you should be able to stop working right? In this case we make a RPC method so that the manager can tell the Worker he/she has done all work and can stop.

func (w *Worker) StopWorkerRPC() error {
args := &common.WorkerShutdownArgs{}
reply := &common.WorkerShutdownReply{}
rpcName := "Worker.Shutdown"

ok := common.RpcCall(w.Address, rpcName, args, reply)
if !ok {
return fmt.Errorf("failed to call %v rpc method", rpcName)
}
return nil
}

Well ok, that looks nice for now. So what about getting work from the manager? Lets keep with the RPC communication. So the Worker will call a manager RPC method and say "I am ready to work!". 


func (w *Worker) RegisterWithManager(address string) error {
w.Lock()
defer w.Unlock()
w.managerIP = address
args := &common.RegisterArgs{WorkerAddress: w.Address}
reply := &common.RegisterResult{}
rpcName := "Manager.Register"
ok := common.RpcCall(address, rpcName, args, reply)
if !ok {
return fmt.Errorf("failed to call %v rpc method\n", rpcName)
}
return nil
}

And now our Worker is registered to do work and the manager can then assign the work to our Workers. 
This is likely also going to require a RPC method on the Worker who will in turn update its work assignments locally. 

func (wk *Worker) AssignWork(args *common.AssignWorkArgs, result *common.AssignWorkResults) error {
wk.Mutex.Lock()
defer wk.Mutex.Unlock()
result.WorkIsGiven = true
fmt.Printf("work was assigned to the Worker at %v\n", wk.Address)
return nil
}

The Worker can then either do the work immediately or assign the task to a queue that is repeatedly dequeued for the Task and the work is done in a First-In-First-Out fashion. 
And finally if the Worker should clean up after themselves, specifically their KVstore, then we need a method for that behaviour, so ...

func (w *Worker) CleanupKVStore() {
os.Remove(w.storageFile)
}

I reckon this is a pretty cool concept to describe in incomplete Go code. The point is to take an idea and run with it as fast as possible and describe behaviour in the simplest terms. If you put this code into an IDE with Go support all the imports should come automatically. I leave the Manager up to you to play with, paint in your own strokes what you wish to see happy reader! If you have no interest in doing this yourself but wish to see the code you can visit the GitHub repo at the link. Have a great week ahead and all the best!


Comments

Popular posts from this blog

HTAP Databases and the processes that keep them going - Part 1 of 4

HTAP databases - lets get distributed - Part 2 of 4

Cache and Buffer Pool Manager