1818
1919package org .apache .hadoop .yarn .server .timelineservice .documentstore .reader .cosmosdb ;
2020
21- import com .microsoft .azure .documentdb .Document ;
22- import com .microsoft .azure .documentdb .DocumentClient ;
23- import com .microsoft .azure .documentdb .FeedOptions ;
21+ import com .google .common .annotations .VisibleForTesting ;
22+ import com .google .common .collect .Sets ;
23+ import com .microsoft .azure .cosmosdb .FeedOptions ;
24+ import com .microsoft .azure .cosmosdb .FeedResponse ;
25+ import com .microsoft .azure .cosmosdb .rx .AsyncDocumentClient ;
2426import org .apache .hadoop .conf .Configuration ;
2527import org .apache .hadoop .yarn .server .timelineservice .reader .TimelineReaderContext ;
2628import org .apache .hadoop .yarn .server .timelineservice .documentstore .DocumentStoreUtils ;
3032import org .apache .hadoop .yarn .server .timelineservice .documentstore .reader .DocumentStoreReader ;
3133import org .slf4j .Logger ;
3234import org .slf4j .LoggerFactory ;
35+ import rx .Observable ;
36+ import rx .Scheduler ;
37+ import rx .schedulers .Schedulers ;
3338
34- import java .util .ArrayList ;
35- import java .util .HashSet ;
36- import java .util .Iterator ;
3739import java .util .List ;
3840import java .util .Set ;
41+ import java .util .concurrent .ExecutorService ;
42+ import java .util .concurrent .Executors ;
3943
4044
4145/**
@@ -49,7 +53,7 @@ public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
4953 .getLogger (CosmosDBDocumentStoreReader .class );
5054 private static final int DEFAULT_DOCUMENTS_SIZE = 1 ;
5155
52- private static volatile DocumentClient client ;
56+ private static AsyncDocumentClient client ;
5357 private final String databaseName ;
5458 private final static String COLLECTION_LINK = "/dbs/%s/colls/%s" ;
5559 private final static String SELECT_TOP_FROM_COLLECTION = "SELECT TOP %d * " +
@@ -66,17 +70,24 @@ public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
6670 "\" %s\" ) " ;
6771 private final static String ORDER_BY_CLAUSE = " ORDER BY c.createdTime" ;
6872
73+ // creating thread pool of size, half of the total available threads from JVM
74+ private static ExecutorService executorService = Executors .newFixedThreadPool (
75+ Runtime .getRuntime ().availableProcessors () / 2 );
76+ private static Scheduler schedulerForBlockingWork =
77+ Schedulers .from (executorService );
78+
6979 public CosmosDBDocumentStoreReader (Configuration conf ) {
7080 LOG .info ("Initializing Cosmos DB DocumentStoreReader..." );
7181 databaseName = DocumentStoreUtils .getCosmosDBDatabaseName (conf );
72- // making CosmosDB Client Singleton
82+ initCosmosDBClient (conf );
83+ }
84+
85+ private synchronized void initCosmosDBClient (Configuration conf ) {
86+ // making CosmosDB Async Client Singleton
7387 if (client == null ) {
74- synchronized (this ) {
75- if (client == null ) {
76- LOG .info ("Creating Cosmos DB Client..." );
77- client = DocumentStoreUtils .createCosmosDBClient (conf );
78- }
79- }
88+ LOG .info ("Creating Cosmos DB Reader Async Client..." );
89+ client = DocumentStoreUtils .createCosmosDBAsyncClient (conf );
90+ addShutdownHook ();
8091 }
8192 }
8293
@@ -104,15 +115,16 @@ public Set<String> fetchEntityTypes(String collectionName,
104115 LOG .debug ("Querying Collection : {} , with query {}" , collectionName ,
105116 sqlQuery );
106117
107- Set <String > entityTypes = new HashSet <>();
108- Iterator <Document > documentIterator = client .queryDocuments (
118+ return Sets .newHashSet (client .queryDocuments (
109119 String .format (COLLECTION_LINK , databaseName , collectionName ),
110- sqlQuery , null ).getQueryIterator ();
111- while (documentIterator .hasNext ()) {
112- Document document = documentIterator .next ();
113- entityTypes .add (document .getString (ENTITY_TYPE_COLUMN ));
114- }
115- return entityTypes ;
120+ sqlQuery , new FeedOptions ())
121+ .map (FeedResponse ::getResults ) // Map the page to the list of documents
122+ .concatMap (Observable ::from )
123+ .map (document -> String .valueOf (document .get (ENTITY_TYPE_COLUMN )))
124+ .toList ()
125+ .subscribeOn (schedulerForBlockingWork )
126+ .toBlocking ()
127+ .single ());
116128 }
117129
118130 @ Override
@@ -133,25 +145,25 @@ private List<TimelineDoc> queryDocuments(String collectionName,
133145 final long maxDocumentsSize ) {
134146 final String sqlQuery = buildQueryWithPredicates (context , collectionName ,
135147 maxDocumentsSize );
136- List <TimelineDoc > timelineDocs = new ArrayList <>();
137148 LOG .debug ("Querying Collection : {} , with query {}" , collectionName ,
138149 sqlQuery );
139150
140- FeedOptions feedOptions = new FeedOptions ();
141- feedOptions .setPageSize ((int ) maxDocumentsSize );
142- Iterator <Document > documentIterator = client .queryDocuments (
143- String .format (COLLECTION_LINK , databaseName , collectionName ),
144- sqlQuery , feedOptions ).getQueryIterator ();
145- while (documentIterator .hasNext ()) {
146- Document document = documentIterator .next ();
147- TimelineDoc resultDoc = document .toObject (docClass );
148- if (resultDoc .getCreatedTime () == 0 &&
149- document .getTimestamp () != null ) {
150- resultDoc .setCreatedTime (document .getTimestamp ().getTime ());
151- }
152- timelineDocs .add (resultDoc );
153- }
154- return timelineDocs ;
151+ return client .queryDocuments (String .format (COLLECTION_LINK ,
152+ databaseName , collectionName ), sqlQuery , new FeedOptions ())
153+ .map (FeedResponse ::getResults ) // Map the page to the list of documents
154+ .concatMap (Observable ::from )
155+ .map (document -> {
156+ TimelineDoc resultDoc = document .toObject (docClass );
157+ if (resultDoc .getCreatedTime () == 0 &&
158+ document .getTimestamp () != null ) {
159+ resultDoc .setCreatedTime (document .getTimestamp ().getTime ());
160+ }
161+ return resultDoc ;
162+ })
163+ .toList ()
164+ .subscribeOn (schedulerForBlockingWork )
165+ .toBlocking ()
166+ .single ();
155167 }
156168
157169 private String buildQueryWithPredicates (TimelineReaderContext context ,
@@ -168,33 +180,34 @@ private String buildQueryWithPredicates(TimelineReaderContext context,
168180 return addPredicates (context , collectionName , queryStrBuilder );
169181 }
170182
171- private String addPredicates (TimelineReaderContext context ,
183+ @ VisibleForTesting
184+ String addPredicates (TimelineReaderContext context ,
172185 String collectionName , StringBuilder queryStrBuilder ) {
173186 boolean hasPredicate = false ;
174187
175188 queryStrBuilder .append (WHERE_CLAUSE );
176189
177- if (context .getClusterId () != null ) {
190+ if (! DocumentStoreUtils . isNullOrEmpty ( context .getClusterId ()) ) {
178191 hasPredicate = true ;
179192 queryStrBuilder .append (String .format (CONTAINS_FUNC_FOR_ID ,
180193 context .getClusterId ()));
181194 }
182- if (context .getUserId () != null ) {
195+ if (! DocumentStoreUtils . isNullOrEmpty ( context .getUserId ()) ) {
183196 hasPredicate = true ;
184197 queryStrBuilder .append (AND_OPERATOR )
185198 .append (String .format (CONTAINS_FUNC_FOR_ID , context .getUserId ()));
186199 }
187- if (context .getFlowName () != null ) {
200+ if (! DocumentStoreUtils . isNullOrEmpty ( context .getFlowName ()) ) {
188201 hasPredicate = true ;
189202 queryStrBuilder .append (AND_OPERATOR )
190203 .append (String .format (CONTAINS_FUNC_FOR_ID , context .getFlowName ()));
191204 }
192- if (context .getAppId () != null ) {
205+ if (! DocumentStoreUtils . isNullOrEmpty ( context .getAppId ()) ) {
193206 hasPredicate = true ;
194207 queryStrBuilder .append (AND_OPERATOR )
195208 .append (String .format (CONTAINS_FUNC_FOR_ID , context .getAppId ()));
196209 }
197- if (context .getEntityId () != null ) {
210+ if (! DocumentStoreUtils . isNullOrEmpty ( context .getEntityId ()) ) {
198211 hasPredicate = true ;
199212 queryStrBuilder .append (AND_OPERATOR )
200213 .append (String .format (CONTAINS_FUNC_FOR_ID , context .getEntityId ()));
@@ -204,7 +217,7 @@ private String addPredicates(TimelineReaderContext context,
204217 queryStrBuilder .append (AND_OPERATOR )
205218 .append (String .format (CONTAINS_FUNC_FOR_ID , context .getFlowRunId ()));
206219 }
207- if (context .getEntityType () != null ){
220+ if (! DocumentStoreUtils . isNullOrEmpty ( context .getEntityType ()) ){
208221 hasPredicate = true ;
209222 queryStrBuilder .append (AND_OPERATOR )
210223 .append (String .format (CONTAINS_FUNC_FOR_TYPE ,
@@ -224,9 +237,17 @@ private String addPredicates(TimelineReaderContext context,
224237 @ Override
225238 public synchronized void close () {
226239 if (client != null ) {
227- LOG .info ("Closing Cosmos DB Client..." );
240+ LOG .info ("Closing Cosmos DB Reader Async Client..." );
228241 client .close ();
229242 client = null ;
230243 }
231244 }
232- }
245+
246+ private void addShutdownHook () {
247+ Runtime .getRuntime ().addShutdownHook (new Thread (() -> {
248+ if (executorService != null ) {
249+ executorService .shutdown ();
250+ }
251+ }));
252+ }
253+ }
0 commit comments