Also:
- fix issue in which CommandFailed would not sent
- remove magic exception catching logic from withLockFor
Also:
n/a
Automatic diff as part of commit; lint not applicable. |
Automatic diff as part of commit; unit tests not applicable. |
Path | Packages | |||
---|---|---|---|---|
M | src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala (61 lines) |
Commit | Tree | Parents | Author | Summary | Date |
---|---|---|---|---|---|
974ee7a464c6 | d4569978be16 | 97494535d49f | Tim Harper | Log lock failures in MarathonSchedulerActor (Show More…) | Mar 13 2017, 9:33 PM |
Show All 24 Lines | ||||
25 | 25 | import org.apache.mesos | ||
---|---|---|---|---|
26 | 26 | import org.apache.mesos.Protos.{ Status, TaskState } | ||
27 | 27 | import org.apache.mesos.SchedulerDriver | ||
28 | 28 | import org.slf4j.LoggerFactory | ||
29 | 29 | |||
30 | 30 | import scala.async.Async.{ async, await } | ||
31 | 31 | import scala.concurrent.{ ExecutionContext, Future } | ||
32 | 32 | import scala.util.control.NonFatal | ||
33 | import scala.util.{ Failure, Success, Try } | |||
33 | import scala.util.{ Failure, Success } | |||
34 | 34 | |||
35 | class LockingFailedException(msg: String) extends Exception(msg) | |||
36 | ||||
37 | 35 | class MarathonSchedulerActor private ( | ||
38 | 36 | groupRepository: GroupRepository, | ||
39 | 37 | schedulerActions: SchedulerActions, | ||
40 | 38 | deploymentManager: DeploymentManager, | ||
41 | 39 | deploymentRepository: DeploymentRepository, | ||
42 | 40 | historyActorProps: Props, | ||
43 | 41 | healthCheckManager: HealthCheckManager, | ||
44 | 42 | killService: KillService, | ||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Line(s) | ||||
144 | 142 | |||
145 | 143 | case ReconcileHealthChecks => | ||
146 | 144 | schedulerActions.reconcileHealthChecks() | ||
147 | 145 | |||
148 | 146 | case ScaleRunSpecs => scaleRunSpecs() | ||
149 | 147 | |||
150 | 148 | case cmd @ ScaleRunSpec(runSpecId) => | ||
151 | 149 | logger.debug("Receive scale run spec for {}", runSpecId) | ||
152 | val origSender = sender() | |||
153 | @SuppressWarnings(Array("all")) /* async/await */ | |||
154 | def scaleAndAnswer(): Done = { | |||
155 | val res: Future[Done] = async { | |||
156 | await(schedulerActions.scale(runSpecId)) | |||
150 | ||||
151 | withLockFor(Set(runSpecId)) { | |||
152 | val result: Future[Event] = schedulerActions.scale(runSpecId).map { _ => | |||
157 | 153 | self ! cmd.answer | ||
158 | Done | |||
154 | cmd.answer | |||
155 | }.recover { case ex => CommandFailed(cmd, ex) } | |||
156 | if (sender != context.system.deadLetters) | |||
157 | result.pipeTo(sender) | |||
158 | } match { | |||
159 | case None => | |||
160 | // ScaleRunSpec is not a user initiated command | |||
161 | logger.debug(s"Did not try to scale run spec ${runSpecId}; it is locked") | |||
162 | case _ => | |||
159 | 163 | } | ||
160 | 164 | |||
161 | if (origSender != context.system.deadLetters) { | |||
162 | res.asTry.onComplete { | |||
163 | case Success(_) => origSender ! cmd.answer | |||
164 | case Failure(t) => origSender ! CommandFailed(cmd, t) | |||
165 | } | |||
166 | } | |||
167 | Done | |||
168 | } | |||
169 | withLockFor(runSpecId) { scaleAndAnswer() } | |||
170 | ||||
171 | 165 | case cmd @ CancelDeployment(plan) => | ||
172 | 166 | deploymentManager.cancel(plan).onComplete{ | ||
173 | 167 | case Success(d) => self ! cmd.answer | ||
174 | 168 | case Failure(e) => logger.error(s"Failed to cancel a deployment ${plan.id} due to: ", e) | ||
175 | 169 | } | ||
176 | 170 | |||
177 | 171 | case cmd @ Deploy(plan, force) => | ||
178 | 172 | deploy(sender(), cmd) | ||
179 | 173 | |||
180 | 174 | case cmd @ KillTasks(runSpecId, tasks) => | ||
181 | 175 | @SuppressWarnings(Array("all")) /* async/await */ | ||
182 | 176 | def killTasks(): Future[Event] = { | ||
183 | 177 | logger.debug("Received kill tasks {} of run spec {}", tasks, runSpecId) | ||
184 | 178 | async { | ||
185 | 179 | await(killService.killInstances(tasks, KillReason.KillingTasksViaApi)) | ||
186 | 180 | await(schedulerActions.scale(runSpecId)) | ||
187 | 181 | self ! cmd.answer | ||
188 | 182 | cmd.answer | ||
189 | 183 | }.recover { | ||
190 | 184 | case t: Throwable => | ||
191 | 185 | CommandFailed(cmd, t) | ||
192 | 186 | } | ||
193 | 187 | } | ||
194 | 188 | |||
195 | val result = withLockFor(runSpecId) { | |||
189 | withLockFor(Set(runSpecId)) { | |||
196 | 190 | killTasks().pipeTo(sender) | ||
191 | } match { | |||
192 | case None => | |||
193 | // KillTasks is user initiated. If we don't process it, then we should make it obvious as to why. | |||
194 | logger.warn( | |||
195 | s"Could not acquire lock while killing tasks ${tasks.map(_.instanceId).toList} for ${runSpecId}") | |||
196 | case _ => | |||
197 | 197 | } | ||
198 | 198 | |||
199 | 199 | case DeploymentFinished(plan) => | ||
200 | 200 | removeLocks(plan.affectedRunSpecIds) | ||
More or less... ¯\_(ツ)_/¯ We had multiple layers of Try, before. I moved those and decided it'd be better to log the failure rather than silently drop it. I looked again and the killTasks function will not throw (body is async). So, I just removed the logging and extra Try altogether. Hope this is okay with you. | ||||
201 | 201 | deploymentSuccess(plan) | ||
202 | 202 | |||
203 | 203 | case DeploymentFailed(plan, reason) => | ||
204 | 204 | removeLocks(plan.affectedRunSpecIds) | ||
205 | 205 | deploymentFailed(plan, reason) | ||
206 | 206 | |||
207 | 207 | case RunSpecScaled(id) => removeLock(id) | ||
208 | 208 | |||
209 | 209 | case TasksKilled(runSpecId, _) => removeLock(runSpecId) | ||
210 | 210 | |||
211 | 211 | case msg => logger.warn(s"Received unexpected message from ${sender()}: $msg") | ||
212 | 212 | } | ||
213 | 213 | |||
214 | 214 | def scaleRunSpecs(): Unit = { | ||
215 | 215 | groupRepository.root().foreach { root => | ||
216 | 216 | root.transitiveRunSpecs.foreach(spec => self ! ScaleRunSpec(spec.id)) | ||
217 | 217 | } | ||
218 | 218 | } | ||
219 | 219 | |||
220 | 220 | /** | ||
221 | 221 | * Tries to acquire the lock for the given runSpecIds. | ||
222 | * If it succeeds it executes the given function, | |||
223 | * otherwise the result will contain an LockingFailedException. | |||
222 | * If it succeeds it evalutes the by name reference, returning Some(result) | |||
223 | * Otherwise, returns None, which should be interpretted as lock acquisition failure | |||
224 | * | |||
225 | * @param runSpecIds the set of runSpecIds for which to acquire the lock | |||
226 | * @param f the by-name reference that is evaluated if the lock acquisition is successful | |||
224 | 227 | */ | ||
225 | def withLockFor[A](runSpecIds: Set[PathId])(f: => A): Try[A] = { | |||
228 | def withLockFor[A](runSpecIds: Set[PathId])(f: => A): Option[A] = { | |||
226 | 229 | // there's no need for synchronization here, because this is being | ||
227 | 230 | // executed inside an actor, i.e. single threaded | ||
228 | 231 | if (noConflictsWith(runSpecIds)) { | ||
229 | 232 | addLocks(runSpecIds) | ||
230 | Try(f) | |||
233 | Some(f) | |||
231 | 234 | } else { | ||
232 | Failure(new LockingFailedException("Failed to acquire locks.")) | |||
235 | None | |||
233 | 236 | } | ||
234 | 237 | } | ||
235 | 238 | |||
236 | 239 | def noConflictsWith(runSpecIds: Set[PathId]): Boolean = { | ||
237 | 240 | val conflicts = lockedRunSpecs.keySet intersect runSpecIds | ||
238 | 241 | conflicts.isEmpty | ||
239 | 242 | } | ||
240 | 243 | |||
241 | 244 | def removeLocks(runSpecIds: Set[PathId]): Unit = runSpecIds.foreach(removeLock) | ||
242 | 245 | def removeLock(runSpecId: PathId): Unit = { | ||
243 | 246 | if (lockedRunSpecs.contains(runSpecId)) { | ||
244 | 247 | val locks = lockedRunSpecs(runSpecId) - 1 | ||
245 | 248 | if (locks <= 0) lockedRunSpecs -= runSpecId else lockedRunSpecs(runSpecId) -= 1 | ||
246 | 249 | logger.debug(s"Removed lock for run spec: id=$runSpecId locks=$locks lockedRunSpec=$lockedRunSpecs") | ||
247 | 250 | } | ||
248 | 251 | } | ||
249 | 252 | |||
250 | 253 | def addLocks(runSpecIds: Set[PathId]): Unit = runSpecIds.foreach(addLock) | ||
251 | 254 | def addLock(runSpecId: PathId): Unit = { | ||
252 | 255 | lockedRunSpecs(runSpecId) += 1 | ||
253 | 256 | logger.debug(s"Added to lock for run spec: id=$runSpecId locks=${lockedRunSpecs(runSpecId)} lockedRunSpec=$lockedRunSpecs") | ||
254 | 257 | } | ||
255 | ||||
256 | /** | |||
257 | * Tries to acquire the lock for the given runSpecId. | |||
258 | * If it succeeds it executes the given function, | |||
259 | * otherwise the result will contain an AppLockedException. | |||
260 | */ | |||
261 | def withLockFor[A](runSpecId: PathId)(f: => A): Try[A] = | |||
262 | withLockFor(Set(runSpecId))(f) | |||
263 | 258 | |||
264 | 259 | // there has to be a better way... | ||
265 | 260 | @SuppressWarnings(Array("OptionGet")) | ||
266 | 261 | def driver: SchedulerDriver = marathonSchedulerDriverHolder.driver.get | ||
267 | 262 | |||
268 | 263 | def deploy(origSender: ActorRef, cmd: Deploy): Unit = { | ||
269 | 264 | val plan = cmd.plan | ||
270 | 265 | val runSpecIds = plan.affectedRunSpecIds | ||
▲ Show 20 Lines • Show All 281 Lines • Show Last 20 Lines |
Not sure this is really necessary ;).