SourceXtractorPlusPlus  0.19
SourceXtractor++, the next generation SExtractor
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Prefetcher.cpp
Go to the documentation of this file.
1 
18 #include <ElementsKernel/Logging.h>
21 
23 
24 
25 namespace SourceXtractor {
26 
30 template<typename Lock>
31 struct ReverseLock {
32  explicit ReverseLock(Lock& lock) : m_lock(lock) {
33  m_lock.unlock();
34  }
35 
37  m_lock.lock();
38  }
39 
40 private:
41  Lock& m_lock;
42 };
43 
44 Prefetcher::Prefetcher(const std::shared_ptr<Euclid::ThreadPool>& thread_pool, unsigned max_queue_size)
45  : m_thread_pool(thread_pool), m_stop(false), m_semaphore(max_queue_size) {
46  m_output_thread = Euclid::make_unique<std::thread>(&Prefetcher::outputLoop, this);
47 }
48 
51  wait();
52 }
53 
56 
57  intptr_t source_addr = reinterpret_cast<intptr_t>(message.get());
58  {
60  m_received.emplace_back(EventType::SOURCE, source_addr);
61  }
62 
63  // Pre-fetch in separate threads
64  auto lambda = [this, source_addr, message = std::move(message)]() mutable {
65  for (auto& prop : m_prefetch_set) {
66  message->getProperty(prop);
67  }
68  {
70  m_finished_sources.emplace(source_addr, std::move(message));
71  }
73  };
74  auto lambda_copyable = [lambda = std::make_shared<decltype(lambda)>(std::move(lambda))](){
75  (*lambda)();
76  };
77  m_thread_pool->submit(lambda_copyable);
78 }
79 
80 void Prefetcher::requestProperty(const PropertyId& property_id) {
81  m_prefetch_set.emplace(property_id);
82  logger.debug() << "Requesting prefetch of " << property_id.getString();
83 }
84 
86  logger.debug() << "Starting prefetcher output loop";
87 
88  while (m_thread_pool->activeThreads() > 0) {
90 
91  // Wait for something new
93 
94  // Process the output queue
95  // This is, release sources when the front of the received has been processed
96  while (!m_received.empty()) {
97  auto next = m_received.front();
98  // If the front is a ProcessSourceEvent, everything received before is done,
99  // so pass downstream
100  if (next.m_event_type == EventType::PROCESS_SOURCE) {
101  auto event = m_event_queue.front();
102  m_event_queue.pop_front();
103  logger.debug() << "ProcessSourceEvent released";
104  {
105  ReverseLock<decltype(output_lock)> release_lock(output_lock);
106  sendProcessSignal(event);
107  }
108  m_received.pop_front();
109  continue;
110  }
111  // Find if the matching source is done
112  auto processed = m_finished_sources.find(next.m_source_addr);
113  // If not, we can't keep going, so exit here
114  if (processed == m_finished_sources.end()) {
115  logger.debug() << "Next source " << next.m_source_addr << " not done yet";
116  break;
117  }
118  // If it is, send it downstream
119  logger.debug() << "Source " << next.m_source_addr << " sent downstream";
120  {
121  ReverseLock<decltype(output_lock)> release_lock(output_lock);
122  sendSource(std::move(processed->second));
123  }
124  m_finished_sources.erase(processed);
125  m_received.pop_front();
127  }
128 
129  if (m_stop && m_received.empty()) {
130  break;
131  }
132  }
133  logger.debug() << "Stopping prefetcher output loop";
134 }
135 
137  {
140  m_event_queue.emplace_back(message);
141  }
143  logger.debug() << "ProcessSourceEvent received";
144 }
145 
147  m_stop = true;
149 }
150 
152  // Wait until the output queue is empty
153  while (true) {
154  {
156  if (m_received.empty()) {
157  break;
158  }
159  else if (m_thread_pool->checkForException(false)) {
160  logger.fatal() << "An exception was thrown from a worker thread";
161  m_thread_pool->checkForException(true);
162  }
163  else if (m_thread_pool->activeThreads() == 0) {
164  throw Elements::Exception() << "No active threads and the queue is not empty! Please, report this as a bug";
165  }
166  }
168  }
169 }
170 
171 } // end of namespace SourceXtractor
std::unique_ptr< std::thread > m_output_thread
Orchestration thread.
Definition: Prefetcher.h:113
std::map< intptr_t, std::unique_ptr< SourceInterface > > m_finished_sources
Finished sources.
Definition: Prefetcher.h:117
void sendProcessSignal(const ProcessSourcesEvent &event) const
Definition: PipelineStage.h:92
std::deque< EventType > m_received
Queue of type of received events. Used to pass downstream events respecting the received order...
Definition: Prefetcher.h:121
T joinable(T...args)
Prefetcher(const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
Definition: Prefetcher.cpp:44
std::set< PropertyId > m_prefetch_set
Properties to prefetch.
Definition: Prefetcher.h:111
std::atomic_bool m_stop
Termination condition for the output loop.
Definition: Prefetcher.h:126
Event received by SourceGrouping to request the processing of some of the Sources stored...
Definition: PipelineStage.h:33
T sleep_for(T...args)
void requestProperty(const PropertyId &property_id)
Definition: Prefetcher.cpp:80
static Elements::Logging logger
Definition: Prefetcher.cpp:22
std::deque< ProcessSourcesEvent > m_event_queue
Queue of received ProcessSourceEvent, order preserved.
Definition: Prefetcher.h:119
void receiveProcessSignal(const ProcessSourcesEvent &event) override
Definition: Prefetcher.cpp:136
void debug(const std::string &logMessage)
std::condition_variable m_new_output
Notifies there is a new source done processing.
Definition: Prefetcher.h:115
void receiveSource(std::unique_ptr< SourceInterface > source) override
Definition: Prefetcher.cpp:54
T join(T...args)
T next(T...args)
T lock(T...args)
void fatal(const std::string &logMessage)
T move(T...args)
T get(T...args)
STL class.
Identifier used to set and retrieve properties.
Definition: PropertyId.h:40
Euclid::Semaphore m_semaphore
Keep the queue under control.
Definition: Prefetcher.h:129
std::string getString() const
Definition: PropertyId.cpp:36
void sendSource(std::unique_ptr< SourceInterface > source) const
Definition: PipelineStage.h:85
static Logging getLogger(const std::string &name="")
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Pointer to the pool of worker threads.
Definition: Prefetcher.h:109