diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java index 6326b42ed..1a73fbdb8 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java @@ -55,7 +55,14 @@ private LensConfConstants() throws LensException { * The Constant DRIVER_CLASSES. */ public static final String DRIVER_CLASSES = SERVER_PFX + "drivers"; - + /** + * The Constant DRIVER_SELECTOR_CLASS. + */ + public static final String DRIVER_SELECTOR_CLASS = SERVER_PFX + "driver.selector.class"; + /** + * The Constant ACCEPTOR_CLASSES. + */ + public static final String ACCEPTOR_CLASSES = SERVER_PFX + "acceptors"; /** * The Constant SERVICE_NAMES. */ @@ -835,15 +842,15 @@ public static String getWSFilterImplConfKey(String filterName) { public static final boolean DEFAULT_ENABLE_QUERY_METRICS = false; /** - * Key used to hold value of unique id for query metrics. This wont be passed by user, will be generated and set. - * This is to pass unique id for query across the code flow. + * Key used to hold value of unique id for query metrics. This wont be passed by user, will be generated and set. This + * is to pass unique id for query across the code flow. */ public static final String QUERY_METRIC_UNIQUE_ID_CONF_KEY = QUERY_PFX + "metric.unique.id"; /** - * Key used to hold value query metric name in the stack. This wont be passed by user, will be generated and set. - * When each query looked at by driver, the metric needs to be different for each driver. This name capture the stack - * from which driver the code reached there. + * Key used to hold value query metric name in the stack. This wont be passed by user, will be generated and set. When + * each query looked at by driver, the metric needs to be different for each driver. This name capture the stack from + * which driver the code reached there. */ public static final String QUERY_METRIC_DRIVER_STACK_NAME = QUERY_PFX + "metric.driver.stack.name"; diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index 3e0e0db81..5b7340fda 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -126,7 +126,7 @@ public class QueryExecutionServiceImpl extends LensService implements QueryExecu /** * The accepted queries. */ - private PriorityBlockingQueue acceptedQueries = new PriorityBlockingQueue(); + private PriorityBlockingQueue queuedQueries = new PriorityBlockingQueue(); /** * The launched queries. @@ -262,7 +262,23 @@ public QueryExecutionServiceImpl(CLIService cliService) throws LensException { /** * Initialize query acceptors and listeners. */ - private void initializeQueryAcceptorsAndListeners() { + private void initializeQueryAcceptors() throws LensException { + String[] acceptorClasses = conf.getStrings(LensConfConstants.ACCEPTOR_CLASSES); + if (acceptorClasses != null) { + for (String acceptorClass : acceptorClasses) { + try { + Class clazz = Class.forName(acceptorClass); + QueryAcceptor acceptor = (QueryAcceptor) clazz.newInstance(); + queryAcceptors.add(acceptor); + } catch (Exception e) { + LOG.warn("Could not load the acceptor:" + acceptorClass, e); + throw new LensException("Could not load acceptor" + acceptorClass, e); + } + } + } + } + + private void initializeListeners() { if (conf.getBoolean(LensConfConstants.QUERY_STATE_LOGGER_ENABLED, true)) { getEventService().addListenerForType(new QueryStatusLogger(), StatusChange.class); LOG.info("Registered query state logger"); @@ -281,7 +297,6 @@ private void initializeQueryAcceptorsAndListeners() { * @throws LensException the lens exception */ private void loadDriversAndSelector() throws LensException { - conf.get(LensConfConstants.DRIVER_CLASSES); String[] driverClasses = conf.getStrings(LensConfConstants.DRIVER_CLASSES); if (driverClasses != null) { for (String driverClass : driverClasses) { @@ -304,7 +319,17 @@ private void loadDriversAndSelector() throws LensException { } else { throw new LensException("No drivers specified"); } - driverSelector = new MinQueryCostSelector(); + try { + Class driverSelectorClass = conf.getClass(LensConfConstants.DRIVER_SELECTOR_CLASS, + MinQueryCostSelector.class, + DriverSelector.class); + LOG.info("Using driver selector class: " + driverSelectorClass.getCanonicalName()); + driverSelector = driverSelectorClass.newInstance(); + } catch (Exception e) { + throw new LensException("Couldn't instantiate driver selector class. Class name: " + + conf.get(LensConfConstants.DRIVER_SELECTOR_CLASS) + ". Please supply a valid value for " + + LensConfConstants.DRIVER_SELECTOR_CLASS); + } } protected LensEventService getEventService() { @@ -462,7 +487,7 @@ public void run() { LOG.info("Starting QuerySubmitter thread"); while (!pausedForTest && !stopped && !querySubmitter.isInterrupted()) { try { - QueryContext ctx = acceptedQueries.take(); + QueryContext ctx = queuedQueries.take(); synchronized (ctx) { if (ctx.getStatus().getStatus().equals(Status.QUEUED)) { LOG.info("Launching query:" + ctx.getUserQuery()); @@ -608,7 +633,7 @@ private void updateFinishedQuery(QueryContext ctx, QueryStatus before) { // before would be null in case of server restart if (before != null) { if (before.getStatus().equals(Status.QUEUED)) { - acceptedQueries.remove(ctx); + queuedQueries.remove(ctx); } else { launchedQueries.remove(ctx); } @@ -844,7 +869,12 @@ public void run() { public synchronized void init(HiveConf hiveConf) { super.init(hiveConf); this.conf = hiveConf; - initializeQueryAcceptorsAndListeners(); + try { + initializeQueryAcceptors(); + } catch (LensException e) { + throw new IllegalStateException("Could not load acceptors"); + } + initializeListeners(); try { loadDriversAndSelector(); } catch (LensException e) { @@ -873,7 +903,7 @@ private void initalizeFinishedQueryStore(Configuration conf) { module.addSerializer(ColumnDescriptor.class, new JsonSerializer() { @Override public void serialize(ColumnDescriptor columnDescriptor, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { jsonGenerator.writeStartObject(); jsonGenerator.writeStringField("name", columnDescriptor.getName()); jsonGenerator.writeStringField("comment", columnDescriptor.getComment()); @@ -997,8 +1027,8 @@ public Thread newThread(Runnable r) { private static final String PARALLEL_CALL_GAUGE = "PARALLEL_ESTIMATE"; /** - * Rewrite the query for each driver, and estimate query cost for the rewritten queries. - * Finally, select the driver using driver selector. + * Rewrite the query for each driver, and estimate query cost for the rewritten queries. Finally, select the driver + * using driver selector. * * @param ctx query context * @throws LensException the lens exception @@ -1089,8 +1119,8 @@ private void rewriteAndSelect(final AbstractQueryContext ctx) throws LensExcepti } /** - * Chains driver specific rewrite and estimate of the query in a single runnable, which can be - * processed in a background thread + * Chains driver specific rewrite and estimate of the query in a single runnable, which can be processed in a + * background thread */ public class RewriteEstimateRunnable implements Runnable { @Getter @@ -1231,11 +1261,11 @@ private LensResultSet getResultset(QueryHandle queryHandle) throws LensException if (resultSet == null) { if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) { resultSets - .put(queryHandle, - new LensPersistentResult( - ctx.getQueryOutputFormatter().getMetadata(), - ctx.getQueryOutputFormatter().getFinalOutputPath(), - ctx.getQueryOutputFormatter().getNumRows())); + .put(queryHandle, + new LensPersistentResult( + ctx.getQueryOutputFormatter().getMetadata(), + ctx.getQueryOutputFormatter().getFinalOutputPath(), + ctx.getQueryOutputFormatter().getNumRows())); } else if (allQueries.get(queryHandle).isResultAvailableInDriver()) { resultSet = allQueries.get(queryHandle).getSelectedDriver().fetchResultSet(allQueries.get(queryHandle)); resultSets.put(queryHandle, resultSet); @@ -1471,7 +1501,7 @@ private QueryHandle executeAsyncInternal(LensSessionHandle sessionHandle, QueryC ctx.setLensSessionIdentifier(sessionHandle.getPublicId().toString()); QueryStatus before = ctx.getStatus(); ctx.setStatus(new QueryStatus(0.0, QueryStatus.Status.QUEUED, "Query is queued", false, null, null)); - acceptedQueries.add(ctx); + queuedQueries.add(ctx); allQueries.put(ctx.getQueryHandle(), ctx); fireStatusChangeEvent(ctx, ctx.getStatus(), before); LOG.info("Returning handle " + ctx.getQueryHandle().getHandleId()); @@ -2135,7 +2165,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept switch (ctx.getStatus().getStatus()) { case NEW: case QUEUED: - acceptedQueries.add(ctx); + queuedQueries.add(ctx); break; case LAUNCHED: case RUNNING: @@ -2289,7 +2319,7 @@ Collection getDrivers() { @Override public long getQueuedQueriesCount() { - return acceptedQueries.size(); + return queuedQueries.size(); } @Override @@ -2321,6 +2351,7 @@ protected void handleDriverSessionStart(DriverEvent event) { LOG.warn("Lens session went away for sessionid:" + lensSession); return; } + try { LensSessionImpl session = getSession(sessionHandle); acquire(sessionHandle); @@ -2353,7 +2384,8 @@ protected void handleDriverSessionStart(DriverEvent event) { /** * Add session's resources to selected driver if needed - * @param ctx the query context + * + * @param ctx QueryContext for executinf queries * @throws LensException */ protected void addSessionResourcesToDriver(final AbstractQueryContext ctx) { @@ -2412,17 +2444,18 @@ protected void addSessionResourcesToDriver(final AbstractQueryContext ctx) { /** * Add resources to hive driver, returning resources which failed to be added - * @param resources collection of resources intented to be added to hive driver + * + * @param resources collection of resources intented to be added to hive driver * @param sessionHandle * @param hiveDriver * @return resources which could not be added to hive driver */ private List addResources(Collection resources, - LensSessionHandle sessionHandle, - HiveDriver hiveDriver) { + LensSessionHandle sessionHandle, + HiveDriver hiveDriver) { List failedResources = new ArrayList(); for (ResourceEntry res : resources) { - try{ + try { addSingleResourceToHive(hiveDriver, res, sessionHandle); } catch (LensException exc) { failedResources.add(res); @@ -2434,7 +2467,7 @@ private List addResources(Collection resources, } private void addSingleResourceToHive(HiveDriver driver, ResourceEntry res, - LensSessionHandle sessionHandle) throws LensException { + LensSessionHandle sessionHandle) throws LensException { String sessionIdentifier = sessionHandle.getPublicId().toString(); String uri = res.getLocation(); // Hive doesn't and URIs starting with file:/ correctly, so we have to change it to file:/// diff --git a/lens-server/src/main/resources/lensserver-default.xml b/lens-server/src/main/resources/lensserver-default.xml index c153193a7..d1b96aef6 100644 --- a/lens-server/src/main/resources/lensserver-default.xml +++ b/lens-server/src/main/resources/lensserver-default.xml @@ -28,6 +28,18 @@ Drivers enabled for this lens server instance + + lens.server.driver.selector.class + org.apache.lens.server.api.driver.MinQueryCostSelector + Class for selecting best driver given the query context + + + + lens.server.query.acceptors + + Query Acceptors configuredsk + + lens.server.servicenames session,query,metastore,scheduler,quota