aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go
blob: 03be4275ca201a164fe96546a3941aebe00d0c86 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
// Copyright (c) 2015 HPE Software Inc. All rights reserved.
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.

package watch

import (
	"log"
	"os"
	"path/filepath"
	"sync"
	"syscall"

	"github.com/hpcloud/tail/util"

	"gopkg.in/fsnotify.v1"
)

type InotifyTracker struct {
	mux       sync.Mutex
	watcher   *fsnotify.Watcher
	chans     map[string]chan fsnotify.Event
	done      map[string]chan bool
	watchNums map[string]int
	watch     chan *watchInfo
	remove    chan *watchInfo
	error     chan error
}

type watchInfo struct {
	op    fsnotify.Op
	fname string
}

func (this *watchInfo) isCreate() bool {
	return this.op == fsnotify.Create
}

var (
	// globally shared InotifyTracker; ensures only one fsnotify.Watcher is used
	shared *InotifyTracker

	// these are used to ensure the shared InotifyTracker is run exactly once
	once  = sync.Once{}
	goRun = func() {
		shared = &InotifyTracker{
			mux:       sync.Mutex{},
			chans:     make(map[string]chan fsnotify.Event),
			done:      make(map[string]chan bool),
			watchNums: make(map[string]int),
			watch:     make(chan *watchInfo),
			remove:    make(chan *watchInfo),
			error:     make(chan error),
		}
		go shared.run()
	}

	logger = log.New(os.Stderr, "", log.LstdFlags)
)

// Watch signals the run goroutine to begin watching the input filename
func Watch(fname string) error {
	return watch(&watchInfo{
		fname: fname,
	})
}

// Watch create signals the run goroutine to begin watching the input filename
// if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate
func WatchCreate(fname string) error {
	return watch(&watchInfo{
		op:    fsnotify.Create,
		fname: fname,
	})
}

func watch(winfo *watchInfo) error {
	// start running the shared InotifyTracker if not already running
	once.Do(goRun)

	winfo.fname = filepath.Clean(winfo.fname)
	shared.watch <- winfo
	return <-shared.error
}

// RemoveWatch signals the run goroutine to remove the watch for the input filename
func RemoveWatch(fname string) {
	remove(&watchInfo{
		fname: fname,
	})
}

// RemoveWatch create signals the run goroutine to remove the watch for the input filename
func RemoveWatchCreate(fname string) {
	remove(&watchInfo{
		op:    fsnotify.Create,
		fname: fname,
	})
}

func remove(winfo *watchInfo) {
	// start running the shared InotifyTracker if not already running
	once.Do(goRun)

	winfo.fname = filepath.Clean(winfo.fname)
	shared.mux.Lock()
	done := shared.done[winfo.fname]
	if done != nil {
		delete(shared.done, winfo.fname)
		close(done)
	}

	fname := winfo.fname
	if winfo.isCreate() {
		// Watch for new files to be created in the parent directory.
		fname = filepath.Dir(fname)
	}
	shared.watchNums[fname]--
	watchNum := shared.watchNums[fname]
	if watchNum == 0 {
		delete(shared.watchNums, fname)
	}
	shared.mux.Unlock()

	// If we were the last ones to watch this file, unsubscribe from inotify.
	// This needs to happen after releasing the lock because fsnotify waits
	// synchronously for the kernel to acknowledge the removal of the watch
	// for this file, which causes us to deadlock if we still held the lock.
	if watchNum == 0 {
		shared.watcher.Remove(fname)
	}
	shared.remove <- winfo
}

// Events returns a channel to which FileEvents corresponding to the input filename
// will be sent. This channel will be closed when removeWatch is called on this
// filename.
func Events(fname string) <-chan fsnotify.Event {
	shared.mux.Lock()
	defer shared.mux.Unlock()

	return shared.chans[fname]
}

// Cleanup removes the watch for the input filename if necessary.
func Cleanup(fname string) {
	RemoveWatch(fname)
}

// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating
// a new Watcher if the previous Watcher was closed.
func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
	shared.mux.Lock()
	defer shared.mux.Unlock()

	if shared.chans[winfo.fname] == nil {
		shared.chans[winfo.fname] = make(chan fsnotify.Event)
		shared.done[winfo.fname] = make(chan bool)
	}

	fname := winfo.fname
	if winfo.isCreate() {
		// Watch for new files to be created in the parent directory.
		fname = filepath.Dir(fname)
	}

	// already in inotify watch
	if shared.watchNums[fname] > 0 {
		shared.watchNums[fname]++
		if winfo.isCreate() {
			shared.watchNums[winfo.fname]++
		}
		return nil
	}

	err := shared.watcher.Add(fname)
	if err == nil {
		shared.watchNums[fname]++
		if winfo.isCreate() {
			shared.watchNums[winfo.fname]++
		}
	}
	return err
}

// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the
// corresponding events channel.
func (shared *InotifyTracker) removeWatch(winfo *watchInfo) {
	shared.mux.Lock()
	defer shared.mux.Unlock()

	ch := shared.chans[winfo.fname]
	if ch == nil {
		return
	}

	delete(shared.chans, winfo.fname)
	close(ch)

	if !winfo.isCreate() {
		return
	}

	shared.watchNums[winfo.fname]--
	if shared.watchNums[winfo.fname] == 0 {
		delete(shared.watchNums, winfo.fname)
	}
}

// sendEvent sends the input event to the appropriate Tail.
func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
	name := filepath.Clean(event.Name)

	shared.mux.Lock()
	ch := shared.chans[name]
	done := shared.done[name]
	shared.mux.Unlock()

	if ch != nil && done != nil {
		select {
		case ch <- event:
		case <-done:
		}
	}
}

// run starts the goroutine in which the shared struct reads events from its
// Watcher's Event channel and sends the events to the appropriate Tail.
func (shared *InotifyTracker) run() {
	watcher, err := fsnotify.NewWatcher()
	if err != nil {
		util.Fatal("failed to create Watcher")
	}
	shared.watcher = watcher

	for {
		select {
		case winfo := <-shared.watch:
			shared.error <- shared.addWatch(winfo)

		case winfo := <-shared.remove:
			shared.removeWatch(winfo)

		case event, open := <-shared.watcher.Events:
			if !open {
				return
			}
			shared.sendEvent(event)

		case err, open := <-shared.watcher.Errors:
			if !open {
				return
			} else if err != nil {
				sysErr, ok := err.(*os.SyscallError)
				if !ok || sysErr.Err != syscall.EINTR {
					logger.Printf("Error in Watcher Error channel: %s", err)
				}
			}
		}
	}
}