16232d98aSTrond Norbye /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
26232d98aSTrond Norbye /*
36232d98aSTrond Norbye  *     Copyright 2016 Couchbase, Inc.
46232d98aSTrond Norbye  *
56232d98aSTrond Norbye  *   Licensed under the Apache License, Version 2.0 (the "License");
66232d98aSTrond Norbye  *   you may not use this file except in compliance with the License.
76232d98aSTrond Norbye  *   You may obtain a copy of the License at
86232d98aSTrond Norbye  *
96232d98aSTrond Norbye  *       http://www.apache.org/licenses/LICENSE-2.0
106232d98aSTrond Norbye  *
116232d98aSTrond Norbye  *   Unless required by applicable law or agreed to in writing, software
126232d98aSTrond Norbye  *   distributed under the License is distributed on an "AS IS" BASIS,
136232d98aSTrond Norbye  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
146232d98aSTrond Norbye  *   See the License for the specific language governing permissions and
156232d98aSTrond Norbye  *   limitations under the License.
166232d98aSTrond Norbye  */
17*d4b3338bSTrond Norbye #include "scrubber_task.h"
18*d4b3338bSTrond Norbye 
196232d98aSTrond Norbye #include "default_engine_internal.h"
206232d98aSTrond Norbye #include "engine_manager.h"
216232d98aSTrond Norbye 
scrubber_task_main(void * arg)226232d98aSTrond Norbye static void scrubber_task_main(void* arg) {
236232d98aSTrond Norbye     ScrubberTask* task = reinterpret_cast<ScrubberTask*>(arg);
246232d98aSTrond Norbye     task->run();
256232d98aSTrond Norbye }
266232d98aSTrond Norbye 
ScrubberTask(EngineManager & manager)27*d4b3338bSTrond Norbye ScrubberTask::ScrubberTask(EngineManager& manager)
28*d4b3338bSTrond Norbye     : state(State::Idle),
29*d4b3338bSTrond Norbye       shuttingdown(false),
306232d98aSTrond Norbye       engineManager(manager) {
31*d4b3338bSTrond Norbye     std::unique_lock<std::mutex> lck(lock);
326232d98aSTrond Norbye     if (cb_create_named_thread(&scrubberThread, &scrubber_task_main, this, 0,
336232d98aSTrond Norbye                                "mc:item scrub") != 0) {
346232d98aSTrond Norbye         throw std::runtime_error("Error creating 'mc:item scrub' thread");
356232d98aSTrond Norbye     }
366232d98aSTrond Norbye }
376232d98aSTrond Norbye 
shutdown()386232d98aSTrond Norbye void ScrubberTask::shutdown() {
39*d4b3338bSTrond Norbye     std::unique_lock<std::mutex> lck(lock);
406232d98aSTrond Norbye     shuttingdown = true;
416232d98aSTrond Norbye     // Serialize with ::run
426232d98aSTrond Norbye     cvar.notify_one();
436232d98aSTrond Norbye }
446232d98aSTrond Norbye 
joinThread()456232d98aSTrond Norbye void ScrubberTask::joinThread() {
466232d98aSTrond Norbye     cb_join_thread(scrubberThread);
476232d98aSTrond Norbye }
486232d98aSTrond Norbye 
placeOnWorkQueue(struct default_engine * engine,bool destroy)49*d4b3338bSTrond Norbye void ScrubberTask::placeOnWorkQueue(struct default_engine* engine,
50*d4b3338bSTrond Norbye                                     bool destroy) {
516232d98aSTrond Norbye     std::lock_guard<std::mutex> lck(lock);
52*d4b3338bSTrond Norbye     if (!shuttingdown) {
536232d98aSTrond Norbye         engine->scrubber.force_delete = destroy;
546232d98aSTrond Norbye         workQueue.push_back(std::make_pair(engine, destroy));
556232d98aSTrond Norbye         cvar.notify_one();
566232d98aSTrond Norbye     }
576232d98aSTrond Norbye }
586232d98aSTrond Norbye 
run()596232d98aSTrond Norbye void ScrubberTask::run() {
606232d98aSTrond Norbye     std::unique_lock<std::mutex> lck(lock);
61*d4b3338bSTrond Norbye     while (!shuttingdown) {
626232d98aSTrond Norbye         if (!workQueue.empty()) {
636232d98aSTrond Norbye             auto engine = workQueue.front();
646232d98aSTrond Norbye             workQueue.pop_front();
65*d4b3338bSTrond Norbye             state = State::Scrubbing;
666232d98aSTrond Norbye             lck.unlock();
67*d4b3338bSTrond Norbye             // Run the task without holding the lock
686232d98aSTrond Norbye             item_scrubber_main(engine.first);
69*d4b3338bSTrond Norbye             engineManager.notifyScrubComplete(engine.first, engine.second);
706232d98aSTrond Norbye 
71*d4b3338bSTrond Norbye             // relock so lck can safely unlock when destroyed at loop end.
72*d4b3338bSTrond Norbye             lck.lock();
736232d98aSTrond Norbye         } else {
74*d4b3338bSTrond Norbye             state = State::Idle;
75*d4b3338bSTrond Norbye             cvar.wait(lck);
766232d98aSTrond Norbye         }
776232d98aSTrond Norbye     }
78*d4b3338bSTrond Norbye     state = State::Stopped;
796232d98aSTrond Norbye }