Skip to content

Commit 2c6c607

Browse files
committed
Merge pull request #42 from Microsoft/NotificationWaiter
New notification design allows for multiple waiters
2 parents c247079 + d7dbe6b commit 2c6c607

File tree

4 files changed

+139
-105
lines changed

4 files changed

+139
-105
lines changed

callback.go

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,39 +40,36 @@ type hcsNotification uint32
4040
type notificationChannel chan error
4141

4242
type notifcationWatcherContext struct {
43-
channel notificationChannel
44-
expectedNotification hcsNotification
45-
handle hcsCallback
43+
channels notificationChannels
44+
handle hcsCallback
45+
}
46+
47+
type notificationChannels map[hcsNotification]notificationChannel
48+
49+
func newChannels() notificationChannels {
50+
channels := make(notificationChannels)
51+
52+
channels[hcsNotificationSystemExited] = make(notificationChannel, 1)
53+
channels[hcsNotificationSystemCreateCompleted] = make(notificationChannel, 1)
54+
channels[hcsNotificationSystemStartCompleted] = make(notificationChannel, 1)
55+
channels[hcsNotificationSystemPauseCompleted] = make(notificationChannel, 1)
56+
channels[hcsNotificationSystemResumeCompleted] = make(notificationChannel, 1)
57+
channels[hcsNotificationProcessExited] = make(notificationChannel, 1)
58+
channels[hcsNotificationServiceDisconnect] = make(notificationChannel, 1)
59+
return channels
4660
}
4761

4862
func notificationWatcher(notificationType hcsNotification, callbackNumber uintptr, notificationStatus uintptr, notificationData *uint16) uintptr {
49-
var (
50-
result error
51-
completeWait = false
52-
)
63+
var result error
64+
if int32(notificationStatus) < 0 {
65+
result = syscall.Errno(win32FromHresult(notificationStatus))
66+
}
5367

5468
callbackMapLock.RLock()
55-
context := callbackMap[callbackNumber]
69+
channels := callbackMap[callbackNumber].channels
5670
callbackMapLock.RUnlock()
5771

58-
if notificationType == context.expectedNotification {
59-
if int32(notificationStatus) < 0 {
60-
result = syscall.Errno(win32FromHresult(notificationStatus))
61-
} else {
62-
result = nil
63-
}
64-
completeWait = true
65-
} else if notificationType == hcsNotificationSystemExited {
66-
result = ErrUnexpectedContainerExit
67-
completeWait = true
68-
} else if notificationType == hcsNotificationServiceDisconnect {
69-
result = ErrUnexpectedProcessAbort
70-
completeWait = true
71-
}
72-
73-
if completeWait {
74-
context.channel <- result
75-
}
72+
channels[notificationType] <- result
7673

7774
return 0
7875
}

container.go

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ type ContainerError struct {
2626
}
2727

2828
type container struct {
29-
handle hcsSystem
30-
id string
29+
handle hcsSystem
30+
id string
31+
callbackNumber uintptr
3132
}
3233

