1+ /*
2+ Copyright 2022 The Dapr Authors
3+ Licensed under the Apache License, Version 2.0 (the "License");
4+ you may not use this file except in compliance with the License.
5+ You may obtain a copy of the License at
6+ http://www.apache.org/licenses/LICENSE-2.0
7+ Unless required by applicable law or agreed to in writing, software
8+ distributed under the License is distributed on an "AS IS" BASIS,
9+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+ See the License for the specific language governing permissions and
11+ limitations under the License.
12+ */
13+
14+ import GRPCClient from './GRPCClient' ;
15+ import * as grpc from "@grpc/grpc-js" ;
16+ import { GetConfigurationRequest , GetConfigurationResponse , SubscribeConfigurationRequest , SubscribeConfigurationResponse , UnsubscribeConfigurationRequest , UnsubscribeConfigurationResponse } from '../../../proto/dapr/proto/runtime/v1/dapr_pb' ;
17+ import IClientConfiguration from '../../../interfaces/Client/IClientConfiguration' ;
18+ import { KeyValueType } from '../../../types/KeyValue.type' ;
19+ import { GetConfigurationResponse as GetConfigurationResponseResult } from '../../../types/configuration/GetConfigurationResponse' ;
20+ import { SubscribeConfigurationResponse as SubscribeConfigurationResponseResult } from '../../../types/configuration/SubscribeConfigurationResponse' ;
21+ import { SubscribeConfigurationCallback } from '../../../types/configuration/SubscribeConfigurationCallback' ;
22+ import { SubscribeConfigurationStream } from '../../../types/configuration/SubscribeConfigurationStream' ;
23+
24+ export default class GRPCClientConfiguration implements IClientConfiguration {
25+ client : GRPCClient ;
26+
27+ constructor ( client : GRPCClient ) {
28+ this . client = client ;
29+ }
30+
31+ async get ( storeName : string , keys : string [ ] , metadataObj ?: KeyValueType ) : Promise < GetConfigurationResponseResult > {
32+ const metadata = new grpc . Metadata ( ) ;
33+
34+ const msg = new GetConfigurationRequest ( ) ;
35+ msg . setStoreName ( storeName ) ;
36+
37+ if ( keys && keys . length > 0 ) {
38+ msg . setKeysList ( keys . filter ( i => i !== "" ) ) ;
39+ }
40+
41+ if ( metadataObj ) {
42+ for ( const [ key , value ] of Object . entries ( metadataObj ) ) {
43+ metadata . add ( key , value ) ;
44+ }
45+ }
46+
47+ return new Promise ( ( resolve , reject ) => {
48+ const client = this . client . getClient ( ) ;
49+ client . getConfigurationAlpha1 ( msg , metadata , ( err , res : GetConfigurationResponse ) => {
50+ if ( err ) {
51+ return reject ( err ) ;
52+ }
53+
54+ const wrapped : GetConfigurationResponseResult = {
55+ items : res . getItemsList ( ) . map ( ( item ) => ( {
56+ key : item . getKey ( ) ,
57+ value : item . getValue ( ) ,
58+ version : item . getVersion ( ) ,
59+ metadata : item . getMetadataMap ( ) . toObject ( ) . reduce ( ( result : object , [ key , value ] ) => {
60+ // @ts -ignore
61+ result [ key ] = value ;
62+ return result
63+ } , { } ) ,
64+ } ) )
65+ }
66+
67+ return resolve ( wrapped ) ;
68+ } ) ;
69+ } ) ;
70+ }
71+
72+ async subscribe ( storeName : string , cb : SubscribeConfigurationCallback ) : Promise < SubscribeConfigurationStream > {
73+ return this . _subscribe ( storeName , cb )
74+ }
75+
76+ async subscribeWithKeys ( storeName : string , keys : string [ ] , cb : SubscribeConfigurationCallback ) : Promise < SubscribeConfigurationStream > {
77+ return this . _subscribe ( storeName , cb , keys )
78+ }
79+
80+ async subscribeWithMetadata ( storeName : string , keys : string [ ] , metadata : KeyValueType , cb : SubscribeConfigurationCallback ) : Promise < SubscribeConfigurationStream > {
81+ return this . _subscribe ( storeName , cb , keys , metadata )
82+ }
83+
84+ async _subscribe ( storeName : string , cb : SubscribeConfigurationCallback , keys ?: string [ ] , metadataObj ?: KeyValueType ) : Promise < SubscribeConfigurationStream > {
85+ const metadata = new grpc . Metadata ( ) ;
86+
87+ const msg = new SubscribeConfigurationRequest ( ) ;
88+ msg . setStoreName ( storeName ) ;
89+
90+ if ( keys && keys . length > 0 ) {
91+ msg . setKeysList ( keys . filter ( i => i !== "" ) ) ;
92+ } else {
93+ msg . setKeysList ( [ ] ) ;
94+ }
95+
96+ if ( metadataObj ) {
97+ for ( const [ key , value ] of Object . entries ( metadataObj ) ) {
98+ metadata . add ( key , value ) ;
99+ }
100+ }
101+
102+ const client = this . client . getClient ( ) ;
103+
104+ // Open a stream. Note that this is a never-ending stream
105+ // and will stay open as long as the client is open
106+ // we will thus create a set with our listeners so we don't
107+ // break on multi listeners
108+ const stream = client . subscribeConfigurationAlpha1 ( msg , metadata ) ;
109+ let streamId : string ;
110+
111+ stream . on ( "data" , async ( data : SubscribeConfigurationResponse ) => {
112+ streamId = data . getId ( ) ;
113+
114+ const wrapped : SubscribeConfigurationResponseResult = {
115+ items : data . getItemsList ( ) . map ( ( item ) => ( {
116+ key : item . getKey ( ) ,
117+ value : item . getValue ( ) ,
118+ version : item . getVersion ( ) ,
119+ metadata : item . getMetadataMap ( ) . toObject ( ) . reduce ( ( result : object , [ key , value ] ) => {
120+ // @ts -ignore
121+ result [ key ] = value ;
122+ return result
123+ } , { } ) ,
124+ } ) )
125+ }
126+
127+ await cb ( wrapped ) ;
128+ } ) ;
129+
130+ return {
131+ stop : async ( ) => {
132+ return new Promise ( ( resolve , reject ) => {
133+ const req = new UnsubscribeConfigurationRequest ( ) ;
134+ req . setStoreName ( storeName ) ;
135+ req . setId ( streamId ) ;
136+
137+ client . unsubscribeConfigurationAlpha1 ( req , ( err , res : UnsubscribeConfigurationResponse ) => {
138+ if ( err || ! res . getOk ( ) ) {
139+ return reject ( res . getMessage ( ) ) ;
140+ }
141+
142+ // Clean up the node.js event emitter
143+ stream . removeAllListeners ( ) ;
144+ stream . destroy ( ) ;
145+
146+ return resolve ( ) ;
147+ } ) ;
148+ } )
149+ }
150+ } ;
151+ }
152+ }
0 commit comments