@@ -15,12 +15,12 @@ const { constants } = internalBinding('tcp_wrap');
15
15
16
16
module . exports = RoundRobinHandle ;
17
17
18
- function RoundRobinHandle ( key , address , { port, fd, flags, backlog } ) {
18
+ function RoundRobinHandle ( key , address , { port, fd, flags, backlog, scheduler } ) {
19
19
this . key = key ;
20
- this . all = new SafeMap ( ) ;
21
- this . free = new SafeMap ( ) ;
20
+ this . workers = new SafeMap ( ) ;
22
21
this . handles = init ( ObjectCreate ( null ) ) ;
23
22
this . handle = null ;
23
+ this . scheduler = scheduler ;
24
24
this . server = net . createServer ( assert . fail ) ;
25
25
26
26
if ( fd >= 0 )
@@ -45,8 +45,8 @@ function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
45
45
}
46
46
47
47
RoundRobinHandle . prototype . add = function ( worker , send ) {
48
- assert ( this . all . has ( worker . id ) === false ) ;
49
- this . all . set ( worker . id , worker ) ;
48
+ assert ( this . workers . has ( worker . id ) === false ) ;
49
+ this . workers . set ( worker . id , worker ) ;
50
50
51
51
const done = ( ) => {
52
52
if ( this . handle . getsockname ) {
@@ -58,7 +58,7 @@ RoundRobinHandle.prototype.add = function(worker, send) {
58
58
send ( null , null , null ) ; // UNIX socket.
59
59
}
60
60
61
- this . handoff ( worker ) ; // In case there are connections pending.
61
+ this . handoff ( ) ; // In case there are connections pending.
62
62
} ;
63
63
64
64
if ( this . server === null )
@@ -72,14 +72,12 @@ RoundRobinHandle.prototype.add = function(worker, send) {
72
72
} ;
73
73
74
74
RoundRobinHandle . prototype . remove = function ( worker ) {
75
- const existed = this . all . delete ( worker . id ) ;
75
+ const existed = this . workers . delete ( worker . id ) ;
76
76
77
77
if ( ! existed )
78
78
return false ;
79
79
80
- this . free . delete ( worker . id ) ;
81
-
82
- if ( this . all . size !== 0 )
80
+ if ( this . workers . size !== 0 )
83
81
return false ;
84
82
85
83
while ( ! isEmpty ( this . handles ) ) {
@@ -95,25 +93,40 @@ RoundRobinHandle.prototype.remove = function(worker) {
95
93
96
94
RoundRobinHandle . prototype . distribute = function ( err , handle ) {
97
95
append ( this . handles , handle ) ;
96
+ this . handoff ( ) ;
97
+ } ;
98
+
99
+ RoundRobinHandle . scheduler = function ( workers ) {
98
100
// eslint-disable-next-line node-core/no-array-destructuring
99
- const [ workerEntry ] = this . free ; // this.free is a SafeMap
101
+ const [ workerEntry ] = workers ;
100
102
101
103
if ( ArrayIsArray ( workerEntry ) ) {
102
104
const { 0 : workerId , 1 : worker } = workerEntry ;
103
- this . free . delete ( workerId ) ;
104
- this . handoff ( worker ) ;
105
+ workers . delete ( workerId ) ;
106
+ workers . set ( workerId , worker ) ;
107
+ return worker ;
105
108
}
106
109
} ;
107
110
108
- RoundRobinHandle . prototype . handoff = function ( worker ) {
109
- if ( ! this . all . has ( worker . id ) ) {
110
- return ; // Worker is closing (or has closed) the server.
111
- }
112
-
111
+ RoundRobinHandle . prototype . handoff = function ( ) {
113
112
const handle = peek ( this . handles ) ;
114
113
115
114
if ( handle === null ) {
116
- this . free . set ( worker . id , worker ) ; // Add to ready queue again.
115
+ return ;
116
+ }
117
+
118
+ let socket ;
119
+ if ( this . scheduler . exposeSocket === true ) {
120
+ socket = new net . Socket ( {
121
+ handle,
122
+ readable : false ,
123
+ writable : false ,
124
+ pauseOnCreate : true
125
+ } ) ;
126
+ }
127
+
128
+ const worker = this . scheduler . execute ( this . workers , socket ) ;
129
+ if ( typeof worker === 'undefined' ) {
117
130
return ;
118
131
}
119
132
@@ -127,6 +140,6 @@ RoundRobinHandle.prototype.handoff = function(worker) {
127
140
else
128
141
this . distribute ( 0 , handle ) ; // Worker is shutting down. Send to another.
129
142
130
- this . handoff ( worker ) ;
143
+ this . handoff ( ) ;
131
144
} ) ;
132
145
} ;
0 commit comments