The clean up for ResidentTaskIntegrationTest would fail sometimes
because Marathon would not free all reserved resources. It seems the
code ended up in a loop trying to delete an already failed task.
Resident tasks can be in a failed state when they are restarted to fast.
This change also removes the KillStreamWatcher since KillServiceActor is
already watching the event bus for state changes.
IIRC, knownInstance potentially carries more information, but the KillAction ins generally based on the passed taskIds. If Marathon receives a status update for an unknown non-terminal task, it will kill that task. So we might end up with a ToKill that contains a taskId but no knownInstance. Since you default to false, allTerminal will in that case never result to true.
Furthermore, the KillServiceActor listens to UnknownInstanceTerminated events. The idea here is that even if the task associated with a taskId is already terminal, issuing a kill request will end up as such event in the actor. The ToKill structure is initialized with only non-terminal taskIds, so the following check doesn't add any more functionality AFAICT.
Should the || task.status.mesosStatus.exists(taskStatus => Task.Terminated.isTerminated(taskStatus.getState) check be moved to KillServiceActor.killInstances?