3334
type containerProperties struct {
@@ -61,19 +62,25 @@ func CreateContainer(id string, c *ContainerConfig) (Container, error) {
6162
logrus.Debugf(title+" id=%s config=%s", id, configuration)
6263

6364
var (
64-
handle hcsSystem
6565
resultp *uint16
6666
createError error
6767
)
6868
if hcsCallbacksSupported {
6969
var identity syscall.Handle
70-
createError = hcsCreateComputeSystem(id, configuration, identity, &handle, &resultp)
70+
createError = hcsCreateComputeSystem(id, configuration, identity, &container.handle, &resultp)
71+
72+
if createError == nil || createError == ErrVmcomputeOperationPending {
73+
if err := container.registerCallback(); err != nil {
74+
err := &ContainerError{Container: container, Operation: operation, Err: err}
75+
logrus.Error(err)
76+
return nil, err
77+
}
78+
}
7179
} else {
72-
createError = hcsCreateComputeSystemTP5(id, configuration, &handle, &resultp)
80+
createError = hcsCreateComputeSystemTP5(id, configuration, &container.handle, &resultp)
7381
}
74-
container.handle = handle
7582

76-
err = processAsyncHcsResult(container, createError, resultp, hcsNotificationSystemCreateCompleted, &defaultTimeout)
83+
err = processAsyncHcsResult(createError, resultp, container.callbackNumber, hcsNotificationSystemCreateCompleted, &defaultTimeout)
7784
if err != nil {
7885
err := &ContainerError{Container: container, Operation: operation, ExtraInfo: configuration, Err: err}
7986
logrus.Error(err)
@@ -122,7 +129,7 @@ func (container *container) Start() error {
122129

123130
var resultp *uint16
124131
err := hcsStartComputeSystemTP5(container.handle, nil, &resultp)
125-
err = processAsyncHcsResult(container, err, resultp, hcsNotificationSystemStartCompleted, &defaultTimeout)
132+
err = processAsyncHcsResult(err, resultp, container.callbackNumber, hcsNotificationSystemStartCompleted, &defaultTimeout)
126133
if err != nil {
127134
err := &ContainerError{Container: container, Operation: operation, Err: err}
128135
logrus.Error(err)
@@ -186,7 +193,7 @@ func (container *container) Wait() error {
186193
logrus.Debugf(title+" id=%s", container.id)
187194

188195
if hcsCallbacksSupported {
189-
err := registerAndWaitForCallback(container, hcsNotificationSystemExited)
196+
err := waitForNotification(container.callbackNumber, hcsNotificationSystemExited, nil)
190197
if err != nil {
191198
err := &ContainerError{Container: container, Operation: operation, Err: err}
192199
logrus.Error(err)
@@ -217,7 +224,7 @@ func (container *container) WaitTimeout(timeout time.Duration) error {
217224
logrus.Debugf(title+" id=%s", container.id)
218225

219226
if hcsCallbacksSupported {
220-
err := registerAndWaitForCallbackTimeout(container, hcsNotificationSystemExited, timeout)
227+
err := waitForNotification(container.callbackNumber, hcsNotificationSystemExited, &timeout)
221228
if err == ErrTimeout {
222229
return ErrTimeout
223230
} else if err != nil {
@@ -304,7 +311,7 @@ func (container *container) Pause() error {
304311

305312
var resultp *uint16
306313
err := hcsPauseComputeSystemTP5(container.handle, nil, &resultp)
307-
err = processAsyncHcsResult(container, err, resultp, hcsNotificationSystemPauseCompleted, &defaultTimeout)
314+
err = processAsyncHcsResult(err, resultp, container.callbackNumber, hcsNotificationSystemPauseCompleted, &defaultTimeout)
308315
if err != nil {
309316
err := &ContainerError{Container: container, Operation: operation, Err: err}
310317
logrus.Error(err)
@@ -325,7 +332,7 @@ func (container *container) Resume() error {
325332
)
326333

327334
err := hcsResumeComputeSystemTP5(container.handle, nil, &resultp)
328-
err = processAsyncHcsResult(container, err, resultp, hcsNotificationSystemResumeCompleted, &defaultTimeout)
335+
err = processAsyncHcsResult(err, resultp, container.callbackNumber, hcsNotificationSystemResumeCompleted, &defaultTimeout)
329336
if err != nil {
330337
err := &ContainerError{Container: container, Operation: operation, Err: err}
331338
logrus.Error(err)
@@ -379,6 +386,14 @@ func (container *container) CreateProcess(c *ProcessConfig) (Process, error) {
379386
},
380387
}
381388

389+
if hcsCallbacksSupported {
390+
if err := process.registerCallback(); err != nil {
391+
err = &ContainerError{Container: container, Operation: operation, Err: err}
392+
logrus.Error(err)
393+
return nil, err
394+
}
395+
}
396+
382397
logrus.Debugf(title+" succeeded id=%s processid=%s", container.id, process.processID)
383398
runtime.SetFinalizer(process, closeProcess)
384399
return process, nil
@@ -408,6 +423,12 @@ func (container *container) OpenProcess(pid int) (Process, error) {
408423
container: container,
409424
}
410425

426+
if err := process.registerCallback(); err != nil {
427+
err = &ContainerError{Container: container, Operation: operation, Err: err}
428+
logrus.Error(err)
429+
return nil, err
430+
}
431+
411432
logrus.Debugf(title+" succeeded id=%s processid=%s", container.id, process.processID)
412433
runtime.SetFinalizer(process, closeProcess)
413434
return process, nil
@@ -424,6 +445,14 @@ func (container *container) Close() error {
424445
return nil
425446
}
426447

448+
if hcsCallbacksSupported {
449+
if err := container.unregisterCallback(); err != nil {
450+
err = &ContainerError{Container: container, Operation: operation, Err: err}
451+
logrus.Error(err)
452+
return err
453+
}
454+
}
455+
427456
if err := hcsCloseComputeSystem(container.handle); err != nil {
428457
err = &ContainerError{Container: container, Operation: operation, Err: err}
429458
logrus.Error(err)
@@ -441,30 +470,32 @@ func closeContainer(container *container) {
441470
container.Close()
442471
}
443472

444-
func (container *container) registerCallback(expectedNotification hcsNotification) (uintptr, error) {
473+
func (container *container) registerCallback() error {
445474
callbackMapLock.Lock()
446475
defer callbackMapLock.Unlock()
447476

448477
callbackNumber := nextCallback
449478
nextCallback++
450479

451480
context := &notifcationWatcherContext{
452-
expectedNotification: expectedNotification,
453-
channel: make(chan error, 1),
481+
channels: newChannels(),
454482
}
455483
callbackMap[callbackNumber] = context
456484

457485
var callbackHandle hcsCallback
458486
err := hcsRegisterComputeSystemCallback(container.handle, notificationWatcherCallback, callbackNumber, &callbackHandle)
459487
if err != nil {
460-
return 0, err
488+
return err
461489
}
462490
context.handle = callbackHandle
491+
container.callbackNumber = callbackNumber
463492

464-
return callbackNumber, nil
493+
return nil
465494
}
466495

467-
func (container *container) unregisterCallback(callbackNumber uintptr) error {
496+
func (container *container) unregisterCallback() error {
497+
callbackNumber := container.callbackNumber
498+
468499
callbackMapLock.Lock()
469500
defer callbackMapLock.Unlock()
470501

process.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ type ProcessError struct {
2222
}
2323

2424
type process struct {
25-
handle hcsProcess
26-
processID int
27-
container *container
28-
cachedPipes *cachedPipes
29-
killCallbackNumber uintptr
25+
handle hcsProcess
26+
processID int
27+
container *container
28+
cachedPipes *cachedPipes
29+
callbackNumber uintptr
3030
}
3131

3232
type cachedPipes struct {
@@ -101,7 +101,7 @@ func (process *process) Wait() error {
101101
logrus.Debugf(title+" processid=%d", process.processID)
102102

103103
if hcsCallbacksSupported {
104-
err := registerAndWaitForCallback(process, hcsNotificationProcessExited)
104+
err := waitForNotification(process.callbackNumber, hcsNotificationProcessExited, nil)
105105
if err != nil {
106106
err := &ProcessError{Operation: operation, Process: process, Err: err}
107107
logrus.Error(err)
@@ -128,7 +128,7 @@ func (process *process) WaitTimeout(timeout time.Duration) error {
128128
logrus.Debugf(title+" processid=%d", process.processID)
129129

130130
if hcsCallbacksSupported {
131-
err := registerAndWaitForCallbackTimeout(process, hcsNotificationProcessExited, timeout)
131+
err := waitForNotification(process.callbackNumber, hcsNotificationProcessExited, &timeout)
132132
if err == ErrTimeout {
133133
return ErrTimeout
134134
} else if err != nil {
@@ -344,6 +344,14 @@ func (process *process) Close() error {
344344
return nil
345345
}
346346

347+
if hcsCallbacksSupported {
348+
if err := process.unregisterCallback(); err != nil {
349+
err = &ProcessError{Operation: operation, Process: process, Err: err}
350+
logrus.Error(err)
351+
return err
352+
}
353+
}
354+
347355
if err := hcsCloseProcess(process.handle); err != nil {
348356
err = &ProcessError{Operation: operation, Process: process, Err: err}
349357
logrus.Error(err)
@@ -361,30 +369,32 @@ func closeProcess(process *process) {
361369
process.Close()
362370
}
363371

364-
func (process *process) registerCallback(expectedNotification hcsNotification) (uintptr, error) {
372+
func (process *process) registerCallback() error {
365373
callbackMapLock.Lock()
366374
defer callbackMapLock.Unlock()
367375

368376
callbackNumber := nextCallback
369377
nextCallback++
370378

371379
context := &notifcationWatcherContext{
372-
expectedNotification: expectedNotification,
373-
channel: make(chan error, 1),
380+
channels: newChannels(),
374381
}
375382
callbackMap[callbackNumber] = context
376383

377384
var callbackHandle hcsCallback
378385
err := hcsRegisterProcessCallback(process.handle, notificationWatcherCallback, callbackNumber, &callbackHandle)
379386
if err != nil {
380-
return 0, err
387+
return err
381388
}
382389
context.handle = callbackHandle
390+
process.callbackNumber = callbackNumber
383391

384-
return callbackNumber, nil
392+
return nil
385393
}
386394

387-
func (process *process) unregisterCallback(callbackNumber uintptr) error {
395+
func (process *process) unregisterCallback() error {
396+
callbackNumber := process.callbackNumber
397+
388398
callbackMapLock.Lock()
389399
defer callbackMapLock.Unlock()
390400
handle := callbackMap[callbackNumber].handle

0 commit comments

Comments
 (0)