| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com).
6 # All rights reserved.
7
8 # This file may be distributed and/or modified under the terms of
9 # the GNU General Public License version 2 as published by
10 # the Free Software Foundation.
11 # This file is distributed without any warranty; without even the implied
12 # warranty of merchantability or fitness for a particular purpose.
13 # See "LICENSE.GPL" in the source distribution for more information.
14
15 # Licensees having purchased or holding a valid Flumotion Advanced
16 # Streaming Server license may use this file in accordance with the
17 # Flumotion Advanced Streaming Server Commercial License Agreement.
18 # See "LICENSE.Flumotion" in the source distribution for more information.
19
20 # Headers in this file shall remain intact.
21
22 import errno
23 import os
24 import stat
25 import tempfile
26 import threading
27 import time
28
29 from twisted.internet import defer, reactor, abstract
30
31 from flumotion.common import log, format, common, python
32 from flumotion.component.misc.httpserver import cachestats
33 from flumotion.component.misc.httpserver import fileprovider
34 from flumotion.component.misc.httpserver import localpath
35 from flumotion.component.misc.httpserver.fileprovider import FileClosedError
36 from flumotion.component.misc.httpserver.fileprovider import FileError
37 from flumotion.component.misc.httpserver.fileprovider import NotFoundError
38
39
40 SEEK_SET = 0 # os.SEEK_SET is not defined in python 2.4
41 DEFAULT_CACHE_SIZE = 1000
42 DEFAULT_CLEANUP_HIGH_WATERMARK = 1.0
43 DEFAULT_CLEANUP_LOW_WATERMARK = 0.6
44 FILE_COPY_BUFFER_SIZE = abstract.FileDescriptor.bufferSize
45 TEMP_FILE_POSTFIX = ".tmp"
46 MAX_LOGNAME_SIZE = 30 # maximum number of characters to use for logging a path
47 ID_CACHE_MAX_SIZE = 1024
48
49
50 LOG_CATEGORY = "fileprovider-localcached"
51
52 _errorLookup = {errno.ENOENT: NotFoundError,
53 errno.EISDIR: fileprovider.CannotOpenError,
54 errno.EACCES: fileprovider.AccessError}
55
56
58 """
59
60 WARNING: Currently does not work properly in combination with rate-control.
61
62 I'm caching the files taken from a mounted
63 network file system to a shared local directory.
64 Multiple instances can share the same cache directory,
65 but it's recommended to use slightly different values
66 for the property cleanup-high-watermark.
67 I'm using the directory access time to know when
68 the cache usage changed and keep an estimation
69 of the cache usage for statistics.
70
71 I'm creating a unique thread to do the file copying block by block,
72 for all files to be copied to the cache.
73 Using a thread instead of a reactor.callLater 'loop' allow for
74 higher copy throughput and do not slow down the mail loop when
75 lots of files are copied at the same time.
76 Simulations with real request logs show that using a thread
77 gives better results than the equivalent asynchronous implementation.
78 """
79
80 logCategory = LOG_CATEGORY
81
83 props = args['properties']
84 self._sourceDir = props.get('path')
85 self._cacheDir = props.get('cache-dir', "/tmp")
86 cacheSizeInMB = int(props.get('cache-size', DEFAULT_CACHE_SIZE))
87 self._cacheSize = cacheSizeInMB * 10 ** 6 # in bytes
88 self._cleanupEnabled = props.get('cleanup-enabled', True)
89 highWatermark = props.get('cleanup-high-watermark',
90 DEFAULT_CLEANUP_HIGH_WATERMARK)
91 highWatermark = max(0.0, min(1.0, float(highWatermark)))
92 lowWatermark = props.get('cleanup-low-watermark',
93 DEFAULT_CLEANUP_LOW_WATERMARK)
94 lowWatermark = max(0.0, min(1.0, float(lowWatermark)))
95 self._identifiers = {} # {path: identifier}
96 self._sessions = {} # {CopySession: None}
97 self._index = {} # {path: CopySession}
98
99 self.info("Cached file provider initialized")
100 self.debug("Source directory: '%s'", self._sourceDir)
101 self.debug("Cache directory: '%s'", self._cacheDir)
102 self.debug("Cache size: %d bytes", self._cacheSize)
103 self.debug("Cache cleanup enabled: %s", self._cleanupEnabled)
104
105 common.ensureDir(self._sourceDir, "source")
106 common.ensureDir(self._cacheDir, "cache")
107
108 self._cacheUsage = None # in bytes
109 self._cacheUsageLastUpdate = None
110 self._lastCacheTime = None
111 self._cacheMaxUsage = self._cacheSize * highWatermark # in bytes
112 self._cacheMinUsage = self._cacheSize * lowWatermark # in bytes
113 self.stats = cachestats.CacheStatistics()
114
115 # Initialize cache usage
116 self.updateCacheUsage()
117
118 # Startup copy thread
119 self._thread = CopyThread(self)
120
122 self._thread.start()
123
125 self._thread.stop()
126
128 #FIXME: This is temporary. Should be done with plug UI.
129 # Used for the UI to know which plug is used
130 updater.update("provider-name", "fileprovider-localcached")
131 self.stats.startUpdates(updater)
132
134 self.stats.stopUpdates()
135
140
141
142 ## Protected Methods ##
143
145 """
146 Returns a log name for a path, shortened to a maximum size
147 specified by the global variable MAX_LOGNAME_SIZE.
148 The log name will be the filename part of the path postfixed
149 by the id in brackets if id is not None.
150 """
151 filename = os.path.basename(path)
152 basename, postfix = os.path.splitext(filename)
153 if id is not None:
154 postfix += "[%s]" % id
155 prefixMaxLen = MAX_LOGNAME_SIZE - len(postfix)
156 if len(basename) > prefixMaxLen:
157 basename = basename[:prefixMaxLen-1] + "*"
158 return basename + postfix
159
161 """
162 Returns an identifier for a path.
163 The identifier is a digest of the path encoded in hex string.
164 The hash function used is SHA1.
165 It caches the identifiers in a dictionary indexed by path and with
166 a maximum number of entry specified by the constant ID_CACHE_MAX_SIZE.
167 """
168 ident = self._identifiers.get(path, None)
169 if ident is None:
170 hash = python.sha1()
171 hash.update(path)
172 ident = hash.digest().encode("hex").strip('\n')
173 # Prevent the cache from growing endlessly
174 if len(self._identifiers) >= ID_CACHE_MAX_SIZE:
175 self._identifiers.clear()
176 self._identifiers[path] = ident
177 return ident
178
182
184 ident = self.getIdentifier(path)
185 return os.path.join(self._cacheDir, ident + TEMP_FILE_POSTFIX)
186
188 self.stats.onEstimateCacheUsage(self._cacheUsage, self._cacheSize)
189
191 """
192 @returns: the cache usage, in bytes
193 """
194 # Only calculate cache usage if the cache directory
195 # modification time changed since the last time we looked at it.
196 cacheTime = os.path.getmtime(self._cacheDir)
197 if ((self._cacheUsage is None) or (self._lastCacheTime < cacheTime)):
198 self._lastCacheTime = cacheTime
199 os.chdir(self._cacheDir)
200
201 # There's a possibility here that we got the filename from
202 # os.listdir, but before we get to os.stat, the file is gone. We'll
203 # get a OSError with a ENOENT errno and we should ignore that file,
204 # since we're just estimating the amount of space taken by the
205 # cache
206 sizes = []
207 for f in os.listdir('.'):
208 try:
209 sizes.append(os.path.getsize(f))
210 except OSError, e:
211 if e.errno == errno.ENOENT:
212 pass
213 else:
214 raise
215
216 self._cacheUsage = sum(sizes)
217 self.updateCacheUsageStatistics()
218 self._cacheUsageLastUpdate = time.time()
219 return self._cacheUsage
220
222 """
223 Try to reserve cache space.
224
225 If there is not enough space and the cache cleanup is enabled,
226 it will delete files from the cache starting with the ones
227 with oldest access time until the cache usage drops below
228 the fraction specified by the property cleanup-low-threshold.
229
230 Returns a 'tag' that should be used to 'free' the cache space
231 using releaseCacheSpace.
232 This tag is needed to better estimate the cache usage,
233 if the cache usage has been updated since cache space
234 has been allocated, freeing up the space should not change
235 the cache usage estimation.
236
237 @param size: size to reserve, in bytes
238 @type size: int
239
240 @returns: an allocation tag or None if the allocation failed.
241 @rtype: tuple
242 """
243 usage = self.updateCacheUsage()
244 if (usage + size) < self._cacheMaxUsage:
245 self._cacheUsage += size
246 self.updateCacheUsageStatistics()
247 return (self._cacheUsageLastUpdate, size)
248
249 self.debug('cache usage will be %sbytes, need more cache',
250 format.formatStorage(usage + size))
251
252 if not self._cleanupEnabled:
253 self.debug('not allowed to clean up cache, so cannot cache')
254 # No space available and cleanup disabled: allocation failed.
255 return None
256
257 # Update cleanup statistics
258 self.stats.onCleanup()
259 # List the cached files with file state
260 os.chdir(self._cacheDir)
261
262 files = []
263 for f in os.listdir('.'):
264 # There's a possibility of getting an error on os.stat here. See
265 # similar comment in updateCacheUsage()
266 try:
267 files.append((f, os.stat(f)))
268 except OSError, e:
269 if e.errno == errno.ENOENT:
270 pass
271 else:
272 raise
273
274 # Calculate the cached file total size
275 usage = sum([d[1].st_size for d in files])
276 # Delete the cached file starting by the oldest accessed ones
277 files.sort(key=lambda d: d[1].st_atime)
278 for path, info in files:
279 try:
280 os.remove(path)
281 usage -= info.st_size
282 except OSError, e:
283 if e.errno == errno.ENOENT:
284 # Already been deleted by another process,
285 # but subtract the size anyway
286 usage -= info.st_size
287 else:
288 self.warning("Error cleaning cached file: %s", str(e))
289 if usage <= self._cacheMinUsage:
290 # We reach the cleanup limit
291 self.debug('cleaned up, cache use is now %sbytes',
292 format.formatStorage(usage))
293 break
294
295 # Update the cache usage
296 self._cacheUsage = usage
297 self._cacheUsageLastUpdate = time.time()
298 if (usage + size) < self._cacheSize:
299 # There is enough space to allocate, allocation succeed
300 self._cacheUsage += size
301 self.updateCacheUsageStatistics()
302 return (self._cacheUsageLastUpdate, size)
303 # There is no enough space, allocation failed
304 self.updateCacheUsageStatistics()
305 return None
306
308 lastUpdate, size = tag
309 if lastUpdate == self._cacheUsageLastUpdate:
310 self._cacheUsage -= size
311 self.updateCacheUsageStatistics()
312
314 return self._index.get(path, None)
315
317 # First outdate existing session for the path
318 self.outdateCopySession(path)
319 # Then create a new one
320 session = CopySession(self, path, file, info)
321 self._index[path] = session
322 return session
323
328
330 path = session.sourcePath
331 if path in self._index:
332 del self._index[path]
333 self.disableSession(session)
334
336 self.debug("Starting Copy Session '%s' (%d)",
337 session.logName, len(self._sessions))
338 if session in self._sessions:
339 return
340 self._sessions[session] = None
341 self._activateCopyLoop()
342
344 self.debug("Stopping Copy Session '%s' (%d)",
345 session.logName, len(self._sessions))
346 if session in self._sessions:
347 del self._sessions[session]
348 if not self._sessions:
349 self._disableCopyLoop()
350
352 self._thread.wakeup()
353
355 self._thread.sleep()
356
357
359
360 logCategory = LOG_CATEGORY
361
363 localpath.LocalPath.__init__(self, path)
364 self.logName = plug.getLogName(path)
365 self.plug = plug
366
370
372 if not os.path.exists(self.path):
373 # Delete the cached file and outdate the copying session
374 self.plug.outdateCopySession(self.path)
375 self._removeCachedFile(self.path)
376 raise NotFoundError("Path '%s' not found" % self.path)
377 return CachedFile(self.plug, self.path, self.mimeType)
378
379
380 ## Private Methods ##
381
390
391
393
394 logCategory = LOG_CATEGORY
395
397 threading.Thread.__init__(self)
398 self.plug = plug
399 self._running = True
400 self._event = threading.Event()
401
406
408 self._event.set()
409
411 self._event.clear()
412
414 while self._running:
415 sessions = self.plug._sessions.keys()
416 for session in sessions:
417 try:
418 session.doServe()
419 except Exception, e:
420 log.warning("Error during async file serving: %s",
421 log.getExceptionMessage(e))
422 try:
423 session.doCopy()
424 except Exception, e:
425 log.warning("Error during file copy: %s",
426 log.getExceptionMessage(e))
427 self._event.wait()
428
429
432
433
435 """
436 I'm serving a file at the same time I'm copying it
437 from the network file system to the cache.
438 If the client ask for data not yet copied, the source file
439 read operation is delegated the the copy thread as an asynchronous
440 operation because file seeking/reading is not thread safe.
441
442 The copy session have to open two times the temporary file,
443 one for read-only and one for write only,
444 because closing a read/write file change the modification time.
445 We want the modification time to be set to a known value
446 when the copy is finished even keeping read access to the file.
447
448 The session manage a reference counter to know how many TempFileDelegate
449 instances are using the session to delegate read operations.
450 This is done for two reasons:
451 - To avoid circular references by have the session manage
452 a list of delegate instances.
453 - If not cancelled, sessions should not be deleted
454 when no delegates reference them anymore. So weakref cannot be used.
455 """
456
457 logCategory = LOG_CATEGORY
458
460 self.plug = plug
461 self.logName = plug.getLogName(sourcePath, sourceFile.fileno())
462 self.copying = None # Not yet started
463 self.sourcePath = sourcePath
464 self.tempPath = plug.getTempPath(sourcePath)
465 self.cachePath = plug.getCachePath(sourcePath)
466 # The size and modification time is not supposed to change over time
467 self.mtime = sourceInfo[stat.ST_MTIME]
468 self.size = sourceInfo[stat.ST_SIZE]
469 self._sourceFile = sourceFile
470 self._cancelled = False # True when a session has been outdated
471 self._wTempFile = None
472 self._rTempFile = None
473 self._allocTag = None # Tag used to identify cache allocations
474 self._waitCancel = None
475 # List of the pending read from source file
476 self._pending = [] # [(position, size, defer),]
477 self._refCount = 0
478 self._copied = 0 # None when the file is fully copied
479 self._correction = 0 # Used to take into account copies data for stats
480 self._startCopying()
481
485
487 # If the temporary file is open for reading
488 if self._rTempFile:
489 # And the needed data is already downloaded
490 # Safe to read because it's not used by the copy thread
491 if (self._copied is None) or ((position + size) <= self._copied):
492 try:
493 self._rTempFile.seek(position)
494 data = self._rTempFile.read(size)
495 # Adjust the cache/source values to take copy into account
496 size = len(data)
497 # It's safe to use and modify self._correction even if
498 # it's used by the copy thread because the copy thread
499 # only add and the main thread only subtract.
500 # The only thing that could append it's a less accurate
501 # correction...
502 diff = min(self._correction, size)
503 self._correction -= diff
504 stats.onBytesRead(0, size, diff)
505 return data
506 except Exception, e:
507 self.warning("Failed to read from temporary file: %s",
508 log.getExceptionMessage(e))
509 self._cancelSession()
510 # If the source file is not open anymore, we can't continue
511 if self._sourceFile is None:
512 raise FileError("File caching error, cannot proceed")
513 # Otherwise read the data directly from the source
514 try:
515 # It's safe to not use Lock, because simple type operations
516 # are thread safe, and even if the copying state change
517 # from True to False, _onCopyFinished will be called
518 # later in the same thread and will process pending reads.
519 if self.copying:
520 # If we are currently copying the source file,
521 # we defer the file read to the copying thread
522 # because we can't read a file from two threads.
523 d = defer.Deferred()
524
525 def updateStats(data):
526 stats.onBytesRead(len(data), 0, 0)
527 return data
528
529 d.addCallback(updateStats)
530 self._pending.append((position, size, d))
531 return d
532 # Not copying, it's safe to read directly
533 self._sourceFile.seek(position)
534 data = self._sourceFile.read(size)
535 stats.onBytesRead(len(data), 0, 0)
536 return data
537 except IOError, e:
538 cls = _errorLookup.get(e.errno, FileError)
539 raise cls("Failed to read source file: %s" % str(e))
540
543
545 self._refCount -= 1
546 # If there is only one client and the session has been cancelled,
547 # stop copying and and serve the source file directly
548 if (self._refCount == 1) and self._cancelled:
549 # Cancel the copy and close the writing temporary file.
550 self._cancelCopy(False, True)
551 # We close if not still copying source file
552 if (self._refCount == 0) and (self._wTempFile is None):
553 self.close()
554
556 if self.plug is not None:
557 self.log("Closing copy session")
558 # Cancel the copy, close the source file and the writing temp file.
559 self._cancelCopy(True, True)
560 self._closeReadTempFile()
561 self.plug.removeCopySession(self)
562 self.plug = None
563
565 if not (self.copying and self._pending):
566 # Nothing to do anymore.
567 return False
568 # We have pending source file read operations
569 position, size, d = self._pending.pop(0)
570 self._sourceFile.seek(position)
571 data = self._sourceFile.read(size)
572 # Call the deferred in the main thread
573 reactor.callFromThread(d.callback, data)
574 return len(self._pending) > 0
575
577 # Called in the copy thread context.
578 if not self.copying:
579 # Nothing to do anymore.
580 return False
581 # Copy a buffer from the source file to the temporary writing file
582 cont = True
583 try:
584 # It's safe to use self._copied, because it's only set
585 # by the copy thread during copy.
586 self._sourceFile.seek(self._copied)
587 self._wTempFile.seek(self._copied)
588 data = self._sourceFile.read(FILE_COPY_BUFFER_SIZE)
589 self._wTempFile.write(data)
590 self._wTempFile.flush()
591 except IOError, e:
592 self.warning("Failed to copy source file: %s",
593 log.getExceptionMessage(e))
594 # Abort copy and cancel the session
595 self.copying = False
596 reactor.callFromThread(self.plug.disableSession, self)
597 reactor.callFromThread(self._cancelSession)
598 # Do not continue
599 cont = False
600 else:
601 size = len(data)
602 self._copied += size
603 self._correction += size
604 if size < FILE_COPY_BUFFER_SIZE:
605 # Stop copying
606 self.copying = False
607 reactor.callFromThread(self.plug.disableSession, self)
608 reactor.callFromThread(self._onCopyFinished)
609 cont = False
610 # Check for cancellation
611 if self._waitCancel:
612 # Copy has been cancelled
613 self.copying = False
614 reactor.callFromThread(self.plug.disableSession, self)
615 reactor.callFromThread(self._onCopyCancelled, *self._waitCancel)
616 return False
617 return cont
618
619
620 ## Private Methods ##
621
623 # Retrieve a cache allocation tag, used to track the cache free space
624 tag = self.plug.allocateCacheSpace(self.size)
625 if tag is None:
626 return False
627 self._allocTag = tag
628 return True
629
631 if not (self._cancelled or self._allocTag is None):
632 self.plug.releaseCacheSpace(self._allocTag)
633 self._allocTag = None
634
636 if not self._cancelled:
637 self.log("Canceling copy session")
638 # Not a valid copy session anymore
639 self._cancelled = True
640 # If there is no more than 1 client using the session,
641 # stop copying and and serve the source file directly
642 if self._refCount <= 1:
643 # Cancel and close the temp write file.
644 self._cancelCopy(False, True)
645
647 self.log("Start copy session")
648 # First ensure there is not already a temporary file
649 self._removeTempFile()
650 # Reserve cache space, may trigger a cache cleanup
651 if not self._allocCacheSpace():
652 # No free space, proxying source file directly
653 self._cancelSession()
654 return
655 self.plug.stats.onCopyStarted()
656 # Then open a transient temporary files
657 try:
658 fd, transientPath = tempfile.mkstemp(".tmp", LOG_CATEGORY)
659 self.log("Created transient file '%s'", transientPath)
660 self._wTempFile = os.fdopen(fd, "wb")
661 self.log("Opened temporary file for writing [fd %d]",
662 self._wTempFile.fileno())
663 self._rTempFile = file(transientPath, "rb")
664 self.log("Opened temporary file for reading [fd %d]",
665 self._rTempFile.fileno())
666 except IOError, e:
667 self.warning("Failed to open temporary file: %s",
668 log.getExceptionMessage(e))
669 self._cancelSession()
670 return
671 # Truncate it to the source size
672 try:
673 self.log("Truncating temporary file to size %d", self.size)
674 self._wTempFile.truncate(self.size)
675 except IOError, e:
676 self.warning("Failed to truncate temporary file: %s",
677 log.getExceptionMessage(e))
678 self._cancelSession()
679 return
680 # And move it to the real temporary file path
681 try:
682 self.log("Renaming transient file to '%s'", self.tempPath)
683 os.rename(transientPath, self.tempPath)
684 except IOError, e:
685 self.warning("Failed to rename transient temporary file: %s",
686 log.getExceptionMessage(e))
687 # And start copying
688 self.debug("Start caching '%s' [fd %d]",
689 self.sourcePath, self._sourceFile.fileno())
690 # Activate the copy
691 self.copying = True
692 self.plug.activateSession(self)
693
695 if self.copying:
696 self.log("Canceling file copy")
697 if self._waitCancel:
698 # Already waiting for cancellation.
699 return
700 self.debug("Cancel caching '%s' [fd %d]",
701 self.sourcePath, self._sourceFile.fileno())
702 # Disable the copy, we do not modify copying directly
703 # to let the copying thread terminate current operations.
704 # The file close operation are deferred.
705 self._waitCancel = (closeSource, closeTempWrite)
706 return
707 # No pending copy, we can close the files
708 if closeSource:
709 self._closeSourceFile()
710 if closeTempWrite:
711 self._closeWriteTempFile()
712
714 self.log("Copy session cancelled")
715 # Called when the copy thread really stopped to read/write
716 self._waitCancel = None
717 self.plug.stats.onCopyCancelled(self.size, self._copied)
718 # Resolve all pending source read operations
719 for position, size, d in self._pending:
720 if self._sourceFile is None:
721 d.errback(CopySessionCancelled())
722 else:
723 try:
724 self._sourceFile.seek(position)
725 data = self._sourceFile.read(size)
726 d.callback(data)
727 except Exception, e:
728 self.warning("Failed to read from source file: %s",
729 log.getExceptionMessage(e))
730 d.errback(e)
731 self._pending = []
732 # then we can safely close files
733 if closeSource:
734 self._closeSourceFile()
735 if closeTempWrite:
736 self._closeWriteTempFile()
737
739 # Called when the copy thread really stopped to read/write
740 self.debug("Finished caching '%s' [fd %d]",
741 self.sourcePath, self._sourceFile.fileno())
742 self.plug.stats.onCopyFinished(self.size)
743 # Set the copy as finished to prevent the temporary file
744 # to be deleted when closed
745 self._copied = None
746 # Closing source and write files
747 self._closeSourceFile()
748 self._closeWriteTempFile()
749 # Setting the modification time on the temporary file
750 try:
751 mtime = self.mtime
752 atime = int(time.time())
753 self.log("Setting temporary file modification time to %d", mtime)
754 # FIXME: Should use futimes, but it's not wrapped by python
755 os.utime(self.tempPath, (atime, mtime))
756 except OSError, e:
757 if e.errno == errno.ENOENT:
758 # The file may have been deleted by another process
759 self._releaseCacheSpace()
760 else:
761 self.warning("Failed to update modification time of temporary "
762 "file: %s", log.getExceptionMessage(e))
763 self._cancelSession()
764 try:
765 self.log("Renaming temporary file to '%s'", self.cachePath)
766 os.rename(self.tempPath, self.cachePath)
767 except OSError, e:
768 if e.errno == errno.ENOENT:
769 self._releaseCacheSpace()
770 else:
771 self.warning("Failed to rename temporary file: %s",
772 log.getExceptionMessage(e))
773 self._cancelSession()
774 # Complete all pending source read operations with the temporary file.
775 for position, size, d in self._pending:
776 try:
777 self._rTempFile.seek(position)
778 data = self._rTempFile.read(size)
779 d.callback(data)
780 except Exception, e:
781 self.warning("Failed to read from temporary file: %s",
782 log.getExceptionMessage(e))
783 d.errback(e)
784 self._pending = []
785 if self._refCount == 0:
786 # We were waiting for the file to be copied to close it.
787 self.close()
788
790 try:
791 os.remove(self.tempPath)
792 self.log("Deleted temporary file '%s'", self.tempPath)
793 # Inform the plug that cache space has been released
794 self._releaseCacheSpace()
795 except OSError, e:
796 if e.errno == errno.ENOENT:
797 if self._wTempFile is not None:
798 # Already deleted but inform the plug anyway
799 self._releaseCacheSpace()
800 else:
801 self.warning("Error deleting temporary file: %s",
802 log.getExceptionMessage(e))
803
805 if self._sourceFile is not None:
806 self.log("Closing source file [fd %d]", self._sourceFile.fileno())
807 try:
808 try:
809 self._sourceFile.close()
810 finally:
811 self._sourceFile = None
812 except IOError, e:
813 self.warning("Failed to close source file: %s",
814 log.getExceptionMessage(e))
815
817 if self._rTempFile is not None:
818 self.log("Closing temporary file for reading [fd %d]",
819 self._rTempFile.fileno())
820 try:
821 try:
822 self._rTempFile.close()
823 finally:
824 self._rTempFile = None
825 except IOError, e:
826 self.warning("Failed to close temporary file for reading: %s",
827 log.getExceptionMessage(e))
828
830 if self._wTempFile is not None:
831 # If the copy is not finished, remove the temporary file
832 if not self._cancelled and self._copied is not None:
833 self._removeTempFile()
834 self.log("Closing temporary file for writing [fd %d]",
835 self._wTempFile.fileno())
836 try:
837 try:
838 self._wTempFile.close()
839 finally:
840 self._wTempFile = None
841 except Exception, e:
842 self.warning("Failed to close temporary file for writing: %s",
843 log.getExceptionMessage(e))
844
845
847
848 logCategory = LOG_CATEGORY
849
851 self.logName = plug.getLogName(session.sourcePath)
852 self.mtime = session.mtime
853 self.size = session.size
854 self._session = session
855 self._reading = False
856 self._position = 0
857 session.incRef()
858
861
864
866 assert not self._reading, "Simultaneous read not supported"
867 d = self._session.read(self._position, size, stats)
868 if isinstance(d, defer.Deferred):
869 self._reading = True
870 return d.addCallback(self._cbGotData)
871 self._position += len(d)
872 return d
873
878
879
880 ## Private Methods ##
881
886
887
889
890 logCategory = LOG_CATEGORY
891
892 # Default values
893 _file = None
894
896 self.logName = plug.getLogName(path, file.fileno())
897 self._file = file
898 # The size and modification time is not supposed to change over time
899 self.mtime = info[stat.ST_MTIME]
900 self.size = info[stat.ST_SIZE]
901
903 try:
904 return self._file.tell()
905 except IOError, e:
906 cls = _errorLookup.get(e.errno, FileError)
907 raise cls("Failed to tell position in file: %s" % str(e))
908
910 try:
911 self._file.seek(offset, SEEK_SET)
912 except IOError, e:
913 cls = _errorLookup.get(e.errno, FileError)
914 raise cls("Failed to seek in cached file: %s" % str(e))
915
917 try:
918 return self._file.read(size)
919 except IOError, e:
920 cls = _errorLookup.get(e.errno, FileError)
921 raise cls("Failed to read data from file: %s" % str(e))
922
933
934
936
938 data = DirectFileDelegate.read(self, size)
939 stats.onBytesRead(0, len(data), 0)
940 return data
941
946
947
949
950 logCategory = LOG_CATEGORY
951
952 # Overriding parent class properties to become attribute
953 mimeType = None
954
955 # Default values
956 _delegate = None
957
959 self.logName = plug.getLogName(path)
960 self.plug = plug
961 self._path = path
962 self.mimeType = mimeType
963 self.stats = cachestats.RequestStatistics(plug.stats)
964 self._delegate = self._selectDelegate()
965
968
970 if self._delegate is None:
971 raise FileClosedError("File closed")
972 return self._delegate.mtime
973
975 if self._delegate is None:
976 raise FileClosedError("File closed")
977 return self._delegate.size
978
980 if self._delegate is None:
981 raise FileClosedError("File closed")
982 return self._delegate.tell()
983
985 if self._delegate is None:
986 raise FileClosedError("File closed")
987 return self._delegate.seek(offset)
988
990 if self._delegate is None:
991 raise FileClosedError("File closed")
992 try:
993 d = self._delegate.read(size, self.stats)
994 if isinstance(d, defer.Deferred):
995 return d
996 return defer.succeed(d)
997 except IOError, e:
998 cls = _errorLookup.get(e.errno, FileError)
999 return defer.fail(cls("Failed to read cached data: %s", str(e)))
1000 except:
1001 return defer.fail()
1002
1004 if self._delegate:
1005 self.stats.onClosed()
1006 self._delegate.close()
1007 self._delegate = None
1008
1010 self.close()
1011
1013 return self.stats.getLogFields()
1014
1015
1016 ## Private Methods ##
1017
1019 """
1020 @rtype: (file, statinfo)
1021 """
1022 try:
1023 file = open(path, 'rb')
1024 fd = file.fileno()
1025 except IOError, e:
1026 cls = _errorLookup.get(e.errno, FileError)
1027 raise cls("Failed to open file '%s': %s" % (path, str(e)))
1028 try:
1029 info = os.fstat(fd)
1030 except OSError, e:
1031 cls = _errorLookup.get(e.errno, FileError)
1032 raise cls("Failed to stat file '%s': %s" % (path, str(e)))
1033 return file, info
1034
1036 self.log("Closing source file [fd %d]", sourceFile.fileno())
1037 try:
1038 sourceFile.close()
1039 except Exception, e:
1040 self.warning("Failed to close source file: %s",
1041 log.getExceptionMessage(e))
1042
1044 sourcePath = self._path
1045 cachedPath = self.plug.getCachePath(sourcePath)
1046 # Opening source file
1047 try:
1048 sourceFile, sourceInfo = self._open(sourcePath)
1049 self.log("Opened source file [fd %d]", sourceFile.fileno())
1050 except NotFoundError:
1051 self.debug("Source file not found")
1052 self.plug.outdateCopySession(sourcePath)
1053 self._removeCachedFile(cachedPath)
1054 raise
1055 # Update the log name
1056 self.logName = self.plug.getLogName(self._path, sourceFile.fileno())
1057 # Opening cached file
1058 try:
1059 cachedFile, cachedInfo = self._open(cachedPath)
1060 self.log("Opened cached file [fd %d]", cachedFile.fileno())
1061 except NotFoundError:
1062 self.debug("Did not find cached file '%s'", cachedPath)
1063 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
1064 except FileError, e:
1065 self.debug("Failed to open cached file: %s", str(e))
1066 self._removeCachedFile(cachedPath)
1067 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
1068 # Found a cached file, now check the modification time
1069 self.debug("Found cached file '%s'", cachedPath)
1070 sourceTime = sourceInfo[stat.ST_MTIME]
1071 cacheTime = cachedInfo[stat.ST_MTIME]
1072 if sourceTime != cacheTime:
1073 # Source file changed, remove file and start caching again
1074 self.debug("Cached file out-of-date (%d != %d)",
1075 sourceTime, cacheTime)
1076 self.stats.onCacheOutdated()
1077 self.plug.outdateCopySession(sourcePath)
1078 self._removeCachedFile(cachedPath)
1079 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
1080 self._closeSourceFile(sourceFile)
1081 # We have a valid cached file, just delegate to it.
1082 self.debug("Serving cached file '%s'", cachedPath)
1083 delegate = CachedFileDelegate(self.plug, cachedPath,
1084 cachedFile, cachedInfo)
1085 self.stats.onStarted(delegate.size, cachestats.CACHE_HIT)
1086 return delegate
1087
1089 try:
1090 os.remove(cachePath)
1091 self.debug("Deleted cached file '%s'", cachePath)
1092 except OSError, e:
1093 if e.errno != errno.ENOENT:
1094 self.warning("Error deleting cached file: %s", str(e))
1095
1097 session = self.plug.getCopySession(sourcePath)
1098 if session is None:
1099 self.debug("No copy sessions found")
1100 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
1101 self.debug("Copy session found")
1102 if sourceInfo[stat.ST_MTIME] != session.mtime:
1103 self.debug("Copy session out-of-date (%d != %d)",
1104 sourceInfo[stat.ST_MTIME], session.mtime)
1105 self.stats.onCacheOutdated()
1106 session.outdate()
1107 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
1108 self._closeSourceFile(sourceFile)
1109 # We have a valid session, just delegate to it.
1110 self.debug("Serving temporary file '%s'", session.tempPath)
1111 delegate = TempFileDelegate(self.plug, session)
1112 self.stats.onStarted(delegate.size, cachestats.TEMP_HIT)
1113 return delegate
1114
1116 session = self.plug.createCopySession(sourcePath, sourceFile,
1117 sourceInfo)
1118 self.debug("Serving temporary file '%s'", session.tempPath)
1119 delegate = TempFileDelegate(self.plug, session)
1120 self.stats.onStarted(delegate.size, cachestats.CACHE_MISS)
1121 return delegate
1122
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Sun Nov 15 09:17:52 2009 | http://epydoc.sourceforge.net |