diff options
Diffstat (limited to 'vendor/github.com/hpcloud/tail/watch/inotify_tracker.go')
-rw-r--r-- | vendor/github.com/hpcloud/tail/watch/inotify_tracker.go | 260 |
1 files changed, 260 insertions, 0 deletions
diff --git a/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go b/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go new file mode 100644 index 000000000..03be4275c --- /dev/null +++ b/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go @@ -0,0 +1,260 @@ +// Copyright (c) 2015 HPE Software Inc. All rights reserved. +// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. + +package watch + +import ( + "log" + "os" + "path/filepath" + "sync" + "syscall" + + "github.com/hpcloud/tail/util" + + "gopkg.in/fsnotify.v1" +) + +type InotifyTracker struct { + mux sync.Mutex + watcher *fsnotify.Watcher + chans map[string]chan fsnotify.Event + done map[string]chan bool + watchNums map[string]int + watch chan *watchInfo + remove chan *watchInfo + error chan error +} + +type watchInfo struct { + op fsnotify.Op + fname string +} + +func (this *watchInfo) isCreate() bool { + return this.op == fsnotify.Create +} + +var ( + // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used + shared *InotifyTracker + + // these are used to ensure the shared InotifyTracker is run exactly once + once = sync.Once{} + goRun = func() { + shared = &InotifyTracker{ + mux: sync.Mutex{}, + chans: make(map[string]chan fsnotify.Event), + done: make(map[string]chan bool), + watchNums: make(map[string]int), + watch: make(chan *watchInfo), + remove: make(chan *watchInfo), + error: make(chan error), + } + go shared.run() + } + + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// Watch signals the run goroutine to begin watching the input filename +func Watch(fname string) error { + return watch(&watchInfo{ + fname: fname, + }) +} + +// Watch create signals the run goroutine to begin watching the input filename +// if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate +func WatchCreate(fname string) error { + return watch(&watchInfo{ + op: fsnotify.Create, + fname: fname, + }) +} + +func watch(winfo *watchInfo) error { + // start running the shared InotifyTracker if not already running + once.Do(goRun) + + winfo.fname = filepath.Clean(winfo.fname) + shared.watch <- winfo + return <-shared.error +} + +// RemoveWatch signals the run goroutine to remove the watch for the input filename +func RemoveWatch(fname string) { + remove(&watchInfo{ + fname: fname, + }) +} + +// RemoveWatch create signals the run goroutine to remove the watch for the input filename +func RemoveWatchCreate(fname string) { + remove(&watchInfo{ + op: fsnotify.Create, + fname: fname, + }) +} + +func remove(winfo *watchInfo) { + // start running the shared InotifyTracker if not already running + once.Do(goRun) + + winfo.fname = filepath.Clean(winfo.fname) + shared.mux.Lock() + done := shared.done[winfo.fname] + if done != nil { + delete(shared.done, winfo.fname) + close(done) + } + + fname := winfo.fname + if winfo.isCreate() { + // Watch for new files to be created in the parent directory. + fname = filepath.Dir(fname) + } + shared.watchNums[fname]-- + watchNum := shared.watchNums[fname] + if watchNum == 0 { + delete(shared.watchNums, fname) + } + shared.mux.Unlock() + + // If we were the last ones to watch this file, unsubscribe from inotify. + // This needs to happen after releasing the lock because fsnotify waits + // synchronously for the kernel to acknowledge the removal of the watch + // for this file, which causes us to deadlock if we still held the lock. + if watchNum == 0 { + shared.watcher.Remove(fname) + } + shared.remove <- winfo +} + +// Events returns a channel to which FileEvents corresponding to the input filename +// will be sent. This channel will be closed when removeWatch is called on this +// filename. +func Events(fname string) <-chan fsnotify.Event { + shared.mux.Lock() + defer shared.mux.Unlock() + + return shared.chans[fname] +} + +// Cleanup removes the watch for the input filename if necessary. +func Cleanup(fname string) { + RemoveWatch(fname) +} + +// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating +// a new Watcher if the previous Watcher was closed. +func (shared *InotifyTracker) addWatch(winfo *watchInfo) error { + shared.mux.Lock() + defer shared.mux.Unlock() + + if shared.chans[winfo.fname] == nil { + shared.chans[winfo.fname] = make(chan fsnotify.Event) + shared.done[winfo.fname] = make(chan bool) + } + + fname := winfo.fname + if winfo.isCreate() { + // Watch for new files to be created in the parent directory. + fname = filepath.Dir(fname) + } + + // already in inotify watch + if shared.watchNums[fname] > 0 { + shared.watchNums[fname]++ + if winfo.isCreate() { + shared.watchNums[winfo.fname]++ + } + return nil + } + + err := shared.watcher.Add(fname) + if err == nil { + shared.watchNums[fname]++ + if winfo.isCreate() { + shared.watchNums[winfo.fname]++ + } + } + return err +} + +// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the +// corresponding events channel. +func (shared *InotifyTracker) removeWatch(winfo *watchInfo) { + shared.mux.Lock() + defer shared.mux.Unlock() + + ch := shared.chans[winfo.fname] + if ch == nil { + return + } + + delete(shared.chans, winfo.fname) + close(ch) + + if !winfo.isCreate() { + return + } + + shared.watchNums[winfo.fname]-- + if shared.watchNums[winfo.fname] == 0 { + delete(shared.watchNums, winfo.fname) + } +} + +// sendEvent sends the input event to the appropriate Tail. +func (shared *InotifyTracker) sendEvent(event fsnotify.Event) { + name := filepath.Clean(event.Name) + + shared.mux.Lock() + ch := shared.chans[name] + done := shared.done[name] + shared.mux.Unlock() + + if ch != nil && done != nil { + select { + case ch <- event: + case <-done: + } + } +} + +// run starts the goroutine in which the shared struct reads events from its +// Watcher's Event channel and sends the events to the appropriate Tail. +func (shared *InotifyTracker) run() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + util.Fatal("failed to create Watcher") + } + shared.watcher = watcher + + for { + select { + case winfo := <-shared.watch: + shared.error <- shared.addWatch(winfo) + + case winfo := <-shared.remove: + shared.removeWatch(winfo) + + case event, open := <-shared.watcher.Events: + if !open { + return + } + shared.sendEvent(event) + + case err, open := <-shared.watcher.Errors: + if !open { + return + } else if err != nil { + sysErr, ok := err.(*os.SyscallError) + if !ok || sysErr.Err != syscall.EINTR { + logger.Printf("Error in Watcher Error channel: %s", err) + } + } + } + } +} |