Skip to content
This repository was archived by the owner on Sep 9, 2020. It is now read-only.

gps: simplify shutdown cleanup #1154

Merged
merged 4 commits into from
Sep 11, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 41 additions & 73 deletions internal/gps/source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,42 +275,22 @@ func (sm *SourceMgr) HandleSignals(sigch chan os.Signal) {
// Run a new goroutine with the input sigch and the fresh qch
go func(sch chan os.Signal, qch <-chan struct{}) {
defer signal.Stop(sch)
for {
select {
case <-sch:
// Set up a timer to uninstall the signal handler after three
// seconds, so that the user can easily force termination with a
// second ctrl-c
go func(c <-chan time.Time) {
<-c
signal.Stop(sch)
}(time.After(3 * time.Second))

if !atomic.CompareAndSwapInt32(&sm.releasing, 0, 1) {
// Something's already called Release() on this sm, so we
// don't have to do anything, as we'd just be redoing
// that work. Instead, deregister and return.
return
}
Copy link
Collaborator

@jmank88 jmank88 Sep 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a small race regarding what count will be logged.

This block was setting the atomic prior to the count() call, which ensured that no more ops launch, fixing the count. Currently, more ops could launch between the count() call and the atomic swap at the start of Release(). What about moving this count/log line inside of Release(), just after the atomic swap?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the 'signal received' portion of the message won't apply from inside the method, so maybe that should still be logged separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block was setting the atomic prior to the count() call, which ensured that no more ops launch, fixing the count. Currently, more ops could launch between the count() call and the atomic swap at the start of Release(). What about moving this count/log line inside of Release(), just after the atomic swap?

(just noting that this sounds like the class of concern that motivated the original design)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tamird thoughts on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could put that print statement inside the Release method - would that suit you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, let's keep the print statement outside, where it is.

in general, we try to treat gps like a pure library within dep. for the most part, gps isn't aware dep exists - it only shares some low-level filesystem manipulation methods, and some test helpers. also to that end, we try really, really hard to avoid direct print statements in gps itself. this one print statement is the sole place, in all of gps, where we actually print directly, and i only let it in there because it would have required too much refactoring to arrange the signal handling otherwise.

so yeah, let's call this PR a victory on its own terms, and we can think about ways of refactoring for a more controlled, informative shutdown experience later.


opc := sm.suprvsr.count()
if opc > 0 {
fmt.Printf("Signal received: waiting for %v ops to complete...\n", opc)
}

// Mutex interaction in a signal handler is, as a general rule,
// unsafe. I'm not clear on whether the guarantees Go provides
// around signal handling, or having passed this through a
// channel in general, obviate those concerns, but it's a lot
// easier to just rely on the mutex contained in the Once right
// now, so do that until it proves problematic or someone
// provides a clear explanation.
sm.relonce.Do(func() { sm.doRelease() })
return
case <-qch:
// quit channel triggered - deregister our sigch and return
return
select {
case <-sch:
// Set up a timer to uninstall the signal handler after three
// seconds, so that the user can easily force termination with a
// second ctrl-c
time.AfterFunc(3*time.Second, func() {
signal.Stop(sch)
})

if opc := sm.suprvsr.count(); opc > 0 {
fmt.Printf("Signal received: waiting for %v ops to complete...\n", opc)
}

sm.Release()
case <-qch:
// quit channel triggered - deregister our sigch and return
}
}(sigch, sm.qch)
// Try to ensure handler is blocked in for-select before releasing the mutex
Expand Down Expand Up @@ -349,46 +329,34 @@ func (e CouldNotCreateLockError) Error() string {
// longer safe to call methods against it; all method calls will immediately
// result in errors.
func (sm *SourceMgr) Release() {
// Set sm.releasing before entering the Once func to guarantee that no
// _more_ method calls will stack up if/while waiting.
atomic.CompareAndSwapInt32(&sm.releasing, 0, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notwithstanding my substantive comment, this is kind of awkward - this essentially just amounts to 1) make any subsequent caught signal immediately deregister the signal handler and 2) don't print anything about waiting for ops to complete. could def be improved.

atomic.StoreInt32(&sm.releasing, 1)

// Whether 'releasing' is set or not, we don't want this function to return
// until after the doRelease process is done, as doing so could cause the
// process to terminate before a signal-driven doRelease() call has a chance
// to finish its cleanup.
sm.relonce.Do(sm.doRelease)
}
sm.relonce.Do(func() {
// Send the signal to the supervisor to cancel all running calls.
sm.cancelAll()
sm.suprvsr.wait()

// doRelease actually releases physical resources (files on disk, etc.).
//
// This must be called only and exactly once. Calls to it should be wrapped in
// the sm.relonce sync.Once instance.
func (sm *SourceMgr) doRelease() {
// Send the signal to the supervisor to cancel all running calls
sm.cancelAll()
sm.suprvsr.wait()

// Close the source coordinator.
sm.srcCoord.close()

// Close the file handle for the lock file and remove it from disk
sm.lf.Unlock()
os.Remove(filepath.Join(sm.cachedir, "sm.lock"))

// Close the qch, if non-nil, so the signal handlers run out. This will
// also deregister the sig channel, if any has been set up.
if sm.qch != nil {
close(sm.qch)
}
// Close the source coordinator.
sm.srcCoord.close()

// Close the file handle for the lock file and remove it from disk
sm.lf.Unlock()
os.Remove(filepath.Join(sm.cachedir, "sm.lock"))

// Close the qch, if non-nil, so the signal handlers run out. This will
// also deregister the sig channel, if any has been set up.
if sm.qch != nil {
close(sm.qch)
}
})
}

// GetManifestAndLock returns manifest and lock information for the provided
// ProjectIdentifier, at the provided Version. The work of producing the
// manifest and lock is delegated to the provided ProjectAnalyzer's
// DeriveManifestAndLock() method.
func (sm *SourceMgr) GetManifestAndLock(id ProjectIdentifier, v Version, an ProjectAnalyzer) (Manifest, Lock, error) {
if atomic.CompareAndSwapInt32(&sm.releasing, 1, 1) {
if atomic.LoadInt32(&sm.releasing) == 1 {
return nil, nil, smIsReleased{}
}

Expand All @@ -403,7 +371,7 @@ func (sm *SourceMgr) GetManifestAndLock(id ProjectIdentifier, v Version, an Proj
// ListPackages parses the tree of the Go packages at and below the ProjectRoot
// of the given ProjectIdentifier, at the given version.
func (sm *SourceMgr) ListPackages(id ProjectIdentifier, v Version) (pkgtree.PackageTree, error) {
if atomic.CompareAndSwapInt32(&sm.releasing, 1, 1) {
if atomic.LoadInt32(&sm.releasing) == 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh good call, don't need the more complex CAS, as reaching 1 is the address' terminal state 👍

return pkgtree.PackageTree{}, smIsReleased{}
}

Expand All @@ -428,7 +396,7 @@ func (sm *SourceMgr) ListPackages(id ProjectIdentifier, v Version) (pkgtree.Pack
// is not accessible (network outage, access issues, or the resource actually
// went away), an error will be returned.
func (sm *SourceMgr) ListVersions(id ProjectIdentifier) ([]PairedVersion, error) {
if atomic.CompareAndSwapInt32(&sm.releasing, 1, 1) {
if atomic.LoadInt32(&sm.releasing) == 1 {
return nil, smIsReleased{}
}

Expand All @@ -444,7 +412,7 @@ func (sm *SourceMgr) ListVersions(id ProjectIdentifier) ([]PairedVersion, error)
// RevisionPresentIn indicates whether the provided Revision is present in the given
// repository.
func (sm *SourceMgr) RevisionPresentIn(id ProjectIdentifier, r Revision) (bool, error) {
if atomic.CompareAndSwapInt32(&sm.releasing, 1, 1) {
if atomic.LoadInt32(&sm.releasing) == 1 {
return false, smIsReleased{}
}

Expand All @@ -460,7 +428,7 @@ func (sm *SourceMgr) RevisionPresentIn(id ProjectIdentifier, r Revision) (bool,
// SourceExists checks if a repository exists, either upstream or in the cache,
// for the provided ProjectIdentifier.
func (sm *SourceMgr) SourceExists(id ProjectIdentifier) (bool, error) {
if atomic.CompareAndSwapInt32(&sm.releasing, 1, 1) {
if atomic.LoadInt32(&sm.releasing) == 1 {
return false, smIsReleased{}
}

Expand All @@ -478,7 +446,7 @@ func (sm *SourceMgr) SourceExists(id ProjectIdentifier) (bool, error) {
//
// The primary use case for this is prefetching.
func (sm *SourceMgr) SyncSourceFor(id ProjectIdentifier) error {
if atomic.CompareAndSwapInt32(&sm.releasing, 1, 1) {
if atomic.LoadInt32(&sm.releasing) == 1 {
return smIsReleased{}
}

Expand All @@ -493,7 +461,7 @@ func (sm *SourceMgr) SyncSourceFor(id ProjectIdentifier) error {
// ExportProject writes out the tree of the provided ProjectIdentifier's
// ProjectRoot, at the provided version, to the provided directory.
func (sm *SourceMgr) ExportProject(id ProjectIdentifier, v Version, to string) error {
if atomic.CompareAndSwapInt32(&sm.releasing, 1, 1) {
if atomic.LoadInt32(&sm.releasing) == 1 {
return smIsReleased{}
}

Expand All @@ -513,7 +481,7 @@ func (sm *SourceMgr) ExportProject(id ProjectIdentifier, v Version, to string) e
// paths. (A special exception is written for gopkg.in to minimize network
// activity, as its behavior is well-structured)
func (sm *SourceMgr) DeduceProjectRoot(ip string) (ProjectRoot, error) {
if atomic.CompareAndSwapInt32(&sm.releasing, 1, 1) {
if atomic.LoadInt32(&sm.releasing) == 1 {
return "", smIsReleased{}
}

Expand Down