@@ -15,12 +15,12 @@ const { constants } = internalBinding('tcp_wrap');
15
15
16
16
module . exports = RoundRobinHandle ;
17
17
18
- function RoundRobinHandle ( key , address , { port, fd, flags, backlog } ) {
18
+ function RoundRobinHandle ( key , address , { port, fd, flags, backlog, scheduler } ) {
19
19
this . key = key ;
20
- this . all = new SafeMap ( ) ;
21
- this . free = new SafeMap ( ) ;
20
+ this . workers = new SafeMap ( ) ;
22
21
this . handles = init ( ObjectCreate ( null ) ) ;
23
22
this . handle = null ;
23
+ this . scheduler = scheduler ;
24
24
this . server = net . createServer ( assert . fail ) ;
25
25
26
26
if ( fd >= 0 )
@@ -45,8 +45,8 @@ function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
45
45
}
46
46
47
47
RoundRobinHandle . prototype . add = function ( worker , send ) {
48
- assert ( this . all . has ( worker . id ) === false ) ;
49
- this . all . set ( worker . id , worker ) ;
48
+ assert ( this . workers . has ( worker . id ) === false ) ;
49
+ this . workers . set ( worker . id , worker ) ;
50
50
51
51
const done = ( ) => {
52
52
if ( this . handle . getsockname ) {
@@ -58,7 +58,7 @@ RoundRobinHandle.prototype.add = function(worker, send) {
58
58
send ( null , null , null ) ; // UNIX socket.
59
59
}
60
60
61
- this . handoff ( worker ) ; // In case there are connections pending.
61
+ this . handoff ( ) ; // In case there are connections pending.
62
62
} ;
63
63
64
64
if ( this . server === null )
@@ -72,14 +72,12 @@ RoundRobinHandle.prototype.add = function(worker, send) {
72
72
} ;
73
73
74
74
RoundRobinHandle . prototype . remove = function ( worker ) {
75
- const existed = this . all . delete ( worker . id ) ;
75
+ const existed = this . workers . delete ( worker . id ) ;
76
76
77
77
if ( ! existed )
78
78
return false ;
79
79
80
- this . free . delete ( worker . id ) ;
81
-
82
- if ( this . all . size !== 0 )
80
+ if ( this . workers . size !== 0 )
83
81
return false ;
84
82
85
83
while ( ! isEmpty ( this . handles ) ) {
@@ -95,25 +93,39 @@ RoundRobinHandle.prototype.remove = function(worker) {
95
93
96
94
RoundRobinHandle . prototype . distribute = function ( err , handle ) {
97
95
append ( this . handles , handle ) ;
98
- // eslint-disable-next-line node-core/no-array-destructuring
99
- const [ workerEntry ] = this . free ; // this.free is a SafeMap
96
+ this . handoff ( ) ;
97
+ } ;
98
+
99
+ RoundRobinHandle . scheduler = function ( workers ) {
100
+ const [ workerEntry ] = workers ;
100
101
101
102
if ( ArrayIsArray ( workerEntry ) ) {
102
103
const { 0 : workerId , 1 : worker } = workerEntry ;
103
- this . free . delete ( workerId ) ;
104
- this . handoff ( worker ) ;
104
+ workers . delete ( workerId ) ;
105
+ workers . set ( workerId , worker )
106
+ return worker ;
105
107
}
106
108
} ;
107
109
108
- RoundRobinHandle . prototype . handoff = function ( worker ) {
109
- if ( ! this . all . has ( worker . id ) ) {
110
- return ; // Worker is closing (or has closed) the server.
111
- }
112
-
110
+ RoundRobinHandle . prototype . handoff = function ( ) {
113
111
const handle = peek ( this . handles ) ;
114
112
115
113
if ( handle === null ) {
116
- this . free . set ( worker . id , worker ) ; // Add to ready queue again.
114
+ return ;
115
+ }
116
+
117
+ let socket ;
118
+ if ( this . scheduler . exposeSocket === true ) {
119
+ socket = new net . Socket ( {
120
+ handle,
121
+ readable : false ,
122
+ writable : false ,
123
+ pauseOnCreate : true
124
+ } ) ;
125
+ }
126
+
127
+ const worker = this . scheduler . execute ( this . workers , socket ) ;
128
+ if ( typeof worker === 'undefined' ) {
117
129
return ;
118
130
}
119
131
@@ -127,6 +139,6 @@ RoundRobinHandle.prototype.handoff = function(worker) {
127
139
else
128
140
this . distribute ( 0 , handle ) ; // Worker is shutting down. Send to another.
129
141
130
- this . handoff ( worker ) ;
142
+ this . handoff ( ) ;
131
143
} ) ;
132
144
} ;
0 commit comments