Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,8 +976,8 @@ func (m *Manager) updateTaskState(taskId string, state string) {
taskPtr.state = st
taskPtr.safeToStop = false
taskPtr.SendEvent(&event.TaskEvent{Name: taskPtr.GetName(), TaskID: taskId, State: state, Hostname: taskPtr.hostname, ClassName: taskPtr.GetClassName()})
if taskPtr.GetParent() != nil {
taskPtr.GetParent().UpdateState(st)
if parent := taskPtr.GetParent(); parent != nil {
parent.UpdateState(st)
}
}

Expand Down Expand Up @@ -1049,8 +1049,8 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
WithField("partition", envId.String()).
Debug("task active (received TASK_RUNNING event from executor)")
taskPtr.status = ACTIVE
if taskPtr.GetParent() != nil {
taskPtr.GetParent().UpdateStatus(ACTIVE)
if parent := taskPtr.GetParent(); parent != nil {
parent.UpdateStatus(ACTIVE)
}
if status.GetAgentID() != nil {
taskPtr.agentId = status.GetAgentID().GetValue()
Expand All @@ -1062,8 +1062,8 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
case mesos.TASK_DROPPED, mesos.TASK_LOST, mesos.TASK_KILLED, mesos.TASK_FAILED, mesos.TASK_ERROR, mesos.TASK_FINISHED:

taskPtr.status = INACTIVE
if taskPtr.GetParent() != nil {
taskPtr.GetParent().UpdateStatus(INACTIVE)
if parent := taskPtr.GetParent(); parent != nil {
parent.UpdateStatus(INACTIVE)
}
}
taskPtr.SendEvent(&event.TaskEvent{Name: taskPtr.GetName(), TaskID: taskId, Status: taskPtr.status.String(), Hostname: taskPtr.hostname, ClassName: taskPtr.GetClassName()})
Expand Down
47 changes: 23 additions & 24 deletions core/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ func (t *Task) GetControlMode() controlmode.ControlMode {
// If it's a BASIC task but its parent role uses it as a HOOK,
// we modify the actual control mode of the task.
// The class itself can never be HOOK, only BASIC
if class.Control.Mode == controlmode.BASIC && t.GetParent() != nil {
traits := t.GetParent().GetTaskTraits()
parent := t.GetParent()
if class.Control.Mode == controlmode.BASIC && parent != nil {
traits := parent.GetTaskTraits()
if len(traits.Trigger) > 0 {
return controlmode.HOOK
}
Expand Down Expand Up @@ -409,7 +410,7 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) {
// If it's a HOOK, we must pass the Timeout to the TCI for
// executor-side timeout enforcement
if cmd.ControlMode == controlmode.HOOK || cmd.ControlMode == controlmode.BASIC {
traits := t.GetParent().GetTaskTraits()
traits := role.GetTaskTraits()
cmd.Timeout, err = time.ParseDuration(traits.Timeout)
}

Expand Down Expand Up @@ -579,7 +580,8 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
propMap = make(controlcommands.PropertyMap)
if class := t.GetTaskClass(); class != nil {
if class.Control.Mode != controlmode.BASIC { // if it's NOT a basic task or hook, we template the props
if t.GetParent() == nil {
parent := t.GetParent()
if parent == nil {
err = fmt.Errorf("cannot build property map for parentless task %s (id %s)", t.name, t.taskId)
return
}
Expand All @@ -590,7 +592,7 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
// First we get the full varStack from the parent role, and
// consolidate it.
var varStack map[string]string
varStack, err = t.GetParent().ConsolidatedVarStack()
varStack, err = parent.ConsolidatedVarStack()
if err != nil {
err = fmt.Errorf("cannot fetch variables stack for property map: %w", err)
return
Expand All @@ -616,7 +618,7 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand

// Finally we build the task-specific special values, and write them
// into the varStack (overwriting anything).
specialVarStack := t.buildSpecialVarStack(t.GetParent())
specialVarStack := t.buildSpecialVarStack(parent)
for k, v := range specialVarStack {
varStack[k] = v
}
Expand All @@ -634,7 +636,7 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
// For FAIRMQ tasks, we append FairMQ channel configuration
if class.Control.Mode == controlmode.FAIRMQ ||
class.Control.Mode == controlmode.DIRECT {
for _, inbCh := range channel.MergeInbound(t.GetParent().CollectInboundChannels(), class.Bind) {
for _, inbCh := range channel.MergeInbound(parent.CollectInboundChannels(), class.Bind) {
// We get the FairMQ-formatted propertyMap from the inbound channel spec
var chanProps controlcommands.PropertyMap
chanProps, err = inbCh.ToFMQMap(t.localBindMap)
Expand All @@ -647,7 +649,7 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
propMap[k] = v
}
}
for _, outboundCh := range channel.MergeOutbound(t.GetParent().CollectOutboundChannels(), class.Connect) {
for _, outboundCh := range channel.MergeOutbound(parent.CollectOutboundChannels(), class.Connect) {
// We get the FairMQ-formatted propertyMap from the outbound channel spec
var chanProps controlcommands.PropertyMap
chanProps, err = outboundCh.ToFMQMap(bindMap)
Expand All @@ -672,7 +674,7 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
err = fields.Execute(the.ConfSvc(), t.name, varStack, objStack, nil, make(map[string]texttemplate.Template), nil)
if err != nil {
log.WithError(err).
WithField("partition", t.GetParent().GetEnvironmentId().String()).
WithField("partition", parent.GetEnvironmentId().String()).
WithField("detector", detector).
Error("cannot resolve templates for property map")
return
Expand Down Expand Up @@ -718,13 +720,14 @@ func (t *Task) GetMesosCommandTarget() controlcommands.MesosCommandTarget {
}

func (t *Task) GetProperties() map[string]string {
t.mu.RLock()
defer t.mu.RUnlock()

if t == nil {
log.Warn("attempted to get properties of nil task")
return make(map[string]string)
}

t.mu.RLock()
defer t.mu.RUnlock()

Comment on lines -721 to +730
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

propertiesMap, err := t.properties.Flattened()
if err != nil {
return make(map[string]string)
Expand All @@ -733,22 +736,24 @@ func (t *Task) GetProperties() map[string]string {
}

func (t *Task) setTaskPID(pid int) {
t.mu.Lock()
defer t.mu.Unlock()

if t == nil {
return
}

t.mu.Lock()
defer t.mu.Unlock()

t.pid = strconv.Itoa(pid)
}

func (t *Task) GetTaskPID() string {
t.mu.RLock()
defer t.mu.RUnlock()

if t == nil {
return ""
}

t.mu.RLock()
defer t.mu.RUnlock()

return t.pid
}

Expand All @@ -767,11 +772,5 @@ func (t *Task) SetParent(parent parentRole) {
}

func (t *Task) GetTask() *Task {
if t == nil {
return nil
}
t.mu.RLock()
defer t.mu.RUnlock()

return t
}