1414#include < detail/stream_impl.hpp>
1515
1616#include < chrono>
17+ #include < cstdio>
1718#include < memory>
1819#include < mutex>
1920#include < set>
@@ -152,48 +153,75 @@ void Scheduler::waitForEvent(EventImplPtr Event) {
152153 GraphProcessor::waitForEvent (std::move (Event));
153154}
154155
156+ static void deallocateStreams (
157+ std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
158+ // Deallocate buffers for stream objects of the finished commands. Iterate in
159+ // reverse order because it is the order of commands execution.
160+ for (auto StreamImplPtr = StreamsToDeallocate.rbegin ();
161+ StreamImplPtr != StreamsToDeallocate.rend (); ++StreamImplPtr)
162+ detail::Scheduler::getInstance ().deallocateStreamBuffers (
163+ StreamImplPtr->get ());
164+ }
165+
155166void Scheduler::cleanupFinishedCommands (EventImplPtr FinishedEvent) {
156- // Avoiding deadlock situation, where one thread is in the process of
157- // enqueueing (with a locked mutex) a currently blocked task that waits for
158- // another thread which is stuck at attempting cleanup.
159- std::unique_lock<std::shared_timed_mutex> Lock (MGraphLock, std::try_to_lock);
160- if (Lock.owns_lock ()) {
161- Command *FinishedCmd = static_cast <Command *>(FinishedEvent->getCommand ());
162- // The command might have been cleaned up (and set to nullptr) by another
163- // thread
164- if (FinishedCmd)
165- MGraphBuilder.cleanupFinishedCommands (FinishedCmd);
167+ // We are going to traverse a graph of finished commands. Gather stream
168+ // objects from these commands if any and deallocate buffers for these stream
169+ // objects, this is needed to guarantee that streamed data is printed and
170+ // resources are released.
171+ std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
172+ {
173+ // Avoiding deadlock situation, where one thread is in the process of
174+ // enqueueing (with a locked mutex) a currently blocked task that waits for
175+ // another thread which is stuck at attempting cleanup.
176+ std::unique_lock<std::shared_timed_mutex> Lock (MGraphLock,
177+ std::try_to_lock);
178+ if (Lock.owns_lock ()) {
179+ auto FinishedCmd = static_cast <Command *>(FinishedEvent->getCommand ());
180+ // The command might have been cleaned up (and set to nullptr) by another
181+ // thread
182+ if (FinishedCmd)
183+ MGraphBuilder.cleanupFinishedCommands (FinishedCmd, StreamsToDeallocate);
184+ }
166185 }
186+ deallocateStreams (StreamsToDeallocate);
167187}
168188
169189void Scheduler::removeMemoryObject (detail::SYCLMemObjI *MemObj) {
170- MemObjRecord *Record = nullptr ;
171- std::unique_lock<std::shared_timed_mutex> Lock (MGraphLock, std::defer_lock);
172-
190+ // We are going to traverse a graph of finished commands. Gather stream
191+ // objects from these commands if any and deallocate buffers for these stream
192+ // objects, this is needed to guarantee that streamed data is printed and
193+ // resources are released.
194+ std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
173195 {
174- lockSharedTimedMutex (Lock);
196+ MemObjRecord *Record = nullptr ;
197+ std::unique_lock<std::shared_timed_mutex> Lock (MGraphLock, std::defer_lock);
175198
176- Record = MGraphBuilder.getMemObjRecord (MemObj);
177- if (!Record)
178- // No operations were performed on the mem object
179- return ;
199+ {
200+ lockSharedTimedMutex (Lock);
180201
181- Lock.unlock ();
182- }
202+ Record = MGraphBuilder.getMemObjRecord (MemObj);
203+ if (!Record)
204+ // No operations were performed on the mem object
205+ return ;
183206
184- {
185- // This only needs a shared mutex as it only involves enqueueing and
186- // awaiting for events
187- std::shared_lock<std::shared_timed_mutex> Lock (MGraphLock);
188- waitForRecordToFinish (Record);
189- }
207+ Lock.unlock ();
208+ }
190209
191- {
192- lockSharedTimedMutex (Lock);
193- MGraphBuilder.decrementLeafCountersForRecord (Record);
194- MGraphBuilder.cleanupCommandsForRecord (Record);
195- MGraphBuilder.removeRecordForMemObj (MemObj);
210+ {
211+ // This only needs a shared mutex as it only involves enqueueing and
212+ // awaiting for events
213+ std::shared_lock<std::shared_timed_mutex> Lock (MGraphLock);
214+ waitForRecordToFinish (Record);
215+ }
216+
217+ {
218+ lockSharedTimedMutex (Lock);
219+ MGraphBuilder.decrementLeafCountersForRecord (Record);
220+ MGraphBuilder.cleanupCommandsForRecord (Record, StreamsToDeallocate);
221+ MGraphBuilder.removeRecordForMemObj (MemObj);
222+ }
196223 }
224+ deallocateStreams (StreamsToDeallocate);
197225}
198226
199227EventImplPtr Scheduler::addHostAccessor (Requirement *Req) {
@@ -243,11 +271,12 @@ void Scheduler::allocateStreamBuffers(stream_impl *Impl,
243271 size_t FlushBufferSize) {
244272 std::lock_guard<std::mutex> lock (StreamBuffersPoolMutex);
245273 StreamBuffersPool.insert (
246- {Impl, StreamBuffers (StreamBufferSize, FlushBufferSize)});
274+ {Impl, new StreamBuffers (StreamBufferSize, FlushBufferSize)});
247275}
248276
249277void Scheduler::deallocateStreamBuffers (stream_impl *Impl) {
250278 std::lock_guard<std::mutex> lock (StreamBuffersPoolMutex);
279+ delete StreamBuffersPool[Impl];
251280 StreamBuffersPool.erase (Impl);
252281}
253282
@@ -258,6 +287,24 @@ Scheduler::Scheduler() {
258287 /* PropList=*/ {}));
259288}
260289
290+ Scheduler::~Scheduler () {
291+ // By specification there are several possible sync points: buffer
292+ // destruction, wait() method of a queue or event. Stream doesn't introduce
293+ // any synchronization point. It is guaranteed that stream is flushed and
294+ // resources are released only if one of the listed sync points was used for
295+ // the kernel. Otherwise resources for stream will not be released, issue a
296+ // warning in this case.
297+ if (pi::trace (pi::TraceLevel::PI_TRACE_BASIC)) {
298+ std::lock_guard<std::mutex> lock (StreamBuffersPoolMutex);
299+ if (!StreamBuffersPool.empty ())
300+ fprintf (
301+ stderr,
302+ " \n WARNING: Some commands may have not finished the execution and "
303+ " not all resources were released. Please be sure that all kernels "
304+ " have synchronization points.\n\n " );
305+ }
306+ }
307+
261308void Scheduler::lockSharedTimedMutex (
262309 std::unique_lock<std::shared_timed_mutex> &Lock) {
263310#ifdef _WIN32
0 commit comments